145 lines
4.6 KiB
Java
145 lines
4.6 KiB
Java
package cryptosky.me.consumers;
|
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import cryptosky.me.dlq.DlqInfo;
|
|
import cryptosky.me.dlq.DlqService;
|
|
import cryptosky.me.exceptions.NoBodyOrStringException;
|
|
import cryptosky.me.pricing.models.entities.CryptoPriceModel;
|
|
import cryptosky.me.pricing.service.PriceService;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.jms.annotation.JmsListener;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import javax.jms.JMSException;
|
|
import javax.jms.TextMessage;
|
|
import java.io.IOException;
|
|
import java.util.UUID;
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
import static cryptosky.me.ArtemisSyncMessaging.*;
|
|
|
|
import static cryptosky.me.helpers.Utils.getCorrelationId;
|
|
import static cryptosky.me.logging.CorrelationInfo.setCorrelationId;
|
|
import static cryptosky.me.logging.CorrelationInfo.setSyncId;
|
|
import static java.lang.String.format;
|
|
|
|
@Component
|
|
public class PriceConsumer {
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(PriceConsumer.class);
|
|
private final ObjectMapper objectMapper;
|
|
private final PriceService priceService;
|
|
private final DlqService dlqService;
|
|
private final String dlqName;
|
|
private final String dlqGateway;
|
|
|
|
private CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
@Autowired
|
|
public PriceConsumer(
|
|
ObjectMapper objectMapper,
|
|
PriceService priceService,
|
|
DlqService dlqService,
|
|
@Value("${destinations.pricing.priceDlq}") String dlqName,
|
|
@Value("${destinations.gateway.gatewayDlq}") String dlqGateway
|
|
) {
|
|
this.objectMapper = objectMapper;
|
|
this.priceService = priceService;
|
|
this.dlqService = dlqService;
|
|
this.dlqName = dlqName;
|
|
this.dlqGateway = dlqGateway;
|
|
}
|
|
|
|
@JmsListener(destination = "${destinations.pricing.priceSave}")
|
|
public void receive(TextMessage message) throws JMSException {
|
|
String correlationId = getCorrelationId(message);
|
|
String syncId = UUID.randomUUID().toString();
|
|
|
|
setCorrelationId(correlationId);
|
|
setSyncId("");
|
|
|
|
logger.info("Received Message: " + message.getBody(String.class));
|
|
|
|
try {
|
|
|
|
if (message.getBody(String.class) == null | message.getBody(String.class).equals("")) {
|
|
throw new NoBodyOrStringException();
|
|
}
|
|
|
|
logger.info("Processing message [{}] as synId [{}] ...", correlationId, syncId);
|
|
|
|
CryptoPriceModel cryptoPriceModel;
|
|
|
|
try {
|
|
JsonNode messageJson = objectMapper.readTree(message.getText());
|
|
cryptoPriceModel = objectMapper.readValue(messageJson.traverse(), CryptoPriceModel.class);
|
|
|
|
logger.info("Message with syncId of [{}] is for customer [{}]", syncId, cryptoPriceModel);
|
|
|
|
priceService.createRecord(cryptoPriceModel, correlationId, syncId, message);
|
|
|
|
latch.countDown();
|
|
} catch (IOException e) {
|
|
logger.info(format(
|
|
"Message [%s] for pricingModel [%s] was not synced due to a readTree exception on getting the text",
|
|
syncId,
|
|
correlationId
|
|
), e);
|
|
latch.countDown();
|
|
throw e;
|
|
}
|
|
} catch (NoBodyOrStringException e) {
|
|
String dlqReason = format("No body or body isn't a String format for incoming %s for price event [%s]",
|
|
CONSUME_PRICE_EVENT, correlationId);
|
|
|
|
DlqInfo dlqInfo = new DlqInfo(
|
|
message.getJMSMessageID(),
|
|
dlqName,
|
|
dlqReason,
|
|
"0",
|
|
correlationId
|
|
);
|
|
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
|
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
return textMessage;
|
|
});
|
|
latch.countDown();
|
|
} catch (JMSException jmsException) {
|
|
String dlqReason = format("An Exception [%s] has occurred with a JMS action [%s] on [%s]", CONSUME_PRICE_EVENT, jmsException,
|
|
correlationId);
|
|
|
|
DlqInfo dlqInfo = new DlqInfo(
|
|
message.getJMSMessageID(),
|
|
dlqGateway,
|
|
dlqReason,
|
|
"0",
|
|
correlationId
|
|
);
|
|
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, jmsException, textMessage -> {
|
|
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
return textMessage;
|
|
});
|
|
latch.countDown();
|
|
} catch (Exception e) {
|
|
String dlqReason = format("Couldn't sync incoming %s for price event [%s]", CONSUME_PRICE_EVENT, correlationId);
|
|
|
|
DlqInfo dlqInfo = new DlqInfo(
|
|
message.getJMSMessageID(),
|
|
dlqName,
|
|
dlqReason,
|
|
"0",
|
|
correlationId
|
|
);
|
|
dlqService.sendToDlq(message, CONSUME_PRICE_EVENT.getMessage(), dlqInfo, e, textMessage -> {
|
|
textMessage.setStringProperty(MESSAGE_SAVE_SYNC_ID.getMessage(), syncId);
|
|
return textMessage;
|
|
});
|
|
latch.countDown();
|
|
}
|
|
}
|
|
|
|
} |