Compare commits

..

No commits in common. "master" and "1.0.0-b7" have entirely different histories.

40 changed files with 430 additions and 1310 deletions

View File

@ -17,8 +17,6 @@ spec:
maxUnavailable: 0 maxUnavailable: 0
template: template:
metadata: metadata:
annotations:
linkerd.io/inject: enabled
labels: labels:
app: RESOURCE_NAME app: RESOURCE_NAME
spec: spec:
@ -53,21 +51,6 @@ spec:
secretKeyRef: secretKeyRef:
name: jdbc name: jdbc
key: jdbc.password key: jdbc.password
- name: BROKER_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: amq.url
- name: BROKER_USER
valueFrom:
secretKeyRef:
name: amq
key: amq.username
- name: BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: amq
key: amq.password
ports: ports:
- containerPort: 9090 - containerPort: 9090
name: RESOURCE_NAME name: RESOURCE_NAME
@ -76,8 +59,8 @@ spec:
path: /actuator/health path: /actuator/health
port: 9090 port: 9090
scheme: HTTP scheme: HTTP
initialDelaySeconds: 260 initialDelaySeconds: 180
periodSeconds: 60 periodSeconds: 30
timeoutSeconds: 2 timeoutSeconds: 2
successThreshold: 1 successThreshold: 1
failureThreshold: 3 failureThreshold: 3
@ -86,8 +69,8 @@ spec:
port: 9090 port: 9090
path: /actuator/info path: /actuator/info
scheme: HTTP scheme: HTTP
initialDelaySeconds: 280 initialDelaySeconds: 200
periodSeconds: 30 periodSeconds: 15
timeoutSeconds: 2 timeoutSeconds: 2
successThreshold: 1 successThreshold: 1
failureThreshold: 10 failureThreshold: 10
@ -95,15 +78,10 @@ spec:
resources: resources:
requests: requests:
cpu: 10m cpu: 10m
memory: 256Mi memory: 128Mi
limits: limits:
cpu: 100m cpu: 250m
memory: 512Mi memory: 256Mi
securityContext:
capabilities:
add:
- NET_ADMIN
- NET_RAW
restartPolicy: Always restartPolicy: Always
imagePullSecrets: imagePullSecrets:
- name: registry-cryptosky-image-registry - name: registry-cryptosky-image-registry

View File

@ -32,7 +32,7 @@ try {
timestamps { timestamps {
node ("${env.SLAVE_LABEL}") { node ("${env.SLAVE_LABEL}") {
stage('Initialise') { stage('Initialise') {
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]]) checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: 'https://github.com/andyjk15/db-gateway.git']]])
env.APPLICATION_VERSION = get_application_version() env.APPLICATION_VERSION = get_application_version()
@ -47,7 +47,7 @@ try {
) { ) {
sh "doctl auth init --access-token ${DOCTL_TOKEN}" sh "doctl auth init --access-token ${DOCTL_TOKEN}"
sh "doctl registry login" sh "doctl registry login"
sh "doctl kubernetes cluster kubeconfig save cryptosky-cluster" sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster"
} }
} }
@ -73,6 +73,10 @@ try {
env.APPLICATION_VERSION, env.APPLICATION_VERSION,
env.APPLICATION_LABEL) env.APPLICATION_LABEL)
}
stage('Tag Repository') {
withDockerServer([uri: "${env.DOCKER_REPOSITORY_TCP}"]) { withDockerServer([uri: "${env.DOCKER_REPOSITORY_TCP}"]) {
docker.build("${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}") docker.build("${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}")
docker.build("${env.APPLICATION_NAME}:latest") docker.build("${env.APPLICATION_NAME}:latest")
@ -83,9 +87,6 @@ try {
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}" sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}"
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest" sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest"
} }
}
stage('Tag Repository') {
withCredentials( withCredentials(
[usernamePassword( [usernamePassword(

View File

@ -5,4 +5,6 @@ APPLICATION_NAME=$1
kubectl apply -f configuration/kubernetes/deployment.yaml kubectl apply -f configuration/kubernetes/deployment.yaml
kubectl apply -f configuration/kubernetes/service.yaml kubectl apply -f configuration/kubernetes/service.yaml
kubectl get pods
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production

56
pom.xml
View File

@ -21,7 +21,6 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<logback.contrib.version>0.1.5</logback.contrib.version>
</properties> </properties>
<dependencies> <dependencies>
@ -34,12 +33,10 @@
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId> <artifactId>spring-boot-starter-websocket</artifactId>
<scope>provided</scope>
</dependency> </dependency>
<!-- Persistent Data --> <!-- Persistent Data -->
@ -48,6 +45,23 @@
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency> </dependency>
<!-- GraghQL -->
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-java-tools</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphiql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<!-- Local database --> <!-- Local database -->
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>
@ -67,30 +81,6 @@
<version>4.3.8.Final</version> <version>4.3.8.Final</version>
</dependency> </dependency>
<!-- GraghQL -->
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphql-java-tools</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>graphiql-spring-boot-starter</artifactId>
<version>5.0.2</version>
</dependency>
<!-- Artemis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
<scope>provided</scope>
</dependency>
<!--- Utils --> <!--- Utils -->
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
@ -98,18 +88,6 @@
<version>1.18.8</version> <version>1.18.8</version>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--- Logging -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.4</version>
</dependency>
<!-- Testing --> <!-- Testing -->
<dependency> <dependency>

View File

@ -1,19 +0,0 @@
package cryptosky.me;
public enum ArtemisSyncMessaging {
MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT"),
CONSUME_SENTIMENT_EVENT("CONSUME_SENTIMENT_EVENT");
private final String syncMessage;
ArtemisSyncMessaging(String syncMessage) {
this.syncMessage = syncMessage;
}
public String getMessage() {
return syncMessage;
}
}

View File

@ -1,7 +0,0 @@
package cryptosky.me;
public class SupportedCurrencies {
public static final String BTC = "btc_usd";
}

View File

@ -1,34 +0,0 @@
package cryptosky.me.configurations;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import cryptosky.me.helpers.StringTrimModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.MapperFeature.DEFAULT_VIEW_INCLUSION;
import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS;
@org.springframework.context.annotation.Configuration
public class Configuration {
private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
@Bean
@Primary
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new StringTrimModule())
.registerModule(new Jdk8Module())
.enable(SORT_PROPERTIES_ALPHABETICALLY)
.disable(WRITE_DATES_AS_TIMESTAMPS)
.disable(FAIL_ON_UNKNOWN_PROPERTIES)
.disable(DEFAULT_VIEW_INCLUSION);
}
}

View File

@ -1,41 +0,0 @@
package cryptosky.me.configurations;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
@Configuration
@EnableJms
public class ConsumerConfig {
@Value("${spring.artemis.broker-url}")
private String brokerUrl;
@Value("${spring.artemis.user}")
private String user;
@Value("${spring.artemis.password}")
private String pass;
@Bean
public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
activeMQConnectionFactory.setUser(user);
activeMQConnectionFactory.setPassword(pass);
return activeMQConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory
.setConnectionFactory(receiverActiveMQConnectionFactory());
factory.setConcurrency("3-10");
return factory;
}
}

View File

@ -1,28 +0,0 @@
package cryptosky.me.configurations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InjectionPoint;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.core.MethodParameter;
import cryptosky.me.Application;
import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_PROTOTYPE;
@Configuration
public class LoggerConfiguration {
@Bean
@Scope(SCOPE_PROTOTYPE)
public Logger logger(InjectionPoint injectionPoint) {
Class<?> loggerClass = Application.class;
MethodParameter methodParameter = injectionPoint.getMethodParameter();
if (methodParameter != null) {
loggerClass = methodParameter.getContainingClass();
}
return LoggerFactory.getLogger(loggerClass);
}
}

View File

@ -1,145 +0,0 @@
package cryptosky.me.consumers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.NoBodyOrStringException;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.service.PriceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.helpers.Utils.getCorrelationId;
import static cryptosky.me.helpers.Utils.getSyncId;
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.lang.String.format;
@Component
public class PriceConsumer {
private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
private final ObjectMapper objectMapper;
private final PriceService priceService;
private final DlqService dlqService;
private final String dlqName;
private final String dlqGateway;
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
public PriceConsumer(
ObjectMapper objectMapper,
PriceService priceService,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName,
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
) {
this.objectMapper = objectMapper;
this.priceService = priceService;
this.dlqService = dlqService;
this.dlqName = dlqName;
this.dlqGateway = dlqGateway;
}
@JmsListener(destination = "${destinations.pricing.priceSave}")
public void receive(TextMessage message) throws JMSException {
String correlationId = getCorrelationId(message);
String syncId = getSyncId(message);
setCorrelationId(correlationId);
setSyncId(syncId);
logger.info("Received Message: " + message.getBody(String.class) + " for syncId :: [{}]", syncId);
try {
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
throw new NoBodyOrStringException();
}
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
CryptoPriceModel cryptoPriceModel;
try {
JsonNode messageJson = objectMapper.readTree(message.getText());
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
logger.info("Message with syncId of [{}] is for [{}]", syncId, cryptoPriceModel);
priceService.createRecord(cryptoPriceModel, correlationId, message);
latch.countDown();
} catch (IOException e) {
logger.info(format(
"Message [%s] for pricingModel [%s] was not synced due to a readTree exception on getting the text",
syncId,
correlationId
), e);
latch.countDown();
throw e;
}
} catch (NoBodyOrStringException e) {
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
CONSUME_PRICE_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (JMSException jmsException) {
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_PRICE_EVENT, jmsException,
correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqGateway,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (Exception e) {
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
}
}
}

View File

@ -1,144 +0,0 @@
package cryptosky.me.consumers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.NoBodyOrStringException;
import cryptosky.me.sentiment.models.entities.SentimentModel;
import cryptosky.me.sentiment.service.SentimentService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.helpers.Utils.getCorrelationId;
import static cryptosky.me.helpers.Utils.getSyncId;
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.lang.String.format;
@Component
public class TweetConsumer {
private final Logger logger = LoggerFactory.getLogger(TweetConsumer.class);
private final ObjectMapper objectMapper;
private final SentimentService sentimentService;
private final DlqService dlqService;
private final String dlqName;
private final String dlqGateway;
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
public TweetConsumer(
ObjectMapper objectMapper,
SentimentService sentimentService,
DlqService dlqService,
@Value("${destinations.tweet.tweetDlq}") String dlqName,
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
) {
this.objectMapper = objectMapper;
this.sentimentService = sentimentService;
this.dlqService = dlqService;
this.dlqName = dlqName;
this.dlqGateway = dlqGateway;
}
@JmsListener(destination = "${destinations.tweet.tweetSave}")
public void receive(TextMessage message) throws JMSException {
String correlationId = getCorrelationId(message);
String syncId = getSyncId(message);
setCorrelationId(correlationId);
setSyncId(syncId);
logger.info("Received Message: " + message.getBody(String.class));
try {
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
throw new NoBodyOrStringException();
}
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
SentimentModel sentimentModel;
try {
JsonNode messageJson = objectMapper.readTree(message.getText());
sentimentModel = objectMapper.readValue(messageJson.traverse(), SentimentModel.class);
logger.info("Message with syncId of [{}] is for [{}]", syncId, sentimentModel);
sentimentService.createRecord(sentimentModel, correlationId, message);
latch.countDown();
} catch (IOException e) {
logger.info(format(
"Message [%s] for sentimentModel [%s] was not synced due to a readTree exception on getting the text",
syncId,
correlationId
), e);
latch.countDown();
throw e;
}
} catch (NoBodyOrStringException e) {
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
CONSUME_SENTIMENT_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (JMSException jmsException) {
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_SENTIMENT_EVENT, jmsException,
correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqGateway,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (Exception e) {
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_SENTIMENT_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_SENTIMENT_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
}
}
}

View File

@ -1,25 +0,0 @@
package cryptosky.me.dlq;
import lombok.*;
@Getter
@Setter
@NoArgsConstructor
@Builder
@ToString
@EqualsAndHashCode
public class DlqInfo {
private String messageId;
private String dlqName;
private String dlqReason;
private String redeliveryCount;
private String correlationId;
public DlqInfo(String messageId, String dlqName, String dlqReason, String redeliveryCount, String correlationId) {
this.messageId = messageId;
this.dlqName = dlqName;
this.dlqReason = dlqReason;
this.redeliveryCount = redeliveryCount;
this.correlationId = correlationId;
}
}

View File

@ -1,68 +0,0 @@
package cryptosky.me.dlq;
import cryptosky.me.dlq.DlqInfo;
import org.slf4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.Collections;
import static java.lang.String.format;
@Component
public class DlqService {
private final JmsTemplate jmsTemplate;
private final Logger logger;
public DlqService(JmsTemplate jmsTemplate, Logger logger) {
this.jmsTemplate = jmsTemplate;
this.logger = logger;
}
public void sendToDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
toDlq(message, messageType, dlqInfo, exception, messageTransformer);
}
private void toDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
logger.error(format("DLQing message [%s] for customer [%s] because of an exception", dlqInfo.getMessageId(), dlqInfo.getCorrelationId()), exception);
logDlqInfo(dlqInfo, messageType);
jmsTemplate.send(dlqInfo.getDlqName(), session -> {
TextMessage textMessage = session.createTextMessage(message.getText());
textMessage.setStringProperty("Exception", exception.getClass().getName());
textMessage.setStringProperty("ExceptionMessage", exception.getMessage());
textMessage.setJMSCorrelationID(message.getJMSCorrelationID());
//noinspection unchecked
Collections.list(message.getPropertyNames()).forEach((Object prop) -> {
try {
textMessage.setObjectProperty(prop.toString(), message.getObjectProperty(prop.toString()));
} catch (JMSException e) {
logger.warn(format("Unable to set header property [%s] on DLQ'd message as an exception was thrown", prop), e);
}
});
return messageTransformer.transform(textMessage);
});
}
private void logDlqInfo(DlqInfo dlqInfo, String messageType) {
String message = format(
// Don't change this message, it is used to create monitoring dashboards!
"Sending message to DLQ - %s for %s to DLQ %s for event %s because of %s",
dlqInfo.getMessageId(),
dlqInfo.getCorrelationId(),
dlqInfo.getDlqName(),
messageType,
dlqInfo.getDlqReason()
);
logger.info(message);
}
public interface MessageTransformer {
TextMessage transform(TextMessage textMessage) throws JMSException;
}
}

View File

@ -1,9 +0,0 @@
package cryptosky.me.exceptions;
public class DatabaseViolationException extends RuntimeException {
public DatabaseViolationException() {}
public DatabaseViolationException(Throwable cause) {
super(cause);
}
}

View File

@ -1,34 +0,0 @@
package cryptosky.me.exceptions;
import cryptosky.me.consumers.PriceConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import static org.springframework.http.HttpStatus.CONFLICT;
import static org.springframework.http.HttpStatus.NOT_FOUND;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static cryptosky.me.exceptions.MessageCodes.*;
public class GatewayExceptionHandler {
private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
@ExceptionHandler(value = NotSupportedCurrencyTypeException.class)
public ResponseEntity<MessageCodes> handleNotSupportedCurrencyTypeException(NotSupportedCurrencyTypeException e) {
logger.error(ResponseEntity.status(NOT_FOUND).body(E01).toString());
return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E01);
}
@ExceptionHandler(value = DatabaseViolationException.class)
public ResponseEntity<MessageCodes> handleDatabaseViolationException(DatabaseViolationException e) {
logger.error(ResponseEntity.status(CONFLICT).body(E02).toString());
return ResponseEntity.status(CONFLICT).contentType(APPLICATION_JSON_UTF8).body(E02);
}
@ExceptionHandler(value = NoBodyOrStringException.class)
public ResponseEntity<MessageCodes> handleNoBodyOrStringException(NoBodyOrStringException e) {
logger.error(ResponseEntity.status(NOT_FOUND).body(E03).toString());
return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E03);
}
}

View File

@ -1,35 +0,0 @@
package cryptosky.me.exceptions;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum MessageCodes {
E01("01", "Not supported on Services"),
E02("02", "Database Violation - Key Pair or other Constraint Violated"),
E03("03", "No body specified or not String format");
private final String code;
private final String message;
private final ObjectMapper objectMapper = new ObjectMapper();
MessageCodes(String code, String message) {
this.code = code;
this.message = message;
}
public String getCode() {
return "GATEWAYSVC-" + code;
}
public String getMessage() {
return message;
}
public String toJson() throws JsonProcessingException {
return objectMapper.writeValueAsString(this);
}
}

View File

@ -1,9 +0,0 @@
package cryptosky.me.exceptions;
public class NoBodyOrStringException extends RuntimeException {
public NoBodyOrStringException() {}
public NoBodyOrStringException(Throwable cause) {
super(cause);
}
}

View File

@ -1,9 +0,0 @@
package cryptosky.me.exceptions;
public class NotSupportedCurrencyTypeException extends RuntimeException {
public NotSupportedCurrencyTypeException() {}
public NotSupportedCurrencyTypeException(Throwable cause) {
super(cause);
}
}

View File

@ -1,4 +1,4 @@
package cryptosky.me; package cryptosky.me.graphql;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@ -1,4 +1,4 @@
package cryptosky.me.pricing.models.entities; package cryptosky.me.graphql.pricing.models.entities;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,4 +1,4 @@
package cryptosky.me.pricing.models.entities; package cryptosky.me.graphql.pricing.models.entities;
import lombok.*; import lombok.*;
@ -13,11 +13,9 @@ public class CryptoPriceModel {
@Id @Id
@Column(name = "ID", nullable = false) @Column(name = "ID", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id; private int id;
@Column(name = "syncId", nullable = false)
private String syncId;
@Column(name = "timestamp", nullable = false) @Column(name = "timestamp", nullable = false)
private LocalDateTime timestamp; private LocalDateTime timestamp;
@ -33,6 +31,12 @@ public class CryptoPriceModel {
@Column(name = "l_price", nullable = false) @Column(name = "l_price", nullable = false)
private float low_price; private float low_price;
@Column(name = "a_price", nullable = false)
private float ask_price;
@Column(name = "b_price", nullable = false)
private float bid_price;
@Column(name = "o_price") @Column(name = "o_price")
private float open_price; private float open_price;

View File

@ -1,6 +1,7 @@
package cryptosky.me.pricing.models.repositories; package cryptosky.me.graphql.pricing.models.repositories;
import cryptosky.me.pricing.models.entities.BtcPriceModel; import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;

View File

@ -0,0 +1,28 @@
package cryptosky.me.graphql.pricing.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cryptosky.me.graphql.pricing.service.BtcPriceService;
@Component
public class BtcPriceMutation implements GraphQLMutationResolver {
@Autowired
private BtcPriceService btcPriceService;
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float o_price,
final float a_price,
final float b_price,
final float c_price,
final float volume ) {
return this.btcPriceService.createBtc(createdDate, type, av_price, h_price, l_price, a_price, b_price, o_price, c_price, volume);
}
}

View File

@ -1,8 +1,8 @@
package cryptosky.me.pricing.graphql.queries; package cryptosky.me.graphql.pricing.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver; import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.pricing.models.entities.BtcPriceModel; import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.service.PriceService; import cryptosky.me.graphql.pricing.service.BtcPriceService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -13,25 +13,25 @@ import java.util.Optional;
public class BtcPriceQuery implements GraphQLQueryResolver { public class BtcPriceQuery implements GraphQLQueryResolver {
@Autowired @Autowired
private PriceService priceService; private BtcPriceService btcPriceService;
public List<BtcPriceModel> getAllPrices(final int count ) { public List<BtcPriceModel> getAllPrices( final int count ) {
return this.priceService.getAllPrices(count); return this.btcPriceService.getAllPrices(count);
} }
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) { public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
return this.priceService.getPricesBetweenCounts(startCount, endCount); return this.btcPriceService.getPricesBetweenCounts(startCount, endCount);
} }
public Optional<BtcPriceModel> getLatest() { public Optional<BtcPriceModel> getLatest() {
return this.priceService.getLatest(); return this.btcPriceService.getLatest();
} }
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) { public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
return this.priceService.getPriceForCreatedDate(createdDate); return this.btcPriceService.getPriceForCreatedDate(createdDate);
} }
public List<BtcPriceModel> getPriceBetweenDates(final String startDate, final String endDate ) { public List<BtcPriceModel> getPriceBetweenDates(final String startDate, final String endDate ) {
return this.priceService.getPriceBetweenDates(startDate, endDate); return this.btcPriceService.getPriceBetweenDates(startDate, endDate);
} }
} }

View File

@ -0,0 +1,132 @@
package cryptosky.me.graphql.pricing.service;
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import cryptosky.me.graphql.pricing.models.repositories.BtcPriceRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.time.LocalDateTime;
import static cryptosky.me.helpers.Utils.format;
@Service
public class BtcPriceService {
private final BtcPriceRepository btcPriceRepository;
public BtcPriceService(final BtcPriceRepository btcPriceRepository) {
this.btcPriceRepository = btcPriceRepository;
}
@Transactional
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float a_price,
final float b_price,
final float o_price,
final float c_price,
final float volume ) {
final BtcPriceModel btcPrice = new BtcPriceModel();
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
btcPrice.setType(type);
btcPrice.setAverage_price(av_price);
btcPrice.setHigh_price(h_price);
btcPrice.setLow_price(l_price);
btcPrice.setAsk_price(a_price);
btcPrice.setBid_price(b_price);
btcPrice.setOpen_price(o_price);
btcPrice.setClose_price(c_price);
btcPrice.setVolume(volume);
return this.btcPriceRepository.save(btcPrice);
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getAllPrices( final int count ) {
return this.btcPriceRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
return this.btcPriceRepository.findAll().stream()
.skip(startCount)
.limit(endCount - startCount)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getLatest() {
return this.btcPriceRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
.findFirst();
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
.collect(Collectors.toList());
}
// @Transactional(readOnly = true)
// public List<BtcPriceModel> getAllLatest() {
// return this.btcPriceRepository.findAll().stream()
// .filter(distinctByKey(CryptoPriceModel::getType))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(count)
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getAllByType(final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(1)
// .findFirst();
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
// .findFirst();
// }
//
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
// .findFirst();
// }
}

View File

@ -1,4 +1,4 @@
package cryptosky.me.sentiment.models.entities; package cryptosky.me.graphql.tweets.models.entities;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -9,7 +9,7 @@ import javax.persistence.Table;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
@Entity @Entity
@Table(name = "btc_sentiment") @Table(name = "btc_tweet")
public class BtcSentimentModel extends SentimentModel { public class BtcTweetModel extends TweetModel {
} }

View File

@ -1,6 +1,7 @@
package cryptosky.me.sentiment.models.entities; package cryptosky.me.graphql.tweets.models.entities;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -10,17 +11,21 @@ import javax.persistence.*;
@NoArgsConstructor @NoArgsConstructor
@Data @Data
@MappedSuperclass @MappedSuperclass
public class SentimentModel { public class TweetModel {
@Id @Id
@Column(name = "ID", nullable = false) @Column(name = "ID", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id; private int id;
@Column(name = "timestamp",nullable = false) @Column(name = "timestamp",nullable = false)
private String timestamp; private String timestamp;
@Column(name = "syncId", nullable = false) @Column(name = "raw_tweet", nullable = false)
private String syncId; private String rawTweet;
@Column(name = "sentiment")
private float sentimentScore;
@Column(name = "pos") @Column(name = "pos")
private float positiveScore; private float positiveScore;
@ -34,7 +39,4 @@ public class SentimentModel {
@Column(name = "compound", nullable = false) @Column(name = "compound", nullable = false)
private float compoundScore; private float compoundScore;
@Column(name = "type")
private String type;
} }

View File

@ -0,0 +1,9 @@
package cryptosky.me.graphql.tweets.models.repositories;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BtcTweetRepository extends JpaRepository<BtcTweetModel, Integer> {
}

View File

@ -0,0 +1,22 @@
package cryptosky.me.graphql.tweets.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import cryptosky.me.graphql.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class BtcTweetMutation implements GraphQLMutationResolver {
@Autowired
BtcTweetService btcTweetService;
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
final float positiveScore, final float neutralScore, final float negativeScore,
final float compoundScore ) {
return this.btcTweetService.createTweet( createdDate, rawTweet, sentimentScore, positiveScore, neutralScore,
negativeScore,compoundScore );
}
}

View File

@ -0,0 +1,33 @@
package cryptosky.me.graphql.tweets.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import cryptosky.me.graphql.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Component
public class BtcTweetQuery implements GraphQLQueryResolver {
@Autowired
private BtcTweetService btcTweetService;
public Optional<BtcTweetModel> getCurrentTweet() {
return this.btcTweetService.getCurrentTweet();
}
public List<BtcTweetModel> getAllTweets( final int count ) {
return this.btcTweetService.getAllTweets(count);
}
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
return this.btcTweetService.getTweetsForDay(startDate, endDate);
}
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
return this.btcTweetService.getTweetsForPeriod( startDate, endDate );
}
}

View File

@ -0,0 +1,101 @@
package cryptosky.me.graphql.tweets.service;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import cryptosky.me.graphql.tweets.models.repositories.BtcTweetRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static cryptosky.me.helpers.Utils.format;
@Service
public class BtcTweetService {
private final BtcTweetRepository btcTweetRepository;
public BtcTweetService(final BtcTweetRepository btcTweetRepository) {
this.btcTweetRepository = btcTweetRepository;
}
@Transactional
public BtcTweetModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
final float positiveScore, final float neutralScore, final float negativeScore,
final float compoundScore ) {
final BtcTweetModel tweetModel = new BtcTweetModel();
tweetModel.setTimestamp(format(createdDate).toString());
tweetModel.setRawTweet(rawTweet);
tweetModel.setSentimentScore(sentimentScore);
tweetModel.setPositiveScore(positiveScore);
tweetModel.setNegativeScore(negativeScore);
tweetModel.setNeutralScore(neutralScore);
tweetModel.setCompoundScore(compoundScore);
return this.btcTweetRepository.save(tweetModel);
}
@Transactional(readOnly = true)
public Optional<BtcTweetModel> getCurrentTweet() {
return this.btcTweetRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getAllTweets( final int count ) {
return this.btcTweetRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getTweetsForDay( final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcTweetRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
}
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
@Transactional(readOnly = true)
public List<BtcTweetModel> getTweetsForPeriod( final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcTweetRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
}

View File

@ -1,29 +0,0 @@
package cryptosky.me.helpers;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
public class StringTrimModule extends SimpleModule {
public StringTrimModule() {
addDeserializer(String.class, new StdScalarDeserializer<String>(String.class) {
@Override
public String deserialize(JsonParser jsonParser, DeserializationContext ctx) throws IOException {
if (!jsonParser.getCurrentToken().equals(JsonToken.VALUE_STRING)) {
throw new JsonMappingException(
jsonParser,
jsonParser.getCurrentName() + " expected to be a String"
);
}
return StringUtils.trim(jsonParser.getValueAsString());
}
});
}
}

View File

@ -9,45 +9,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import javax.jms.Message;
import static java.util.UUID.randomUUID;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class Utils { public class Utils {
private static final String CORRELATION_ID_KEY = "X-CRYPTO-Correlation-ID";
private static final String SYNC_ID_KEY = "X-CRYPTO-Sync-ID";
public static String getCorrelationId(Message message) {
String correlationId = null;
try {
correlationId = message.getJMSCorrelationID();
if (isBlank(correlationId)) {
correlationId = message.getStringProperty(CORRELATION_ID_KEY);
}
} catch (Throwable e) {
// NOOP
}
if (isBlank(correlationId)) {
correlationId = randomUUID().toString();
}
return correlationId;
}
public static String getSyncId(Message message) {
String syncId = null;
try {
syncId = message.getStringProperty(SYNC_ID_KEY);
} catch (Throwable e) {
// NOOP
}
if (isBlank(syncId)) {
syncId = randomUUID().toString();
}
return syncId;
}
// Utility Function to be able to get one of all types // Utility Function to be able to get one of all types
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) { public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
Map<Object, Boolean> seen = new ConcurrentHashMap<>(); Map<Object, Boolean> seen = new ConcurrentHashMap<>();

View File

@ -1,22 +0,0 @@
package cryptosky.me.logging;
import org.slf4j.MDC;
public class CorrelationInfo {
public static final String CORRELATION_ID = "X-CRYPTO-Correlation-ID";
public static final String SYNC_ID = "X-CRYPTO-Sync-ID";
public static void setCorrelationId(String correlationId) {
MDC.put(CORRELATION_ID, correlationId);
}
public static void setSyncId(String syncId) {
MDC.put(SYNC_ID, syncId);
}
public static void clear() {
MDC.remove(CORRELATION_ID);
MDC.remove(SYNC_ID);
}
}

View File

@ -1,215 +0,0 @@
package cryptosky.me.pricing.service;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.MessageCodes;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.*;
import static cryptosky.me.helpers.Utils.format;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.util.UUID.randomUUID;
@Service
public class PriceService {
private final BtcPriceRepository btcPriceRepository;
private final DlqService dlqService;
private final String dlqName;
private final Logger logger = LoggerFactory.getLogger(PriceService.class);
@Autowired
public PriceService(
final BtcPriceRepository btcPriceRepository,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
) {
this.btcPriceRepository = btcPriceRepository;
this.dlqService = dlqService;
this.dlqName = dlqName;
}
@Transactional
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, TextMessage message) {
String syncId = randomUUID().toString();
setSyncId(syncId);
try {
switch (cryptoPriceModel.getType()) {
case BTC:
BtcPriceModel priceModel = new BtcPriceModel();
priceModel.setId(getLatestId() + 1);
priceModel.setTimestamp(cryptoPriceModel.getTimestamp());
priceModel.setSyncId(syncId);
priceModel.setType(cryptoPriceModel.getType());
priceModel.setAverage_price(cryptoPriceModel.getAverage_price());
priceModel.setHigh_price(cryptoPriceModel.getHigh_price());
priceModel.setLow_price(cryptoPriceModel.getLow_price());
priceModel.setOpen_price(cryptoPriceModel.getOpen_price());
priceModel.setClose_price(cryptoPriceModel.getClose_price());
priceModel.setVolume(cryptoPriceModel.getVolume());
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", cryptoPriceModel.getType(), correlationId, syncId);
this.btcPriceRepository.save(priceModel);
return;
default:
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), cryptoPriceModel.getType(), MessageCodes.E01.getMessage());
throw new NotSupportedCurrencyTypeException();
}
} catch (DataIntegrityViolationException e) {
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), cryptoPriceModel.getType(), MessageCodes.E02.getMessage());
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
} catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
}
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getAllPrices( final int count ) {
return this.btcPriceRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
return this.btcPriceRepository.findAll().stream()
.skip(startCount)
.limit(endCount - startCount)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getLatest() {
return this.btcPriceRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public int getLatestId() {
List<BtcPriceModel> records = new ArrayList<>(this.btcPriceRepository.findAll());
BtcPriceModel latest = records.stream().skip(records.size() - 1).findFirst().get();
return latest.getId();
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
.findFirst();
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
.collect(Collectors.toList());
}
// @Transactional(readOnly = true)
// public List<BtcPriceModel> getAllLatest() {
// return this.btcPriceRepository.findAll().stream()
// .filter(distinctByKey(CryptoPriceModel::getType))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(count)
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getAllByType(final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(1)
// .findFirst();
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
// .findFirst();
// }
//
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
// .findFirst();
// }
}

View File

@ -1,9 +0,0 @@
package cryptosky.me.sentiment.models.repositories;
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BtcSentimentRepository extends JpaRepository<BtcSentimentModel, Integer> {
}

View File

@ -1,207 +0,0 @@
package cryptosky.me.sentiment.service;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.MessageCodes;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.sentiment.models.entities.BtcSentimentModel;
import cryptosky.me.sentiment.models.entities.SentimentModel;
import cryptosky.me.sentiment.models.repositories.BtcSentimentRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.BTC;
import static cryptosky.me.helpers.Utils.format;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.util.UUID.randomUUID;
@Service
public class SentimentService {
private final BtcSentimentRepository btcSentimentRepository;
private final DlqService dlqService;
private final String dlqName;
private final Logger logger = LoggerFactory.getLogger(SentimentService.class);
@Autowired
public SentimentService(
final BtcSentimentRepository btcSentimentRepository,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
) {
this.btcSentimentRepository = btcSentimentRepository;
this.dlqService = dlqService;
this.dlqName = dlqName;
}
// @Transactional
// public BtcSentimentModel createTweet(final String createdDate, final String rawTweet, final float sentimentScore,
// final float positiveScore, final float neutralScore, final float negativeScore,
// final float compoundScore ) {
//
// final BtcSentimentModel tweetModel = new BtcSentimentModel();
// tweetModel.setTimestamp(format(createdDate).toString());
// tweetModel.setRawTweet(rawTweet);
// tweetModel.setSentimentScore(sentimentScore);
// tweetModel.setPositiveScore(positiveScore);
// tweetModel.setNegativeScore(negativeScore);
// tweetModel.setNeutralScore(neutralScore);
// tweetModel.setCompoundScore(compoundScore);
// return this.btcSentimentRepository.save(tweetModel);
// }
@Transactional
public void createRecord(SentimentModel sentimentModel, String correlationId, TextMessage message) {
String syncId = randomUUID().toString();
setSyncId(syncId);
try {
switch (sentimentModel.getType()) {
case BTC:
BtcSentimentModel btcSentimentModel = new BtcSentimentModel();
btcSentimentModel.setId(getLatestId() + 1);
btcSentimentModel.setTimestamp(sentimentModel.getTimestamp());
btcSentimentModel.setSyncId(syncId);
btcSentimentModel.setPositiveScore(sentimentModel.getPositiveScore());
btcSentimentModel.setNeutralScore(sentimentModel.getNeutralScore());
btcSentimentModel.setNegativeScore(sentimentModel.getNegativeScore());
btcSentimentModel.setCompoundScore(sentimentModel.getCompoundScore());
btcSentimentModel.setType(sentimentModel.getType());
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", sentimentModel.getType(), correlationId, syncId);
this.btcSentimentRepository.save(btcSentimentModel);
return;
default:
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), sentimentModel.getType(), MessageCodes.E01.getMessage());
throw new NotSupportedCurrencyTypeException();
}
} catch (DataIntegrityViolationException e) {
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), sentimentModel.getType(), MessageCodes.E02.getMessage());
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
} catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
}
}
@Transactional(readOnly = true)
public int getLatestId() {
List<BtcSentimentModel> records = new ArrayList<>(this.btcSentimentRepository.findAll());
BtcSentimentModel latest = records.stream().skip(records.size() - 1).findFirst().get();
return latest.getId();
}
@Transactional(readOnly = true)
public Optional<BtcSentimentModel> getCurrentTweet() {
return this.btcSentimentRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getAllTweets(final int count ) {
return this.btcSentimentRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getTweetsForDay(final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
if ( r_end.equals(r_start.toLocalDate().atTime(LocalTime.MAX))) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcSentimentRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException(r_start +" and "+ r_end +" are not on the same day");
}
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
@Transactional(readOnly = true)
public List<BtcSentimentModel> getTweetsForPeriod(final String startDate, final String endDate ) {
LocalDateTime r_start = format(startDate);
LocalDateTime r_end = format(endDate);
r_start = r_start.toLocalDate().atStartOfDay();
r_end = r_end.toLocalDate().atTime(LocalTime.MAX);
if ( r_end.isAfter(r_start) ) {
LocalDateTime finalR_end = r_end;
LocalDateTime finalR_start = r_start;
return this.btcSentimentRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp()).isBefore(finalR_end))
.filter(createdDateList -> format(createdDateList.getTimestamp()).isAfter(finalR_start))
.collect(Collectors.toList());
} else {
// Logger
throw new DateTimeException("End Date "+ r_end +" is not after "+ r_start);
}
}
}

View File

@ -1,15 +1,10 @@
spring: spring:
application: application:
name: @project.artifactId@ name: @project.artifactId@
artemis:
broker-url: ${BROKER_URL:tcp://localhost:61616}
user: ${BROKER_USER:admin}
password: ${BROKER_PASSWORD:admin}
datasource: datasource:
url: ${JDBC_URL:jdbc:postgresql://localhost:25060/db} url: ${JDBC_URL}
username: ${JDBC_USERNAME:admin} username: ${JDBC_USERNAME}
password: ${JDBC_PASSWORD:admin} password: ${JDBC_PASSWORD}
driver-class-name: org.postgresql.Driver driver-class-name: org.postgresql.Driver
hikari: hikari:
maximum-pool-size: 100 maximum-pool-size: 100
@ -18,23 +13,12 @@ spring:
jpa: jpa:
hibernate: hibernate:
ddl-auto: validate ddl-auto: validate
show-sql: false show-sql: true
properties: properties:
org.hibernate.envers.revision_field_name: revision_id org.hibernate.envers.revision_field_name: revision_id
org.hibernate.envers.revision_type_field_name: revision_type org.hibernate.envers.revision_type_field_name: revision_type
org.hibernate.jdbc.lob.non_contextual_creation: true org.hibernate.jdbc.lob.non_contextual_creation: true
format_sql: true format_sql: true
database: postgresql database: postgresql
destinations:
pricing:
priceSave: PricingSave
priceDlq: PricingSave.dlq
tweet:
tweetSave: TweetSave
tweetDlq: TweetSave.dlq
gateway:
gatewayDlq: DBGateway.dlq
server: server:
port: 9090 port: 9090

View File

@ -1,35 +1,42 @@
type BtcPrice { type BtcPrice {
id: ID!, id: ID!,
timestamp: String!, timestamp: String!,
type: String, type: String,
average_price: Float!, average_price: Float!,
high_price: Float, high_price: Float,
low_price: Float, low_price: Float,
open_price: Float, ask_price: Float,
close_price: Float, bid_price: Float,
volume: Float open_price: Float,
close_price: Float,
volume: Float
} }
type sentimentModel { type tweetModel {
id: ID!, id: ID!,
timestamp: String!, timestamp: String!,
rawTweet: String, rawTweet: String,
sentimentScore: Float!, sentimentScore: Float!,
positiveScore: Float, positiveScore: Float,
neutralScore: Float, neutralScore: Float,
negativeScore: Float, negativeScore: Float,
compoundScore: Float! compoundScore: Float!
} }
type Query { type Query {
allPrices(count: Int):[BtcPrice], allPrices(count: Int):[BtcPrice],
pricesBetweenCounts(startCount: Int, endCount: Int):[BtcPrice] pricesBetweenCounts(startCount: Int, endCount: Int):[BtcPrice]
latest:BtcPrice, latest:BtcPrice,
priceForCreatedDate(createdDate: String):BtcPrice, priceForCreatedDate(createdDate: String):BtcPrice,
priceBetweenDates(startDate: String, endDate: String):[BtcPrice], priceBetweenDates(startDate: String, endDate: String):[BtcPrice],
################################################################ ################################################################
currentTweet:sentimentModel, currentTweet:tweetModel,
allTweets(count: Int):[sentimentModel], allTweets(count: Int):[tweetModel],
tweetsForDay(startDate: String, endDate: String):[sentimentModel], tweetsForDay(startDate: String, endDate: String):[tweetModel],
tweetsForPeriod(startDate: String, endDate: String):[sentimentModel] tweetsForPeriod(startDate: String, endDate: String):[tweetModel]
}
type Mutation {
createBtc(createdDate: String!, type: String!, average_price: Float!, high_price: Float, low_price: Float, ask_price: Float, bid_price: Float, open_price: Float, close_price: Float, volume: Float):BtcPrice
createTweet(createdDate: String!, rawTweet: String!, sentimentScore: Float!, positiveScore: Float, neutralScore: Float, negativeScore: Float, compoundScore: Float!):tweetModel
} }

View File

@ -1,36 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOGS" value="./logs" />
<appender name="logstash" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<fieldNames>
<version>[ignore]</version>
<levelValue>[ignore]</levelValue>
</fieldNames>
</encoder>
</appender>
<appender name="console"
class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg {%mdc}%n%throwable
</Pattern>
</layout>
</appender>
<springProfile name="!local">
<root level="INFO">
<appender-ref ref="logstash"/>
</root>
</springProfile>
<springProfile name="local">
<root level="INFO">
<appender-ref ref="console"/>
</root>
</springProfile>
<jmxConfigurator/>
</configuration>