diff --git a/configuration/kubernetes/deployment.yaml b/configuration/kubernetes/deployment.yaml index 3d2cde7..50ae9e5 100644 --- a/configuration/kubernetes/deployment.yaml +++ b/configuration/kubernetes/deployment.yaml @@ -97,7 +97,7 @@ spec: cpu: 10m memory: 256Mi limits: - cpu: 250m + cpu: 100m memory: 512Mi securityContext: capabilities: diff --git a/src/main/java/cryptosky/me/ArtemisSyncMessaging.java b/src/main/java/cryptosky/me/ArtemisSyncMessaging.java index 0a2c7f3..05a2402 100644 --- a/src/main/java/cryptosky/me/ArtemisSyncMessaging.java +++ b/src/main/java/cryptosky/me/ArtemisSyncMessaging.java @@ -4,7 +4,8 @@ public enum ArtemisSyncMessaging { MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"), SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"), - CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT"); + CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT"), + CONSUME_SENTIMENT_EVENT("CONSUME_SENTIMENT_EVENT"); private final String syncMessage; diff --git a/src/main/java/cryptosky/me/consumers/PriceConsumer.java b/src/main/java/cryptosky/me/consumers/PriceConsumer.java index 0ebb28a..3d28aa8 100644 --- a/src/main/java/cryptosky/me/consumers/PriceConsumer.java +++ b/src/main/java/cryptosky/me/consumers/PriceConsumer.java @@ -17,12 +17,12 @@ import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.TextMessage; import java.io.IOException; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import static cryptosky.me.ArtemisSyncMessaging.*; import static cryptosky.me.helpers.Utils.getCorrelationId; +import static cryptosky.me.helpers.Utils.getSyncId; import static cryptosky.me.logging.CorrelationInfo.setCorrelationId; import static cryptosky.me.logging.CorrelationInfo.setSyncId; import static java.lang.String.format; @@ -57,12 +57,12 @@ public class PriceConsumer { @JmsListener(destination = "${destinations.pricing.priceSave}") public void receive(TextMessage message) throws JMSException { String correlationId = getCorrelationId(message); - String syncId = UUID.randomUUID().toString(); + String syncId = getSyncId(message); setCorrelationId(correlationId); - setSyncId(""); + setSyncId(syncId); - logger.info("Received Message: " + message.getBody(String.class)); + logger.info("Received Message: " + message.getBody(String.class) + " for syncId :: [{}]", syncId); try { @@ -80,7 +80,7 @@ public class PriceConsumer { logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel); - priceService.createRecord(cryptoPriceModel, correlationId, syncId, message); + priceService.createRecord(cryptoPriceModel, correlationId, message); latch.countDown(); } catch (IOException e) { diff --git a/src/main/java/cryptosky/me/consumers/TweetConsumer.java b/src/main/java/cryptosky/me/consumers/TweetConsumer.java new file mode 100644 index 0000000..5fc640e --- /dev/null +++ b/src/main/java/cryptosky/me/consumers/TweetConsumer.java @@ -0,0 +1,144 @@ +package cryptosky.me.consumers; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import cryptosky.me.dlq.DlqInfo; +import cryptosky.me.dlq.DlqService; +import cryptosky.me.exceptions.NoBodyOrStringException; +import cryptosky.me.sentiment.models.entities.SentimentModel; +import cryptosky.me.sentiment.service.SentimentService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Component; + +import javax.jms.JMSException; +import javax.jms.TextMessage; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import static cryptosky.me.ArtemisSyncMessaging.*; +import static cryptosky.me.helpers.Utils.getCorrelationId; +import static cryptosky.me.helpers.Utils.getSyncId; +import static cryptosky.me.logging.CorrelationInfo.setCorrelationId; +import static cryptosky.me.logging.CorrelationInfo.setSyncId; +import static java.lang.String.format; + +@Component +public class TweetConsumer { + + private final Logger logger = LoggerFactory.getLogger(TweetConsumer.class); + private final ObjectMapper objectMapper; + private final SentimentService sentimentService; + private final DlqService dlqService; + private final String dlqName; + private final String dlqGateway; + + private CountDownLatch latch = new CountDownLatch(1); + + @Autowired + public TweetConsumer( + ObjectMapper objectMapper, + SentimentService sentimentService, + DlqService dlqService, + @Value("${destinations.tweet.tweetDlq}") String dlqName, + @Value("${destinations.gateway.gatewayDlq}") String dlqGateway + ) { + this.objectMapper = objectMapper; + this.sentimentService = sentimentService; + this.dlqService = dlqService; + this.dlqName = dlqName; + this.dlqGateway = dlqGateway; + } + + @JmsListener(destination = "${destinations.tweet.tweetSave}") + public void receive(TextMessage message) throws JMSException { + String correlationId = getCorrelationId(message); + String syncId = getSyncId(message); + + setCorrelationId(correlationId); + setSyncId(syncId); + + logger.info("Received Message: " + message.getBody(String.class)); + + try { + + if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) { + throw new NoBodyOrStringException(); + } + + logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId); + + SentimentModel sentimentModel; + + try { + JsonNode messageJson = objectMapper.readTree(message.getText()); + sentimentModel = objectMapper.readValue(messageJson.traverse(), SentimentModel.class); + + logger.info("Message with syncId of [{}] is for customer [{}]", syncId, sentimentModel); + + sentimentService.createRecord(sentimentModel, correlationId, message); + + latch.countDown(); + } catch (IOException e) { + logger.info(format( + "Message [%s] for sentimentModel [%s] was not synced due to a readTree exception on getting the text", + syncId, + correlationId + ), e); + latch.countDown(); + throw e; + } + } catch (NoBodyOrStringException e) { + String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]", + CONSUME_SENTIMENT_EVENT, correlationId); + + DlqInfo dlqInfo = new DlqInfo( + message.getJMSMessageID(), + dlqName, + dlqReason, + "0", + correlationId + ); + dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> { + textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); + return textMessage; + }); + latch.countDown(); + } catch (JMSException jmsException) { + String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_SENTIMENT_EVENT, jmsException, + correlationId); + + DlqInfo dlqInfo = new DlqInfo( + message.getJMSMessageID(), + dlqGateway, + dlqReason, + "0", + correlationId + ); + dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> { + textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); + return textMessage; + }); + latch.countDown(); + } catch (Exception e) { + String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_SENTIMENT_EVENT, correlationId); + + DlqInfo dlqInfo = new DlqInfo( + message.getJMSMessageID(), + dlqName, + dlqReason, + "0", + correlationId + ); + dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> { + textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); + return textMessage; + }); + latch.countDown(); + } + } + +} \ No newline at end of file diff --git a/src/main/java/cryptosky/me/helpers/Utils.java b/src/main/java/cryptosky/me/helpers/Utils.java index c2925c0..505b37b 100644 --- a/src/main/java/cryptosky/me/helpers/Utils.java +++ b/src/main/java/cryptosky/me/helpers/Utils.java @@ -17,6 +17,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; public class Utils { private static final String CORRELATION_ID_KEY = "X-CRYPTO-Correlation-ID"; + private static final String SYNC_ID_KEY = "X-CRYPTO-Sync-ID"; public static String getCorrelationId(Message message) { String correlationId = null; @@ -34,6 +35,19 @@ public class Utils { return correlationId; } + public static String getSyncId(Message message) { + String syncId = null; + try { + syncId = message.getStringProperty(SYNC_ID_KEY); + } catch (Throwable e) { + // NOOP + } + if (isBlank(syncId)) { + syncId = randomUUID().toString(); + } + return syncId; + } + // Utility Function to be able to get one of all types public static Predicate distinctByKey(Function keyExtractor) { Map seen = new ConcurrentHashMap<>(); diff --git a/src/main/java/cryptosky/me/pricing/service/PriceService.java b/src/main/java/cryptosky/me/pricing/service/PriceService.java index 25dd97c..4b24690 100644 --- a/src/main/java/cryptosky/me/pricing/service/PriceService.java +++ b/src/main/java/cryptosky/me/pricing/service/PriceService.java @@ -33,6 +33,7 @@ import static cryptosky.me.SupportedCurrencies.*; import static cryptosky.me.helpers.Utils.format; import static cryptosky.me.logging.CorrelationInfo.setSyncId; +import static java.util.UUID.randomUUID; @Service public class PriceService { @@ -78,9 +79,10 @@ public class PriceService { } @Transactional - public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, String syncId, TextMessage message) { + public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, TextMessage message) { + String syncId = randomUUID().toString(); + setSyncId(syncId); try { - setSyncId(syncId); switch (cryptoPriceModel.getType()) { case BTC: @@ -125,7 +127,7 @@ public class PriceService { logger.error("An Exception has occurred with a JMS action [%s]", jmsException); } } catch (Exception e) { - String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId); + String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId); try { DlqInfo dlqInfo = new DlqInfo( @@ -139,6 +141,7 @@ public class PriceService { textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); return textMessage; }); + logger.info("Sending [{}] to the DLQ", syncId); } catch (JMSException jmsException) { logger.error("An Exception has occurred with a JMS action [%s]", jmsException); } diff --git a/src/main/java/cryptosky/me/tweets/models/entities/BtcTweetModel.java b/src/main/java/cryptosky/me/sentiment/models/entities/BtcSentimentModel.java similarity index 56% rename from src/main/java/cryptosky/me/tweets/models/entities/BtcTweetModel.java rename to src/main/java/cryptosky/me/sentiment/models/entities/BtcSentimentModel.java index f58fe6b..204f7dd 100644 --- a/src/main/java/cryptosky/me/tweets/models/entities/BtcTweetModel.java +++ b/src/main/java/cryptosky/me/sentiment/models/entities/BtcSentimentModel.java @@ -1,4 +1,4 @@ -package cryptosky.me.tweets.models.entities; +package cryptosky.me.sentiment.models.entities; import lombok.Data; import lombok.EqualsAndHashCode; @@ -9,7 +9,7 @@ import javax.persistence.Table; @EqualsAndHashCode(callSuper = true) @Data @Entity -@Table(name = "btc_tweet") -public class BtcTweetModel extends TweetModel { +@Table(name = "btc_sentiment") +public class BtcSentimentModel extends SentimentModel { } diff --git a/src/main/java/cryptosky/me/tweets/models/entities/TweetModel.java b/src/main/java/cryptosky/me/sentiment/models/entities/SentimentModel.java similarity index 75% rename from src/main/java/cryptosky/me/tweets/models/entities/TweetModel.java rename to src/main/java/cryptosky/me/sentiment/models/entities/SentimentModel.java index f0588f7..96423fd 100644 --- a/src/main/java/cryptosky/me/tweets/models/entities/TweetModel.java +++ b/src/main/java/cryptosky/me/sentiment/models/entities/SentimentModel.java @@ -1,4 +1,4 @@ -package cryptosky.me.tweets.models.entities; +package cryptosky.me.sentiment.models.entities; import lombok.AllArgsConstructor; import lombok.Data; @@ -10,7 +10,7 @@ import javax.persistence.*; @NoArgsConstructor @Data @MappedSuperclass -public class TweetModel { +public class SentimentModel { @Id @Column(name = "ID", nullable = false) @@ -20,11 +20,8 @@ public class TweetModel { @Column(name = "timestamp",nullable = false) private String timestamp; - @Column(name = "raw_tweet", nullable = false) - private String rawTweet; - - @Column(name = "sentiment") - private float sentimentScore; + @Column(name = "syncId", nullable = false) + private String syncId; @Column(name = "pos") private float positiveScore; @@ -38,4 +35,7 @@ public class TweetModel { @Column(name = "compound", nullable = false) private float compoundScore; + @Column(name = "type") + private String type; + } diff --git a/src/main/java/cryptosky/me/sentiment/models/repositories/BtcSentimentRepository.java b/src/main/java/cryptosky/me/sentiment/models/repositories/BtcSentimentRepository.java new file mode 100644 index 0000000..1516413 --- /dev/null +++ b/src/main/java/cryptosky/me/sentiment/models/repositories/BtcSentimentRepository.java @@ -0,0 +1,9 @@ +package cryptosky.me.sentiment.models.repositories; + +import cryptosky.me.sentiment.models.entities.BtcSentimentModel; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface BtcSentimentRepository extends JpaRepository { +} diff --git a/src/main/java/cryptosky/me/sentiment/service/SentimentService.java b/src/main/java/cryptosky/me/sentiment/service/SentimentService.java new file mode 100644 index 0000000..65551ae --- /dev/null +++ b/src/main/java/cryptosky/me/sentiment/service/SentimentService.java @@ -0,0 +1,207 @@ +package cryptosky.me.sentiment.service; + +import cryptosky.me.dlq.DlqInfo; +import cryptosky.me.dlq.DlqService; +import cryptosky.me.exceptions.MessageCodes; +import cryptosky.me.exceptions.NotSupportedCurrencyTypeException; +import cryptosky.me.pricing.models.entities.BtcPriceModel; +import cryptosky.me.sentiment.models.entities.BtcSentimentModel; +import cryptosky.me.sentiment.models.entities.SentimentModel; +import cryptosky.me.sentiment.models.repositories.BtcSentimentRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.jms.JMSException; +import javax.jms.TextMessage; +import java.time.DateTimeException; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static cryptosky.me.ArtemisSyncMessaging.*; +import static cryptosky.me.SupportedCurrencies.BTC; +import static cryptosky.me.helpers.Utils.format; +import static cryptosky.me.logging.CorrelationInfo.setSyncId; +import static java.util.UUID.randomUUID; + +@Service +public class SentimentService { + + private final BtcSentimentRepository btcSentimentRepository; + private final DlqService dlqService; + private final String dlqName; + + private final Logger logger = LoggerFactory.getLogger(SentimentService.class); + + @Autowired + public SentimentService( + final BtcSentimentRepository btcSentimentRepository, + DlqService dlqService, + @Value("${destinations.pricing.priceDlq}") String dlqName + ) { + this.btcSentimentRepository = btcSentimentRepository; + this.dlqService = dlqService; + this.dlqName = dlqName; + } + +// @Transactional +// public BtcSentimentModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore, +// final float positiveScore, final float neutralScore, final float negativeScore, +// final float compoundScore ) { +// +// final BtcSentimentModel tweetModel = new BtcSentimentModel(); +// tweetModel.setTimestamp(format(createdDate).toString()); +// tweetModel.setRawTweet(rawTweet); +// tweetModel.setSentimentScore(sentimentScore); +// tweetModel.setPositiveScore(positiveScore); +// tweetModel.setNegativeScore(negativeScore); +// tweetModel.setNeutralScore(neutralScore); +// tweetModel.setCompoundScore(compoundScore); + +// return this.btcSentimentRepository.save(tweetModel); +// } + + @Transactional + public void createRecord(SentimentModel sentimentModel, String correlationId, TextMessage message) { + String syncId = randomUUID().toString(); + setSyncId(syncId); + + try { + + switch (sentimentModel.getType()) { + case BTC: + BtcSentimentModel btcSentimentModel = new BtcSentimentModel(); + btcSentimentModel.setId(getLatestId() + 1); + btcSentimentModel.setTimestamp(sentimentModel.getTimestamp()); + btcSentimentModel.setSyncId(syncId); + btcSentimentModel.setPositiveScore(sentimentModel.getPositiveScore()); + btcSentimentModel.setNeutralScore(sentimentModel.getNeutralScore()); + btcSentimentModel.setNegativeScore(sentimentModel.getNegativeScore()); + btcSentimentModel.setCompoundScore(sentimentModel.getCompoundScore()); + btcSentimentModel.setType(sentimentModel.getType()); + + logger.info("Saving {} record to the database for [{}] with syncId of [{}]", sentimentModel.getType(), correlationId, syncId); + this.btcSentimentRepository.save(btcSentimentModel); + return; + default: + logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), sentimentModel.getType(), MessageCodes.E01.getMessage()); + throw new NotSupportedCurrencyTypeException(); + } + } catch (DataIntegrityViolationException e) { + logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), sentimentModel.getType(), MessageCodes.E02.getMessage()); + + String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId); + + try { + DlqInfo dlqInfo = new DlqInfo( + message.getJMSMessageID(), + dlqName, + dlqReason, + "0", + correlationId + ); + dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> { + textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); + return textMessage; + }); + logger.info("Sending [{}] to the DLQ", syncId); + } catch (JMSException jmsException) { + logger.error("An Exception has occurred with a JMS action [%s]", jmsException); + } + } catch (Exception e) { + String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId); + + try { + DlqInfo dlqInfo = new DlqInfo( + message.getJMSMessageID(), + dlqName, + dlqReason, + "0", + correlationId + ); + dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> { + textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); + return textMessage; + }); + logger.info("Sending [{}] to the DLQ", syncId); + } catch (JMSException jmsException) { + logger.error("An Exception has occurred with a JMS action [%s]", jmsException); + } + } + + } + + @Transactional(readOnly = true) + public int getLatestId() { + List records = new ArrayList<>(this.btcSentimentRepository.findAll()); + BtcSentimentModel latest = records.stream().skip(records.size() - 1).findFirst().get(); + return latest.getId(); + } + + @Transactional(readOnly = true) + public Optional getCurrentTweet() { + return this.btcSentimentRepository.findAll().stream().findFirst(); + } + + @Transactional(readOnly = true) + public List getAllTweets(final int count ) { + return this.btcSentimentRepository.findAll().stream() + .limit(count) + .collect(Collectors.toList()); + } + + @Transactional(readOnly = true) + public List getTweetsForDay(final String startDate, final String endDate ) { + LocalDateTime r_start = format(startDate); + LocalDateTime r_end = format(endDate); + + r_start = r_start.toLocalDate().atStartOfDay(); + r_end = r_end.toLocalDate().atTime(LocalTime.MAX); + + if ( r_end.isAfter(r_start) ) { + if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) { + LocalDateTime finalR_end = r_end; + LocalDateTime finalR_start = r_start; + return this.btcSentimentRepository.findAll().stream() + .filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end)) + .filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start)) + .collect(Collectors.toList()); + } else { + // Logger + throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day"); + } + } else { + // Logger + throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start); + } + } + + @Transactional(readOnly = true) + public List getTweetsForPeriod(final String startDate, final String endDate ) { + LocalDateTime r_start = format(startDate); + LocalDateTime r_end = format(endDate); + + r_start = r_start.toLocalDate().atStartOfDay(); + r_end = r_end.toLocalDate().atTime(LocalTime.MAX); + + if ( r_end.isAfter(r_start) ) { + LocalDateTime finalR_end = r_end; + LocalDateTime finalR_start = r_start; + return this.btcSentimentRepository.findAll().stream() + .filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end)) + .filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start)) + .collect(Collectors.toList()); + } else { + // Logger + throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start); + } + } +} diff --git a/src/main/java/cryptosky/me/tweets/models/repositories/BtcTweetRepository.java b/src/main/java/cryptosky/me/tweets/models/repositories/BtcTweetRepository.java deleted file mode 100644 index 9bd9402..0000000 --- a/src/main/java/cryptosky/me/tweets/models/repositories/BtcTweetRepository.java +++ /dev/null @@ -1,9 +0,0 @@ -package cryptosky.me.tweets.models.repositories; - -import cryptosky.me.tweets.models.entities.BtcTweetModel; -import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.stereotype.Repository; - -@Repository -public interface BtcTweetRepository extends JpaRepository { -} diff --git a/src/main/java/cryptosky/me/tweets/mutations/BtcTweetMutation.java b/src/main/java/cryptosky/me/tweets/mutations/BtcTweetMutation.java deleted file mode 100644 index 48d93f0..0000000 --- a/src/main/java/cryptosky/me/tweets/mutations/BtcTweetMutation.java +++ /dev/null @@ -1,22 +0,0 @@ -package cryptosky.me.tweets.mutations; - -import com.coxautodev.graphql.tools.GraphQLMutationResolver; -import cryptosky.me.tweets.models.entities.BtcTweetModel; -import cryptosky.me.tweets.service.BtcTweetService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class BtcTweetMutation implements GraphQLMutationResolver { - - @Autowired - BtcTweetService btcTweetService; - - public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore, - final float positiveScore, final float neutralScore, final float negativeScore, - final float compoundScore ) { - return this.btcTweetService.createTweet( createdDate, rawTweet, sentimentScore, positiveScore, neutralScore, - negativeScore,compoundScore ); - } - -} diff --git a/src/main/java/cryptosky/me/tweets/queries/BtcTweetQuery.java b/src/main/java/cryptosky/me/tweets/queries/BtcTweetQuery.java deleted file mode 100644 index 76d94db..0000000 --- a/src/main/java/cryptosky/me/tweets/queries/BtcTweetQuery.java +++ /dev/null @@ -1,33 +0,0 @@ -package cryptosky.me.tweets.queries; - -import com.coxautodev.graphql.tools.GraphQLQueryResolver; -import cryptosky.me.tweets.models.entities.BtcTweetModel; -import cryptosky.me.tweets.service.BtcTweetService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Optional; - -@Component -public class BtcTweetQuery implements GraphQLQueryResolver { - - @Autowired - private BtcTweetService btcTweetService; - - public Optional getCurrentTweet() { - return this.btcTweetService.getCurrentTweet(); - } - - public List getAllTweets( final int count ) { - return this.btcTweetService.getAllTweets(count); - } - - public List getTweetsForDay( final String startDate, final String endDate ) { - return this.btcTweetService.getTweetsForDay(startDate, endDate); - } - - public List getTweetsForPeriod( final String startDate, final String endDate ) { - return this.btcTweetService.getTweetsForPeriod( startDate, endDate ); - } -} diff --git a/src/main/java/cryptosky/me/tweets/service/BtcTweetService.java b/src/main/java/cryptosky/me/tweets/service/BtcTweetService.java deleted file mode 100644 index af53219..0000000 --- a/src/main/java/cryptosky/me/tweets/service/BtcTweetService.java +++ /dev/null @@ -1,101 +0,0 @@ -package cryptosky.me.tweets.service; - -import cryptosky.me.tweets.models.entities.BtcTweetModel; -import cryptosky.me.tweets.models.repositories.BtcTweetRepository; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.DateTimeException; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import static cryptosky.me.helpers.Utils.format; - -@Service -public class BtcTweetService { - - private final BtcTweetRepository btcTweetRepository; - - public BtcTweetService(final BtcTweetRepository btcTweetRepository) { - this.btcTweetRepository = btcTweetRepository; - } - - @Transactional - public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore, - final float positiveScore, final float neutralScore, final float negativeScore, - final float compoundScore ) { - - final BtcTweetModel tweetModel = new BtcTweetModel(); - tweetModel.setTimestamp(format(createdDate).toString()); - tweetModel.setRawTweet(rawTweet); - tweetModel.setSentimentScore(sentimentScore); - tweetModel.setPositiveScore(positiveScore); - tweetModel.setNegativeScore(negativeScore); - tweetModel.setNeutralScore(neutralScore); - tweetModel.setCompoundScore(compoundScore); - - return this.btcTweetRepository.save(tweetModel); - } - - @Transactional(readOnly = true) - public Optional getCurrentTweet() { - return this.btcTweetRepository.findAll().stream().findFirst(); - } - - @Transactional(readOnly = true) - public List getAllTweets( final int count ) { - return this.btcTweetRepository.findAll().stream() - .limit(count) - .collect(Collectors.toList()); - } - - @Transactional(readOnly = true) - public List getTweetsForDay( final String startDate, final String endDate ) { - LocalDateTime r_start = format(startDate); - LocalDateTime r_end = format(endDate); - - r_start = r_start.toLocalDate().atStartOfDay(); - r_end = r_end.toLocalDate().atTime(LocalTime.MAX); - - if ( r_end.isAfter(r_start) ) { - if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) { - LocalDateTime finalR_end = r_end; - LocalDateTime finalR_start = r_start; - return this.btcTweetRepository.findAll().stream() - .filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end)) - .filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start)) - .collect(Collectors.toList()); - } else { - // Logger - throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day"); - } - } else { - // Logger - throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start); - } - } - - @Transactional(readOnly = true) - public List getTweetsForPeriod( final String startDate, final String endDate ) { - LocalDateTime r_start = format(startDate); - LocalDateTime r_end = format(endDate); - - r_start = r_start.toLocalDate().atStartOfDay(); - r_end = r_end.toLocalDate().atTime(LocalTime.MAX); - - if ( r_end.isAfter(r_start) ) { - LocalDateTime finalR_end = r_end; - LocalDateTime finalR_start = r_start; - return this.btcTweetRepository.findAll().stream() - .filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end)) - .filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start)) - .collect(Collectors.toList()); - } else { - // Logger - throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start); - } - } -} diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 6118342..6b11254 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -30,6 +30,9 @@ destinations: pricing: priceSave: PricingSave priceDlq: PricingSave.dlq + tweet: + tweetSave: TweetSave + tweetDlq: TweetSave.dlq gateway: gatewayDlq: DBGateway.dlq diff --git a/src/main/resources/graphql/configQL.graphqls b/src/main/resources/graphql/configQL.graphqls index 69a6752..6ca2324 100644 --- a/src/main/resources/graphql/configQL.graphqls +++ b/src/main/resources/graphql/configQL.graphqls @@ -10,7 +10,7 @@ type BtcPrice { volume: Float } -type tweetModel { +type sentimentModel { id: ID!, timestamp: String!, rawTweet: String, @@ -28,13 +28,13 @@ type Query { priceForCreatedDate(createdDate: String):BtcPrice, priceBetweenDates(startDate: String, endDate: String):[BtcPrice], ################################################################ - currentTweet:tweetModel, - allTweets(count: Int):[tweetModel], - tweetsForDay(startDate: String, endDate: String):[tweetModel], - tweetsForPeriod(startDate: String, endDate: String):[tweetModel] + currentTweet:sentimentModel, + allTweets(count: Int):[sentimentModel], + tweetsForDay(startDate: String, endDate: String):[sentimentModel], + tweetsForPeriod(startDate: String, endDate: String):[sentimentModel] } type Mutation { createBtc(createdDate: String!, type: String!, average_price: Float!, high_price: Float, low_price: Float, open_price: Float, close_price: Float, volume: Float):BtcPrice - createTweet(createdDate: String!, rawTweet: String!, sentimentScore: Float!, positiveScore: Float, neutralScore: Float, negativeScore: Float, compoundScore: Float!):tweetModel + createTweet(createdDate: String!, rawTweet: String!, sentimentScore: Float!, positiveScore: Float, neutralScore: Float, negativeScore: Float, compoundScore: Float!):sentimentModel } \ No newline at end of file