Skip to content

Commit

Permalink
Add support for custom headers and JSON payload configuration in webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
Siddhanttimeline committed Sep 30, 2024
1 parent 1d727d5 commit ec22ef7
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,79 +67,96 @@ public GenericPublisher(
public void sendMessage(ChangeEvent event) throws EventPublisherException {
long attemptTime = System.currentTimeMillis();
try {
// Post Message to default
String json = JsonUtils.pojoToJson(event);
if (webhook.getEndpoint() != null) {
if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) {
String hmac =
"sha256="
+ CommonUtil.calculateHMAC(decryptWebhookSecretKey(webhook.getSecretKey()), json);
postWebhookMessage(this, getTarget().header(RestUtil.SIGNATURE_HEADER, hmac), json);
} else {
postWebhookMessage(this, getTarget(), json);
}
}
String json =
CommonUtil.nullOrEmpty(webhook.getJson())
? JsonUtils.pojoToJson(event)
: webhook.getJson();

prepareAndSendMessage(json, getTarget());

// Post to Generic Webhook with Actions
String eventJson = JsonUtils.pojoToJson(event);
List<Invocation.Builder> targets =
getTargetsForWebhookAlert(
webhook, subscriptionDestination.getCategory(), WEBHOOK, client, event);
String eventJson = JsonUtils.pojoToJson(event);
for (Invocation.Builder actionTarget : targets) {
postWebhookMessage(this, actionTarget, eventJson);
}
} catch (Exception ex) {
Throwable cause = ex.getCause();
String message;
if (cause != null && cause.getClass() == UnknownHostException.class) {
message =
String.format(
"Unknown Host Exception for Generic Publisher : %s , WebhookEndpoint : %s",
subscriptionDestination.getId(), webhook.getEndpoint());
LOG.warn(message);
setErrorStatus(attemptTime, 400, "UnknownHostException");
} else {
message =
CatalogExceptionMessage.eventPublisherFailedToPublish(WEBHOOK, event, ex.getMessage());
LOG.error(message);
}
throw new EventPublisherException(message, Pair.of(subscriptionDestination.getId(), event));
handleException(attemptTime, event, ex);
}
}

@Override
public void sendTestMessage() throws EventPublisherException {
long attemptTime = System.currentTimeMillis();
try {
// Post Message to default
String json =
"This is a test message from OpenMetadata to confirm your webhook destination is configured correctly.";
if (webhook.getEndpoint() != null) {
if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) {
String hmac =
"sha256="
+ CommonUtil.calculateHMAC(decryptWebhookSecretKey(webhook.getSecretKey()), json);
postWebhookMessage(this, getTarget().header(RestUtil.SIGNATURE_HEADER, hmac), json);
} else {
postWebhookMessage(this, getTarget(), json);
}
}
CommonUtil.nullOrEmpty(webhook.getJson())
? "This is a test message from OpenMetadata to confirm your webhook destination is configured correctly."
: webhook.getJson();

prepareAndSendMessage(json, getTarget());
} catch (Exception ex) {
Throwable cause = ex.getCause();
String message;
if (cause != null && cause.getClass() == UnknownHostException.class) {
message =
String.format(
"Unknown Host Exception for Generic Publisher : %s , WebhookEndpoint : %s",
subscriptionDestination.getId(), webhook.getEndpoint());
LOG.warn(message);
setErrorStatus(attemptTime, 400, "UnknownHostException");
} else {
message = CatalogExceptionMessage.eventPublisherFailedToPublish(WEBHOOK, ex.getMessage());
LOG.error(message);
handleException(attemptTime, ex);
}
}

private void prepareAndSendMessage(String json, Invocation.Builder target) {
if (!CommonUtil.nullOrEmpty(webhook.getEndpoint())) {

// Add HMAC signature header if secret key is present
if (!CommonUtil.nullOrEmpty(webhook.getSecretKey())) {
String hmac =
"sha256="
+ CommonUtil.calculateHMAC(decryptWebhookSecretKey(webhook.getSecretKey()), json);
target.header(RestUtil.SIGNATURE_HEADER, hmac);
}
throw new EventPublisherException(message);

// Add custom headers if they exist
Map<String, String> headers = webhook.getHeaders();
if (!CommonUtil.nullOrEmpty(headers)) {
headers.forEach(target::header);
}

postWebhookMessage(this, target, json);
}
}

private void handleException(long attemptTime, ChangeEvent event, Exception ex)
throws EventPublisherException {
Throwable cause = ex.getCause();
String message;
if (cause != null && cause.getClass() == UnknownHostException.class) {
message =
String.format(
"Unknown Host Exception for Generic Publisher : %s , WebhookEndpoint : %s",
subscriptionDestination.getId(), webhook.getEndpoint());
LOG.warn(message);
setErrorStatus(attemptTime, 400, "UnknownHostException");
} else {
message =
CatalogExceptionMessage.eventPublisherFailedToPublish(WEBHOOK, event, ex.getMessage());
LOG.error(message);
}
throw new EventPublisherException(message, Pair.of(subscriptionDestination.getId(), event));
}

private void handleException(long attemptTime, Exception ex) throws EventPublisherException {
Throwable cause = ex.getCause();
String message;
if (cause != null && cause.getClass() == UnknownHostException.class) {
message =
String.format(
"Unknown Host Exception for Generic Publisher : %s , WebhookEndpoint : %s",
subscriptionDestination.getId(), webhook.getEndpoint());
LOG.warn(message);
setErrorStatus(attemptTime, 400, "UnknownHostException");
} else {
message = CatalogExceptionMessage.eventPublisherFailedToPublish(WEBHOOK, ex.getMessage());
LOG.error(message);
}
throw new EventPublisherException(message);
}

private Invocation.Builder getTarget() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
"description": "Secret set by the webhook client used for computing HMAC SHA256 signature of webhook payload and sent in `X-OM-Signature` header in POST requests to publish the events.",
"type": "string"
},
"headers": {
"description": "Custom headers to be sent with the webhook request.",
"type": "object",
"existingJavaType": "java.util.Map<String, String>"
},
"json": {
"description": "Optional JSON payload to be sent with the webhook request.",
"type": "string"
},
"sendToAdmins": {
"description": "Send the Event to Admins",
"type": "boolean",
Expand Down

0 comments on commit ec22ef7

Please sign in to comment.