236 lines
10 KiB
Java
236 lines
10 KiB
Java
package cryptosky.me.pricing.service;
|
|
|
|
import cryptosky.me.dlq.DlqInfo;
|
|
import cryptosky.me.dlq.DlqService;
|
|
import cryptosky.me.exceptions.MessageCodes;
|
|
|
|
import cryptosky.me.pricing.models.entities.BtcPriceModel;
|
|
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
|
|
import cryptosky.me.pricing.models.repositories.BtcPriceRepository;
|
|
|
|
import cryptosky.me.exceptions.NotSupportedCurrencyTypeException;
|
|
import cryptosky.me.exceptions.DatabaseViolationException;
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.dao.DataIntegrityViolationException;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
import javax.jms.JMSException;
|
|
import javax.jms.TextMessage;
|
|
import java.util.ArrayList;
|
|
import java.util.Comparator;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
import java.util.stream.Collectors;
|
|
import java.time.LocalDateTime;
|
|
|
|
import static cryptosky.me.ArtemisSyncMessaging.*;
|
|
import static cryptosky.me.SupportedCurrencies.*;
|
|
|
|
import static cryptosky.me.helpers.Utils.format;
|
|
|
|
@Service
|
|
public class PriceService {
|
|
|
|
private final BtcPriceRepository btcPriceRepository;
|
|
private final DlqService dlqService;
|
|
private final String dlqName;
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(PriceService.class);
|
|
|
|
@Autowired
|
|
public PriceService(
|
|
final BtcPriceRepository btcPriceRepository,
|
|
DlqService dlqService,
|
|
@Value("${destinations.pricing.priceDlq}") String dlqName
|
|
) {
|
|
this.btcPriceRepository = btcPriceRepository;
|
|
this.dlqService = dlqService;
|
|
this.dlqName = dlqName;
|
|
}
|
|
|
|
@Transactional
|
|
public BtcPriceModel createBtc(final String createdDate, final String type,
|
|
final float av_price,
|
|
final float h_price,
|
|
final float l_price,
|
|
final float o_price,
|
|
final float c_price,
|
|
final float volume ) {
|
|
|
|
final BtcPriceModel btcPrice = new BtcPriceModel();
|
|
btcPrice.setTimestamp(LocalDateTime.parse((createdDate)));
|
|
btcPrice.setType(type);
|
|
btcPrice.setAverage_price(av_price);
|
|
btcPrice.setHigh_price(h_price);
|
|
btcPrice.setLow_price(l_price);
|
|
btcPrice.setOpen_price(o_price);
|
|
btcPrice.setClose_price(c_price);
|
|
btcPrice.setVolume(volume);
|
|
|
|
return btcPrice;
|
|
// return this.btcPriceRepository.save(btcPrice);
|
|
}
|
|
|
|
@Transactional
|
|
public void createRecord(CryptoPriceModel cryptoPriceModel, String correlationId, String syncId, TextMessage message) {
|
|
try {
|
|
switch (cryptoPriceModel.getType()) {
|
|
case BTC:
|
|
BtcPriceModel priceModel = new BtcPriceModel();
|
|
priceModel.setId(getLatestId() + 1);
|
|
priceModel.setTimestamp(cryptoPriceModel.getTimestamp());
|
|
priceModel.setSyncId(syncId);
|
|
priceModel.setType(cryptoPriceModel.getType());
|
|
priceModel.setAverage_price(cryptoPriceModel.getAverage_price());
|
|
priceModel.setHigh_price(cryptoPriceModel.getHigh_price());
|
|
priceModel.setLow_price(cryptoPriceModel.getLow_price());
|
|
priceModel.setOpen_price(cryptoPriceModel.getOpen_price());
|
|
priceModel.setClose_price(cryptoPriceModel.getClose_price());
|
|
priceModel.setVolume(cryptoPriceModel.getVolume());
|
|
|
|
logger.info("Saving {} record to the database for [{}] with syncId of [{}]", cryptoPriceModel.getType(), correlationId, syncId);
|
|
// this.btcPriceRepository.save(priceModel);
|
|
return;
|
|
default:
|
|
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E01.getCode(), cryptoPriceModel.getType(), MessageCodes.E01.getMessage());
|
|
throw new NotSupportedCurrencyTypeException();
|
|
}
|
|
} catch (DataIntegrityViolationException e) {
|
|
logger.error("Message for [{}] has failed with [{}] due to the currency [{}] {}", syncId, MessageCodes.E02.getCode(), cryptoPriceModel.getType(), MessageCodes.E02.getMessage());
|
|
|
|
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
|
|
|
try {
|
|
DlqInfo dlqInfo = new DlqInfo(
|
|
message.getJMSMessageID(),
|
|
dlqName,
|
|
dlqReason,
|
|
"0",
|
|
correlationId
|
|
);
|
|
dlqService.sendToDlq(message, SAVE_MESSAGE_DATABASE_EXCEPTION.getMessage(), dlqInfo, e, textMessage -> {
|
|
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
return textMessage;
|
|
});
|
|
logger.info("Sending [{}] to the DLQ", syncId);
|
|
} catch (JMSException jmsException) {
|
|
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
|
}
|
|
} catch (Exception e) {
|
|
String dlqReason = String.format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
|
|
|
try {
|
|
DlqInfo dlqInfo = new DlqInfo(
|
|
message.getJMSMessageID(),
|
|
dlqName,
|
|
dlqReason,
|
|
"0",
|
|
correlationId
|
|
);
|
|
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
|
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
return textMessage;
|
|
});
|
|
} catch (JMSException jmsException) {
|
|
logger.error("An Exception has occurred with a JMS action [%s]", jmsException);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
|
|
@Transactional(readOnly = true)
|
|
public List<BtcPriceModel> getAllPrices( final int count ) {
|
|
return this.btcPriceRepository.findAll().stream()
|
|
.limit(count)
|
|
.collect(Collectors.toList());
|
|
}
|
|
|
|
@Transactional(readOnly = true)
|
|
public List<BtcPriceModel> getPricesBetweenCounts( final int startCount, final int endCount ) {
|
|
return this.btcPriceRepository.findAll().stream()
|
|
.skip(startCount)
|
|
.limit(endCount - startCount)
|
|
.collect(Collectors.toList());
|
|
}
|
|
|
|
@Transactional(readOnly = true)
|
|
public Optional<BtcPriceModel> getLatest() {
|
|
return this.btcPriceRepository.findAll().stream().findFirst();
|
|
}
|
|
|
|
@Transactional(readOnly = true)
|
|
public int getLatestId() {
|
|
List<BtcPriceModel> records = new ArrayList<>(this.btcPriceRepository.findAll());
|
|
BtcPriceModel latest = records.stream().skip(records.size() - 1).findFirst().get();
|
|
return latest.getId();
|
|
}
|
|
|
|
@Transactional(readOnly = true)
|
|
public Optional<BtcPriceModel> getPriceForCreatedDate( final String createdDate ) {
|
|
return this.btcPriceRepository.findAll().stream()
|
|
.filter(createdDateList -> createdDateList.getTimestamp().equals(format(createdDate)))
|
|
.findFirst();
|
|
}
|
|
|
|
@Transactional(readOnly = true)
|
|
public List<BtcPriceModel> getPriceBetweenDates( final String startDate, final String endDate ) {
|
|
return this.btcPriceRepository.findAll().stream()
|
|
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isBefore(format(endDate)))
|
|
.filter(createdDateList -> format(createdDateList.getTimestamp().toString()).isAfter(format(startDate)))
|
|
.collect(Collectors.toList());
|
|
}
|
|
|
|
// @Transactional(readOnly = true)
|
|
// public List<BtcPriceModel> getAllLatest() {
|
|
// return this.btcPriceRepository.findAll().stream()
|
|
// .filter(distinctByKey(CryptoPriceModel::getType))
|
|
// .collect(Collectors.toList());
|
|
// }
|
|
|
|
// @Transactional(readOnly = true)
|
|
// public List<CryptoPriceModel> getLimitPricesByType(final int count, final String type ) {
|
|
// return this.btcPriceRepository.findAll().stream()
|
|
// .filter(typeList -> typeList.getType().equals(type))
|
|
// .limit(count)
|
|
// .collect(Collectors.toList());
|
|
// }
|
|
|
|
// @Transactional(readOnly = true)
|
|
// public List<CryptoPriceModel> getAllByType(final String type ) {
|
|
// return this.btcPriceRepository.findAll().stream()
|
|
// .filter(typeList -> typeList.getType().equals(type))
|
|
// .collect(Collectors.toList());
|
|
// }
|
|
|
|
// @Transactional(readOnly = true)
|
|
// public Optional<CryptoPriceModel> getLatestByType(final String type ) {
|
|
// return this.cryptoPriceRepository.findAll().stream()
|
|
// .filter(typeList -> typeList.getType().equals(type))
|
|
// .limit(1)
|
|
// .findFirst();
|
|
// }
|
|
|
|
// @Transactional(readOnly = true)
|
|
// public Optional<CryptoPriceModel> getPriceByCreatedDateForType( final String type, final String createdDate ) {
|
|
// return this.cryptoPriceRepository.findAll().stream()
|
|
// .filter(typeList -> typeList.getType().equals(type))
|
|
// .filter(createdDateList -> createdDateList.getTimestamp().equals(LocalDate.parse(createdDate)))
|
|
// .findFirst();
|
|
// }
|
|
//
|
|
// @Transactional(readOnly = true)
|
|
// public Optional<CryptoPriceModel> getPriceBetweenCreatedDatesForType( final String type, final String startDate, final String endDate ) {
|
|
// return this.cryptoPriceRepository.findAll().stream()
|
|
// .filter(typeList -> typeList.getType().equals(type))
|
|
// .filter(createdDateList -> createdDateList.getTimestamp().isBefore(LocalDate.parse(endDate)))
|
|
// .filter(createdDateList -> createdDateList.getTimestamp().isAfter(LocalDate.parse(startDate)))
|
|
// .findFirst();
|
|
// }
|
|
}
|