Compare commits
No commits in common. "master" and "1.0.0-b25" have entirely different histories.
@ -53,21 +53,6 @@ spec:
|
||||
secretKeyRef:
|
||||
name: jdbc
|
||||
key: jdbc.password
|
||||
- name: BROKER_URL
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: endpoints
|
||||
key: amq.url
|
||||
- name: BROKER_USER
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: amq
|
||||
key: amq.username
|
||||
- name: BROKER_PASSWORD
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: amq
|
||||
key: amq.password
|
||||
ports:
|
||||
- containerPort: 9090
|
||||
name: RESOURCE_NAME
|
||||
@ -97,7 +82,7 @@ spec:
|
||||
cpu: 10m
|
||||
memory: 256Mi
|
||||
limits:
|
||||
cpu: 100m
|
||||
cpu: 250m
|
||||
memory: 512Mi
|
||||
securityContext:
|
||||
capabilities:
|
||||
|
||||
@ -32,7 +32,7 @@ try {
|
||||
timestamps {
|
||||
node ("${env.SLAVE_LABEL}") {
|
||||
stage('Initialise') {
|
||||
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]])
|
||||
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: 'https://github.com/andyjk15/db-gateway.git']]])
|
||||
|
||||
env.APPLICATION_VERSION = get_application_version()
|
||||
|
||||
@ -47,7 +47,7 @@ try {
|
||||
) {
|
||||
sh "doctl auth init --access-token ${DOCTL_TOKEN}"
|
||||
sh "doctl registry login"
|
||||
sh "doctl kubernetes cluster kubeconfig save cryptosky-cluster"
|
||||
sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster"
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,6 +83,7 @@ try {
|
||||
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}"
|
||||
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
stage('Tag Repository') {
|
||||
|
||||
@ -5,4 +5,6 @@ APPLICATION_NAME=$1
|
||||
kubectl apply -f configuration/kubernetes/deployment.yaml
|
||||
kubectl apply -f configuration/kubernetes/service.yaml
|
||||
|
||||
kubectl get pods
|
||||
|
||||
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production
|
||||
56
pom.xml
56
pom.xml
@ -21,7 +21,6 @@
|
||||
<java.version>1.8</java.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<logback.contrib.version>0.1.5</logback.contrib.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -34,12 +33,10 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Persistent Data -->
|
||||
@ -48,6 +45,23 @@
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- GraghQL -->
|
||||
<dependency>
|
||||
<groupId>com.graphql-java</groupId>
|
||||
<artifactId>graphql-spring-boot-starter</artifactId>
|
||||
<version>5.0.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.graphql-java</groupId>
|
||||
<artifactId>graphql-java-tools</artifactId>
|
||||
<version>5.2.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.graphql-java</groupId>
|
||||
<artifactId>graphiql-spring-boot-starter</artifactId>
|
||||
<version>5.0.2</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Local database -->
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
@ -67,30 +81,6 @@
|
||||
<version>4.3.8.Final</version>
|
||||
</dependency>
|
||||
|
||||
<!-- GraghQL -->
|
||||
<dependency>
|
||||
<groupId>com.graphql-java</groupId>
|
||||
<artifactId>graphql-spring-boot-starter</artifactId>
|
||||
<version>5.0.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.graphql-java</groupId>
|
||||
<artifactId>graphql-java-tools</artifactId>
|
||||
<version>5.2.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.graphql-java</groupId>
|
||||
<artifactId>graphiql-spring-boot-starter</artifactId>
|
||||
<version>5.0.2</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Artemis -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-artemis</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!--- Utils -->
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
@ -98,18 +88,6 @@
|
||||
<version>1.18.8</version>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!--- Logging -->
|
||||
<dependency>
|
||||
<groupId>net.logstash.logback</groupId>
|
||||
<artifactId>logstash-logback-encoder</artifactId>
|
||||
<version>6.4</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Testing -->
|
||||
<dependency>
|
||||
|
||||
@ -1,19 +0,0 @@
|
||||
package cryptosky.me;
|
||||
|
||||
public enum ArtemisSyncMessaging {
|
||||
|
||||
MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
|
||||
SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
|
||||
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT"),
|
||||
CONSUME_SENTIMENT_EVENT("CONSUME_SENTIMENT_EVENT");
|
||||
|
||||
private final String syncMessage;
|
||||
|
||||
ArtemisSyncMessaging(String syncMessage) {
|
||||
this.syncMessage = syncMessage;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return syncMessage;
|
||||
}
|
||||
}
|
||||
@ -1,7 +0,0 @@
|
||||
package cryptosky.me;
|
||||
|
||||
public class SupportedCurrencies {
|
||||
|
||||
public static final String BTC = "btc_usd";
|
||||
|
||||
}
|
||||
@ -1,34 +0,0 @@
|
||||
package cryptosky.me.configurations;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import cryptosky.me.helpers.StringTrimModule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
|
||||
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
|
||||
import static com.fasterxml.jackson.databind.MapperFeature.DEFAULT_VIEW_INCLUSION;
|
||||
import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
|
||||
import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS;
|
||||
|
||||
@org.springframework.context.annotation.Configuration
|
||||
public class Configuration {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
|
||||
|
||||
@Bean
|
||||
@Primary
|
||||
public ObjectMapper objectMapper() {
|
||||
return new ObjectMapper()
|
||||
.registerModule(new JavaTimeModule())
|
||||
.registerModule(new StringTrimModule())
|
||||
.registerModule(new Jdk8Module())
|
||||
.enable(SORT_PROPERTIES_ALPHABETICALLY)
|
||||
.disable(WRITE_DATES_AS_TIMESTAMPS)
|
||||
.disable(FAIL_ON_UNKNOWN_PROPERTIES)
|
||||
.disable(DEFAULT_VIEW_INCLUSION);
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,41 +0,0 @@
|
||||
package cryptosky.me.configurations;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jms.annotation.EnableJms;
|
||||
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
|
||||
|
||||
@Configuration
|
||||
@EnableJms
|
||||
public class ConsumerConfig {
|
||||
|
||||
@Value("${spring.artemis.broker-url}")
|
||||
private String brokerUrl;
|
||||
|
||||
@Value("${spring.artemis.user}")
|
||||
private String user;
|
||||
|
||||
@Value("${spring.artemis.password}")
|
||||
private String pass;
|
||||
|
||||
@Bean
|
||||
public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
|
||||
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
|
||||
activeMQConnectionFactory.setUser(user);
|
||||
activeMQConnectionFactory.setPassword(pass);
|
||||
return activeMQConnectionFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
|
||||
DefaultJmsListenerContainerFactory factory =
|
||||
new DefaultJmsListenerContainerFactory();
|
||||
factory
|
||||
.setConnectionFactory(receiverActiveMQConnectionFactory());
|
||||
factory.setConcurrency("3-10");
|
||||
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
@ -1,28 +0,0 @@
|
||||
package cryptosky.me.configurations;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.InjectionPoint;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import cryptosky.me.Application;
|
||||
|
||||
import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_PROTOTYPE;
|
||||
|
||||
@Configuration
|
||||
public class LoggerConfiguration {
|
||||
|
||||
@Bean
|
||||
@Scope(SCOPE_PROTOTYPE)
|
||||
public Logger logger(InjectionPoint injectionPoint) {
|
||||
Class<?> loggerClass = Application.class;
|
||||
|
||||
MethodParameter methodParameter = injectionPoint.getMethodParameter();
|
||||
if (methodParameter != null) {
|
||||
loggerClass = methodParameter.getContainingClass();
|
||||
}
|
||||
return LoggerFactory.getLogger(loggerClass);
|
||||
}
|
||||
}
|
||||
@ -1,145 +0,0 @@
|
||||
package cryptosky.me.consumers;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import cryptosky.me.dlq.DlqInfo;
|
||||
import cryptosky.me.dlq.DlqService;
|
||||
import cryptosky.me.exceptions.NoBodyOrStringException;
|
||||
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
|
||||
import cryptosky.me.pricing.service.PriceService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jms.annotation.JmsListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
||||
|
||||
import static cryptosky.me.helpers.Utils.getCorrelationId;
|
||||
import static cryptosky.me.helpers.Utils.getSyncId;
|
||||
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
|
||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
||||
import static java.lang.String.format;
|
||||
|
||||
@Component
|
||||
public class PriceConsumer {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
|
||||
private final ObjectMapper objectMapper;
|
||||
private final PriceService priceService;
|
||||
private final DlqService dlqService;
|
||||
private final String dlqName;
|
||||
private final String dlqGateway;
|
||||
|
||||
private CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@Autowired
|
||||
public PriceConsumer(
|
||||
ObjectMapper objectMapper,
|
||||
PriceService priceService,
|
||||
DlqService dlqService,
|
||||
@Value("${destinations.pricing.priceDlq}") String dlqName,
|
||||
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
|
||||
) {
|
||||
this.objectMapper = objectMapper;
|
||||
this.priceService = priceService;
|
||||
this.dlqService = dlqService;
|
||||
this.dlqName = dlqName;
|
||||
this.dlqGateway = dlqGateway;
|
||||
}
|
||||
|
||||
@JmsListener(destination = "${destinations.pricing.priceSave}")
|
||||
public void receive(TextMessage message) throws JMSException {
|
||||
String correlationId = getCorrelationId(message);
|
||||
String syncId = getSyncId(message);
|
||||
|
||||
setCorrelationId(correlationId);
|
||||
setSyncId(syncId);
|
||||
|
||||
logger.info("Received Message: " + message.getBody(String.class) + " for syncId :: [{}]", syncId);
|
||||
|
||||
try {
|
||||
|
||||
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
|
||||
throw new NoBodyOrStringException();
|
||||
}
|
||||
|
||||
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
|
||||
|
||||
CryptoPriceModel cryptoPriceModel;
|
||||
|
||||
try {
|
||||
JsonNode messageJson = objectMapper.readTree(message.getText());
|
||||
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
|
||||
|
||||
logger.info("Message with syncId of [{}] is for [{}]", syncId, cryptoPriceModel);
|
||||
|
||||
priceService.createRecord(cryptoPriceModel, correlationId, message);
|
||||
|
||||
latch.countDown();
|
||||
} catch (IOException e) {
|
||||
logger.info(format(
|
||||
"Message [%s] for pricingModel [%s] was not synced due to a readTree exception on getting the text",
|
||||
syncId,
|
||||
correlationId
|
||||
), e);
|
||||
latch.countDown();
|
||||
throw e;
|
||||
}
|
||||
} catch (NoBodyOrStringException e) {
|
||||
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
|
||||
CONSUME_PRICE_EVENT, correlationId);
|
||||
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
latch.countDown();
|
||||
} catch (JMSException jmsException) {
|
||||
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_PRICE_EVENT, jmsException,
|
||||
correlationId);
|
||||
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqGateway,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
latch.countDown();
|
||||
} catch (Exception e) {
|
||||
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
||||
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,144 +0,0 @@
|
||||
package cryptosky.me.consumers;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import cryptosky.me.dlq.DlqInfo;
|
||||
import cryptosky.me.dlq.DlqService;
|
||||
import cryptosky.me.exceptions.NoBodyOrStringException;
|
||||
import cryptosky.me.sentiment.models.entities.SentimentModel;
|
||||
import cryptosky.me.sentiment.service.SentimentService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jms.annotation.JmsListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
||||
import static cryptosky.me.helpers.Utils.getCorrelationId;
|
||||
import static cryptosky.me.helpers.Utils.getSyncId;
|
||||
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
|
||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
||||
import static java.lang.String.format;
|
||||
|
||||
@Component
|
||||
public class TweetConsumer {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(TweetConsumer.class);
|
||||
private final ObjectMapper objectMapper;
|
||||
private final SentimentService sentimentService;
|
||||
private final DlqService dlqService;
|
||||
private final String dlqName;
|
||||
private final String dlqGateway;
|
||||
|
||||
private CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@Autowired
|
||||
public TweetConsumer(
|
||||
ObjectMapper objectMapper,
|
||||
SentimentService sentimentService,
|
||||
DlqService dlqService,
|
||||
@Value("${destinations.tweet.tweetDlq}") String dlqName,
|
||||
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
|
||||
) {
|
||||
this.objectMapper = objectMapper;
|
||||
this.sentimentService = sentimentService;
|
||||
this.dlqService = dlqService;
|
||||
this.dlqName = dlqName;
|
||||
this.dlqGateway = dlqGateway;
|
||||
}
|
||||
|
||||
@JmsListener(destination = "${destinations.tweet.tweetSave}")
|
||||
public void receive(TextMessage message) throws JMSException {
|
||||
String correlationId = getCorrelationId(message);
|
||||
String syncId = getSyncId(message);
|
||||
|
||||
setCorrelationId(correlationId);
|
||||
setSyncId(syncId);
|
||||
|
||||
logger.info("Received Message: " + message.getBody(String.class));
|
||||
|
||||
try {
|
||||
|
||||
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
|
||||
throw new NoBodyOrStringException();
|
||||
}
|
||||
|
||||
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
|
||||
|
||||
SentimentModel sentimentModel;
|
||||
|
||||
try {
|
||||
JsonNode messageJson = objectMapper.readTree(message.getText());
|
||||
sentimentModel = objectMapper.readValue(messageJson.traverse(), SentimentModel.class);
|
||||
|
||||
logger.info("Message with syncId of [{}] is for [{}]", syncId, sentimentModel);
|
||||
|
||||
sentimentService.createRecord(sentimentModel, correlationId, message);
|
||||
|
||||
latch.countDown();
|
||||
} catch (IOException e) {
|
||||
logger.info(format(
|
||||
"Message [%s] for sentimentModel [%s] was not synced due to a readTree exception on getting the text",
|
||||
syncId,
|
||||
correlationId
|
||||
), e);
|
||||
latch.countDown();
|
||||
throw e;
|
||||
}
|
||||
} catch (NoBodyOrStringException e) {
|
||||
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
|
||||
CONSUME_SENTIMENT_EVENT, correlationId);
|
||||
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
latch.countDown();
|
||||
} catch (JMSException jmsException) {
|
||||
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_SENTIMENT_EVENT, jmsException,
|
||||
correlationId);
|
||||
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqGateway,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
latch.countDown();
|
||||
} catch (Exception e) {
|
||||
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_SENTIMENT_EVENT, correlationId);
|
||||
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,25 +0,0 @@
|
||||
package cryptosky.me.dlq;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
public class DlqInfo {
|
||||
private String messageId;
|
||||
private String dlqName;
|
||||
private String dlqReason;
|
||||
private String redeliveryCount;
|
||||
private String correlationId;
|
||||
|
||||
public DlqInfo(String messageId, String dlqName, String dlqReason, String redeliveryCount, String correlationId) {
|
||||
this.messageId = messageId;
|
||||
this.dlqName = dlqName;
|
||||
this.dlqReason = dlqReason;
|
||||
this.redeliveryCount = redeliveryCount;
|
||||
this.correlationId = correlationId;
|
||||
}
|
||||
}
|
||||
@ -1,68 +0,0 @@
|
||||
package cryptosky.me.dlq;
|
||||
|
||||
import cryptosky.me.dlq.DlqInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.Collections;
|
||||
|
||||
import static java.lang.String.format;
|
||||
|
||||
@Component
|
||||
public class DlqService {
|
||||
private final JmsTemplate jmsTemplate;
|
||||
private final Logger logger;
|
||||
|
||||
public DlqService(JmsTemplate jmsTemplate, Logger logger) {
|
||||
this.jmsTemplate = jmsTemplate;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public void sendToDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
|
||||
toDlq(message, messageType, dlqInfo, exception, messageTransformer);
|
||||
}
|
||||
|
||||
private void toDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
|
||||
logger.error(format("DLQing message [%s] for customer [%s] because of an exception", dlqInfo.getMessageId(), dlqInfo.getCorrelationId()), exception);
|
||||
logDlqInfo(dlqInfo, messageType);
|
||||
|
||||
jmsTemplate.send(dlqInfo.getDlqName(), session -> {
|
||||
TextMessage textMessage = session.createTextMessage(message.getText());
|
||||
textMessage.setStringProperty("Exception", exception.getClass().getName());
|
||||
textMessage.setStringProperty("ExceptionMessage", exception.getMessage());
|
||||
textMessage.setJMSCorrelationID(message.getJMSCorrelationID());
|
||||
|
||||
//noinspection unchecked
|
||||
Collections.list(message.getPropertyNames()).forEach((Object prop) -> {
|
||||
try {
|
||||
textMessage.setObjectProperty(prop.toString(), message.getObjectProperty(prop.toString()));
|
||||
} catch (JMSException e) {
|
||||
logger.warn(format("Unable to set header property [%s] on DLQ'd message as an exception was thrown", prop), e);
|
||||
}
|
||||
});
|
||||
|
||||
return messageTransformer.transform(textMessage);
|
||||
});
|
||||
}
|
||||
|
||||
private void logDlqInfo(DlqInfo dlqInfo, String messageType) {
|
||||
String message = format(
|
||||
// Don't change this message, it is used to create monitoring dashboards!
|
||||
"Sending message to DLQ - %s for %s to DLQ %s for event %s because of %s",
|
||||
dlqInfo.getMessageId(),
|
||||
dlqInfo.getCorrelationId(),
|
||||
dlqInfo.getDlqName(),
|
||||
messageType,
|
||||
dlqInfo.getDlqReason()
|
||||
);
|
||||
|
||||
logger.info(message);
|
||||
}
|
||||
|
||||
public interface MessageTransformer {
|
||||
TextMessage transform(TextMessage textMessage) throws JMSException;
|
||||
}
|
||||
}
|
||||
@ -1,9 +0,0 @@
|
||||
package cryptosky.me.exceptions;
|
||||
|
||||
public class DatabaseViolationException extends RuntimeException {
|
||||
public DatabaseViolationException() {}
|
||||
|
||||
public DatabaseViolationException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
@ -1,34 +0,0 @@
|
||||
package cryptosky.me.exceptions;
|
||||
|
||||
import cryptosky.me.consumers.PriceConsumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.ExceptionHandler;
|
||||
|
||||
import static org.springframework.http.HttpStatus.CONFLICT;
|
||||
import static org.springframework.http.HttpStatus.NOT_FOUND;
|
||||
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
|
||||
import static cryptosky.me.exceptions.MessageCodes.*;
|
||||
|
||||
public class GatewayExceptionHandler {
|
||||
private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
|
||||
|
||||
@ExceptionHandler(value = NotSupportedCurrencyTypeException.class)
|
||||
public ResponseEntity<MessageCodes> handleNotSupportedCurrencyTypeException(NotSupportedCurrencyTypeException e) {
|
||||
logger.error(ResponseEntity.status(NOT_FOUND).body(E01).toString());
|
||||
return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E01);
|
||||
}
|
||||
|
||||
@ExceptionHandler(value = DatabaseViolationException.class)
|
||||
public ResponseEntity<MessageCodes> handleDatabaseViolationException(DatabaseViolationException e) {
|
||||
logger.error(ResponseEntity.status(CONFLICT).body(E02).toString());
|
||||
return ResponseEntity.status(CONFLICT).contentType(APPLICATION_JSON_UTF8).body(E02);
|
||||
}
|
||||
|
||||
@ExceptionHandler(value = NoBodyOrStringException.class)
|
||||
public ResponseEntity<MessageCodes> handleNoBodyOrStringException(NoBodyOrStringException e) {
|
||||
logger.error(ResponseEntity.status(NOT_FOUND).body(E03).toString());
|
||||
return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E03);
|
||||
}
|
||||
}
|
||||
@ -1,35 +0,0 @@
|
||||
package cryptosky.me.exceptions;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
@JsonFormat(shape = JsonFormat.Shape.OBJECT)
|
||||
public enum MessageCodes {
|
||||
|
||||
E01("01", "Not supported on Services"),
|
||||
E02("02", "Database Violation - Key Pair or other Constraint Violated"),
|
||||
E03("03", "No body specified or not String format");
|
||||
|
||||
private final String code;
|
||||
private final String message;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
MessageCodes(String code, String message) {
|
||||
this.code = code;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public String getCode() {
|
||||
return "GATEWAYSVC-" + code;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public String toJson() throws JsonProcessingException {
|
||||
return objectMapper.writeValueAsString(this);
|
||||
}
|
||||
}
|
||||
@ -1,9 +0,0 @@
|
||||
package cryptosky.me.exceptions;
|
||||
|
||||
public class NoBodyOrStringException extends RuntimeException {
|
||||
public NoBodyOrStringException() {}
|
||||
|
||||
public NoBodyOrStringException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
@ -1,9 +0,0 @@
|
||||
package cryptosky.me.exceptions;
|
||||
|
||||
public class NotSupportedCurrencyTypeException extends RuntimeException {
|
||||
public NotSupportedCurrencyTypeException() {}
|
||||
|
||||
public NotSupportedCurrencyTypeException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package cryptosky.me;
|
||||
package cryptosky.me.graphql;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
@ -1,4 +1,4 @@
|
||||
package cryptosky.me.pricing.models.entities;
|
||||
package cryptosky.me.graphql.pricing.models.entities;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
@ -1,4 +1,4 @@
|
||||
package cryptosky.me.pricing.models.entities;
|
||||
package cryptosky.me.graphql.pricing.models.entities;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
@ -13,11 +13,9 @@ public class CryptoPriceModel {
|
||||
|
||||
@Id
|
||||
@Column(name = "ID", nullable = false)
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
private int id;
|
||||
|
||||
@Column(name = "syncId", nullable = false)
|
||||
private String syncId;
|
||||
|
||||
@Column(name = "timestamp", nullable = false)
|
||||
private LocalDateTime timestamp;
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package cryptosky.me.pricing.models.repositories;
|
||||
package cryptosky.me.graphql.pricing.models.repositories;
|
||||
|
||||
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
||||
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
package cryptosky.me.graphql.pricing.mutations;
|
||||
|
||||
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
|
||||
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import cryptosky.me.graphql.pricing.service.BtcPriceService;
|
||||
|
||||
@Component
|
||||
public class BtcPriceMutation implements GraphQLMutationResolver {
|
||||
|
||||
@Autowired
|
||||
private BtcPriceService btcPriceService;
|
||||
|
||||
public BtcPriceModel createBtc(final String createdDate, final String type,
|
||||
final float av_price,
|
||||
final float h_price,
|
||||
final float l_price,
|
||||
final float o_price,
|
||||
final float c_price,
|
||||
final float volume ) {
|
||||
return this.btcPriceService.createBtc(createdDate, type, av_price, h_price, l_price, o_price, c_price, volume);
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,8 +1,8 @@
|
||||
package cryptosky.me.pricing.graphql.queries;
|
||||
package cryptosky.me.graphql.pricing.queries;
|
||||
|
||||
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
|
||||
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
||||
import cryptosky.me.pricing.service.PriceService;
|
||||
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
|
||||
import cryptosky.me.graphql.pricing.service.BtcPriceService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -13,25 +13,25 @@ import java.util.Optional;
|
||||
public class BtcPriceQuery implements GraphQLQueryResolver {
|
||||
|
||||
@Autowired
|
||||
private PriceService priceService;
|
||||
private BtcPriceService btcPriceService;
|
||||
|
||||
public List<BtcPriceModel> getAllPrices(final int count ) {
|
||||
return this.priceService.getAllPrices(count);
|
||||
public List<BtcPriceModel> getAllPrices( final int count ) {
|
||||
return this.btcPriceService.getAllPrices(count);
|
||||
}
|
||||
|
||||
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
|
||||
return this.priceService.getPricesBetweenCounts(startCount, endCount);
|
||||
return this.btcPriceService.getPricesBetweenCounts(startCount, endCount);
|
||||
}
|
||||
|
||||
public Optional<BtcPriceModel> getLatest() {
|
||||
return this.priceService.getLatest();
|
||||
return this.btcPriceService.getLatest();
|
||||
}
|
||||
|
||||
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
|
||||
return this.priceService.getPriceForCreatedDate(createdDate);
|
||||
return this.btcPriceService.getPriceForCreatedDate(createdDate);
|
||||
}
|
||||
|
||||
public List<BtcPriceModel> getPriceBetweenDates(final String startDate, final String endDate ) {
|
||||
return this.priceService.getPriceBetweenDates(startDate, endDate);
|
||||
return this.btcPriceService.getPriceBetweenDates(startDate, endDate);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,128 @@
|
||||
package cryptosky.me.graphql.pricing.service;
|
||||
|
||||
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
|
||||
import cryptosky.me.graphql.pricing.models.repositories.BtcPriceRepository;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import static cryptosky.me.helpers.Utils.format;
|
||||
|
||||
@Service
|
||||
public class BtcPriceService {
|
||||
|
||||
private final BtcPriceRepository btcPriceRepository;
|
||||
|
||||
public BtcPriceService(final BtcPriceRepository btcPriceRepository) {
|
||||
this.btcPriceRepository = btcPriceRepository;
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public BtcPriceModel createBtc(final String createdDate, final String type,
|
||||
final float av_price,
|
||||
final float h_price,
|
||||
final float l_price,
|
||||
final float o_price,
|
||||
final float c_price,
|
||||
final float volume ) {
|
||||
|
||||
final BtcPriceModel btcPrice = new BtcPriceModel();
|
||||
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
|
||||
btcPrice.setType(type);
|
||||
btcPrice.setAverage_price(av_price);
|
||||
btcPrice.setHigh_price(h_price);
|
||||
btcPrice.setLow_price(l_price);
|
||||
btcPrice.setOpen_price(o_price);
|
||||
btcPrice.setClose_price(c_price);
|
||||
btcPrice.setVolume(volume);
|
||||
|
||||
return this.btcPriceRepository.save(btcPrice);
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcPriceModel> getAllPrices( final int count ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.limit(count)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.skip(startCount)
|
||||
.limit(endCount - startCount)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<BtcPriceModel> getLatest() {
|
||||
return this.btcPriceRepository.findAll().stream().findFirst();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public List<BtcPriceModel> getAllLatest() {
|
||||
// return this.btcPriceRepository.findAll().stream()
|
||||
// .filter(distinctByKey(CryptoPriceModel::getType))
|
||||
// .collect(Collectors.toList());
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
|
||||
// return this.btcPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .limit(count)
|
||||
// .collect(Collectors.toList());
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public List<CryptoPriceModel> getAllByType(final String type ) {
|
||||
// return this.btcPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .collect(Collectors.toList());
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
|
||||
// return this.cryptoPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .limit(1)
|
||||
// .findFirst();
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
|
||||
// return this.cryptoPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
|
||||
// .findFirst();
|
||||
// }
|
||||
//
|
||||
// @Transactional(readOnly = true)
|
||||
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
|
||||
// return this.cryptoPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
|
||||
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
|
||||
// .findFirst();
|
||||
// }
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package cryptosky.me.sentiment.models.entities;
|
||||
package cryptosky.me.graphql.tweets.models.entities;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
@ -9,7 +9,7 @@ import javax.persistence.Table;
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
@Entity
|
||||
@Table(name = "btc_sentiment")
|
||||
public class BtcSentimentModel extends SentimentModel {
|
||||
@Table(name = "btc_tweet")
|
||||
public class BtcTweetModel extends TweetModel {
|
||||
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
package cryptosky.me.sentiment.models.entities;
|
||||
package cryptosky.me.graphql.tweets.models.entities;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@ -10,17 +11,21 @@ import javax.persistence.*;
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
@MappedSuperclass
|
||||
public class SentimentModel {
|
||||
public class TweetModel {
|
||||
|
||||
@Id
|
||||
@Column(name = "ID", nullable = false)
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
private int id;
|
||||
|
||||
@Column(name = "timestamp",nullable = false)
|
||||
private String timestamp;
|
||||
|
||||
@Column(name = "syncId", nullable = false)
|
||||
private String syncId;
|
||||
@Column(name = "raw_tweet", nullable = false)
|
||||
private String rawTweet;
|
||||
|
||||
@Column(name = "sentiment")
|
||||
private float sentimentScore;
|
||||
|
||||
@Column(name = "pos")
|
||||
private float positiveScore;
|
||||
@ -34,7 +39,4 @@ public class SentimentModel {
|
||||
@Column(name = "compound", nullable = false)
|
||||
private float compoundScore;
|
||||
|
||||
@Column(name = "type")
|
||||
private String type;
|
||||
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
package cryptosky.me.graphql.tweets.models.repositories;
|
||||
|
||||
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface BtcTweetRepository extends JpaRepository<BtcTweetModel, Integer> {
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package cryptosky.me.graphql.tweets.mutations;
|
||||
|
||||
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
|
||||
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
|
||||
import cryptosky.me.graphql.tweets.service.BtcTweetService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class BtcTweetMutation implements GraphQLMutationResolver {
|
||||
|
||||
@Autowired
|
||||
BtcTweetService btcTweetService;
|
||||
|
||||
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
|
||||
final float positiveScore, final float neutralScore, final float negativeScore,
|
||||
final float compoundScore ) {
|
||||
return this.btcTweetService.createTweet( createdDate, rawTweet, sentimentScore, positiveScore, neutralScore,
|
||||
negativeScore,compoundScore );
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
package cryptosky.me.graphql.tweets.queries;
|
||||
|
||||
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
|
||||
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
|
||||
import cryptosky.me.graphql.tweets.service.BtcTweetService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Component
|
||||
public class BtcTweetQuery implements GraphQLQueryResolver {
|
||||
|
||||
@Autowired
|
||||
private BtcTweetService btcTweetService;
|
||||
|
||||
public Optional<BtcTweetModel> getCurrentTweet() {
|
||||
return this.btcTweetService.getCurrentTweet();
|
||||
}
|
||||
|
||||
public List<BtcTweetModel> getAllTweets( final int count ) {
|
||||
return this.btcTweetService.getAllTweets(count);
|
||||
}
|
||||
|
||||
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
|
||||
return this.btcTweetService.getTweetsForDay(startDate, endDate);
|
||||
}
|
||||
|
||||
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
|
||||
return this.btcTweetService.getTweetsForPeriod( startDate, endDate );
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,101 @@
|
||||
package cryptosky.me.graphql.tweets.service;
|
||||
|
||||
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
|
||||
import cryptosky.me.graphql.tweets.models.repositories.BtcTweetRepository;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.DateTimeException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static cryptosky.me.helpers.Utils.format;
|
||||
|
||||
@Service
|
||||
public class BtcTweetService {
|
||||
|
||||
private final BtcTweetRepository btcTweetRepository;
|
||||
|
||||
public BtcTweetService(final BtcTweetRepository btcTweetRepository) {
|
||||
this.btcTweetRepository = btcTweetRepository;
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
|
||||
final float positiveScore, final float neutralScore, final float negativeScore,
|
||||
final float compoundScore ) {
|
||||
|
||||
final BtcTweetModel tweetModel = new BtcTweetModel();
|
||||
tweetModel.setTimestamp(format(createdDate).toString());
|
||||
tweetModel.setRawTweet(rawTweet);
|
||||
tweetModel.setSentimentScore(sentimentScore);
|
||||
tweetModel.setPositiveScore(positiveScore);
|
||||
tweetModel.setNegativeScore(negativeScore);
|
||||
tweetModel.setNeutralScore(neutralScore);
|
||||
tweetModel.setCompoundScore(compoundScore);
|
||||
|
||||
return this.btcTweetRepository.save(tweetModel);
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<BtcTweetModel> getCurrentTweet() {
|
||||
return this.btcTweetRepository.findAll().stream().findFirst();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcTweetModel> getAllTweets( final int count ) {
|
||||
return this.btcTweetRepository.findAll().stream()
|
||||
.limit(count)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
|
||||
LocalDateTime r_start = format(startDate);
|
||||
LocalDateTime r_end = format(endDate);
|
||||
|
||||
r_start = r_start.toLocalDate().atStartOfDay();
|
||||
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
||||
|
||||
if ( r_end.isAfter(r_start) ) {
|
||||
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
|
||||
LocalDateTime finalR_end = r_end;
|
||||
LocalDateTime finalR_start = r_start;
|
||||
return this.btcTweetRepository.findAll().stream()
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
// Logger
|
||||
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
|
||||
}
|
||||
} else {
|
||||
// Logger
|
||||
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
|
||||
LocalDateTime r_start = format(startDate);
|
||||
LocalDateTime r_end = format(endDate);
|
||||
|
||||
r_start = r_start.toLocalDate().atStartOfDay();
|
||||
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
||||
|
||||
if ( r_end.isAfter(r_start) ) {
|
||||
LocalDateTime finalR_end = r_end;
|
||||
LocalDateTime finalR_start = r_start;
|
||||
return this.btcTweetRepository.findAll().stream()
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
// Logger
|
||||
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,29 +0,0 @@
|
||||
package cryptosky.me.helpers;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class StringTrimModule extends SimpleModule {
|
||||
public StringTrimModule() {
|
||||
addDeserializer(String.class, new StdScalarDeserializer<String>(String.class) {
|
||||
@Override
|
||||
public String deserialize(JsonParser jsonParser, DeserializationContext ctx) throws IOException {
|
||||
if (!jsonParser.getCurrentToken().equals(JsonToken.VALUE_STRING)) {
|
||||
throw new JsonMappingException(
|
||||
jsonParser,
|
||||
jsonParser.getCurrentName() + " expected to be a String"
|
||||
);
|
||||
}
|
||||
|
||||
return StringUtils.trim(jsonParser.getValueAsString());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -9,45 +9,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
import static java.util.UUID.randomUUID;
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
|
||||
public class Utils {
|
||||
|
||||
private static final String CORRELATION_ID_KEY = "X-CRYPTO-Correlation-ID";
|
||||
private static final String SYNC_ID_KEY = "X-CRYPTO-Sync-ID";
|
||||
|
||||
public static String getCorrelationId(Message message) {
|
||||
String correlationId = null;
|
||||
try {
|
||||
correlationId = message.getJMSCorrelationID();
|
||||
if (isBlank(correlationId)) {
|
||||
correlationId = message.getStringProperty(CORRELATION_ID_KEY);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// NOOP
|
||||
}
|
||||
if (isBlank(correlationId)) {
|
||||
correlationId = randomUUID().toString();
|
||||
}
|
||||
return correlationId;
|
||||
}
|
||||
|
||||
public static String getSyncId(Message message) {
|
||||
String syncId = null;
|
||||
try {
|
||||
syncId = message.getStringProperty(SYNC_ID_KEY);
|
||||
} catch (Throwable e) {
|
||||
// NOOP
|
||||
}
|
||||
if (isBlank(syncId)) {
|
||||
syncId = randomUUID().toString();
|
||||
}
|
||||
return syncId;
|
||||
}
|
||||
|
||||
// Utility Function to be able to get one of all types
|
||||
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
|
||||
Map<Object, Boolean> seen = new ConcurrentHashMap<>();
|
||||
|
||||
@ -1,22 +0,0 @@
|
||||
package cryptosky.me.logging;
|
||||
|
||||
import org.slf4j.MDC;
|
||||
|
||||
public class CorrelationInfo {
|
||||
|
||||
public static final String CORRELATION_ID = "X-CRYPTO-Correlation-ID";
|
||||
public static final String SYNC_ID = "X-CRYPTO-Sync-ID";
|
||||
|
||||
public static void setCorrelationId(String correlationId) {
|
||||
MDC.put(CORRELATION_ID, correlationId);
|
||||
}
|
||||
|
||||
public static void setSyncId(String syncId) {
|
||||
MDC.put(SYNC_ID, syncId);
|
||||
}
|
||||
|
||||
public static void clear() {
|
||||
MDC.remove(CORRELATION_ID);
|
||||
MDC.remove(SYNC_ID);
|
||||
}
|
||||
}
|
||||
@ -1,215 +0,0 @@
|
||||
package cryptosky.me.pricing.service;
|
||||
|
||||
import cryptosky.me.dlq.DlqInfo;
|
||||
import cryptosky.me.dlq.DlqService;
|
||||
import cryptosky.me.exceptions.MessageCodes;
|
||||
|
||||
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
||||
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
|
||||
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
|
||||
|
||||
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.dao.DataIntegrityViolationException;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
||||
import static cryptosky.me.SupportedCurrencies.*;
|
||||
|
||||
import static cryptosky.me.helpers.Utils.format;
|
||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
||||
import static java.util.UUID.randomUUID;
|
||||
|
||||
@Service
|
||||
public class PriceService {
|
||||
|
||||
private final BtcPriceRepository btcPriceRepository;
|
||||
private final DlqService dlqService;
|
||||
private final String dlqName;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(PriceService.class);
|
||||
|
||||
@Autowired
|
||||
public PriceService(
|
||||
final BtcPriceRepository btcPriceRepository,
|
||||
DlqService dlqService,
|
||||
@Value("${destinations.pricing.priceDlq}") String dlqName
|
||||
) {
|
||||
this.btcPriceRepository = btcPriceRepository;
|
||||
this.dlqService = dlqService;
|
||||
this.dlqName = dlqName;
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, TextMessage message) {
|
||||
String syncId = randomUUID().toString();
|
||||
setSyncId(syncId);
|
||||
try {
|
||||
|
||||
switch (cryptoPriceModel.getType()) {
|
||||
case BTC:
|
||||
BtcPriceModel priceModel = new BtcPriceModel();
|
||||
priceModel.setId(getLatestId() + 1);
|
||||
priceModel.setTimestamp(cryptoPriceModel.getTimestamp());
|
||||
priceModel.setSyncId(syncId);
|
||||
priceModel.setType(cryptoPriceModel.getType());
|
||||
priceModel.setAverage_price(cryptoPriceModel.getAverage_price());
|
||||
priceModel.setHigh_price(cryptoPriceModel.getHigh_price());
|
||||
priceModel.setLow_price(cryptoPriceModel.getLow_price());
|
||||
priceModel.setOpen_price(cryptoPriceModel.getOpen_price());
|
||||
priceModel.setClose_price(cryptoPriceModel.getClose_price());
|
||||
priceModel.setVolume(cryptoPriceModel.getVolume());
|
||||
|
||||
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", cryptoPriceModel.getType(), correlationId, syncId);
|
||||
this.btcPriceRepository.save(priceModel);
|
||||
return;
|
||||
default:
|
||||
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), cryptoPriceModel.getType(), MessageCodes.E01.getMessage());
|
||||
throw new NotSupportedCurrencyTypeException();
|
||||
}
|
||||
} catch (DataIntegrityViolationException e) {
|
||||
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), cryptoPriceModel.getType(), MessageCodes.E02.getMessage());
|
||||
|
||||
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
||||
|
||||
try {
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
logger.info("Sending [{}] to the DLQ", syncId);
|
||||
} catch (JMSException jmsException) {
|
||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
|
||||
|
||||
try {
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
logger.info("Sending [{}] to the DLQ", syncId);
|
||||
} catch (JMSException jmsException) {
|
||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcPriceModel> getAllPrices( final int count ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.limit(count)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.skip(startCount)
|
||||
.limit(endCount - startCount)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<BtcPriceModel> getLatest() {
|
||||
return this.btcPriceRepository.findAll().stream().findFirst();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public int getLatestId() {
|
||||
List<BtcPriceModel> records = new ArrayList<>(this.btcPriceRepository.findAll());
|
||||
BtcPriceModel latest = records.stream().skip(records.size() - 1).findFirst().get();
|
||||
return latest.getId();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
|
||||
return this.btcPriceRepository.findAll().stream()
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public List<BtcPriceModel> getAllLatest() {
|
||||
// return this.btcPriceRepository.findAll().stream()
|
||||
// .filter(distinctByKey(CryptoPriceModel::getType))
|
||||
// .collect(Collectors.toList());
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
|
||||
// return this.btcPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .limit(count)
|
||||
// .collect(Collectors.toList());
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public List<CryptoPriceModel> getAllByType(final String type ) {
|
||||
// return this.btcPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .collect(Collectors.toList());
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
|
||||
// return this.cryptoPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .limit(1)
|
||||
// .findFirst();
|
||||
// }
|
||||
|
||||
// @Transactional(readOnly = true)
|
||||
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
|
||||
// return this.cryptoPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
|
||||
// .findFirst();
|
||||
// }
|
||||
//
|
||||
// @Transactional(readOnly = true)
|
||||
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
|
||||
// return this.cryptoPriceRepository.findAll().stream()
|
||||
// .filter(typeList -> typeList.getType().equals(type))
|
||||
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
|
||||
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
|
||||
// .findFirst();
|
||||
// }
|
||||
}
|
||||
@ -1,9 +0,0 @@
|
||||
package cryptosky.me.sentiment.models.repositories;
|
||||
|
||||
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface BtcSentimentRepository extends JpaRepository<BtcSentimentModel, Integer> {
|
||||
}
|
||||
@ -1,207 +0,0 @@
|
||||
package cryptosky.me.sentiment.service;
|
||||
|
||||
import cryptosky.me.dlq.DlqInfo;
|
||||
import cryptosky.me.dlq.DlqService;
|
||||
import cryptosky.me.exceptions.MessageCodes;
|
||||
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
|
||||
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
||||
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
|
||||
import cryptosky.me.sentiment.models.entities.SentimentModel;
|
||||
import cryptosky.me.sentiment.models.repositories.BtcSentimentRepository;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.dao.DataIntegrityViolationException;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.TextMessage;
|
||||
import java.time.DateTimeException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static cryptosky.me.ArtemisSyncMessaging.*;
|
||||
import static cryptosky.me.SupportedCurrencies.BTC;
|
||||
import static cryptosky.me.helpers.Utils.format;
|
||||
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
||||
import static java.util.UUID.randomUUID;
|
||||
|
||||
@Service
|
||||
public class SentimentService {
|
||||
|
||||
private final BtcSentimentRepository btcSentimentRepository;
|
||||
private final DlqService dlqService;
|
||||
private final String dlqName;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(SentimentService.class);
|
||||
|
||||
@Autowired
|
||||
public SentimentService(
|
||||
final BtcSentimentRepository btcSentimentRepository,
|
||||
DlqService dlqService,
|
||||
@Value("${destinations.pricing.priceDlq}") String dlqName
|
||||
) {
|
||||
this.btcSentimentRepository = btcSentimentRepository;
|
||||
this.dlqService = dlqService;
|
||||
this.dlqName = dlqName;
|
||||
}
|
||||
|
||||
// @Transactional
|
||||
// public BtcSentimentModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
|
||||
// final float positiveScore, final float neutralScore, final float negativeScore,
|
||||
// final float compoundScore ) {
|
||||
//
|
||||
// final BtcSentimentModel tweetModel = new BtcSentimentModel();
|
||||
// tweetModel.setTimestamp(format(createdDate).toString());
|
||||
// tweetModel.setRawTweet(rawTweet);
|
||||
// tweetModel.setSentimentScore(sentimentScore);
|
||||
// tweetModel.setPositiveScore(positiveScore);
|
||||
// tweetModel.setNegativeScore(negativeScore);
|
||||
// tweetModel.setNeutralScore(neutralScore);
|
||||
// tweetModel.setCompoundScore(compoundScore);
|
||||
|
||||
// return this.btcSentimentRepository.save(tweetModel);
|
||||
// }
|
||||
|
||||
@Transactional
|
||||
public void createRecord(SentimentModel sentimentModel, String correlationId, TextMessage message) {
|
||||
String syncId = randomUUID().toString();
|
||||
setSyncId(syncId);
|
||||
|
||||
try {
|
||||
|
||||
switch (sentimentModel.getType()) {
|
||||
case BTC:
|
||||
BtcSentimentModel btcSentimentModel = new BtcSentimentModel();
|
||||
btcSentimentModel.setId(getLatestId() + 1);
|
||||
btcSentimentModel.setTimestamp(sentimentModel.getTimestamp());
|
||||
btcSentimentModel.setSyncId(syncId);
|
||||
btcSentimentModel.setPositiveScore(sentimentModel.getPositiveScore());
|
||||
btcSentimentModel.setNeutralScore(sentimentModel.getNeutralScore());
|
||||
btcSentimentModel.setNegativeScore(sentimentModel.getNegativeScore());
|
||||
btcSentimentModel.setCompoundScore(sentimentModel.getCompoundScore());
|
||||
btcSentimentModel.setType(sentimentModel.getType());
|
||||
|
||||
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", sentimentModel.getType(), correlationId, syncId);
|
||||
this.btcSentimentRepository.save(btcSentimentModel);
|
||||
return;
|
||||
default:
|
||||
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), sentimentModel.getType(), MessageCodes.E01.getMessage());
|
||||
throw new NotSupportedCurrencyTypeException();
|
||||
}
|
||||
} catch (DataIntegrityViolationException e) {
|
||||
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), sentimentModel.getType(), MessageCodes.E02.getMessage());
|
||||
|
||||
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
||||
|
||||
try {
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
logger.info("Sending [{}] to the DLQ", syncId);
|
||||
} catch (JMSException jmsException) {
|
||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
|
||||
|
||||
try {
|
||||
DlqInfo dlqInfo = new DlqInfo(
|
||||
message.getJMSMessageID(),
|
||||
dlqName,
|
||||
dlqReason,
|
||||
"0",
|
||||
correlationId
|
||||
);
|
||||
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
||||
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
||||
return textMessage;
|
||||
});
|
||||
logger.info("Sending [{}] to the DLQ", syncId);
|
||||
} catch (JMSException jmsException) {
|
||||
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public int getLatestId() {
|
||||
List<BtcSentimentModel> records = new ArrayList<>(this.btcSentimentRepository.findAll());
|
||||
BtcSentimentModel latest = records.stream().skip(records.size() - 1).findFirst().get();
|
||||
return latest.getId();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<BtcSentimentModel> getCurrentTweet() {
|
||||
return this.btcSentimentRepository.findAll().stream().findFirst();
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcSentimentModel> getAllTweets(final int count ) {
|
||||
return this.btcSentimentRepository.findAll().stream()
|
||||
.limit(count)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcSentimentModel> getTweetsForDay(final String startDate, final String endDate ) {
|
||||
LocalDateTime r_start = format(startDate);
|
||||
LocalDateTime r_end = format(endDate);
|
||||
|
||||
r_start = r_start.toLocalDate().atStartOfDay();
|
||||
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
||||
|
||||
if ( r_end.isAfter(r_start) ) {
|
||||
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
|
||||
LocalDateTime finalR_end = r_end;
|
||||
LocalDateTime finalR_start = r_start;
|
||||
return this.btcSentimentRepository.findAll().stream()
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
// Logger
|
||||
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
|
||||
}
|
||||
} else {
|
||||
// Logger
|
||||
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public List<BtcSentimentModel> getTweetsForPeriod(final String startDate, final String endDate ) {
|
||||
LocalDateTime r_start = format(startDate);
|
||||
LocalDateTime r_end = format(endDate);
|
||||
|
||||
r_start = r_start.toLocalDate().atStartOfDay();
|
||||
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
|
||||
|
||||
if ( r_end.isAfter(r_start) ) {
|
||||
LocalDateTime finalR_end = r_end;
|
||||
LocalDateTime finalR_start = r_start;
|
||||
return this.btcSentimentRepository.findAll().stream()
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
|
||||
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
// Logger
|
||||
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,15 +1,10 @@
|
||||
spring:
|
||||
application:
|
||||
name: @project.artifactId@
|
||||
artemis:
|
||||
broker-url: ${BROKER_URL:tcp://localhost:61616}
|
||||
user: ${BROKER_USER:admin}
|
||||
password: ${BROKER_PASSWORD:admin}
|
||||
|
||||
datasource:
|
||||
url: ${JDBC_URL:jdbc:postgresql://localhost:25060/db}
|
||||
username: ${JDBC_USERNAME:admin}
|
||||
password: ${JDBC_PASSWORD:admin}
|
||||
url: ${JDBC_URL}
|
||||
username: ${JDBC_USERNAME}
|
||||
password: ${JDBC_PASSWORD}
|
||||
driver-class-name: org.postgresql.Driver
|
||||
hikari:
|
||||
maximum-pool-size: 100
|
||||
@ -18,23 +13,12 @@ spring:
|
||||
jpa:
|
||||
hibernate:
|
||||
ddl-auto: validate
|
||||
show-sql: false
|
||||
show-sql: true
|
||||
properties:
|
||||
org.hibernate.envers.revision_field_name: revision_id
|
||||
org.hibernate.envers.revision_type_field_name: revision_type
|
||||
org.hibernate.jdbc.lob.non_contextual_creation: true
|
||||
format_sql: true
|
||||
database: postgresql
|
||||
|
||||
destinations:
|
||||
pricing:
|
||||
priceSave: PricingSave
|
||||
priceDlq: PricingSave.dlq
|
||||
tweet:
|
||||
tweetSave: TweetSave
|
||||
tweetDlq: TweetSave.dlq
|
||||
gateway:
|
||||
gatewayDlq: DBGateway.dlq
|
||||
|
||||
server:
|
||||
port: 9090
|
||||
@ -1,35 +1,40 @@
|
||||
type BtcPrice {
|
||||
id: ID!,
|
||||
id: ID!,
|
||||
timestamp: String!,
|
||||
type: String,
|
||||
average_price: Float!,
|
||||
high_price: Float,
|
||||
low_price: Float,
|
||||
open_price: Float,
|
||||
close_price: Float,
|
||||
volume: Float
|
||||
type: String,
|
||||
average_price: Float!,
|
||||
high_price: Float,
|
||||
low_price: Float,
|
||||
open_price: Float,
|
||||
close_price: Float,
|
||||
volume: Float
|
||||
}
|
||||
|
||||
type sentimentModel {
|
||||
id: ID!,
|
||||
timestamp: String!,
|
||||
rawTweet: String,
|
||||
sentimentScore: Float!,
|
||||
positiveScore: Float,
|
||||
neutralScore: Float,
|
||||
negativeScore: Float,
|
||||
compoundScore: Float!
|
||||
type tweetModel {
|
||||
id: ID!,
|
||||
timestamp: String!,
|
||||
rawTweet: String,
|
||||
sentimentScore: Float!,
|
||||
positiveScore: Float,
|
||||
neutralScore: Float,
|
||||
negativeScore: Float,
|
||||
compoundScore: Float!
|
||||
}
|
||||
|
||||
type Query {
|
||||
allPrices(count: Int):[BtcPrice],
|
||||
pricesBetweenCounts(startCount: Int, endCount: Int):[BtcPrice]
|
||||
latest:BtcPrice,
|
||||
priceForCreatedDate(createdDate: String):BtcPrice,
|
||||
priceBetweenDates(startDate: String, endDate: String):[BtcPrice],
|
||||
################################################################
|
||||
currentTweet:sentimentModel,
|
||||
allTweets(count: Int):[sentimentModel],
|
||||
tweetsForDay(startDate: String, endDate: String):[sentimentModel],
|
||||
tweetsForPeriod(startDate: String, endDate: String):[sentimentModel]
|
||||
allPrices(count: Int):[BtcPrice],
|
||||
pricesBetweenCounts(startCount: Int, endCount: Int):[BtcPrice]
|
||||
latest:BtcPrice,
|
||||
priceForCreatedDate(createdDate: String):BtcPrice,
|
||||
priceBetweenDates(startDate: String, endDate: String):[BtcPrice],
|
||||
################################################################
|
||||
currentTweet:tweetModel,
|
||||
allTweets(count: Int):[tweetModel],
|
||||
tweetsForDay(startDate: String, endDate: String):[tweetModel],
|
||||
tweetsForPeriod(startDate: String, endDate: String):[tweetModel]
|
||||
}
|
||||
|
||||
type Mutation {
|
||||
createBtc(createdDate: String!, type: String!, average_price: Float!, high_price: Float, low_price: Float, open_price: Float, close_price: Float, volume: Float):BtcPrice
|
||||
createTweet(createdDate: String!, rawTweet: String!, sentimentScore: Float!, positiveScore: Float, neutralScore: Float, negativeScore: Float, compoundScore: Float!):tweetModel
|
||||
}
|
||||
@ -1,36 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<property name="LOGS" value="./logs" />
|
||||
|
||||
<appender name="logstash" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
|
||||
<fieldNames>
|
||||
<version>[ignore]</version>
|
||||
<levelValue>[ignore]</levelValue>
|
||||
</fieldNames>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<appender name="console"
|
||||
class="ch.qos.logback.core.ConsoleAppender">
|
||||
<layout class="ch.qos.logback.classic.PatternLayout">
|
||||
<Pattern>
|
||||
%black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg {%mdc}%n%throwable
|
||||
</Pattern>
|
||||
</layout>
|
||||
</appender>
|
||||
|
||||
<springProfile name="!local">
|
||||
<root level="INFO">
|
||||
<appender-ref ref="logstash"/>
|
||||
</root>
|
||||
</springProfile>
|
||||
|
||||
<springProfile name="local">
|
||||
<root level="INFO">
|
||||
<appender-ref ref="console"/>
|
||||
</root>
|
||||
</springProfile>
|
||||
|
||||
<jmxConfigurator/>
|
||||
</configuration>
|
||||
Loading…
x
Reference in New Issue
Block a user