[03.09.20] Set up configurations for consuming and DLQing to Artemis. Removed need for GraphQL but have left this is for future use.

This commit is contained in:
Andy Sotheran 2020-09-03 20:07:50 +01:00
parent d8abb86904
commit 47f296a06c
32 changed files with 800 additions and 168 deletions

View File

@ -53,6 +53,21 @@ spec:
secretKeyRef:
name: jdbc
key: jdbc.password
- name: BROKER_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: amq.url
- name: BROKER_USER
valueFrom:
secretKeyRef:
name: amq
key: amq.username
- name: BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: amq
key: amq.password
ports:
- containerPort: 9090
name: RESOURCE_NAME

View File

@ -81,6 +81,12 @@
<version>4.3.8.Final</version>
</dependency>
<!-- Artemis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
<!--- Utils -->
<dependency>
<groupId>org.projectlombok</groupId>

View File

@ -1,4 +1,4 @@
package cryptosky.me.graphql;
package cryptosky.me;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@ -0,0 +1,18 @@
package cryptosky.me;
public enum ArtemisSyncMessaging {
MESSAGE_SAVE_SYNC_ID("MESSAGE_SAVE_SYNC_ID"),
SAVE_MESSAGE_DATABASE_EXCEPTION("SAVE_MESSAGE_DATABASE_EXCEPTION"),
CONSUME_PRICE_EVENT("CONSUME_PRICE_EVENT");
private final String syncMessage;
ArtemisSyncMessaging(String syncMessage) {
this.syncMessage = syncMessage;
}
public String getMessage() {
return syncMessage;
}
}

View File

@ -0,0 +1,7 @@
package cryptosky.me;
public class SupportedCurrencies {
public static final String BTC = "btc_usd";
}

View File

@ -0,0 +1,34 @@
package cryptosky.me.configurations;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import cryptosky.me.helpers.StringTrimModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.MapperFeature.DEFAULT_VIEW_INCLUSION;
import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS;
@org.springframework.context.annotation.Configuration
public class Configuration {
private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
@Bean
@Primary
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new StringTrimModule())
.registerModule(new Jdk8Module())
.enable(SORT_PROPERTIES_ALPHABETICALLY)
.disable(WRITE_DATES_AS_TIMESTAMPS)
.disable(FAIL_ON_UNKNOWN_PROPERTIES)
.disable(DEFAULT_VIEW_INCLUSION);
}
}

View File

@ -0,0 +1,41 @@
package cryptosky.me.configurations;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
@Configuration
@EnableJms
public class ConsumerConfig {
@Value("${spring.artemis.broker-url}")
private String brokerUrl;
@Value("${spring.artemis.user}")
private String user;
@Value("${spring.artemis.password}")
private String pass;
@Bean
public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
activeMQConnectionFactory.setUser(user);
activeMQConnectionFactory.setPassword(pass);
return activeMQConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory
.setConnectionFactory(receiverActiveMQConnectionFactory());
factory.setConcurrency("3-10");
return factory;
}
}

View File

@ -0,0 +1,28 @@
package cryptosky.me.configurations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InjectionPoint;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.core.MethodParameter;
import cryptosky.me.Application;
import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_PROTOTYPE;
@Configuration
public class LoggerConfiguration {
@Bean
@Scope(SCOPE_PROTOTYPE)
public Logger logger(InjectionPoint injectionPoint) {
Class<?> loggerClass = Application.class;
MethodParameter methodParameter = injectionPoint.getMethodParameter();
if (methodParameter != null) {
loggerClass = methodParameter.getContainingClass();
}
return LoggerFactory.getLogger(loggerClass);
}
}

View File

@ -0,0 +1,123 @@
package cryptosky.me.consumers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.NoBodyOrStringException;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.service.PriceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.helpers.Utils.getCorrelationId;
import static java.lang.String.format;
@Component
public class PriceConsumer {
private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
private final ObjectMapper objectMapper;
private final PriceService priceService;
private final DlqService dlqService;
private final String dlqName;
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
public PriceConsumer(
ObjectMapper objectMapper,
PriceService priceService,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
) {
this.objectMapper = objectMapper;
this.priceService = priceService;
this.dlqService = dlqService;
this.dlqName = dlqName;
}
@JmsListener(destination = "${destinations.pricing.priceSave}")
public void receive(TextMessage message) throws JMSException {
String correlationId = getCorrelationId(message);
String syncId = UUID.randomUUID().toString();
logger.info("Received Message: " + message.getBody(String.class));
try {
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
throw new NoBodyOrStringException();
}
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
CryptoPriceModel cryptoPriceModel;
try {
JsonNode messageJson = objectMapper.readTree(message.getText());
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel);
priceService.createRecord(cryptoPriceModel, correlationId, syncId, message);
latch.countDown();
} catch (IOException e) {
logger.info(format(
"Message [%s] for pricingModel [%s] was not synced due to a readTree exception on getting the text",
syncId,
correlationId
), e);
latch.countDown();
throw e;
}
} catch (NoBodyOrStringException e) {
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
CONSUME_PRICE_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
latch.countDown();
} catch (Exception e) {
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
latch.countDown();
}
}
}

View File

@ -0,0 +1,25 @@
package cryptosky.me.dlq;
import lombok.*;
@Getter
@Setter
@NoArgsConstructor
@Builder
@ToString
@EqualsAndHashCode
public class DlqInfo {
private String messageId;
private String dlqName;
private String dlqReason;
private String redeliveryCount;
private String correlationId;
public DlqInfo(String messageId, String dlqName, String dlqReason, String redeliveryCount, String correlationId) {
this.messageId = messageId;
this.dlqName = dlqName;
this.dlqReason = dlqReason;
this.redeliveryCount = redeliveryCount;
this.correlationId = correlationId;
}
}

View File

@ -0,0 +1,68 @@
package cryptosky.me.dlq;
import cryptosky.me.dlq.DlqInfo;
import org.slf4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.Collections;
import static java.lang.String.format;
@Component
public class DlqService {
private final JmsTemplate jmsTemplate;
private final Logger logger;
public DlqService(JmsTemplate jmsTemplate, Logger logger) {
this.jmsTemplate = jmsTemplate;
this.logger = logger;
}
public void sendToDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
toDlq(message, messageType, dlqInfo, exception, messageTransformer);
}
private void toDlq(TextMessage message, String messageType, DlqInfo dlqInfo, Exception exception, MessageTransformer messageTransformer) {
logger.error(format("DLQing message [%s] for customer [%s] because of an exception", dlqInfo.getMessageId(), dlqInfo.getCorrelationId()), exception);
logDlqInfo(dlqInfo, messageType);
jmsTemplate.send(dlqInfo.getDlqName(), session -> {
TextMessage textMessage = session.createTextMessage(message.getText());
textMessage.setStringProperty("Exception", exception.getClass().getName());
textMessage.setStringProperty("ExceptionMessage", exception.getMessage());
textMessage.setJMSCorrelationID(message.getJMSCorrelationID());
//noinspection unchecked
Collections.list(message.getPropertyNames()).forEach((Object prop) -> {
try {
textMessage.setObjectProperty(prop.toString(), message.getObjectProperty(prop.toString()));
} catch (JMSException e) {
logger.warn(format("Unable to set header property [%s] on DLQ'd message as an exception was thrown", prop), e);
}
});
return messageTransformer.transform(textMessage);
});
}
private void logDlqInfo(DlqInfo dlqInfo, String messageType) {
String message = format(
// Don't change this message, it is used to create monitoring dashboards!
"Sending message to DLQ - %s for %s to DLQ %s for event %s because of %s",
dlqInfo.getMessageId(),
dlqInfo.getCorrelationId(),
dlqInfo.getDlqName(),
messageType,
dlqInfo.getDlqReason()
);
logger.info(message);
}
public interface MessageTransformer {
TextMessage transform(TextMessage textMessage) throws JMSException;
}
}

View File

@ -0,0 +1,9 @@
package cryptosky.me.exceptions;
public class DatabaseViolationException extends RuntimeException {
public DatabaseViolationException() {}
public DatabaseViolationException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,34 @@
package cryptosky.me.exceptions;
import cryptosky.me.consumers.PriceConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import static org.springframework.http.HttpStatus.CONFLICT;
import static org.springframework.http.HttpStatus.NOT_FOUND;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static cryptosky.me.exceptions.MessageCodes.*;
public class GatewayExceptionHandler {
private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
@ExceptionHandler(value = NotSupportedCurrencyTypeException.class)
public ResponseEntity<MessageCodes> handleNotSupportedCurrencyTypeException(NotSupportedCurrencyTypeException e) {
logger.error(ResponseEntity.status(NOT_FOUND).body(E01).toString());
return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E01);
}
@ExceptionHandler(value = DatabaseViolationException.class)
public ResponseEntity<MessageCodes> handleDatabaseViolationException(DatabaseViolationException e) {
logger.error(ResponseEntity.status(CONFLICT).body(E02).toString());
return ResponseEntity.status(CONFLICT).contentType(APPLICATION_JSON_UTF8).body(E02);
}
@ExceptionHandler(value = NoBodyOrStringException.class)
public ResponseEntity<MessageCodes> handleNoBodyOrStringException(NoBodyOrStringException e) {
logger.error(ResponseEntity.status(NOT_FOUND).body(E03).toString());
return ResponseEntity.status(NOT_FOUND).contentType(APPLICATION_JSON_UTF8).body(E03);
}
}

View File

@ -0,0 +1,35 @@
package cryptosky.me.exceptions;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum MessageCodes {
E01("01", "Not supported on Services"),
E02("02", "Database Violation - Key Pair or other Constraint Violated"),
E03("03", "No body specified or not String format");
private final String code;
private final String message;
private final ObjectMapper objectMapper = new ObjectMapper();
MessageCodes(String code, String message) {
this.code = code;
this.message = message;
}
public String getCode() {
return "GATEWAYSVC-" + code;
}
public String getMessage() {
return message;
}
public String toJson() throws JsonProcessingException {
return objectMapper.writeValueAsString(this);
}
}

View File

@ -0,0 +1,9 @@
package cryptosky.me.exceptions;
public class NoBodyOrStringException extends RuntimeException {
public NoBodyOrStringException() {}
public NoBodyOrStringException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,9 @@
package cryptosky.me.exceptions;
public class NotSupportedCurrencyTypeException extends RuntimeException {
public NotSupportedCurrencyTypeException() {}
public NotSupportedCurrencyTypeException(Throwable cause) {
super(cause);
}
}

View File

@ -1,128 +0,0 @@
package cryptosky.me.graphql.pricing.service;
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import cryptosky.me.graphql.pricing.models.repositories.BtcPriceRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.time.LocalDateTime;
import static cryptosky.me.helpers.Utils.format;
@Service
public class BtcPriceService {
private final BtcPriceRepository btcPriceRepository;
public BtcPriceService(final BtcPriceRepository btcPriceRepository) {
this.btcPriceRepository = btcPriceRepository;
}
@Transactional
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float o_price,
final float c_price,
final float volume ) {
final BtcPriceModel btcPrice = new BtcPriceModel();
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
btcPrice.setType(type);
btcPrice.setAverage_price(av_price);
btcPrice.setHigh_price(h_price);
btcPrice.setLow_price(l_price);
btcPrice.setOpen_price(o_price);
btcPrice.setClose_price(c_price);
btcPrice.setVolume(volume);
return this.btcPriceRepository.save(btcPrice);
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getAllPrices( final int count ) {
return this.btcPriceRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
return this.btcPriceRepository.findAll().stream()
.skip(startCount)
.limit(endCount - startCount)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getLatest() {
return this.btcPriceRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
.findFirst();
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
.collect(Collectors.toList());
}
// @Transactional(readOnly = true)
// public List<BtcPriceModel> getAllLatest() {
// return this.btcPriceRepository.findAll().stream()
// .filter(distinctByKey(CryptoPriceModel::getType))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(count)
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getAllByType(final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(1)
// .findFirst();
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
// .findFirst();
// }
//
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
// .findFirst();
// }
}

View File

@ -0,0 +1,29 @@
package cryptosky.me.helpers;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
public class StringTrimModule extends SimpleModule {
public StringTrimModule() {
addDeserializer(String.class, new StdScalarDeserializer<String>(String.class) {
@Override
public String deserialize(JsonParser jsonParser, DeserializationContext ctx) throws IOException {
if (!jsonParser.getCurrentToken().equals(JsonToken.VALUE_STRING)) {
throw new JsonMappingException(
jsonParser,
jsonParser.getCurrentName() + " expected to be a String"
);
}
return StringUtils.trim(jsonParser.getValueAsString());
}
});
}
}

View File

@ -9,8 +9,31 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.jms.Message;
import static java.util.UUID.randomUUID;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class Utils {
private static final String CORRELATION_ID_KEY = "CS-Correlation-ID";
public static String getCorrelationId(Message message) {
String correlationId = null;
try {
correlationId = message.getJMSCorrelationID();
if (isBlank(correlationId)) {
correlationId = message.getStringProperty(CORRELATION_ID_KEY);
}
} catch (Throwable e) {
// NOOP
}
if (isBlank(correlationId)) {
correlationId = randomUUID().toString();
}
return correlationId;
}
// Utility Function to be able to get one of all types
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
Map<Object, Boolean> seen = new ConcurrentHashMap<>();

View File

@ -1,17 +1,17 @@
package cryptosky.me.graphql.pricing.mutations;
package cryptosky.me.pricing.graphql.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cryptosky.me.graphql.pricing.service.BtcPriceService;
import cryptosky.me.pricing.service.PriceService;
@Component
public class BtcPriceMutation implements GraphQLMutationResolver {
@Autowired
private BtcPriceService btcPriceService;
private PriceService priceService;
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
@ -20,7 +20,7 @@ public class BtcPriceMutation implements GraphQLMutationResolver {
final float o_price,
final float c_price,
final float volume ) {
return this.btcPriceService.createBtc(createdDate, type, av_price, h_price, l_price, o_price, c_price, volume);
return this.priceService.createBtc(createdDate, type, av_price, h_price, l_price, o_price, c_price, volume);
}
}

View File

@ -1,8 +1,9 @@
package cryptosky.me.graphql.pricing.queries;
package cryptosky.me.pricing.graphql.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import cryptosky.me.graphql.pricing.service.BtcPriceService;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.service.PriceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -13,25 +14,25 @@ import java.util.Optional;
public class BtcPriceQuery implements GraphQLQueryResolver {
@Autowired
private BtcPriceService btcPriceService;
private PriceService priceService;
public List<BtcPriceModel> getAllPrices( final int count ) {
return this.btcPriceService.getAllPrices(count);
public List<BtcPriceModel> getAllPrices(final int count ) {
return this.priceService.getAllPrices(count);
}
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
return this.btcPriceService.getPricesBetweenCounts(startCount, endCount);
return this.priceService.getPricesBetweenCounts(startCount, endCount);
}
public Optional<BtcPriceModel> getLatest() {
return this.btcPriceService.getLatest();
return this.priceService.getLatest();
}
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
return this.btcPriceService.getPriceForCreatedDate(createdDate);
return this.priceService.getPriceForCreatedDate(createdDate);
}
public List<BtcPriceModel> getPriceBetweenDates(final String startDate, final String endDate ) {
return this.btcPriceService.getPriceBetweenDates(startDate, endDate);
return this.priceService.getPriceBetweenDates(startDate, endDate);
}
}

View File

@ -1,4 +1,4 @@
package cryptosky.me.graphql.pricing.models.entities;
package cryptosky.me.pricing.models.entities;
import lombok.Data;
import lombok.EqualsAndHashCode;

View File

@ -1,4 +1,4 @@
package cryptosky.me.graphql.pricing.models.entities;
package cryptosky.me.pricing.models.entities;
import lombok.*;
@ -13,9 +13,11 @@ public class CryptoPriceModel {
@Id
@Column(name = "ID", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
@Column(name = "syncId", nullable = false)
private String syncId;
@Column(name = "timestamp", nullable = false)
private LocalDateTime timestamp;

View File

@ -1,7 +1,6 @@
package cryptosky.me.graphql.pricing.models.repositories;
package cryptosky.me.pricing.models.repositories;
import cryptosky.me.graphql.pricing.models.entities.BtcPriceModel;
import org.springframework.context.annotation.PropertySource;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

View File

@ -0,0 +1,235 @@
package cryptosky.me.pricing.service;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.MessageCodes;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import cryptosky.me.exceptions.DatabaseViolationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.time.LocalDateTime;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.*;
import static cryptosky.me.helpers.Utils.format;
@Service
public class PriceService {
private final BtcPriceRepository btcPriceRepository;
private final DlqService dlqService;
private final String dlqName;
private final Logger logger = LoggerFactory.getLogger(PriceService.class);
@Autowired
public PriceService(
final BtcPriceRepository btcPriceRepository,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
) {
this.btcPriceRepository = btcPriceRepository;
this.dlqService = dlqService;
this.dlqName = dlqName;
}
@Transactional
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float o_price,
final float c_price,
final float volume ) {
final BtcPriceModel btcPrice = new BtcPriceModel();
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
btcPrice.setType(type);
btcPrice.setAverage_price(av_price);
btcPrice.setHigh_price(h_price);
btcPrice.setLow_price(l_price);
btcPrice.setOpen_price(o_price);
btcPrice.setClose_price(c_price);
btcPrice.setVolume(volume);
return btcPrice;
// return this.btcPriceRepository.save(btcPrice);
}
@Transactional
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, String syncId, TextMessage message) {
try {
switch (cryptoPriceModel.getType()) {
case BTC:
BtcPriceModel priceModel = new BtcPriceModel();
priceModel.setId(getLatestId() + 1);
priceModel.setTimestamp(cryptoPriceModel.getTimestamp());
priceModel.setSyncId(syncId);
priceModel.setType(cryptoPriceModel.getType());
priceModel.setAverage_price(cryptoPriceModel.getAverage_price());
priceModel.setHigh_price(cryptoPriceModel.getHigh_price());
priceModel.setLow_price(cryptoPriceModel.getLow_price());
priceModel.setOpen_price(cryptoPriceModel.getOpen_price());
priceModel.setClose_price(cryptoPriceModel.getClose_price());
priceModel.setVolume(cryptoPriceModel.getVolume());
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", cryptoPriceModel.getType(), correlationId, syncId);
// this.btcPriceRepository.save(priceModel);
return;
default:
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), cryptoPriceModel.getType(), MessageCodes.E01.getMessage());
throw new NotSupportedCurrencyTypeException();
}
} catch (DataIntegrityViolationException e) {
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), cryptoPriceModel.getType(), MessageCodes.E02.getMessage());
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
} catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
}
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getAllPrices( final int count ) {
return this.btcPriceRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
return this.btcPriceRepository.findAll().stream()
.skip(startCount)
.limit(endCount - startCount)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getLatest() {
return this.btcPriceRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public int getLatestId() {
List<BtcPriceModel> records = new ArrayList<>(this.btcPriceRepository.findAll());
BtcPriceModel latest = records.stream().skip(records.size() - 1).findFirst().get();
return latest.getId();
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
.findFirst();
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
.collect(Collectors.toList());
}
// @Transactional(readOnly = true)
// public List<BtcPriceModel> getAllLatest() {
// return this.btcPriceRepository.findAll().stream()
// .filter(distinctByKey(CryptoPriceModel::getType))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(count)
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getAllByType(final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(1)
// .findFirst();
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
// .findFirst();
// }
//
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
// .findFirst();
// }
}

View File

@ -1,4 +1,4 @@
package cryptosky.me.graphql.tweets.models.entities;
package cryptosky.me.tweets.models.entities;
import lombok.Data;
import lombok.EqualsAndHashCode;

View File

@ -1,7 +1,6 @@
package cryptosky.me.graphql.tweets.models.entities;
package cryptosky.me.tweets.models.entities;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

View File

@ -1,6 +1,6 @@
package cryptosky.me.graphql.tweets.models.repositories;
package cryptosky.me.tweets.models.repositories;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

View File

@ -1,8 +1,8 @@
package cryptosky.me.graphql.tweets.mutations;
package cryptosky.me.tweets.mutations;
import com.coxautodev.graphql.tools.GraphQLMutationResolver;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import cryptosky.me.graphql.tweets.service.BtcTweetService;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

View File

@ -1,8 +1,8 @@
package cryptosky.me.graphql.tweets.queries;
package cryptosky.me.tweets.queries;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import cryptosky.me.graphql.tweets.service.BtcTweetService;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.service.BtcTweetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

View File

@ -1,7 +1,7 @@
package cryptosky.me.graphql.tweets.service;
package cryptosky.me.tweets.service;
import cryptosky.me.graphql.tweets.models.entities.BtcTweetModel;
import cryptosky.me.graphql.tweets.models.repositories.BtcTweetRepository;
import cryptosky.me.tweets.models.entities.BtcTweetModel;
import cryptosky.me.tweets.models.repositories.BtcTweetRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

View File

@ -1,10 +1,15 @@
spring:
application:
name: @project.artifactId@
artemis:
broker-url: ${BROKER_URL:tcp://localhost:61616}
user: ${BROKER_USER:admin}
password: ${BROKER_PASSWORD:admin}
datasource:
url: ${JDBC_URL}
username: ${JDBC_USERNAME}
password: ${JDBC_PASSWORD}
url: ${JDBC_URL:jdbc:postgresql://localhost:25060/db}
username: ${JDBC_USERNAME:admin}
password: ${JDBC_PASSWORD:admin}
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 100
@ -13,12 +18,18 @@ spring:
jpa:
hibernate:
ddl-auto: validate
show-sql: true
show-sql: false
properties:
org.hibernate.envers.revision_field_name: revision_id
org.hibernate.envers.revision_type_field_name: revision_type
org.hibernate.jdbc.lob.non_contextual_creation: true
format_sql: true
database: postgresql
destinations:
pricing:
priceSave: PricingSave
priceDlq: PricingSave.dlq
server:
port: 9090