242 lines
10 KiB
Java

package cryptosky.me.pricing.service;
import cryptosky.me.dlq.DlqInfo;
import cryptosky.me.dlq.DlqService;
import cryptosky.me.exceptions.MessageCodes;
import cryptosky.me.pricing.models.entities.BtcPriceModel;
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
import cryptosky.me.exceptions.DatabaseViolationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.time.LocalDateTime;
import static cryptosky.me.ArtemisSyncMessaging.*;
import static cryptosky.me.SupportedCurrencies.*;
import static cryptosky.me.helpers.Utils.format;
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
import static java.util.UUID.randomUUID;
@Service
public class PriceService {
private final BtcPriceRepository btcPriceRepository;
private final DlqService dlqService;
private final String dlqName;
private final Logger logger = LoggerFactory.getLogger(PriceService.class);
@Autowired
public PriceService(
final BtcPriceRepository btcPriceRepository,
DlqService dlqService,
@Value("${destinations.pricing.priceDlq}") String dlqName
) {
this.btcPriceRepository = btcPriceRepository;
this.dlqService = dlqService;
this.dlqName = dlqName;
}
@Transactional
public BtcPriceModel createBtc(final String createdDate, final String type,
final float av_price,
final float h_price,
final float l_price,
final float o_price,
final float c_price,
final float volume ) {
final BtcPriceModel btcPrice = new BtcPriceModel();
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
btcPrice.setType(type);
btcPrice.setAverage_price(av_price);
btcPrice.setHigh_price(h_price);
btcPrice.setLow_price(l_price);
btcPrice.setOpen_price(o_price);
btcPrice.setClose_price(c_price);
btcPrice.setVolume(volume);
return btcPrice;
// return this.btcPriceRepository.save(btcPrice);
}
@Transactional
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, TextMessage message) {
String syncId = randomUUID().toString();
setSyncId(syncId);
try {
switch (cryptoPriceModel.getType()) {
case BTC:
BtcPriceModel priceModel = new BtcPriceModel();
priceModel.setId(getLatestId() + 1);
priceModel.setTimestamp(cryptoPriceModel.getTimestamp());
priceModel.setSyncId(syncId);
priceModel.setType(cryptoPriceModel.getType());
priceModel.setAverage_price(cryptoPriceModel.getAverage_price());
priceModel.setHigh_price(cryptoPriceModel.getHigh_price());
priceModel.setLow_price(cryptoPriceModel.getLow_price());
priceModel.setOpen_price(cryptoPriceModel.getOpen_price());
priceModel.setClose_price(cryptoPriceModel.getClose_price());
priceModel.setVolume(cryptoPriceModel.getVolume());
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", cryptoPriceModel.getType(), correlationId, syncId);
this.btcPriceRepository.save(priceModel);
return;
default:
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), cryptoPriceModel.getType(), MessageCodes.E01.getMessage());
throw new NotSupportedCurrencyTypeException();
}
} catch (DataIntegrityViolationException e) {
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), cryptoPriceModel.getType(), MessageCodes.E02.getMessage());
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
} catch (Exception e) {
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s] due to a generic exception", CONSUME_PRICE_EVENT, correlationId);
try {
DlqInfo dlqInfo = new DlqInfo(
message.getJMSMessageID(),
dlqName,
dlqReason,
"0",
correlationId
);
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
return textMessage;
});
logger.info("Sending [{}] to the DLQ", syncId);
} catch (JMSException jmsException) {
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
}
}
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getAllPrices( final int count ) {
return this.btcPriceRepository.findAll().stream()
.limit(count)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
return this.btcPriceRepository.findAll().stream()
.skip(startCount)
.limit(endCount - startCount)
.collect(Collectors.toList());
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getLatest() {
return this.btcPriceRepository.findAll().stream().findFirst();
}
@Transactional(readOnly = true)
public int getLatestId() {
List<BtcPriceModel> records = new ArrayList<>(this.btcPriceRepository.findAll());
BtcPriceModel latest = records.stream().skip(records.size() - 1).findFirst().get();
return latest.getId();
}
@Transactional(readOnly = true)
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
.findFirst();
}
@Transactional(readOnly = true)
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
return this.btcPriceRepository.findAll().stream()
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
.collect(Collectors.toList());
}
// @Transactional(readOnly = true)
// public List<BtcPriceModel> getAllLatest() {
// return this.btcPriceRepository.findAll().stream()
// .filter(distinctByKey(CryptoPriceModel::getType))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(count)
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public List<CryptoPriceModel> getAllByType(final String type ) {
// return this.btcPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .collect(Collectors.toList());
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .limit(1)
// .findFirst();
// }
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
// .findFirst();
// }
//
// @Transactional(readOnly = true)
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
// return this.cryptoPriceRepository.findAll().stream()
// .filter(typeList -> typeList.getType().equals(type))
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
// .findFirst();
// }
}