Compare commits
No commits in common. "master" and "1.0.0-b41" have entirely different histories.
@ -97,7 +97,7 @@ spec:
|
|||||||
cpu: 10m
|
cpu: 10m
|
||||||
memory: 256Mi
|
memory: 256Mi
|
||||||
limits:
|
limits:
|
||||||
cpu: 100m
|
cpu: 250m
|
||||||
memory: 512Mi
|
memory: 512Mi
|
||||||
securityContext:
|
securityContext:
|
||||||
capabilities:
|
capabilities:
|
||||||
|
|||||||
38
pom.xml
38
pom.xml
@ -48,6 +48,23 @@
|
|||||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- GraghQL -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.graphql-java</groupId>
|
||||||
|
<artifactId>graphql-spring-boot-starter</artifactId>
|
||||||
|
<version>5.0.2</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.graphql-java</groupId>
|
||||||
|
<artifactId>graphql-java-tools</artifactId>
|
||||||
|
<version>5.2.4</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.graphql-java</groupId>
|
||||||
|
<artifactId>graphiql-spring-boot-starter</artifactId>
|
||||||
|
<version>5.0.2</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Local database -->
|
<!-- Local database -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.h2database</groupId>
|
<groupId>com.h2database</groupId>
|
||||||
@ -67,23 +84,6 @@
|
|||||||
<version>4.3.8.Final</version>
|
<version>4.3.8.Final</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- GraghQL -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.graphql-java</groupId>
|
|
||||||
<artifactId>graphql-spring-boot-starter</artifactId>
|
|
||||||
<version>5.0.2</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.graphql-java</groupId>
|
|
||||||
<artifactId>graphql-java-tools</artifactId>
|
|
||||||
<version>5.2.4</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.graphql-java</groupId>
|
|
||||||
<artifactId>graphiql-spring-boot-starter</artifactId>
|
|
||||||
<version>5.0.2</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Artemis -->
|
<!-- Artemis -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
@ -98,10 +98,6 @@
|
|||||||
<version>1.18.8</version>
|
<version>1.18.8</version>
|
||||||
<optional>true</optional>
|
<optional>true</optional>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.commons</groupId>
|
|
||||||
<artifactId>commons-lang3</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!--- Logging -->
|
<!--- Logging -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|||||||
@ -4,8 +4,7 @@ public enum ArtemisSyncMessaging {
|
|||||||
|
|
||||||
MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
|
MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
|
||||||
SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
|
SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
|
||||||
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT"),
|
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT");
|
||||||
CONSUME_SENTIMENT_EVENT("CONSUME_SENTIMENT_EVENT");
|
|
||||||
|
|
||||||
private final String syncMessage;
|
private final String syncMessage;
|
||||||
|
|
||||||
|
|||||||
@ -17,12 +17,12 @@ import org.springframework.stereotype.Component;
|
|||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
import static cryptosky.me.ArtemisSyncMessaging.*;
|
||||||
|
|
||||||
import static cryptosky.me.helpers.Utils.getCorrelationId;
|
import static cryptosky.me.helpers.Utils.getCorrelationId;
|
||||||
import static cryptosky.me.helpers.Utils.getSyncId;
|
|
||||||
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
|
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
|
||||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
||||||
import static java.lang.String.format;
|
import static java.lang.String.format;
|
||||||
@ -57,12 +57,12 @@ public class PriceConsumer {
|
|||||||
@JmsListener(destination = "${destinations.pricing.priceSave}")
|
@JmsListener(destination = "${destinations.pricing.priceSave}")
|
||||||
public void receive(TextMessage message) throws JMSException {
|
public void receive(TextMessage message) throws JMSException {
|
||||||
String correlationId = getCorrelationId(message);
|
String correlationId = getCorrelationId(message);
|
||||||
String syncId = getSyncId(message);
|
String syncId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
setCorrelationId(correlationId);
|
setCorrelationId(correlationId);
|
||||||
setSyncId(syncId);
|
setSyncId("");
|
||||||
|
|
||||||
logger.info("Received Message: " + message.getBody(String.class) + " for syncId :: [{}]", syncId);
|
logger.info("Received Message: " + message.getBody(String.class));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
@ -78,9 +78,9 @@ public class PriceConsumer {
|
|||||||
JsonNode messageJson = objectMapper.readTree(message.getText());
|
JsonNode messageJson = objectMapper.readTree(message.getText());
|
||||||
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
|
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
|
||||||
|
|
||||||
logger.info("Message with syncId of [{}] is for [{}]", syncId, cryptoPriceModel);
|
logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel);
|
||||||
|
|
||||||
priceService.createRecord(cryptoPriceModel, correlationId, message);
|
priceService.createRecord(cryptoPriceModel, correlationId, syncId, message);
|
||||||
|
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|||||||
@ -1,144 +0,0 @@
|
|||||||
package cryptosky.me.consumers;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import cryptosky.me.dlq.DlqInfo;
|
|
||||||
import cryptosky.me.dlq.DlqService;
|
|
||||||
import cryptosky.me.exceptions.NoBodyOrStringException;
|
|
||||||
import cryptosky.me.sentiment.models.entities.SentimentModel;
|
|
||||||
import cryptosky.me.sentiment.service.SentimentService;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.jms.annotation.JmsListener;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.TextMessage;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
|
|
||||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
|
||||||
import static cryptosky.me.helpers.Utils.getCorrelationId;
|
|
||||||
import static cryptosky.me.helpers.Utils.getSyncId;
|
|
||||||
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
|
|
||||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
|
||||||
import static java.lang.String.format;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class TweetConsumer {
|
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(TweetConsumer.class);
|
|
||||||
private final ObjectMapper objectMapper;
|
|
||||||
private final SentimentService sentimentService;
|
|
||||||
private final DlqService dlqService;
|
|
||||||
private final String dlqName;
|
|
||||||
private final String dlqGateway;
|
|
||||||
|
|
||||||
private CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
public TweetConsumer(
|
|
||||||
ObjectMapper objectMapper,
|
|
||||||
SentimentService sentimentService,
|
|
||||||
DlqService dlqService,
|
|
||||||
@Value("${destinations.tweet.tweetDlq}") String dlqName,
|
|
||||||
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
|
|
||||||
) {
|
|
||||||
this.objectMapper = objectMapper;
|
|
||||||
this.sentimentService = sentimentService;
|
|
||||||
this.dlqService = dlqService;
|
|
||||||
this.dlqName = dlqName;
|
|
||||||
this.dlqGateway = dlqGateway;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JmsListener(destination = "${destinations.tweet.tweetSave}")
|
|
||||||
public void receive(TextMessage message) throws JMSException {
|
|
||||||
String correlationId = getCorrelationId(message);
|
|
||||||
String syncId = getSyncId(message);
|
|
||||||
|
|
||||||
setCorrelationId(correlationId);
|
|
||||||
setSyncId(syncId);
|
|
||||||
|
|
||||||
logger.info("Received Message: " + message.getBody(String.class));
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
|
|
||||||
throw new NoBodyOrStringException();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
|
|
||||||
|
|
||||||
SentimentModel sentimentModel;
|
|
||||||
|
|
||||||
try {
|
|
||||||
JsonNode messageJson = objectMapper.readTree(message.getText());
|
|
||||||
sentimentModel = objectMapper.readValue(messageJson.traverse(), SentimentModel.class);
|
|
||||||
|
|
||||||
logger.info("Message with syncId of [{}] is for [{}]", syncId, sentimentModel);
|
|
||||||
|
|
||||||
sentimentService.createRecord(sentimentModel, correlationId, message);
|
|
||||||
|
|
||||||
latch.countDown();
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.info(format(
|
|
||||||
"Message [%s] for sentimentModel [%s] was not synced due to a readTree exception on getting the text",
|
|
||||||
syncId,
|
|
||||||
correlationId
|
|
||||||
), e);
|
|
||||||
latch.countDown();
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} catch (NoBodyOrStringException e) {
|
|
||||||
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
|
|
||||||
CONSUME_SENTIMENT_EVENT, correlationId);
|
|
||||||
|
|
||||||
DlqInfo dlqInfo = new DlqInfo(
|
|
||||||
message.getJMSMessageID(),
|
|
||||||
dlqName,
|
|
||||||
dlqReason,
|
|
||||||
"0",
|
|
||||||
correlationId
|
|
||||||
);
|
|
||||||
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
|
||||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
||||||
return textMessage;
|
|
||||||
});
|
|
||||||
latch.countDown();
|
|
||||||
} catch (JMSException jmsException) {
|
|
||||||
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_SENTIMENT_EVENT, jmsException,
|
|
||||||
correlationId);
|
|
||||||
|
|
||||||
DlqInfo dlqInfo = new DlqInfo(
|
|
||||||
message.getJMSMessageID(),
|
|
||||||
dlqGateway,
|
|
||||||
dlqReason,
|
|
||||||
"0",
|
|
||||||
correlationId
|
|
||||||
);
|
|
||||||
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
|
|
||||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
||||||
return textMessage;
|
|
||||||
});
|
|
||||||
latch.countDown();
|
|
||||||
} catch (Exception e) {
|
|
||||||
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_SENTIMENT_EVENT, correlationId);
|
|
||||||
|
|
||||||
DlqInfo dlqInfo = new DlqInfo(
|
|
||||||
message.getJMSMessageID(),
|
|
||||||
dlqName,
|
|
||||||
dlqReason,
|
|
||||||
"0",
|
|
||||||
correlationId
|
|
||||||
);
|
|
||||||
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
|
||||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
||||||
return textMessage;
|
|
||||||
});
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -16,8 +16,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
|
|||||||
|
|
||||||
public class Utils {
|
public class Utils {
|
||||||
|
|
||||||
private static final String CORRELATION_ID_KEY = "X-CRYPTO-Correlation-ID";
|
private static final String CORRELATION_ID_KEY = "CS-Correlation-ID";
|
||||||
private static final String SYNC_ID_KEY = "X-CRYPTO-Sync-ID";
|
|
||||||
|
|
||||||
public static String getCorrelationId(Message message) {
|
public static String getCorrelationId(Message message) {
|
||||||
String correlationId = null;
|
String correlationId = null;
|
||||||
@ -35,19 +34,6 @@ public class Utils {
|
|||||||
return correlationId;
|
return correlationId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getSyncId(Message message) {
|
|
||||||
String syncId = null;
|
|
||||||
try {
|
|
||||||
syncId = message.getStringProperty(SYNC_ID_KEY);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
// NOOP
|
|
||||||
}
|
|
||||||
if (isBlank(syncId)) {
|
|
||||||
syncId = randomUUID().toString();
|
|
||||||
}
|
|
||||||
return syncId;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Utility Function to be able to get one of all types
|
// Utility Function to be able to get one of all types
|
||||||
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
|
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
|
||||||
Map<Object, Boolean> seen = new ConcurrentHashMap<>();
|
Map<Object, Boolean> seen = new ConcurrentHashMap<>();
|
||||||
|
|||||||
@ -0,0 +1,26 @@
|
|||||||
|
package cryptosky.me.pricing.graphql.mutations;
|
||||||
|
|
||||||
|
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
|
||||||
|
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import cryptosky.me.pricing.service.PriceService;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class BtcPriceMutation implements GraphQLMutationResolver {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private PriceService priceService;
|
||||||
|
|
||||||
|
public BtcPriceModel createBtc(final String createdDate, final String type,
|
||||||
|
final float av_price,
|
||||||
|
final float h_price,
|
||||||
|
final float l_price,
|
||||||
|
final float o_price,
|
||||||
|
final float c_price,
|
||||||
|
final float volume ) {
|
||||||
|
return this.priceService.createBtc(createdDate, type, av_price, h_price, l_price, o_price, c_price, volume);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -2,6 +2,7 @@ package cryptosky.me.pricing.graphql.queries;
|
|||||||
|
|
||||||
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
|
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
|
||||||
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
||||||
|
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
|
||||||
import cryptosky.me.pricing.service.PriceService;
|
import cryptosky.me.pricing.service.PriceService;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import cryptosky.me.pricing.models.entities.CryptoPriceModel;
|
|||||||
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
|
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
|
||||||
|
|
||||||
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
|
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
|
||||||
|
import cryptosky.me.exceptions.DatabaseViolationException;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -21,16 +22,17 @@ import org.springframework.transaction.annotation.Transactional;
|
|||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
import static cryptosky.me.ArtemisSyncMessaging.*;
|
||||||
import static cryptosky.me.SupportedCurrencies.*;
|
import static cryptosky.me.SupportedCurrencies.*;
|
||||||
|
|
||||||
import static cryptosky.me.helpers.Utils.format;
|
import static cryptosky.me.helpers.Utils.format;
|
||||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
||||||
import static java.util.UUID.randomUUID;
|
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class PriceService {
|
public class PriceService {
|
||||||
@ -53,10 +55,32 @@ public class PriceService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, TextMessage message) {
|
public BtcPriceModel createBtc(final String createdDate, final String type,
|
||||||
String syncId = randomUUID().toString();
|
final float av_price,
|
||||||
setSyncId(syncId);
|
final float h_price,
|
||||||
|
final float l_price,
|
||||||
|
final float o_price,
|
||||||
|
final float c_price,
|
||||||
|
final float volume ) {
|
||||||
|
|
||||||
|
final BtcPriceModel btcPrice = new BtcPriceModel();
|
||||||
|
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
|
||||||
|
btcPrice.setType(type);
|
||||||
|
btcPrice.setAverage_price(av_price);
|
||||||
|
btcPrice.setHigh_price(h_price);
|
||||||
|
btcPrice.setLow_price(l_price);
|
||||||
|
btcPrice.setOpen_price(o_price);
|
||||||
|
btcPrice.setClose_price(c_price);
|
||||||
|
btcPrice.setVolume(volume);
|
||||||
|
|
||||||
|
return btcPrice;
|
||||||
|
// return this.btcPriceRepository.save(btcPrice);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, String syncId, TextMessage message) {
|
||||||
try {
|
try {
|
||||||
|
setSyncId(syncId);
|
||||||
|
|
||||||
switch (cryptoPriceModel.getType()) {
|
switch (cryptoPriceModel.getType()) {
|
||||||
case BTC:
|
case BTC:
|
||||||
@ -101,7 +125,7 @@ public class PriceService {
|
|||||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
|
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DlqInfo dlqInfo = new DlqInfo(
|
DlqInfo dlqInfo = new DlqInfo(
|
||||||
@ -115,7 +139,6 @@ public class PriceService {
|
|||||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||||
return textMessage;
|
return textMessage;
|
||||||
});
|
});
|
||||||
logger.info("Sending [{}] to the DLQ", syncId);
|
|
||||||
} catch (JMSException jmsException) {
|
} catch (JMSException jmsException) {
|
||||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,9 +0,0 @@
|
|||||||
package cryptosky.me.sentiment.models.repositories;
|
|
||||||
|
|
||||||
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
|
|
||||||
import org.springframework.data.jpa.repository.JpaRepository;
|
|
||||||
import org.springframework.stereotype.Repository;
|
|
||||||
|
|
||||||
@Repository
|
|
||||||
public interface BtcSentimentRepository extends JpaRepository<BtcSentimentModel, Integer> {
|
|
||||||
}
|
|
||||||
@ -1,207 +0,0 @@
|
|||||||
package cryptosky.me.sentiment.service;
|
|
||||||
|
|
||||||
import cryptosky.me.dlq.DlqInfo;
|
|
||||||
import cryptosky.me.dlq.DlqService;
|
|
||||||
import cryptosky.me.exceptions.MessageCodes;
|
|
||||||
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
|
|
||||||
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
|
||||||
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
|
|
||||||
import cryptosky.me.sentiment.models.entities.SentimentModel;
|
|
||||||
import cryptosky.me.sentiment.models.repositories.BtcSentimentRepository;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.dao.DataIntegrityViolationException;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.TextMessage;
|
|
||||||
import java.time.DateTimeException;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.time.LocalTime;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
|
||||||
import static cryptosky.me.SupportedCurrencies.BTC;
|
|
||||||
import static cryptosky.me.helpers.Utils.format;
|
|
||||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
|
||||||
import static java.util.UUID.randomUUID;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class SentimentService {
|
|
||||||
|
|
||||||
private final BtcSentimentRepository btcSentimentRepository;
|
|
||||||
private final DlqService dlqService;
|
|
||||||
private final String dlqName;
|
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(SentimentService.class);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
public SentimentService(
|
|
||||||
final BtcSentimentRepository btcSentimentRepository,
|
|
||||||
DlqService dlqService,
|
|
||||||
@Value("${destinations.pricing.priceDlq}") String dlqName
|
|
||||||
) {
|
|
||||||
this.btcSentimentRepository = btcSentimentRepository;
|
|
||||||
this.dlqService = dlqService;
|
|
||||||
this.dlqName = dlqName;
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Transactional
|
|
||||||
// public BtcSentimentModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
|
|
||||||
// final float positiveScore, final float neutralScore, final float negativeScore,
|
|
||||||
// final float compoundScore ) {
|
|
||||||
//
|
|
||||||
// final BtcSentimentModel tweetModel = new BtcSentimentModel();
|
|
||||||
// tweetModel.setTimestamp(format(createdDate).toString());
|
|
||||||
// tweetModel.setRawTweet(rawTweet);
|
|
||||||
// tweetModel.setSentimentScore(sentimentScore);
|
|
||||||
// tweetModel.setPositiveScore(positiveScore);
|
|
||||||
// tweetModel.setNegativeScore(negativeScore);
|
|
||||||
// tweetModel.setNeutralScore(neutralScore);
|
|
||||||
// tweetModel.setCompoundScore(compoundScore);
|
|
||||||
|
|
||||||
// return this.btcSentimentRepository.save(tweetModel);
|
|
||||||
// }
|
|
||||||
|
|
||||||
@Transactional
|
|
||||||
public void createRecord(SentimentModel sentimentModel, String correlationId, TextMessage message) {
|
|
||||||
String syncId = randomUUID().toString();
|
|
||||||
setSyncId(syncId);
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
switch (sentimentModel.getType()) {
|
|
||||||
case BTC:
|
|
||||||
BtcSentimentModel btcSentimentModel = new BtcSentimentModel();
|
|
||||||
btcSentimentModel.setId(getLatestId() + 1);
|
|
||||||
btcSentimentModel.setTimestamp(sentimentModel.getTimestamp());
|
|
||||||
btcSentimentModel.setSyncId(syncId);
|
|
||||||
btcSentimentModel.setPositiveScore(sentimentModel.getPositiveScore());
|
|
||||||
btcSentimentModel.setNeutralScore(sentimentModel.getNeutralScore());
|
|
||||||
btcSentimentModel.setNegativeScore(sentimentModel.getNegativeScore());
|
|
||||||
btcSentimentModel.setCompoundScore(sentimentModel.getCompoundScore());
|
|
||||||
btcSentimentModel.setType(sentimentModel.getType());
|
|
||||||
|
|
||||||
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", sentimentModel.getType(), correlationId, syncId);
|
|
||||||
this.btcSentimentRepository.save(btcSentimentModel);
|
|
||||||
return;
|
|
||||||
default:
|
|
||||||
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), sentimentModel.getType(), MessageCodes.E01.getMessage());
|
|
||||||
throw new NotSupportedCurrencyTypeException();
|
|
||||||
}
|
|
||||||
} catch (DataIntegrityViolationException e) {
|
|
||||||
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), sentimentModel.getType(), MessageCodes.E02.getMessage());
|
|
||||||
|
|
||||||
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
|
||||||
|
|
||||||
try {
|
|
||||||
DlqInfo dlqInfo = new DlqInfo(
|
|
||||||
message.getJMSMessageID(),
|
|
||||||
dlqName,
|
|
||||||
dlqReason,
|
|
||||||
"0",
|
|
||||||
correlationId
|
|
||||||
);
|
|
||||||
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
|
|
||||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
||||||
return textMessage;
|
|
||||||
});
|
|
||||||
logger.info("Sending [{}] to the DLQ", syncId);
|
|
||||||
} catch (JMSException jmsException) {
|
|
||||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
|
|
||||||
|
|
||||||
try {
|
|
||||||
DlqInfo dlqInfo = new DlqInfo(
|
|
||||||
message.getJMSMessageID(),
|
|
||||||
dlqName,
|
|
||||||
dlqReason,
|
|
||||||
"0",
|
|
||||||
correlationId
|
|
||||||
);
|
|
||||||
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
|
||||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
||||||
return textMessage;
|
|
||||||
});
|
|
||||||
logger.info("Sending [{}] to the DLQ", syncId);
|
|
||||||
} catch (JMSException jmsException) {
|
|
||||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Transactional(readOnly = true)
|
|
||||||
public int getLatestId() {
|
|
||||||
List<BtcSentimentModel> records = new ArrayList<>(this.btcSentimentRepository.findAll());
|
|
||||||
BtcSentimentModel latest = records.stream().skip(records.size() - 1).findFirst().get();
|
|
||||||
return latest.getId();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Transactional(readOnly = true)
|
|
||||||
public Optional<BtcSentimentModel> getCurrentTweet() {
|
|
||||||
return this.btcSentimentRepository.findAll().stream().findFirst();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Transactional(readOnly = true)
|
|
||||||
public List<BtcSentimentModel> getAllTweets(final int count ) {
|
|
||||||
return this.btcSentimentRepository.findAll().stream()
|
|
||||||
.limit(count)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Transactional(readOnly = true)
|
|
||||||
public List<BtcSentimentModel> getTweetsForDay(final String startDate, final String endDate ) {
|
|
||||||
LocalDateTime r_start = format(startDate);
|
|
||||||
LocalDateTime r_end = format(endDate);
|
|
||||||
|
|
||||||
r_start = r_start.toLocalDate().atStartOfDay();
|
|
||||||
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
|
||||||
|
|
||||||
if ( r_end.isAfter(r_start) ) {
|
|
||||||
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
|
|
||||||
LocalDateTime finalR_end = r_end;
|
|
||||||
LocalDateTime finalR_start = r_start;
|
|
||||||
return this.btcSentimentRepository.findAll().stream()
|
|
||||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
|
||||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
} else {
|
|
||||||
// Logger
|
|
||||||
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Logger
|
|
||||||
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Transactional(readOnly = true)
|
|
||||||
public List<BtcSentimentModel> getTweetsForPeriod(final String startDate, final String endDate ) {
|
|
||||||
LocalDateTime r_start = format(startDate);
|
|
||||||
LocalDateTime r_end = format(endDate);
|
|
||||||
|
|
||||||
r_start = r_start.toLocalDate().atStartOfDay();
|
|
||||||
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
|
||||||
|
|
||||||
if ( r_end.isAfter(r_start) ) {
|
|
||||||
LocalDateTime finalR_end = r_end;
|
|
||||||
LocalDateTime finalR_start = r_start;
|
|
||||||
return this.btcSentimentRepository.findAll().stream()
|
|
||||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
|
||||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
} else {
|
|
||||||
// Logger
|
|
||||||
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package cryptosky.me.sentiment.models.entities;
|
package cryptosky.me.tweets.models.entities;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
@ -9,7 +9,7 @@ import javax.persistence.Table;
|
|||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Data
|
@Data
|
||||||
@Entity
|
@Entity
|
||||||
@Table(name = "btc_sentiment")
|
@Table(name = "btc_tweet")
|
||||||
public class BtcSentimentModel extends SentimentModel {
|
public class BtcTweetModel extends TweetModel {
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package cryptosky.me.sentiment.models.entities;
|
package cryptosky.me.tweets.models.entities;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
@ -10,17 +10,21 @@ import javax.persistence.*;
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@Data
|
@Data
|
||||||
@MappedSuperclass
|
@MappedSuperclass
|
||||||
public class SentimentModel {
|
public class TweetModel {
|
||||||
|
|
||||||
@Id
|
@Id
|
||||||
@Column(name = "ID", nullable = false)
|
@Column(name = "ID", nullable = false)
|
||||||
|
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||||
private int id;
|
private int id;
|
||||||
|
|
||||||
@Column(name = "timestamp",nullable = false)
|
@Column(name = "timestamp",nullable = false)
|
||||||
private String timestamp;
|
private String timestamp;
|
||||||
|
|
||||||
@Column(name = "syncId", nullable = false)
|
@Column(name = "raw_tweet", nullable = false)
|
||||||
private String syncId;
|
private String rawTweet;
|
||||||
|
|
||||||
|
@Column(name = "sentiment")
|
||||||
|
private float sentimentScore;
|
||||||
|
|
||||||
@Column(name = "pos")
|
@Column(name = "pos")
|
||||||
private float positiveScore;
|
private float positiveScore;
|
||||||
@ -34,7 +38,4 @@ public class SentimentModel {
|
|||||||
@Column(name = "compound", nullable = false)
|
@Column(name = "compound", nullable = false)
|
||||||
private float compoundScore;
|
private float compoundScore;
|
||||||
|
|
||||||
@Column(name = "type")
|
|
||||||
private String type;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -0,0 +1,9 @@
|
|||||||
|
package cryptosky.me.tweets.models.repositories;
|
||||||
|
|
||||||
|
import cryptosky.me.tweets.models.entities.BtcTweetModel;
|
||||||
|
import org.springframework.data.jpa.repository.JpaRepository;
|
||||||
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
|
@Repository
|
||||||
|
public interface BtcTweetRepository extends JpaRepository<BtcTweetModel, Integer> {
|
||||||
|
}
|
||||||
@ -0,0 +1,22 @@
|
|||||||
|
package cryptosky.me.tweets.mutations;
|
||||||
|
|
||||||
|
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
|
||||||
|
import cryptosky.me.tweets.models.entities.BtcTweetModel;
|
||||||
|
import cryptosky.me.tweets.service.BtcTweetService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class BtcTweetMutation implements GraphQLMutationResolver {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
BtcTweetService btcTweetService;
|
||||||
|
|
||||||
|
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
|
||||||
|
final float positiveScore, final float neutralScore, final float negativeScore,
|
||||||
|
final float compoundScore ) {
|
||||||
|
return this.btcTweetService.createTweet( createdDate, rawTweet, sentimentScore, positiveScore, neutralScore,
|
||||||
|
negativeScore,compoundScore );
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
33
src/main/java/cryptosky/me/tweets/queries/BtcTweetQuery.java
Normal file
33
src/main/java/cryptosky/me/tweets/queries/BtcTweetQuery.java
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package cryptosky.me.tweets.queries;
|
||||||
|
|
||||||
|
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
|
||||||
|
import cryptosky.me.tweets.models.entities.BtcTweetModel;
|
||||||
|
import cryptosky.me.tweets.service.BtcTweetService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class BtcTweetQuery implements GraphQLQueryResolver {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private BtcTweetService btcTweetService;
|
||||||
|
|
||||||
|
public Optional<BtcTweetModel> getCurrentTweet() {
|
||||||
|
return this.btcTweetService.getCurrentTweet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<BtcTweetModel> getAllTweets( final int count ) {
|
||||||
|
return this.btcTweetService.getAllTweets(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
|
||||||
|
return this.btcTweetService.getTweetsForDay(startDate, endDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
|
||||||
|
return this.btcTweetService.getTweetsForPeriod( startDate, endDate );
|
||||||
|
}
|
||||||
|
}
|
||||||
101
src/main/java/cryptosky/me/tweets/service/BtcTweetService.java
Normal file
101
src/main/java/cryptosky/me/tweets/service/BtcTweetService.java
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
package cryptosky.me.tweets.service;
|
||||||
|
|
||||||
|
import cryptosky.me.tweets.models.entities.BtcTweetModel;
|
||||||
|
import cryptosky.me.tweets.models.repositories.BtcTweetRepository;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.time.DateTimeException;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.LocalTime;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static cryptosky.me.helpers.Utils.format;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class BtcTweetService {
|
||||||
|
|
||||||
|
private final BtcTweetRepository btcTweetRepository;
|
||||||
|
|
||||||
|
public BtcTweetService(final BtcTweetRepository btcTweetRepository) {
|
||||||
|
this.btcTweetRepository = btcTweetRepository;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
|
||||||
|
final float positiveScore, final float neutralScore, final float negativeScore,
|
||||||
|
final float compoundScore ) {
|
||||||
|
|
||||||
|
final BtcTweetModel tweetModel = new BtcTweetModel();
|
||||||
|
tweetModel.setTimestamp(format(createdDate).toString());
|
||||||
|
tweetModel.setRawTweet(rawTweet);
|
||||||
|
tweetModel.setSentimentScore(sentimentScore);
|
||||||
|
tweetModel.setPositiveScore(positiveScore);
|
||||||
|
tweetModel.setNegativeScore(negativeScore);
|
||||||
|
tweetModel.setNeutralScore(neutralScore);
|
||||||
|
tweetModel.setCompoundScore(compoundScore);
|
||||||
|
|
||||||
|
return this.btcTweetRepository.save(tweetModel);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional(readOnly = true)
|
||||||
|
public Optional<BtcTweetModel> getCurrentTweet() {
|
||||||
|
return this.btcTweetRepository.findAll().stream().findFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional(readOnly = true)
|
||||||
|
public List<BtcTweetModel> getAllTweets( final int count ) {
|
||||||
|
return this.btcTweetRepository.findAll().stream()
|
||||||
|
.limit(count)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional(readOnly = true)
|
||||||
|
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
|
||||||
|
LocalDateTime r_start = format(startDate);
|
||||||
|
LocalDateTime r_end = format(endDate);
|
||||||
|
|
||||||
|
r_start = r_start.toLocalDate().atStartOfDay();
|
||||||
|
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
||||||
|
|
||||||
|
if ( r_end.isAfter(r_start) ) {
|
||||||
|
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
|
||||||
|
LocalDateTime finalR_end = r_end;
|
||||||
|
LocalDateTime finalR_start = r_start;
|
||||||
|
return this.btcTweetRepository.findAll().stream()
|
||||||
|
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
||||||
|
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
} else {
|
||||||
|
// Logger
|
||||||
|
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Logger
|
||||||
|
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional(readOnly = true)
|
||||||
|
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
|
||||||
|
LocalDateTime r_start = format(startDate);
|
||||||
|
LocalDateTime r_end = format(endDate);
|
||||||
|
|
||||||
|
r_start = r_start.toLocalDate().atStartOfDay();
|
||||||
|
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
||||||
|
|
||||||
|
if ( r_end.isAfter(r_start) ) {
|
||||||
|
LocalDateTime finalR_end = r_end;
|
||||||
|
LocalDateTime finalR_start = r_start;
|
||||||
|
return this.btcTweetRepository.findAll().stream()
|
||||||
|
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
||||||
|
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
} else {
|
||||||
|
// Logger
|
||||||
|
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -30,9 +30,6 @@ destinations:
|
|||||||
pricing:
|
pricing:
|
||||||
priceSave: PricingSave
|
priceSave: PricingSave
|
||||||
priceDlq: PricingSave.dlq
|
priceDlq: PricingSave.dlq
|
||||||
tweet:
|
|
||||||
tweetSave: TweetSave
|
|
||||||
tweetDlq: TweetSave.dlq
|
|
||||||
gateway:
|
gateway:
|
||||||
gatewayDlq: DBGateway.dlq
|
gatewayDlq: DBGateway.dlq
|
||||||
|
|
||||||
|
|||||||
@ -10,7 +10,7 @@ type BtcPrice {
|
|||||||
volume: Float
|
volume: Float
|
||||||
}
|
}
|
||||||
|
|
||||||
type sentimentModel {
|
type tweetModel {
|
||||||
id: ID!,
|
id: ID!,
|
||||||
timestamp: String!,
|
timestamp: String!,
|
||||||
rawTweet: String,
|
rawTweet: String,
|
||||||
@ -28,8 +28,13 @@ type Query {
|
|||||||
priceForCreatedDate(createdDate: String):BtcPrice,
|
priceForCreatedDate(createdDate: String):BtcPrice,
|
||||||
priceBetweenDates(startDate: String, endDate: String):[BtcPrice],
|
priceBetweenDates(startDate: String, endDate: String):[BtcPrice],
|
||||||
################################################################
|
################################################################
|
||||||
currentTweet:sentimentModel,
|
currentTweet:tweetModel,
|
||||||
allTweets(count: Int):[sentimentModel],
|
allTweets(count: Int):[tweetModel],
|
||||||
tweetsForDay(startDate: String, endDate: String):[sentimentModel],
|
tweetsForDay(startDate: String, endDate: String):[tweetModel],
|
||||||
tweetsForPeriod(startDate: String, endDate: String):[sentimentModel]
|
tweetsForPeriod(startDate: String, endDate: String):[tweetModel]
|
||||||
|
}
|
||||||
|
|
||||||
|
type Mutation {
|
||||||
|
createBtc(createdDate: String!, type: String!, average_price: Float!, high_price: Float, low_price: Float, open_price: Float, close_price: Float, volume: Float):BtcPrice
|
||||||
|
createTweet(createdDate: String!, rawTweet: String!, sentimentScore: Float!, positiveScore: Float, neutralScore: Float, negativeScore: Float, compoundScore: Float!):tweetModel
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user