Compare commits

..

No commits in common. "master" and "1.0.0-b37" have entirely different histories.

23 changed files with 294 additions and 546 deletions

View File

@ -97,7 +97,7 @@ spec:
cpu: 10m cpu: 10m
memory: 256Mi memory: 256Mi
limits: limits:
cpu: 100m cpu: 250m
memory: 512Mi memory: 512Mi
securityContext: securityContext:
capabilities: capabilities:

View File

@ -32,7 +32,7 @@ try {
timestamps { timestamps {
node ("${env.SLAVE_LABEL}") { node ("${env.SLAVE_LABEL}") {
stage('Initialise') { stage('Initialise') {
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]]) checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: 'https://github.com/andyjk15/db-gateway.git']]])
env.APPLICATION_VERSION = get_application_version() env.APPLICATION_VERSION = get_application_version()
@ -83,6 +83,7 @@ try {
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}" sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}"
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest" sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest"
} }
} }
stage('Tag Repository') { stage('Tag Repository') {

View File

@ -5,4 +5,6 @@ APPLICATION_NAME=$1
kubectl apply -f configuration/kubernetes/deployment.yaml kubectl apply -f configuration/kubernetes/deployment.yaml
kubectl apply -f configuration/kubernetes/service.yaml kubectl apply -f configuration/kubernetes/service.yaml
kubectl get pods
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production

47
pom.xml
View File

@ -21,7 +21,6 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<logback.contrib.version>0.1.5</logback.contrib.version>
</properties> </properties>
<dependencies> <dependencies>
@ -48,6 +47,23 @@
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency> </dependency>
<!-- GraghQL -->
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-java-tools</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphiql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<!-- Local database --> <!-- Local database -->
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>
@ -67,23 +83,6 @@
<version>4.3.8.Final</version> <version>4.3.8.Final</version>
</dependency> </dependency>
<!-- GraghQL -->
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-java-tools</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphiql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<!-- Artemis --> <!-- Artemis -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
@ -98,18 +97,6 @@
<version>1.18.8</version> <version>1.18.8</version>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--- Logging -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.4</version>
</dependency>
<!-- Testing --> <!-- Testing -->
<dependency> <dependency>

View File

@ -4,8 +4,7 @@ public enum ArtemisSyncMessaging {
MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"), MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"), SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT"), CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT");
CONSUME_SENTIMENT_EVENT("CONSUME_SENTIMENT_EVENT");
private final String syncMessage; private final String syncMessage;

View File

@ -17,14 +17,12 @@ import org.springframework.stereotype.Component;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.io.IOException; import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import static cryptosky.me.ArtemisSyncMessaging.*; import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.helpers.Utils.getCorrelationId; import static cryptosky.me.helpers.Utils.getCorrelationId;
import static cryptosky.me.helpers.Utils.getSyncId;
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.lang.String.format; import static java.lang.String.format;
@Component @Component
@ -35,7 +33,6 @@ public class PriceConsumer {
private final PriceService priceService; private final PriceService priceService;
private final DlqService dlqService; private final DlqService dlqService;
private final String dlqName; private final String dlqName;
private final String dlqGateway;
private CountDownLatch latch = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1);
@ -44,28 +41,22 @@ public class PriceConsumer {
ObjectMapper objectMapper, ObjectMapper objectMapper,
PriceService priceService, PriceService priceService,
DlqService dlqService, DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName, @Value("${destinations.pricing.priceDlq}") String dlqName
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
) { ) {
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.priceService = priceService; this.priceService = priceService;
this.dlqService = dlqService; this.dlqService = dlqService;
this.dlqName = dlqName; this.dlqName = dlqName;
this.dlqGateway = dlqGateway;
} }
@JmsListener(destination = "${destinations.pricing.priceSave}") @JmsListener(destination = "${destinations.pricing.priceSave}")
public void receive(TextMessage message) throws JMSException { public void receive(TextMessage message) throws JMSException {
String correlationId = getCorrelationId(message); String correlationId = getCorrelationId(message);
String syncId = getSyncId(message); String syncId = UUID.randomUUID().toString();
setCorrelationId(correlationId); logger.info("Received Message: " + message.getBody(String.class));
setSyncId(syncId);
logger.info("Received Message: " + message.getBody(String.class) + " for syncId :: [{}]", syncId);
try { try {
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) { if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
throw new NoBodyOrStringException(); throw new NoBodyOrStringException();
} }
@ -78,9 +69,9 @@ public class PriceConsumer {
JsonNode messageJson = objectMapper.readTree(message.getText()); JsonNode messageJson = objectMapper.readTree(message.getText());
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class); cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
logger.info("Message with syncId of [{}] is for [{}]", syncId, cryptoPriceModel); logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel);
priceService.createRecord(cryptoPriceModel, correlationId, message); priceService.createRecord(cryptoPriceModel, correlationId, syncId, message);
latch.countDown(); latch.countDown();
} catch (IOException e) { } catch (IOException e) {
@ -109,20 +100,7 @@ public class PriceConsumer {
}); });
latch.countDown(); latch.countDown();
} catch (JMSException jmsException) { } catch (JMSException jmsException) {
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_PRICE_EVENT, jmsException, logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqGateway,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown(); latch.countDown();
} catch (Exception e) { } catch (Exception e) {
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId); String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);

View File

@ -1,144 +0,0 @@
package cryptosky.me.consumers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.NoBodyOrStringException;
import cryptosky.me.sentiment.models.entities.SentimentModel;
import cryptosky.me.sentiment.service.SentimentService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.helpers.Utils.getCorrelationId;
import static cryptosky.me.helpers.Utils.getSyncId;
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.lang.String.format;
@Component
public class TweetConsumer {
private final Logger logger = LoggerFactory.getLogger(TweetConsumer.class);
private final ObjectMapper objectMapper;
private final SentimentService sentimentService;
private final DlqService dlqService;
private final String dlqName;
private final String dlqGateway;
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
public TweetConsumer(
ObjectMapper objectMapper,
SentimentService sentimentService,
DlqService dlqService,
@Value("${destinations.tweet.tweetDlq}") String dlqName,
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
) {
this.objectMapper = objectMapper;
this.sentimentService = sentimentService;
this.dlqService = dlqService;
this.dlqName = dlqName;
this.dlqGateway = dlqGateway;
}
@JmsListener(destination = "${destinations.tweet.tweetSave}")
public void receive(TextMessage message) throws JMSException {
String correlationId = getCorrelationId(message);
String syncId = getSyncId(message);
setCorrelationId(correlationId);
setSyncId(syncId);
logger.info("Received Message: " + message.getBody(String.class));
try {
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
throw new NoBodyOrStringException();
}
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
SentimentModel sentimentModel;
try {
JsonNode messageJson = objectMapper.readTree(message.getText());
sentimentModel = objectMapper.readValue(messageJson.traverse(), SentimentModel.class);
logger.info("Message with syncId of [{}] is for [{}]", syncId, sentimentModel);
sentimentService.createRecord(sentimentModel, correlationId, message);
latch.countDown();
} catch (IOException e) {
logger.info(format(
"Message [%s] for sentimentModel [%s] was not synced due to a readTree exception on getting the text",
syncId,
correlationId
), e);
latch.countDown();
throw e;
}
} catch (NoBodyOrStringException e) {
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
CONSUME_SENTIMENT_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (JMSException jmsException) {
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_SENTIMENT_EVENT, jmsException,
correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqGateway,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (Exception e) {
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_SENTIMENT_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
}
}
}

View File

@ -16,8 +16,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class Utils { public class Utils {
private static final String CORRELATION_ID_KEY = "X-CRYPTO-Correlation-ID"; private static final String CORRELATION_ID_KEY = "CS-Correlation-ID";
private static final String SYNC_ID_KEY = "X-CRYPTO-Sync-ID";
public static String getCorrelationId(Message message) { public static String getCorrelationId(Message message) {
String correlationId = null; String correlationId = null;
@ -35,19 +34,6 @@ public class Utils {
return correlationId; return correlationId;
} }
public static String getSyncId(Message message) {
String syncId = null;
try {
syncId = message.getStringProperty(SYNC_ID_KEY);
} catch (Throwable e) {
// NOOP
}
if (isBlank(syncId)) {
syncId = randomUUID().toString();
}
return syncId;
}
// Utility Function to be able to get one of all types // Utility Function to be able to get one of all types
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) { public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
Map<Object, Boolean> seen = new ConcurrentHashMap<>(); Map<Object, Boolean> seen = new ConcurrentHashMap<>();

View File

@ -1,22 +0,0 @@
package cryptosky.me.logging;
import org.slf4j.MDC;
public class CorrelationInfo {
public static final String CORRELATION_ID = "X-CRYPTO-Correlation-ID";
public static final String SYNC_ID = "X-CRYPTO-Sync-ID";
public static void setCorrelationId(String correlationId) {
MDC.put(CORRELATION_ID, correlationId);
}
public static void setSyncId(String syncId) {
MDC.put(SYNC_ID, syncId);
}
public static void clear() {
MDC.remove(CORRELATION_ID);
MDC.remove(SYNC_ID);
}
}

View File

@ -0,0 +1,26 @@
package cryptosky.me.pricing.graphql.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cryptosky.me.pricing.service.PriceService;
@Component
public class BtcPriceMutation implements GraphQLMutationResolver {
@Autowired
private PriceService priceService;
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float o_price,
final float c_price,
final float volume ) {
return this.priceService.createBtc(createdDate, type, av_price, h_price, l_price, o_price, c_price, volume);
}
}

View File

@ -2,6 +2,7 @@ package cryptosky.me.pricing.graphql.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver; import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.pricing.models.entities.BtcPriceModel; import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.service.PriceService; import cryptosky.me.pricing.service.PriceService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View File

@ -9,6 +9,7 @@ import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.models.repositories.BtcPriceRepository; import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException; import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import cryptosky.me.exceptions.DatabaseViolationException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -21,16 +22,16 @@ import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.time.LocalDateTime;
import static cryptosky.me.ArtemisSyncMessaging.*; import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.*; import static cryptosky.me.SupportedCurrencies.*;
import static cryptosky.me.helpers.Utils.format; import static cryptosky.me.helpers.Utils.format;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.util.UUID.randomUUID;
@Service @Service
public class PriceService { public class PriceService {
@ -53,11 +54,31 @@ public class PriceService {
} }
@Transactional @Transactional
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, TextMessage message) { public BtcPriceModel createBtc(final String createdDate, final String type,
String syncId = randomUUID().toString(); final float av_price,
setSyncId(syncId); final float h_price,
try { final float l_price,
final float o_price,
final float c_price,
final float volume ) {
final BtcPriceModel btcPrice = new BtcPriceModel();
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
btcPrice.setType(type);
btcPrice.setAverage_price(av_price);
btcPrice.setHigh_price(h_price);
btcPrice.setLow_price(l_price);
btcPrice.setOpen_price(o_price);
btcPrice.setClose_price(c_price);
btcPrice.setVolume(volume);
return btcPrice;
// return this.btcPriceRepository.save(btcPrice);
}
@Transactional
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, String syncId, TextMessage message) {
try {
switch (cryptoPriceModel.getType()) { switch (cryptoPriceModel.getType()) {
case BTC: case BTC:
BtcPriceModel priceModel = new BtcPriceModel(); BtcPriceModel priceModel = new BtcPriceModel();
@ -101,7 +122,7 @@ public class PriceService {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException); logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
} }
} catch (Exception e) { } catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId); String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try { try {
DlqInfo dlqInfo = new DlqInfo( DlqInfo dlqInfo = new DlqInfo(
@ -115,7 +136,6 @@ public class PriceService {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage; return textMessage;
}); });
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) { } catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException); logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
} }

View File

@ -1,9 +0,0 @@
package cryptosky.me.sentiment.models.repositories;
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BtcSentimentRepository extends JpaRepository<BtcSentimentModel, Integer> {
}

View File

@ -1,207 +0,0 @@
package cryptosky.me.sentiment.service;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.MessageCodes;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
import cryptosky.me.sentiment.models.entities.SentimentModel;
import cryptosky.me.sentiment.models.repositories.BtcSentimentRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.BTC;
import static cryptosky.me.helpers.Utils.format;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.util.UUID.randomUUID;
@Service
public class SentimentService {
private final BtcSentimentRepository btcSentimentRepository;
private final DlqService dlqService;
private final String dlqName;
private final Logger logger = LoggerFactory.getLogger(SentimentService.class);
@Autowired
public SentimentService(
final BtcSentimentRepository btcSentimentRepository,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
) {
this.btcSentimentRepository = btcSentimentRepository;
this.dlqService = dlqService;
this.dlqName = dlqName;
}
// @Transactional
// public BtcSentimentModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
// final float positiveScore, final float neutralScore, final float negativeScore,
// final float compoundScore ) {
//
// final BtcSentimentModel tweetModel = new BtcSentimentModel();
// tweetModel.setTimestamp(format(createdDate).toString());
// tweetModel.setRawTweet(rawTweet);
// tweetModel.setSentimentScore(sentimentScore);
// tweetModel.setPositiveScore(positiveScore);
// tweetModel.setNegativeScore(negativeScore);
// tweetModel.setNeutralScore(neutralScore);
// tweetModel.setCompoundScore(compoundScore);
// return this.btcSentimentRepository.save(tweetModel);
// }
@Transactional
public void createRecord(SentimentModel sentimentModel, String correlationId, TextMessage message) {
String syncId = randomUUID().toString();
setSyncId(syncId);
try {
switch (sentimentModel.getType()) {
case BTC:
BtcSentimentModel btcSentimentModel = new BtcSentimentModel();
btcSentimentModel.setId(getLatestId() + 1);
btcSentimentModel.setTimestamp(sentimentModel.getTimestamp());
btcSentimentModel.setSyncId(syncId);
btcSentimentModel.setPositiveScore(sentimentModel.getPositiveScore());
btcSentimentModel.setNeutralScore(sentimentModel.getNeutralScore());
btcSentimentModel.setNegativeScore(sentimentModel.getNegativeScore());
btcSentimentModel.setCompoundScore(sentimentModel.getCompoundScore());
btcSentimentModel.setType(sentimentModel.getType());
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", sentimentModel.getType(), correlationId, syncId);
this.btcSentimentRepository.save(btcSentimentModel);
return;
default:
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), sentimentModel.getType(), MessageCodes.E01.getMessage());
throw new NotSupportedCurrencyTypeException();
}
} catch (DataIntegrityViolationException e) {
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), sentimentModel.getType(), MessageCodes.E02.getMessage());
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
} catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
}
}
@Transactional(readOnly = true)
public int getLatestId() {
List<BtcSentimentModel> records = new ArrayList<>(this.btcSentimentRepository.findAll());
BtcSentimentModel latest = records.stream().skip(records.size() - 1).findFirst().get();
return latest.getId();
}
@Transactional(readOnly = true)
public Optional<BtcSentimentModel> getCurrentTweet() {
return this.btcSentimentRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getAllTweets(final int count ) {
return this.btcSentimentRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getTweetsForDay(final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcSentimentRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
}
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getTweetsForPeriod(final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcSentimentRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
}

View File

@ -1,4 +1,4 @@
package cryptosky.me.sentiment.models.entities; package cryptosky.me.tweets.models.entities;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -9,7 +9,7 @@ import javax.persistence.Table;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
@Entity @Entity
@Table(name = "btc_sentiment") @Table(name = "btc_tweet")
public class BtcSentimentModel extends SentimentModel { public class BtcTweetModel extends TweetModel {
} }

View File

@ -1,4 +1,4 @@
package cryptosky.me.sentiment.models.entities; package cryptosky.me.tweets.models.entities;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@ -10,17 +10,21 @@ import javax.persistence.*;
@NoArgsConstructor @NoArgsConstructor
@Data @Data
@MappedSuperclass @MappedSuperclass
public class SentimentModel { public class TweetModel {
@Id @Id
@Column(name = "ID", nullable = false) @Column(name = "ID", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id; private int id;
@Column(name = "timestamp",nullable = false) @Column(name = "timestamp",nullable = false)
private String timestamp; private String timestamp;
@Column(name = "syncId", nullable = false) @Column(name = "raw_tweet", nullable = false)
private String syncId; private String rawTweet;
@Column(name = "sentiment")
private float sentimentScore;
@Column(name = "pos") @Column(name = "pos")
private float positiveScore; private float positiveScore;
@ -34,7 +38,4 @@ public class SentimentModel {
@Column(name = "compound", nullable = false) @Column(name = "compound", nullable = false)
private float compoundScore; private float compoundScore;
@Column(name = "type")
private String type;
} }

View File

@ -0,0 +1,9 @@
package cryptosky.me.tweets.models.repositories;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BtcTweetRepository extends JpaRepository<BtcTweetModel, Integer> {
}

View File

@ -0,0 +1,22 @@
package cryptosky.me.tweets.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class BtcTweetMutation implements GraphQLMutationResolver {
@Autowired
BtcTweetService btcTweetService;
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
final float positiveScore, final float neutralScore, final float negativeScore,
final float compoundScore ) {
return this.btcTweetService.createTweet( createdDate, rawTweet, sentimentScore, positiveScore, neutralScore,
negativeScore,compoundScore );
}
}

View File

@ -0,0 +1,33 @@
package cryptosky.me.tweets.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Component
public class BtcTweetQuery implements GraphQLQueryResolver {
@Autowired
private BtcTweetService btcTweetService;
public Optional<BtcTweetModel> getCurrentTweet() {
return this.btcTweetService.getCurrentTweet();
}
public List<BtcTweetModel> getAllTweets( final int count ) {
return this.btcTweetService.getAllTweets(count);
}
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
return this.btcTweetService.getTweetsForDay(startDate, endDate);
}
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
return this.btcTweetService.getTweetsForPeriod( startDate, endDate );
}
}

View File

@ -0,0 +1,101 @@
package cryptosky.me.tweets.service;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.models.repositories.BtcTweetRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static cryptosky.me.helpers.Utils.format;
@Service
public class BtcTweetService {
private final BtcTweetRepository btcTweetRepository;
public BtcTweetService(final BtcTweetRepository btcTweetRepository) {
this.btcTweetRepository = btcTweetRepository;
}
@Transactional
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
final float positiveScore, final float neutralScore, final float negativeScore,
final float compoundScore ) {
final BtcTweetModel tweetModel = new BtcTweetModel();
tweetModel.setTimestamp(format(createdDate).toString());
tweetModel.setRawTweet(rawTweet);
tweetModel.setSentimentScore(sentimentScore);
tweetModel.setPositiveScore(positiveScore);
tweetModel.setNegativeScore(negativeScore);
tweetModel.setNeutralScore(neutralScore);
tweetModel.setCompoundScore(compoundScore);
return this.btcTweetRepository.save(tweetModel);
}
@Transactional(readOnly = true)
public Optional<BtcTweetModel> getCurrentTweet() {
return this.btcTweetRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getAllTweets( final int count ) {
return this.btcTweetRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcTweetRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
}
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcTweetRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
}

View File

@ -30,11 +30,6 @@ destinations:
pricing: pricing:
priceSave: PricingSave priceSave: PricingSave
priceDlq: PricingSave.dlq priceDlq: PricingSave.dlq
tweet:
tweetSave: TweetSave
tweetDlq: TweetSave.dlq
gateway:
gatewayDlq: DBGateway.dlq
server: server:
port: 9090 port: 9090

View File

@ -1,35 +1,40 @@
type BtcPrice { type BtcPrice {
id: ID!, id: ID!,
timestamp: String!, timestamp: String!,
type: String, type: String,
average_price: Float!, average_price: Float!,
high_price: Float, high_price: Float,
low_price: Float, low_price: Float,
open_price: Float, open_price: Float,
close_price: Float, close_price: Float,
volume: Float volume: Float
} }
type sentimentModel { type tweetModel {
id: ID!, id: ID!,
timestamp: String!, timestamp: String!,
rawTweet: String, rawTweet: String,
sentimentScore: Float!, sentimentScore: Float!,
positiveScore: Float, positiveScore: Float,
neutralScore: Float, neutralScore: Float,
negativeScore: Float, negativeScore: Float,
compoundScore: Float! compoundScore: Float!
} }
type Query { type Query {
allPrices(count: Int):[BtcPrice], allPrices(count: Int):[BtcPrice],
pricesBetweenCounts(startCount: Int, endCount: Int):[BtcPrice] pricesBetweenCounts(startCount: Int, endCount: Int):[BtcPrice]
latest:BtcPrice, latest:BtcPrice,
priceForCreatedDate(createdDate: String):BtcPrice, priceForCreatedDate(createdDate: String):BtcPrice,
priceBetweenDates(startDate: String, endDate: String):[BtcPrice], priceBetweenDates(startDate: String, endDate: String):[BtcPrice],
################################################################ ################################################################
currentTweet:sentimentModel, currentTweet:tweetModel,
allTweets(count: Int):[sentimentModel], allTweets(count: Int):[tweetModel],
tweetsForDay(startDate: String, endDate: String):[sentimentModel], tweetsForDay(startDate: String, endDate: String):[tweetModel],
tweetsForPeriod(startDate: String, endDate: String):[sentimentModel] tweetsForPeriod(startDate: String, endDate: String):[tweetModel]
}
type Mutation {
createBtc(createdDate: String!, type: String!, average_price: Float!, high_price: Float, low_price: Float, open_price: Float, close_price: Float, volume: Float):BtcPrice
createTweet(createdDate: String!, rawTweet: String!, sentimentScore: Float!, positiveScore: Float, neutralScore: Float, negativeScore: Float, compoundScore: Float!):tweetModel
} }

View File

@ -1,36 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOGS" value="./logs" />
<appender name="logstash" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<fieldNames>
<version>[ignore]</version>
<levelValue>[ignore]</levelValue>
</fieldNames>
</encoder>
</appender>
<appender name="console"
class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg {%mdc}%n%throwable
</Pattern>
</layout>
</appender>
<springProfile name="!local">
<root level="INFO">
<appender-ref ref="logstash"/>
</root>
</springProfile>
<springProfile name="local">
<root level="INFO">
<appender-ref ref="console"/>
</root>
</springProfile>
<jmxConfigurator/>
</configuration>