Compare commits

...

17 Commits

Author SHA1 Message Date
57ab9b3e01 [07.12.20] Brought back graphql queries 2020-12-07 20:04:34 +00:00
487eb48322 Revert "[07.12.20] Updating dependencies"
This reverts commit a78cdb24
2020-12-07 19:34:45 +00:00
8955933a47 Revert "[07.12.20] Updating dependencies"
This reverts commit 0626395b
2020-12-07 19:34:37 +00:00
0626395b9f [07.12.20] Updating dependencies 2020-12-07 19:26:55 +00:00
a78cdb24fd [07.12.20] Updating dependencies 2020-12-07 19:14:19 +00:00
1b1bde8c76 [15.10.20] Logging changes 2020-10-15 20:33:57 +01:00
ddb5012b95 [11.10.20] Removed graphql deletion 2020-10-11 15:17:40 +01:00
80843da00e [11.10.20] Sentiment process service and lowered CPU limit 2020-10-11 14:53:05 +01:00
d8576a29ec [11.10.20] Sentiment process service and lowered CPU limit 2020-10-11 14:40:30 +01:00
602076977d [07.10.20] Wrong Header being got 2020-10-07 09:08:04 +01:00
51b6adadcb [06.10.20] Moved setting of correlationID to before first logging as it was generating a random ID on receiving a message 2020-10-06 19:29:57 +01:00
81b988e7b8 [06.10.20] Moved setting of correlationID to before first logging as it was generating a random ID on receiving a message 2020-10-06 19:29:16 +01:00
a396b0f002 [28.09.20] Logback and MDC ids 2020-09-28 17:55:42 +01:00
9603794f91 [26.09.20] Pipeline changes 2020-09-26 19:14:56 +01:00
9434872193 [25.09.20] Added GatewayDLQ use 2020-09-25 13:57:34 +01:00
cd82fec43a [19.09.20] Uncommented to allow saving to database 2020-09-19 20:34:37 +01:00
753ca449c4 [19.09.20] Changed cluster name 2020-09-19 19:24:22 +01:00
23 changed files with 548 additions and 296 deletions

View File

@ -97,7 +97,7 @@ spec:
cpu: 10m
memory: 256Mi
limits:
cpu: 250m
cpu: 100m
memory: 512Mi
securityContext:
capabilities:

View File

@ -32,7 +32,7 @@ try {
timestamps {
node ("${env.SLAVE_LABEL}") {
stage('Initialise') {
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: 'https://github.com/andyjk15/db-gateway.git']]])
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]])
env.APPLICATION_VERSION = get_application_version()
@ -47,7 +47,7 @@ try {
) {
sh "doctl auth init --access-token ${DOCTL_TOKEN}"
sh "doctl registry login"
sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster"
sh "doctl kubernetes cluster kubeconfig save cryptosky-cluster"
}
}
@ -83,7 +83,6 @@ try {
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}"
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest"
}
}
stage('Tag Repository') {

View File

@ -5,6 +5,4 @@ APPLICATION_NAME=$1
kubectl apply -f configuration/kubernetes/deployment.yaml
kubectl apply -f configuration/kubernetes/service.yaml
kubectl get pods
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production

47
pom.xml
View File

@ -21,6 +21,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<logback.contrib.version>0.1.5</logback.contrib.version>
</properties>
<dependencies>
@ -47,23 +48,6 @@
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- GraghQL -->
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-java-tools</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphiql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<!-- Local database -->
<dependency>
<groupId>com.h2database</groupId>
@ -83,6 +67,23 @@
<version>4.3.8.Final</version>
</dependency>
<!-- GraghQL -->
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-java-tools</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphiql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<!-- Artemis -->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -97,6 +98,18 @@
<version>1.18.8</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--- Logging -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.4</version>
</dependency>
<!-- Testing -->
<dependency>

View File

@ -4,7 +4,8 @@ public enum ArtemisSyncMessaging {
MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT");
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT"),
CONSUME_SENTIMENT_EVENT("CONSUME_SENTIMENT_EVENT");
private final String syncMessage;

View File

@ -17,12 +17,14 @@ import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.helpers.Utils.getCorrelationId;
import static cryptosky.me.helpers.Utils.getSyncId;
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.lang.String.format;
@Component
@ -33,6 +35,7 @@ public class PriceConsumer {
private final PriceService priceService;
private final DlqService dlqService;
private final String dlqName;
private final String dlqGateway;
private CountDownLatch latch = new CountDownLatch(1);
@ -41,22 +44,28 @@ public class PriceConsumer {
ObjectMapper objectMapper,
PriceService priceService,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
@Value("${destinations.pricing.priceDlq}") String dlqName,
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
) {
this.objectMapper = objectMapper;
this.priceService = priceService;
this.dlqService = dlqService;
this.dlqName = dlqName;
this.dlqGateway = dlqGateway;
}
@JmsListener(destination = "${destinations.pricing.priceSave}")
public void receive(TextMessage message) throws JMSException {
String correlationId = getCorrelationId(message);
String syncId = UUID.randomUUID().toString();
String syncId = getSyncId(message);
logger.info("Received Message: " + message.getBody(String.class));
setCorrelationId(correlationId);
setSyncId(syncId);
logger.info("Received Message: " + message.getBody(String.class) + " for syncId :: [{}]", syncId);
try {
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
throw new NoBodyOrStringException();
}
@ -69,9 +78,9 @@ public class PriceConsumer {
JsonNode messageJson = objectMapper.readTree(message.getText());
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel);
logger.info("Message with syncId of [{}] is for [{}]", syncId, cryptoPriceModel);
priceService.createRecord(cryptoPriceModel, correlationId, syncId, message);
priceService.createRecord(cryptoPriceModel, correlationId, message);
latch.countDown();
} catch (IOException e) {
@ -100,7 +109,20 @@ public class PriceConsumer {
});
latch.countDown();
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_PRICE_EVENT, jmsException,
correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqGateway,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (Exception e) {
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);

View File

@ -0,0 +1,144 @@
package cryptosky.me.consumers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.NoBodyOrStringException;
import cryptosky.me.sentiment.models.entities.SentimentModel;
import cryptosky.me.sentiment.service.SentimentService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.helpers.Utils.getCorrelationId;
import static cryptosky.me.helpers.Utils.getSyncId;
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.lang.String.format;
@Component
public class TweetConsumer {
private final Logger logger = LoggerFactory.getLogger(TweetConsumer.class);
private final ObjectMapper objectMapper;
private final SentimentService sentimentService;
private final DlqService dlqService;
private final String dlqName;
private final String dlqGateway;
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
public TweetConsumer(
ObjectMapper objectMapper,
SentimentService sentimentService,
DlqService dlqService,
@Value("${destinations.tweet.tweetDlq}") String dlqName,
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
) {
this.objectMapper = objectMapper;
this.sentimentService = sentimentService;
this.dlqService = dlqService;
this.dlqName = dlqName;
this.dlqGateway = dlqGateway;
}
@JmsListener(destination = "${destinations.tweet.tweetSave}")
public void receive(TextMessage message) throws JMSException {
String correlationId = getCorrelationId(message);
String syncId = getSyncId(message);
setCorrelationId(correlationId);
setSyncId(syncId);
logger.info("Received Message: " + message.getBody(String.class));
try {
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
throw new NoBodyOrStringException();
}
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
SentimentModel sentimentModel;
try {
JsonNode messageJson = objectMapper.readTree(message.getText());
sentimentModel = objectMapper.readValue(messageJson.traverse(), SentimentModel.class);
logger.info("Message with syncId of [{}] is for [{}]", syncId, sentimentModel);
sentimentService.createRecord(sentimentModel, correlationId, message);
latch.countDown();
} catch (IOException e) {
logger.info(format(
"Message [%s] for sentimentModel [%s] was not synced due to a readTree exception on getting the text",
syncId,
correlationId
), e);
latch.countDown();
throw e;
}
} catch (NoBodyOrStringException e) {
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
CONSUME_SENTIMENT_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (JMSException jmsException) {
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_SENTIMENT_EVENT, jmsException,
correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqGateway,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (Exception e) {
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_SENTIMENT_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
}
}
}

View File

@ -16,7 +16,8 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class Utils {
private static final String CORRELATION_ID_KEY = "CS-Correlation-ID";
private static final String CORRELATION_ID_KEY = "X-CRYPTO-Correlation-ID";
private static final String SYNC_ID_KEY = "X-CRYPTO-Sync-ID";
public static String getCorrelationId(Message message) {
String correlationId = null;
@ -34,6 +35,19 @@ public class Utils {
return correlationId;
}
public static String getSyncId(Message message) {
String syncId = null;
try {
syncId = message.getStringProperty(SYNC_ID_KEY);
} catch (Throwable e) {
// NOOP
}
if (isBlank(syncId)) {
syncId = randomUUID().toString();
}
return syncId;
}
// Utility Function to be able to get one of all types
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
Map<Object, Boolean> seen = new ConcurrentHashMap<>();

View File

@ -0,0 +1,22 @@
package cryptosky.me.logging;
import org.slf4j.MDC;
public class CorrelationInfo {
public static final String CORRELATION_ID = "X-CRYPTO-Correlation-ID";
public static final String SYNC_ID = "X-CRYPTO-Sync-ID";
public static void setCorrelationId(String correlationId) {
MDC.put(CORRELATION_ID, correlationId);
}
public static void setSyncId(String syncId) {
MDC.put(SYNC_ID, syncId);
}
public static void clear() {
MDC.remove(CORRELATION_ID);
MDC.remove(SYNC_ID);
}
}

View File

@ -1,26 +0,0 @@
package cryptosky.me.pricing.graphql.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cryptosky.me.pricing.service.PriceService;
@Component
public class BtcPriceMutation implements GraphQLMutationResolver {
@Autowired
private PriceService priceService;
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float o_price,
final float c_price,
final float volume ) {
return this.priceService.createBtc(createdDate, type, av_price, h_price, l_price, o_price, c_price, volume);
}
}

View File

@ -2,7 +2,6 @@ package cryptosky.me.pricing.graphql.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.service.PriceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

View File

@ -9,7 +9,6 @@ import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import cryptosky.me.exceptions.DatabaseViolationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,16 +21,16 @@ import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.time.LocalDateTime;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.*;
import static cryptosky.me.helpers.Utils.format;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.util.UUID.randomUUID;
@Service
public class PriceService {
@ -54,31 +53,11 @@ public class PriceService {
}
@Transactional
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float o_price,
final float c_price,
final float volume ) {
final BtcPriceModel btcPrice = new BtcPriceModel();
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
btcPrice.setType(type);
btcPrice.setAverage_price(av_price);
btcPrice.setHigh_price(h_price);
btcPrice.setLow_price(l_price);
btcPrice.setOpen_price(o_price);
btcPrice.setClose_price(c_price);
btcPrice.setVolume(volume);
return btcPrice;
// return this.btcPriceRepository.save(btcPrice);
}
@Transactional
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, String syncId, TextMessage message) {
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, TextMessage message) {
String syncId = randomUUID().toString();
setSyncId(syncId);
try {
switch (cryptoPriceModel.getType()) {
case BTC:
BtcPriceModel priceModel = new BtcPriceModel();
@ -94,7 +73,7 @@ public class PriceService {
priceModel.setVolume(cryptoPriceModel.getVolume());
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", cryptoPriceModel.getType(), correlationId, syncId);
// this.btcPriceRepository.save(priceModel);
this.btcPriceRepository.save(priceModel);
return;
default:
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), cryptoPriceModel.getType(), MessageCodes.E01.getMessage());
@ -122,7 +101,7 @@ public class PriceService {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
} catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
@ -136,6 +115,7 @@ public class PriceService {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}

View File

@ -1,4 +1,4 @@
package cryptosky.me.tweets.models.entities;
package cryptosky.me.sentiment.models.entities;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -9,7 +9,7 @@ import javax.persistence.Table;
@EqualsAndHashCode(callSuper = true)
@Data
@Entity
@Table(name = "btc_tweet")
public class BtcTweetModel extends TweetModel {
@Table(name = "btc_sentiment")
public class BtcSentimentModel extends SentimentModel {
}

View File

@ -1,4 +1,4 @@
package cryptosky.me.tweets.models.entities;
package cryptosky.me.sentiment.models.entities;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -10,21 +10,17 @@ import javax.persistence.*;
@NoArgsConstructor
@Data
@MappedSuperclass
public class TweetModel {
public class SentimentModel {
@Id
@Column(name = "ID", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
@Column(name = "timestamp",nullable = false)
private String timestamp;
@Column(name = "raw_tweet", nullable = false)
private String rawTweet;
@Column(name = "sentiment")
private float sentimentScore;
@Column(name = "syncId", nullable = false)
private String syncId;
@Column(name = "pos")
private float positiveScore;
@ -38,4 +34,7 @@ public class TweetModel {
@Column(name = "compound", nullable = false)
private float compoundScore;
@Column(name = "type")
private String type;
}

View File

@ -0,0 +1,9 @@
package cryptosky.me.sentiment.models.repositories;
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BtcSentimentRepository extends JpaRepository<BtcSentimentModel, Integer> {
}

View File

@ -0,0 +1,207 @@
package cryptosky.me.sentiment.service;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.MessageCodes;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
import cryptosky.me.sentiment.models.entities.SentimentModel;
import cryptosky.me.sentiment.models.repositories.BtcSentimentRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.BTC;
import static cryptosky.me.helpers.Utils.format;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.util.UUID.randomUUID;
@Service
public class SentimentService {
private final BtcSentimentRepository btcSentimentRepository;
private final DlqService dlqService;
private final String dlqName;
private final Logger logger = LoggerFactory.getLogger(SentimentService.class);
@Autowired
public SentimentService(
final BtcSentimentRepository btcSentimentRepository,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
) {
this.btcSentimentRepository = btcSentimentRepository;
this.dlqService = dlqService;
this.dlqName = dlqName;
}
// @Transactional
// public BtcSentimentModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
// final float positiveScore, final float neutralScore, final float negativeScore,
// final float compoundScore ) {
//
// final BtcSentimentModel tweetModel = new BtcSentimentModel();
// tweetModel.setTimestamp(format(createdDate).toString());
// tweetModel.setRawTweet(rawTweet);
// tweetModel.setSentimentScore(sentimentScore);
// tweetModel.setPositiveScore(positiveScore);
// tweetModel.setNegativeScore(negativeScore);
// tweetModel.setNeutralScore(neutralScore);
// tweetModel.setCompoundScore(compoundScore);
// return this.btcSentimentRepository.save(tweetModel);
// }
@Transactional
public void createRecord(SentimentModel sentimentModel, String correlationId, TextMessage message) {
String syncId = randomUUID().toString();
setSyncId(syncId);
try {
switch (sentimentModel.getType()) {
case BTC:
BtcSentimentModel btcSentimentModel = new BtcSentimentModel();
btcSentimentModel.setId(getLatestId() + 1);
btcSentimentModel.setTimestamp(sentimentModel.getTimestamp());
btcSentimentModel.setSyncId(syncId);
btcSentimentModel.setPositiveScore(sentimentModel.getPositiveScore());
btcSentimentModel.setNeutralScore(sentimentModel.getNeutralScore());
btcSentimentModel.setNegativeScore(sentimentModel.getNegativeScore());
btcSentimentModel.setCompoundScore(sentimentModel.getCompoundScore());
btcSentimentModel.setType(sentimentModel.getType());
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", sentimentModel.getType(), correlationId, syncId);
this.btcSentimentRepository.save(btcSentimentModel);
return;
default:
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), sentimentModel.getType(), MessageCodes.E01.getMessage());
throw new NotSupportedCurrencyTypeException();
}
} catch (DataIntegrityViolationException e) {
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), sentimentModel.getType(), MessageCodes.E02.getMessage());
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
} catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
}
}
@Transactional(readOnly = true)
public int getLatestId() {
List<BtcSentimentModel> records = new ArrayList<>(this.btcSentimentRepository.findAll());
BtcSentimentModel latest = records.stream().skip(records.size() - 1).findFirst().get();
return latest.getId();
}
@Transactional(readOnly = true)
public Optional<BtcSentimentModel> getCurrentTweet() {
return this.btcSentimentRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getAllTweets(final int count ) {
return this.btcSentimentRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getTweetsForDay(final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcSentimentRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
}
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getTweetsForPeriod(final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcSentimentRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
}

View File

@ -1,9 +0,0 @@
package cryptosky.me.tweets.models.repositories;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BtcTweetRepository extends JpaRepository<BtcTweetModel, Integer> {
}

View File

@ -1,22 +0,0 @@
package cryptosky.me.tweets.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class BtcTweetMutation implements GraphQLMutationResolver {
@Autowired
BtcTweetService btcTweetService;
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
final float positiveScore, final float neutralScore, final float negativeScore,
final float compoundScore ) {
return this.btcTweetService.createTweet( createdDate, rawTweet, sentimentScore, positiveScore, neutralScore,
negativeScore,compoundScore );
}
}

View File

@ -1,33 +0,0 @@
package cryptosky.me.tweets.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Component
public class BtcTweetQuery implements GraphQLQueryResolver {
@Autowired
private BtcTweetService btcTweetService;
public Optional<BtcTweetModel> getCurrentTweet() {
return this.btcTweetService.getCurrentTweet();
}
public List<BtcTweetModel> getAllTweets( final int count ) {
return this.btcTweetService.getAllTweets(count);
}
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
return this.btcTweetService.getTweetsForDay(startDate, endDate);
}
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
return this.btcTweetService.getTweetsForPeriod( startDate, endDate );
}
}

View File

@ -1,101 +0,0 @@
package cryptosky.me.tweets.service;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.models.repositories.BtcTweetRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static cryptosky.me.helpers.Utils.format;
@Service
public class BtcTweetService {
private final BtcTweetRepository btcTweetRepository;
public BtcTweetService(final BtcTweetRepository btcTweetRepository) {
this.btcTweetRepository = btcTweetRepository;
}
@Transactional
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
final float positiveScore, final float neutralScore, final float negativeScore,
final float compoundScore ) {
final BtcTweetModel tweetModel = new BtcTweetModel();
tweetModel.setTimestamp(format(createdDate).toString());
tweetModel.setRawTweet(rawTweet);
tweetModel.setSentimentScore(sentimentScore);
tweetModel.setPositiveScore(positiveScore);
tweetModel.setNegativeScore(negativeScore);
tweetModel.setNeutralScore(neutralScore);
tweetModel.setCompoundScore(compoundScore);
return this.btcTweetRepository.save(tweetModel);
}
@Transactional(readOnly = true)
public Optional<BtcTweetModel> getCurrentTweet() {
return this.btcTweetRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getAllTweets( final int count ) {
return this.btcTweetRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcTweetRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
}
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcTweetRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
}

View File

@ -30,6 +30,11 @@ destinations:
pricing:
priceSave: PricingSave
priceDlq: PricingSave.dlq
tweet:
tweetSave: TweetSave
tweetDlq: TweetSave.dlq
gateway:
gatewayDlq: DBGateway.dlq
server:
port: 9090

View File

@ -10,7 +10,7 @@ type BtcPrice {
volume: Float
}
type tweetModel {
type sentimentModel {
id: ID!,
timestamp: String!,
rawTweet: String,
@ -28,13 +28,8 @@ type Query {
priceForCreatedDate(createdDate: String):BtcPrice,
priceBetweenDates(startDate: String, endDate: String):[BtcPrice],
################################################################
currentTweet:tweetModel,
allTweets(count: Int):[tweetModel],
tweetsForDay(startDate: String, endDate: String):[tweetModel],
tweetsForPeriod(startDate: String, endDate: String):[tweetModel]
}
type Mutation {
createBtc(createdDate: String!, type: String!, average_price: Float!, high_price: Float, low_price: Float, open_price: Float, close_price: Float, volume: Float):BtcPrice
createTweet(createdDate: String!, rawTweet: String!, sentimentScore: Float!, positiveScore: Float, neutralScore: Float, negativeScore: Float, compoundScore: Float!):tweetModel
currentTweet:sentimentModel,
allTweets(count: Int):[sentimentModel],
tweetsForDay(startDate: String, endDate: String):[sentimentModel],
tweetsForPeriod(startDate: String, endDate: String):[sentimentModel]
}

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOGS" value="./logs" />
<appender name="logstash" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<fieldNames>
<version>[ignore]</version>
<levelValue>[ignore]</levelValue>
</fieldNames>
</encoder>
</appender>
<appender name="console"
class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg {%mdc}%n%throwable
</Pattern>
</layout>
</appender>
<springProfile name="!local">
<root level="INFO">
<appender-ref ref="logstash"/>
</root>
</springProfile>
<springProfile name="local">
<root level="INFO">
<appender-ref ref="console"/>
</root>
</springProfile>
<jmxConfigurator/>
</configuration>