diff --git a/configuration/kubernetes/deployment.yaml b/configuration/kubernetes/deployment.yaml
index b1ce8e0..3d2cde7 100644
--- a/configuration/kubernetes/deployment.yaml
+++ b/configuration/kubernetes/deployment.yaml
@@ -53,6 +53,21 @@ spec:
secretKeyRef:
name: jdbc
key: jdbc.password
+ - name: BROKER_URL
+ valueFrom:
+ configMapKeyRef:
+ name: endpoints
+ key: amq.url
+ - name: BROKER_USER
+ valueFrom:
+ secretKeyRef:
+ name: amq
+ key: amq.username
+ - name: BROKER_PASSWORD
+ valueFrom:
+ secretKeyRef:
+ name: amq
+ key: amq.password
ports:
- containerPort: 9090
name: RESOURCE_NAME
diff --git a/pom.xml b/pom.xml
index 2c3480f..326a23e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,12 @@
4.3.8.Final
+
+
+ org.springframework.boot
+ spring-boot-starter-artemis
+
+
org.projectlombok
diff --git a/src/main/java/cryptosky/me/graphql/Application.java b/src/main/java/cryptosky/me/Application.java
similarity index 93%
rename from src/main/java/cryptosky/me/graphql/Application.java
rename to src/main/java/cryptosky/me/Application.java
index 35f2fb4..a8490f0 100644
--- a/src/main/java/cryptosky/me/graphql/Application.java
+++ b/src/main/java/cryptosky/me/Application.java
@@ -1,4 +1,4 @@
-package cryptosky.me.graphql;
+package cryptosky.me;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
diff --git a/src/main/java/cryptosky/me/ArtemisSyncMessaging.java b/src/main/java/cryptosky/me/ArtemisSyncMessaging.java
new file mode 100644
index 0000000..0a2c7f3
--- /dev/null
+++ b/src/main/java/cryptosky/me/ArtemisSyncMessaging.java
@@ -0,0 +1,18 @@
+package cryptosky.me;
+
+public enum ArtemisSyncMessaging {
+
+ MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
+ SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
+ CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT");
+
+ private final String syncMessage;
+
+ ArtemisSyncMessaging(String syncMessage) {
+ this.syncMessage = syncMessage;
+ }
+
+ public String getMessage() {
+ return syncMessage;
+ }
+}
diff --git a/src/main/java/cryptosky/me/SupportedCurrencies.java b/src/main/java/cryptosky/me/SupportedCurrencies.java
new file mode 100644
index 0000000..251d6f0
--- /dev/null
+++ b/src/main/java/cryptosky/me/SupportedCurrencies.java
@@ -0,0 +1,7 @@
+package cryptosky.me;
+
+public class SupportedCurrencies {
+
+ public static final String BTC = "btc_usd";
+
+}
diff --git a/src/main/java/cryptosky/me/configurations/Configuration.java b/src/main/java/cryptosky/me/configurations/Configuration.java
new file mode 100644
index 0000000..6dd46bd
--- /dev/null
+++ b/src/main/java/cryptosky/me/configurations/Configuration.java
@@ -0,0 +1,34 @@
+package cryptosky.me.configurations;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import cryptosky.me.helpers.StringTrimModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.MapperFeature.DEFAULT_VIEW_INCLUSION;
+import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
+import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS;
+
+@org.springframework.context.annotation.Configuration
+public class Configuration {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
+
+ @Bean
+ @Primary
+ public ObjectMapper objectMapper() {
+ return new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .registerModule(new StringTrimModule())
+ .registerModule(new Jdk8Module())
+ .enable(SORT_PROPERTIES_ALPHABETICALLY)
+ .disable(WRITE_DATES_AS_TIMESTAMPS)
+ .disable(FAIL_ON_UNKNOWN_PROPERTIES)
+ .disable(DEFAULT_VIEW_INCLUSION);
+ }
+
+}
diff --git a/src/main/java/cryptosky/me/configurations/ConsumerConfig.java b/src/main/java/cryptosky/me/configurations/ConsumerConfig.java
new file mode 100644
index 0000000..d80d19b
--- /dev/null
+++ b/src/main/java/cryptosky/me/configurations/ConsumerConfig.java
@@ -0,0 +1,41 @@
+package cryptosky.me.configurations;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.annotation.EnableJms;
+import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
+
+@Configuration
+@EnableJms
+public class ConsumerConfig {
+
+ @Value("${spring.artemis.broker-url}")
+ private String brokerUrl;
+
+ @Value("${spring.artemis.user}")
+ private String user;
+
+ @Value("${spring.artemis.password}")
+ private String pass;
+
+ @Bean
+ public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
+ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+ activeMQConnectionFactory.setUser(user);
+ activeMQConnectionFactory.setPassword(pass);
+ return activeMQConnectionFactory;
+ }
+
+ @Bean
+ public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
+ DefaultJmsListenerContainerFactory factory =
+ new DefaultJmsListenerContainerFactory();
+ factory
+ .setConnectionFactory(receiverActiveMQConnectionFactory());
+ factory.setConcurrency("3-10");
+
+ return factory;
+ }
+}
diff --git a/src/main/java/cryptosky/me/configurations/LoggerConfiguration.java b/src/main/java/cryptosky/me/configurations/LoggerConfiguration.java
new file mode 100644
index 0000000..42d9b64
--- /dev/null
+++ b/src/main/java/cryptosky/me/configurations/LoggerConfiguration.java
@@ -0,0 +1,28 @@
+package cryptosky.me.configurations;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InjectionPoint;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Scope;
+import org.springframework.core.MethodParameter;
+import cryptosky.me.Application;
+
+import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_PROTOTYPE;
+
+@Configuration
+public class LoggerConfiguration {
+
+ @Bean
+ @Scope(SCOPE_PROTOTYPE)
+ public Logger logger(InjectionPoint injectionPoint) {
+ Class> loggerClass = Application.class;
+
+ MethodParameter methodParameter = injectionPoint.getMethodParameter();
+ if (methodParameter != null) {
+ loggerClass = methodParameter.getContainingClass();
+ }
+ return LoggerFactory.getLogger(loggerClass);
+ }
+}
diff --git a/src/main/java/cryptosky/me/consumers/PriceConsumer.java b/src/main/java/cryptosky/me/consumers/PriceConsumer.java
new file mode 100644
index 0000000..a71b249
--- /dev/null
+++ b/src/main/java/cryptosky/me/consumers/PriceConsumer.java
@@ -0,0 +1,123 @@
+package cryptosky.me.consumers;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import cryptosky.me.dlq.DlqInfo;
+import cryptosky.me.dlq.DlqService;
+import cryptosky.me.exceptions.NoBodyOrStringException;
+import cryptosky.me.pricing.models.entities.CryptoPriceModel;
+import cryptosky.me.pricing.service.PriceService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jms.annotation.JmsListener;
+import org.springframework.stereotype.Component;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import static cryptosky.me.ArtemisSyncMessaging.*;
+
+import static cryptosky.me.helpers.Utils.getCorrelationId;
+import static java.lang.String.format;
+
+@Component
+public class PriceConsumer {
+
+ private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
+ private final ObjectMapper objectMapper;
+ private final PriceService priceService;
+ private final DlqService dlqService;
+ private final String dlqName;
+
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ @Autowired
+ public PriceConsumer(
+ ObjectMapper objectMapper,
+ PriceService priceService,
+ DlqService dlqService,
+ @Value("${destinations.pricing.priceDlq}") String dlqName
+ ) {
+ this.objectMapper = objectMapper;
+ this.priceService = priceService;
+ this.dlqService = dlqService;
+ this.dlqName = dlqName;
+ }
+
+ @JmsListener(destination = "${destinations.pricing.priceSave}")
+ public void receive(TextMessage message) throws JMSException {
+ String correlationId = getCorrelationId(message);
+ String syncId = UUID.randomUUID().toString();
+
+ logger.info("Received Message: " + message.getBody(String.class));
+
+ try {
+ if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
+ throw new NoBodyOrStringException();
+ }
+
+ logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
+
+ CryptoPriceModel cryptoPriceModel;
+
+ try {
+ JsonNode messageJson = objectMapper.readTree(message.getText());
+ cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
+
+ logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel);
+
+ priceService.createRecord(cryptoPriceModel, correlationId, syncId, message);
+
+ latch.countDown();
+ } catch (IOException e) {
+ logger.info(format(
+ "Message [%s] for pricingModel [%s] was not synced due to a readTree exception on getting the text",
+ syncId,
+ correlationId
+ ), e);
+ latch.countDown();
+ throw e;
+ }
+ } catch (NoBodyOrStringException e) {
+ String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
+ CONSUME_PRICE_EVENT, correlationId);
+
+ DlqInfo dlqInfo = new DlqInfo(
+ message.getJMSMessageID(),
+ dlqName,
+ dlqReason,
+ "0",
+ correlationId
+ );
+ dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
+ textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
+ return textMessage;
+ });
+ latch.countDown();
+ } catch (JMSException jmsException) {
+ logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
+ latch.countDown();
+ } catch (Exception e) {
+ String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
+
+ DlqInfo dlqInfo = new DlqInfo(
+ message.getJMSMessageID(),
+ dlqName,
+ dlqReason,
+ "0",
+ correlationId
+ );
+ dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
+ textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
+ return textMessage;
+ });
+ latch.countDown();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/cryptosky/me/dlq/DlqInfo.java b/src/main/java/cryptosky/me/dlq/DlqInfo.java
new file mode 100644
index 0000000..bf13e35
--- /dev/null
+++ b/src/main/java/cryptosky/me/dlq/DlqInfo.java
@@ -0,0 +1,25 @@
+package cryptosky.me.dlq;
+
+import lombok.*;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@Builder
+@ToString
+@EqualsAndHashCode
+public class DlqInfo {
+ private String messageId;
+ private String dlqName;
+ private String dlqReason;
+ private String redeliveryCount;
+ private String correlationId;
+
+ public DlqInfo(String messageId, String dlqName, String dlqReason, String redeliveryCount, String correlationId) {
+ this.messageId = messageId;
+ this.dlqName = dlqName;
+ this.dlqReason = dlqReason;
+ this.redeliveryCount = redeliveryCount;
+ this.correlationId = correlationId;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/cryptosky/me/dlq/DlqService.java b/src/main/java/cryptosky/me/dlq/DlqService.java
new file mode 100644
index 0000000..34b2d3f
--- /dev/null
+++ b/src/main/java/cryptosky/me/dlq/DlqService.java
@@ -0,0 +1,68 @@
+package cryptosky.me.dlq;
+
+import cryptosky.me.dlq.DlqInfo;
+import org.slf4j.Logger;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import java.util.Collections;
+
+import static java.lang.String.format;
+
+@Component
+public class DlqService {
+ private final JmsTemplate jmsTemplate;
+ private final Logger logger;
+
+ public DlqService(JmsTemplate jmsTemplate, Logger logger) {
+ this.jmsTemplate = jmsTemplate;
+ this.logger = logger;
+ }
+
+ public void sendToDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
+ toDlq(message, messageType, dlqInfo, exception, messageTransformer);
+ }
+
+ private void toDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
+ logger.error(format("DLQing message [%s] for customer [%s] because of an exception", dlqInfo.getMessageId(), dlqInfo.getCorrelationId()), exception);
+ logDlqInfo(dlqInfo, messageType);
+
+ jmsTemplate.send(dlqInfo.getDlqName(), session -> {
+ TextMessage textMessage = session.createTextMessage(message.getText());
+ textMessage.setStringProperty("Exception", exception.getClass().getName());
+ textMessage.setStringProperty("ExceptionMessage", exception.getMessage());
+ textMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+ //noinspection unchecked
+ Collections.list(message.getPropertyNames()).forEach((Object prop) -> {
+ try {
+ textMessage.setObjectProperty(prop.toString(), message.getObjectProperty(prop.toString()));
+ } catch (JMSException e) {
+ logger.warn(format("Unable to set header property [%s] on DLQ'd message as an exception was thrown", prop), e);
+ }
+ });
+
+ return messageTransformer.transform(textMessage);
+ });
+ }
+
+ private void logDlqInfo(DlqInfo dlqInfo, String messageType) {
+ String message = format(
+ // Don't change this message, it is used to create monitoring dashboards!
+ "Sending message to DLQ - %s for %s to DLQ %s for event %s because of %s",
+ dlqInfo.getMessageId(),
+ dlqInfo.getCorrelationId(),
+ dlqInfo.getDlqName(),
+ messageType,
+ dlqInfo.getDlqReason()
+ );
+
+ logger.info(message);
+ }
+
+ public interface MessageTransformer {
+ TextMessage transform(TextMessage textMessage) throws JMSException;
+ }
+}
diff --git a/src/main/java/cryptosky/me/exceptions/DatabaseViolationException.java b/src/main/java/cryptosky/me/exceptions/DatabaseViolationException.java
new file mode 100644
index 0000000..60bdd6e
--- /dev/null
+++ b/src/main/java/cryptosky/me/exceptions/DatabaseViolationException.java
@@ -0,0 +1,9 @@
+package cryptosky.me.exceptions;
+
+public class DatabaseViolationException extends RuntimeException {
+ public DatabaseViolationException() {}
+
+ public DatabaseViolationException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/cryptosky/me/exceptions/GatewayExceptionHandler.java b/src/main/java/cryptosky/me/exceptions/GatewayExceptionHandler.java
new file mode 100644
index 0000000..89d33d2
--- /dev/null
+++ b/src/main/java/cryptosky/me/exceptions/GatewayExceptionHandler.java
@@ -0,0 +1,34 @@
+package cryptosky.me.exceptions;
+
+import cryptosky.me.consumers.PriceConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+
+import static org.springframework.http.HttpStatus.CONFLICT;
+import static org.springframework.http.HttpStatus.NOT_FOUND;
+import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
+import static cryptosky.me.exceptions.MessageCodes.*;
+
+public class GatewayExceptionHandler {
+ private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
+
+ @ExceptionHandler(value = NotSupportedCurrencyTypeException.class)
+ public ResponseEntity handleNotSupportedCurrencyTypeException(NotSupportedCurrencyTypeException e) {
+ logger.error(ResponseEntity.status(NOT_FOUND).body(E01).toString());
+ return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E01);
+ }
+
+ @ExceptionHandler(value = DatabaseViolationException.class)
+ public ResponseEntity handleDatabaseViolationException(DatabaseViolationException e) {
+ logger.error(ResponseEntity.status(CONFLICT).body(E02).toString());
+ return ResponseEntity.status(CONFLICT).contentType(APPLICATION_JSON_UTF8).body(E02);
+ }
+
+ @ExceptionHandler(value = NoBodyOrStringException.class)
+ public ResponseEntity handleNoBodyOrStringException(NoBodyOrStringException e) {
+ logger.error(ResponseEntity.status(NOT_FOUND).body(E03).toString());
+ return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E03);
+ }
+}
diff --git a/src/main/java/cryptosky/me/exceptions/MessageCodes.java b/src/main/java/cryptosky/me/exceptions/MessageCodes.java
new file mode 100644
index 0000000..8d19eed
--- /dev/null
+++ b/src/main/java/cryptosky/me/exceptions/MessageCodes.java
@@ -0,0 +1,35 @@
+package cryptosky.me.exceptions;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@JsonFormat(shape = JsonFormat.Shape.OBJECT)
+public enum MessageCodes {
+
+ E01("01", "Not supported on Services"),
+ E02("02", "Database Violation - Key Pair or other Constraint Violated"),
+ E03("03", "No body specified or not String format");
+
+ private final String code;
+ private final String message;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ MessageCodes(String code, String message) {
+ this.code = code;
+ this.message = message;
+ }
+
+ public String getCode() {
+ return "GATEWAYSVC-" + code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public String toJson() throws JsonProcessingException {
+ return objectMapper.writeValueAsString(this);
+ }
+}
diff --git a/src/main/java/cryptosky/me/exceptions/NoBodyOrStringException.java b/src/main/java/cryptosky/me/exceptions/NoBodyOrStringException.java
new file mode 100644
index 0000000..d2548b5
--- /dev/null
+++ b/src/main/java/cryptosky/me/exceptions/NoBodyOrStringException.java
@@ -0,0 +1,9 @@
+package cryptosky.me.exceptions;
+
+public class NoBodyOrStringException extends RuntimeException {
+ public NoBodyOrStringException() {}
+
+ public NoBodyOrStringException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/cryptosky/me/exceptions/NotSupportedCurrencyTypeException.java b/src/main/java/cryptosky/me/exceptions/NotSupportedCurrencyTypeException.java
new file mode 100644
index 0000000..b074cb8
--- /dev/null
+++ b/src/main/java/cryptosky/me/exceptions/NotSupportedCurrencyTypeException.java
@@ -0,0 +1,9 @@
+package cryptosky.me.exceptions;
+
+public class NotSupportedCurrencyTypeException extends RuntimeException {
+ public NotSupportedCurrencyTypeException() {}
+
+ public NotSupportedCurrencyTypeException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/cryptosky/me/graphql/pricing/service/BtcPriceService.java b/src/main/java/cryptosky/me/graphql/pricing/service/BtcPriceService.java
deleted file mode 100644
index 1b7cf55..0000000
--- a/src/main/java/cryptosky/me/graphql/pricing/service/BtcPriceService.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package cryptosky.me.graphql.pricing.service;
-
-import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
-import cryptosky.me.graphql.pricing.models.repositories.BtcPriceRepository;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import java.time.LocalDateTime;
-
-import static cryptosky.me.helpers.Utils.format;
-
-@Service
-public class BtcPriceService {
-
- private final BtcPriceRepository btcPriceRepository;
-
- public BtcPriceService(final BtcPriceRepository btcPriceRepository) {
- this.btcPriceRepository = btcPriceRepository;
- }
-
- @Transactional
- public BtcPriceModel createBtc(final String createdDate, final String type,
- final float av_price,
- final float h_price,
- final float l_price,
- final float o_price,
- final float c_price,
- final float volume ) {
-
- final BtcPriceModel btcPrice = new BtcPriceModel();
- btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
- btcPrice.setType(type);
- btcPrice.setAverage_price(av_price);
- btcPrice.setHigh_price(h_price);
- btcPrice.setLow_price(l_price);
- btcPrice.setOpen_price(o_price);
- btcPrice.setClose_price(c_price);
- btcPrice.setVolume(volume);
-
- return this.btcPriceRepository.save(btcPrice);
- }
-
- @Transactional(readOnly = true)
- public List getAllPrices( final int count ) {
- return this.btcPriceRepository.findAll().stream()
- .limit(count)
- .collect(Collectors.toList());
- }
-
- @Transactional(readOnly = true)
- public List getPricesBetweenCounts( final int startCount, final int endCount ) {
- return this.btcPriceRepository.findAll().stream()
- .skip(startCount)
- .limit(endCount - startCount)
- .collect(Collectors.toList());
- }
-
- @Transactional(readOnly = true)
- public Optional getLatest() {
- return this.btcPriceRepository.findAll().stream().findFirst();
- }
-
- @Transactional(readOnly = true)
- public Optional getPriceForCreatedDate( final String createdDate ) {
- return this.btcPriceRepository.findAll().stream()
- .filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
- .findFirst();
- }
-
- @Transactional(readOnly = true)
- public List getPriceBetweenDates( final String startDate, final String endDate ) {
- return this.btcPriceRepository.findAll().stream()
- .filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
- .filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
- .collect(Collectors.toList());
- }
-
-// @Transactional(readOnly = true)
-// public List getAllLatest() {
-// return this.btcPriceRepository.findAll().stream()
-// .filter(distinctByKey(CryptoPriceModel::getType))
-// .collect(Collectors.toList());
-// }
-
-// @Transactional(readOnly = true)
-// public List getLimitPricesByType(final int count, final String type ) {
-// return this.btcPriceRepository.findAll().stream()
-// .filter(typeList -> typeList.getType().equals(type))
-// .limit(count)
-// .collect(Collectors.toList());
-// }
-
-// @Transactional(readOnly = true)
-// public List getAllByType(final String type ) {
-// return this.btcPriceRepository.findAll().stream()
-// .filter(typeList -> typeList.getType().equals(type))
-// .collect(Collectors.toList());
-// }
-
-// @Transactional(readOnly = true)
-// public Optional getLatestByType(final String type ) {
-// return this.cryptoPriceRepository.findAll().stream()
-// .filter(typeList -> typeList.getType().equals(type))
-// .limit(1)
-// .findFirst();
-// }
-
-// @Transactional(readOnly = true)
-// public Optional getPriceByCreatedDateForType( final String type, final String createdDate ) {
-// return this.cryptoPriceRepository.findAll().stream()
-// .filter(typeList -> typeList.getType().equals(type))
-// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
-// .findFirst();
-// }
-//
-// @Transactional(readOnly = true)
-// public Optional getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
-// return this.cryptoPriceRepository.findAll().stream()
-// .filter(typeList -> typeList.getType().equals(type))
-// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
-// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
-// .findFirst();
-// }
-}
diff --git a/src/main/java/cryptosky/me/helpers/StringTrimModule.java b/src/main/java/cryptosky/me/helpers/StringTrimModule.java
new file mode 100644
index 0000000..5d9e209
--- /dev/null
+++ b/src/main/java/cryptosky/me/helpers/StringTrimModule.java
@@ -0,0 +1,29 @@
+package cryptosky.me.helpers;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+
+public class StringTrimModule extends SimpleModule {
+ public StringTrimModule() {
+ addDeserializer(String.class, new StdScalarDeserializer(String.class) {
+ @Override
+ public String deserialize(JsonParser jsonParser, DeserializationContext ctx) throws IOException {
+ if (!jsonParser.getCurrentToken().equals(JsonToken.VALUE_STRING)) {
+ throw new JsonMappingException(
+ jsonParser,
+ jsonParser.getCurrentName() + " expected to be a String"
+ );
+ }
+
+ return StringUtils.trim(jsonParser.getValueAsString());
+ }
+ });
+ }
+}
diff --git a/src/main/java/cryptosky/me/helpers/Utils.java b/src/main/java/cryptosky/me/helpers/Utils.java
index 8f31195..f23e7e7 100644
--- a/src/main/java/cryptosky/me/helpers/Utils.java
+++ b/src/main/java/cryptosky/me/helpers/Utils.java
@@ -9,8 +9,31 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
+import javax.jms.Message;
+
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
public class Utils {
+ private static final String CORRELATION_ID_KEY = "CS-Correlation-ID";
+
+ public static String getCorrelationId(Message message) {
+ String correlationId = null;
+ try {
+ correlationId = message.getJMSCorrelationID();
+ if (isBlank(correlationId)) {
+ correlationId = message.getStringProperty(CORRELATION_ID_KEY);
+ }
+ } catch (Throwable e) {
+ // NOOP
+ }
+ if (isBlank(correlationId)) {
+ correlationId = randomUUID().toString();
+ }
+ return correlationId;
+ }
+
// Utility Function to be able to get one of all types
public static Predicate distinctByKey(Function super T, Object> keyExtractor) {
Map