Skip to content

Commit

Permalink
fix: Add log recap total messages ingested
Browse files Browse the repository at this point in the history
  • Loading branch information
svariant committed Nov 29, 2024
1 parent 322f1e7 commit ee05780
Showing 1 changed file with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void ingestPaymentPositions(List<String> messages) {
"PaymentPosition ingestion called at {} for payment positions with events list size {}",
LocalDateTime.now(),
messages.size());

int nullMessages = 0;
int errorMessages = 0;
// persist the item
for (String msg : messages) {
try {
Expand All @@ -73,6 +74,7 @@ public void ingestPaymentPositions(List<String> messages) {
msg, new TypeReference<DataCaptureMessage<PaymentPosition>>() {});

if (paymentPosition == null) {
nullMessages += 1;
continue;
}
PaymentPosition valuesBefore = paymentPosition.getBefore();
Expand Down Expand Up @@ -101,42 +103,56 @@ public void ingestPaymentPositions(List<String> messages) {
if (response) {
log.debug("PaymentPosition ingestion sent to eventhub at {}", LocalDateTime.now());
} else {
errorMessages += 1;
log.error(
"PaymentPosition ingestion unable to send to eventhub at {}", LocalDateTime.now());
}
} catch (JsonProcessingException e) {
errorMessages += 1;
log.error(
"PaymentPosition ingestion error JsonProcessingException at {}",
LocalDateTime.now(),
e);
} catch (PDVTokenizerException e) {
errorMessages += 1;
log.error(
"PaymentPosition ingestion error PDVTokenizerException at {}", LocalDateTime.now(), e);
} catch (PDVTokenizerUnexpectedException e) {
errorMessages += 1;
log.error(
"PaymentPosition ingestion error PDVTokenizerUnexpectedException at {}",
LocalDateTime.now(),
e);
} catch (Exception e) {
errorMessages += 1;
log.error(
"PaymentPosition ingestion error Generic exception at {}", LocalDateTime.now(), e);
}
}

log.debug(
"PaymentPosition ingested at {}: total messages {}, {} null and {} errors",
LocalDateTime.now(),
messages.size(),
nullMessages,
errorMessages);
}

public void ingestPaymentOptions(List<String> messages) {
log.debug(
"PaymentOption ingestion called at {} for payment positions with events list size {}",
LocalDateTime.now(),
messages.size());

int nullMessages = 0;
int errorMessages = 0;
// persist the item
for (String msg : messages) {
try {
DataCaptureMessage<PaymentOption> paymentOption =
objectMapper.readValue(msg, new TypeReference<DataCaptureMessage<PaymentOption>>() {});

if (paymentOption == null) {
nullMessages += 1;
continue;
}
PaymentOption valuesBefore = paymentOption.getBefore();
Expand All @@ -152,16 +168,26 @@ public void ingestPaymentOptions(List<String> messages) {
if (response) {
log.debug("PaymentOption ingestion sent to eventhub at {}", LocalDateTime.now());
} else {
errorMessages += 1;
log.error(
"PaymentOption ingestion unable to send to eventhub at {}", LocalDateTime.now());
}
} catch (JsonProcessingException e) {
errorMessages += 1;
log.error(
"PaymentOption ingestion error JsonProcessingException at {}", LocalDateTime.now(), e);
} catch (Exception e) {
errorMessages += 1;
log.error("PaymentOption ingestion error Generic exception at {}", LocalDateTime.now(), e);
}
}

log.debug(
"PaymentOption ingested at {}: total messages {}, {} null and {} errors",
LocalDateTime.now(),
messages.size(),
nullMessages,
errorMessages);
}

public void ingestTransfers(List<String> messages) {
Expand All @@ -170,13 +196,16 @@ public void ingestTransfers(List<String> messages) {
LocalDateTime.now(),
messages.size());

int nullMessages = 0;
int errorMessages = 0;
// persist the item
for (String msg : messages) {
try {
DataCaptureMessage<Transfer> transfer =
objectMapper.readValue(msg, new TypeReference<DataCaptureMessage<Transfer>>() {});

if (transfer == null) {
nullMessages += 1;
continue;
}
Transfer valuesBefore = transfer.getBefore();
Expand All @@ -192,13 +221,22 @@ public void ingestTransfers(List<String> messages) {
if (response) {
log.debug("Transfer ingestion sent to eventhub at {}", LocalDateTime.now());
} else {
errorMessages += 1;
log.error("Transfer ingestion unable to send to eventhub at {}", LocalDateTime.now());
}
} catch (JsonProcessingException e) {
errorMessages += 1;
log.error("Transfer ingestion error JsonProcessingException at {}", LocalDateTime.now(), e);
} catch (Exception e) {
errorMessages += 1;
log.error("Transfer ingestion error Generic exception at {}", LocalDateTime.now(), e);
}
}
log.debug(
"Transfer ingested at {}: total messages {}, {} null and {} errors",
LocalDateTime.now(),
messages.size(),
nullMessages,
errorMessages);
}
}

0 comments on commit ee05780

Please sign in to comment.