Skip to content

Commit

Permalink
Fixes #17718 : Notification Debuggability (#18199)
Browse files Browse the repository at this point in the history
* create EventSubscriptionDiagnosticInfo

* API for failedEvents

* migrations

* add index on source

* list all failedEvents optionally filtered by source

* refactor

* refactor

* getSuccessfullySentChangeEventsForAlert API

* increase defaultValue of limit to 100

* resolve conflicts

* listEvents API with 'failed', 'successful,' and 'unprocessed' query parameters

* fix description

* eventSource as enums

* refactor name.

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
  • Loading branch information
Siddhanttimeline and harshach authored Oct 30, 2024
1 parent 23f1381 commit f0fc482
Show file tree
Hide file tree
Showing 20 changed files with 953 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,6 @@ WHERE serviceType IN ('Athena','BigQuery','Mssql','Mysql','Oracle','Postgres','R
update dbservice_entity
set json = JSON_SET(json, '$.connection.config.supportsSystemProfile', true)
where serviceType in ('Snowflake', 'Redshift', 'BigQuery');

-- Update all rows in the consumers_dlq table to set the source column to 'publisher'
UPDATE consumers_dlq SET source = 'publisher';
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ CREATE TABLE IF NOT EXISTS apps_data_store (
identifier VARCHAR(256) NOT NULL,
type VARCHAR(256) NOT NULL,
json JSON NOT NULL
);
);

-- Add the source column to the consumers_dlq table
ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255) NOT NULL;

-- Create an index on the source column in the consumers_dlq table
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ WHERE serviceType IN ('Athena','BigQuery','Mssql','Mysql','Oracle','Postgres','R
UPDATE dbservice_entity
SET json = jsonb_set(json::jsonb, '{connection,config,supportsSystemProfile}', 'true'::jsonb)
WHERE serviceType IN ('Snowflake', 'Redshift', 'BigQuery');

-- Update all rows in the consumers_dlq table to set the source column to 'publisher'
UPDATE consumers_dlq SET source = 'publisher';
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ CREATE TABLE IF NOT EXISTS apps_data_store (
identifier VARCHAR(256) NOT NULL,
type VARCHAR(256) NOT NULL,
json JSON NOT NULL
);
);

-- Add the source column to the consumers_dlq table
ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255) NOT NULL;

-- Create an index on the source column in the consumers_dlq table
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public abstract class AbstractEventConsumer
public static final String OFFSET_EXTENSION = "eventSubscription.Offset";
public static final String METRICS_EXTENSION = "eventSubscription.metrics";
public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent";

private long offset = -1;
private AlertMetrics alertMetrics;

Expand All @@ -77,8 +78,13 @@ protected void doInit(JobExecutionContext context) {
// To be implemented by the Subclass if needed
}

public enum FailureTowards {
SUBSCRIBER,
PUBLISHER
}

@Override
public void handleFailedEvent(EventPublisherException ex) {
public void handleFailedEvent(EventPublisherException ex, boolean errorOnSub) {
UUID failingSubscriptionId = ex.getChangeEventWithSubscription().getLeft();
ChangeEvent changeEvent = ex.getChangeEventWithSubscription().getRight();
LOG.debug(
Expand All @@ -87,6 +93,8 @@ public void handleFailedEvent(EventPublisherException ex) {
failingSubscriptionId,
changeEvent);

FailureTowards source = errorOnSub ? FailureTowards.SUBSCRIBER : FailureTowards.PUBLISHER;

Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertFailedEvent(
Expand All @@ -97,7 +105,9 @@ public void handleFailedEvent(EventPublisherException ex) {
.withFailingSubscriptionId(failingSubscriptionId)
.withChangeEvent(changeEvent)
.withRetriesLeft(eventSubscription.getRetries())
.withTimestamp(System.currentTimeMillis())));
.withReason(ex.getMessage())
.withTimestamp(System.currentTimeMillis())),
source.toString());
}

private long loadInitialOffset(JobExecutionContext context) {
Expand Down Expand Up @@ -164,7 +174,7 @@ public void publishEvents(Map<ChangeEvent, Set<UUID>> events) {
alertMetrics.withSuccessEvents(alertMetrics.getSuccessEvents() + 1);
} catch (EventPublisherException e) {
alertMetrics.withFailedEvents(alertMetrics.getFailedEvents() + 1);
handleFailedEvent(e);
handleFailedEvent(e, false);
}
}
}
Expand All @@ -176,13 +186,15 @@ public void commit(JobExecutionContext jobExecutionContext) {
// Upsert Offset
EventSubscriptionOffset eventSubscriptionOffset =
new EventSubscriptionOffset().withOffset(offset).withTimestamp(currentTime);

Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertSubscriberExtension(
eventSubscription.getId().toString(),
OFFSET_EXTENSION,
"eventSubscriptionOffset",
JsonUtils.pojoToJson(eventSubscriptionOffset));

jobExecutionContext
.getJobDetail()
.getJobDataMap()
Expand All @@ -195,13 +207,15 @@ public void commit(JobExecutionContext jobExecutionContext) {
.withFailedEvents(alertMetrics.getFailedEvents())
.withSuccessEvents(alertMetrics.getSuccessEvents())
.withTimestamp(currentTime);

Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertSubscriberExtension(
eventSubscription.getId().toString(),
METRICS_EXTENSION,
"alertMetrics",
JsonUtils.pojoToJson(metrics));

jobExecutionContext.getJobDetail().getJobDataMap().put(METRICS_EXTENSION, alertMetrics);

// Populate the Destination map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ public void sendAlert(UUID receiverId, ChangeEvent event) throws EventPublisherE
if (destinationMap.containsKey(receiverId)) {
Destination<ChangeEvent> destination = destinationMap.get(receiverId);
if (Boolean.TRUE.equals(destination.getEnabled())) {
destination.sendMessage(event);
try {
destination.sendMessage(event);
} catch (EventPublisherException ex) {
handleFailedEvent(ex, true);
}
} else {
LOG.debug(
"Event Subscription:{} Skipping sending message since, disabled subscription with Id: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface Consumer<T> {

void publishEvents(Map<ChangeEvent, Set<UUID>> events);

void handleFailedEvent(EventPublisherException e);
void handleFailedEvent(EventPublisherException e, boolean errorOnSub);

void commit(JobExecutionContext jobExecutionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
String message =
CatalogExceptionMessage.eventPublisherFailedToPublish(EMAIL, event, e.getMessage());
LOG.error(message);
throw new EventPublisherException(message, Pair.of(subscriptionDestination.getId(), event));
throw new EventPublisherException(
CatalogExceptionMessage.eventPublisherFailedToPublish(EMAIL, e.getMessage()),
Pair.of(subscriptionDestination.getId(), event));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void sendMessage(ChangeEvent changeEvent) throws EventPublisherException
ACTIVITY_FEED, changeEvent, ex.getMessage());
LOG.error(message);
throw new EventPublisherException(
message, Pair.of(subscriptionDestination.getId(), changeEvent));
CatalogExceptionMessage.eventPublisherFailedToPublish(ACTIVITY_FEED, ex.getMessage()),
Pair.of(subscriptionDestination.getId(), changeEvent));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
String message =
CatalogExceptionMessage.eventPublisherFailedToPublish(G_CHAT, event, e.getMessage());
LOG.error(message);
throw new EventPublisherException(message, Pair.of(subscriptionDestination.getId(), event));
throw new EventPublisherException(
CatalogExceptionMessage.eventPublisherFailedToPublish(G_CHAT, e.getMessage()),
Pair.of(subscriptionDestination.getId(), event));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ private void handleException(long attemptTime, ChangeEvent event, Exception ex)
String message =
CatalogExceptionMessage.eventPublisherFailedToPublish(WEBHOOK, event, ex.getMessage());
LOG.error(message);
throw new EventPublisherException(message, Pair.of(subscriptionDestination.getId(), event));
throw new EventPublisherException(
CatalogExceptionMessage.eventPublisherFailedToPublish(WEBHOOK, ex.getMessage()),
Pair.of(subscriptionDestination.getId(), event));
}

private void handleException(long attemptTime, Exception ex) throws EventPublisherException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
String message =
CatalogExceptionMessage.eventPublisherFailedToPublish(MS_TEAMS, event, e.getMessage());
LOG.error(message);
throw new EventPublisherException(message, Pair.of(subscriptionDestination.getId(), event));
throw new EventPublisherException(
CatalogExceptionMessage.eventPublisherFailedToPublish(MS_TEAMS, e.getMessage()),
Pair.of(subscriptionDestination.getId(), event));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
String message =
CatalogExceptionMessage.eventPublisherFailedToPublish(SLACK, event, e.getMessage());
LOG.error(message);
throw new EventPublisherException(message, Pair.of(subscriptionDestination.getId(), event));
throw new EventPublisherException(
CatalogExceptionMessage.eventPublisherFailedToPublish(SLACK, e.getMessage()),
Pair.of(subscriptionDestination.getId(), event));
}
}

Expand Down
Loading

0 comments on commit f0fc482

Please sign in to comment.