package cryptosky.me.consumers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import cryptosky.me.dlq.DlqInfo; import cryptosky.me.dlq.DlqService; import cryptosky.me.exceptions.NoBodyOrStringException; import cryptosky.me.pricing.models.entities.CryptoPriceModel; import cryptosky.me.pricing.service.PriceService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.TextMessage; import java.io.IOException; import java.util.UUID; import java.util.concurrent.CountDownLatch; import static cryptosky.me.ArtemisSyncMessaging.*; import static cryptosky.me.helpers.Utils.getCorrelationId; import static java.lang.String.format; @Component public class PriceConsumer { private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class); private final ObjectMapper objectMapper; private final PriceService priceService; private final DlqService dlqService; private final String dlqName; private final String dlqGateway; private CountDownLatch latch = new CountDownLatch(1); @Autowired public PriceConsumer( ObjectMapper objectMapper, PriceService priceService, DlqService dlqService, @Value("${destinations.pricing.priceDlq}") String dlqName, @Value("${destinations.gateway.gatewayDlq}") String dlqGateway ) { this.objectMapper = objectMapper; this.priceService = priceService; this.dlqService = dlqService; this.dlqName = dlqName; this.dlqGateway = dlqGateway; } @JmsListener(destination = "${destinations.pricing.priceSave}") public void receive(TextMessage message) throws JMSException { String correlationId = getCorrelationId(message); String syncId = UUID.randomUUID().toString(); logger.info("Received Message: " + message.getBody(String.class)); try { if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) { throw new NoBodyOrStringException(); } logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId); CryptoPriceModel cryptoPriceModel; try { JsonNode messageJson = objectMapper.readTree(message.getText()); cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class); logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel); priceService.createRecord(cryptoPriceModel, correlationId, syncId, message); latch.countDown(); } catch (IOException e) { logger.info(format( "Message [%s] for pricingModel [%s] was not synced due to a readTree exception on getting the text", syncId, correlationId ), e); latch.countDown(); throw e; } } catch (NoBodyOrStringException e) { String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId); DlqInfo dlqInfo = new DlqInfo( message.getJMSMessageID(), dlqName, dlqReason, "0", correlationId ); dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> { textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); return textMessage; }); latch.countDown(); } catch (JMSException jmsException) { String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_PRICE_EVENT, jmsException, correlationId); DlqInfo dlqInfo = new DlqInfo( message.getJMSMessageID(), dlqGateway, dlqReason, "0", correlationId ); dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> { textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); return textMessage; }); latch.countDown(); } catch (Exception e) { String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId); DlqInfo dlqInfo = new DlqInfo( message.getJMSMessageID(), dlqName, dlqReason, "0", correlationId ); dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> { textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId); return textMessage; }); latch.countDown(); } } }