Skip to content

Commit

Permalink
Fix/slow notification queries (#2786)
Browse files Browse the repository at this point in the history
* fix: add some timing to the notification tasks

* fix: attempt to speed up the queries run to generate notifications

- Limit the use of the `data_ids` field as it is super slow
- Only send notifications for published datasets
- For data cut queries only, limit the number of changes in an email to 10 (this query is very slow)
  • Loading branch information
niross authored Oct 4, 2023
1 parent cce6e85 commit de7f9d1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
26 changes: 19 additions & 7 deletions dataworkspace/dataworkspace/apps/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,14 +961,27 @@ def send_notification_emails():

def create_notifications():
logger.info("send_notification_emails: Creating notifications")
for notifiable_object in (
list(SourceTable.objects.order_by("id"))
+ list(CustomDatasetQuery.objects.order_by("id"))
+ list(ReferenceDataset.objects.order_by("id"))
):
source_objects = (
list(
SourceTable.objects.filter(
dataset__deleted=False, dataset__published=True
).order_by("id")
)
+ list(
CustomDatasetQuery.objects.filter(
dataset__deleted=False, dataset__published=True
).order_by("id")
)
+ list(ReferenceDataset.objects.filter(deleted=False, published=True).order_by("id"))
)
logger.info(
"send_notification_emails: Creating notifications for %d source objects",
len(source_objects),
)
for notifiable_object in source_objects:
logger.info(
"send_notification_emails: Creating notification for %s %s",
type(notifiable_object),
notifiable_object.__class__.__name__,
notifiable_object.id,
)
changelog = get_detailed_changelog(notifiable_object)
Expand Down Expand Up @@ -1001,7 +1014,6 @@ def create_notifications():
"send_notification_emails: Processing notifications for dataset %s",
dataset.name,
)
queryset = DataSetSubscription.objects.none()
queryset = (
dataset.subscriptions.filter(notify_on_schema_change=True)
if schema_change
Expand Down
14 changes: 9 additions & 5 deletions dataworkspace/dataworkspace/datasets_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,16 @@ def get_source_table_changelog(source_table):
SELECT id, source_data_modified_utc, table_schema||'.'||table_name, table_structure, data_hash_v1
FROM dataflow.metadata
WHERE data_type = {}
AND {} = any(data_ids)
AND table_schema = {}
AND table_name = {}
AND source_data_modified_utc IS NOT NULL
AND table_structure IS NOT NULL
ORDER BY id ASC;
"""
).format(
Literal(DataSetType.MASTER),
Literal(str(source_table.id)),
Literal(source_table.schema),
Literal(source_table.table),
)
)
return get_changelog_from_metadata_rows(cursor.fetchall())
Expand All @@ -197,7 +199,8 @@ def get_custom_dataset_query_changelog(query):
AND {} = any(data_ids)
AND source_data_modified_utc IS NOT NULL
AND table_structure IS NOT NULL
ORDER BY id ASC;
ORDER BY id ASC
LIMIT 10;
"""
).format(
Literal(DataSetType.DATACUT),
Expand All @@ -216,14 +219,15 @@ def get_reference_dataset_changelog(dataset):
SELECT id, source_data_modified_utc, table_schema||'.'||table_name, table_structure, data_hash_v1
FROM dataflow.metadata
WHERE data_type = {}
AND {} = any(data_ids)
AND table_schema = 'public'
AND table_name = {}
AND source_data_modified_utc IS NOT NULL
AND table_structure IS NOT NULL
ORDER BY id ASC;
"""
).format(
Literal(DataSetType.REFERENCE),
Literal(str(dataset.id)),
Literal(dataset.table_name),
)
)
return get_changelog_from_metadata_rows(cursor.fetchall())
Expand Down

0 comments on commit de7f9d1

Please sign in to comment.