Skip to content

Commit

Permalink
storage reads from backfill tags table and backfill job_name column i…
Browse files Browse the repository at this point in the history
…f they are populated (#25497)

## Summary & Motivation
See #25460 for additional
context

updates backfill queries to use the job_name column and backfill tags
table if the migrations have run

## How I Tested These Changes

## Changelog

Added a new column to the BulkActions table and a new BackfillTags table
to improve performance when filtering Backfills. To take advantage of
these performance improvements, run `dagster instance migrate`. This
migration involves a schema migration to add the new column and table,
and a data migration to populate the new column for historical
backfills.
  • Loading branch information
jamiedemaria authored Oct 24, 2024
1 parent edfee57 commit 2772968
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 45 deletions.
100 changes: 65 additions & 35 deletions python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
)
from dagster._core.storage.runs.base import RunStorage
from dagster._core.storage.runs.migration import (
BACKFILL_JOB_NAME_AND_TAGS,
OPTIONAL_DATA_MIGRATIONS,
REQUIRED_DATA_MIGRATIONS,
RUN_BACKFILL_ID,
Expand Down Expand Up @@ -862,49 +863,76 @@ def wipe_daemon_heartbeats(self) -> None:
# https://stackoverflow.com/a/54386260/324449
conn.execute(DaemonHeartbeatsTable.delete())

def _backfills_query(self, filters: Optional[BulkActionsFilter] = None):
query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp])
if filters and filters.tags:
# Backfills do not have a corresponding tags table. However, all tags that are on a backfill are
# applied to the runs the backfill launches. So we can query for runs that match the tags and
# are also part of a backfill to find the backfills that match the tags.

backfills_with_tags_query = db_select([RunTagsTable.c.value]).where(
RunTagsTable.c.key == BACKFILL_ID_TAG
)

def _add_backfill_filters_to_table(
self, table: db.Table, filters: Optional[BulkActionsFilter]
) -> db.Table:
if filters and filters.tags and self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS):
for i, (key, value) in enumerate(filters.tags.items()):
run_tags_alias = db.alias(RunTagsTable, f"run_tags_filter{i}")
backfills_with_tags_query = backfills_with_tags_query.where(
backfill_tags_alias = db.alias(BackfillTagsTable, f"backfill_tags_filter{i}")

table = table.join(
backfill_tags_alias,
db.and_(
RunTagsTable.c.run_id == run_tags_alias.c.run_id,
run_tags_alias.c.key == key,
(run_tags_alias.c.value == value)
BulkActionsTable.c.key == backfill_tags_alias.c.backfill_id,
backfill_tags_alias.c.key == key,
(backfill_tags_alias.c.value == value)
if isinstance(value, str)
else run_tags_alias.c.value.in_(value),
else backfill_tags_alias.c.value.in_(value),
),
)
return table
return table

def _backfills_query(self, filters: Optional[BulkActionsFilter] = None):
query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp])
if filters and filters.tags:
if not self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS):
# if the migration was run, we added the query for tags filtering in _add_backfill_filters_to_table
# BackfillTags table has not been built. However, all tags that are on a backfill are
# applied to the runs the backfill launches. So we can query for runs that match the tags and
# are also part of a backfill to find the backfills that match the tags.

backfills_with_tags_query = db_select([RunTagsTable.c.value]).where(
RunTagsTable.c.key == BACKFILL_ID_TAG
)

for i, (key, value) in enumerate(filters.tags.items()):
run_tags_alias = db.alias(RunTagsTable, f"run_tags_filter{i}")
backfills_with_tags_query = backfills_with_tags_query.where(
db.and_(
RunTagsTable.c.run_id == run_tags_alias.c.run_id,
run_tags_alias.c.key == key,
(run_tags_alias.c.value == value)
if isinstance(value, str)
else run_tags_alias.c.value.in_(value),
),
)

query = query.where(BulkActionsTable.c.key.in_(db_subquery(backfills_with_tags_query)))
query = query.where(
BulkActionsTable.c.key.in_(db_subquery(backfills_with_tags_query))
)

if filters and filters.job_name:
run_tags_table = RunTagsTable
if self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS):
query = query.where(BulkActionsTable.c.job_name == filters.job_name)
else:
run_tags_table = RunTagsTable

runs_in_backfill_with_job_name = run_tags_table.join(
RunsTable,
db.and_(
RunTagsTable.c.run_id == RunsTable.c.run_id,
RunTagsTable.c.key == BACKFILL_ID_TAG,
RunsTable.c.pipeline_name == filters.job_name,
),
)
runs_in_backfill_with_job_name = run_tags_table.join(
RunsTable,
db.and_(
RunTagsTable.c.run_id == RunsTable.c.run_id,
RunTagsTable.c.key == BACKFILL_ID_TAG,
RunsTable.c.pipeline_name == filters.job_name,
),
)

backfills_with_job_name_query = db_select([RunTagsTable.c.value]).select_from(
runs_in_backfill_with_job_name
)
query = query.where(
BulkActionsTable.c.key.in_(db_subquery(backfills_with_job_name_query))
)
backfills_with_job_name_query = db_select([RunTagsTable.c.value]).select_from(
runs_in_backfill_with_job_name
)
query = query.where(
BulkActionsTable.c.key.in_(db_subquery(backfills_with_job_name_query))
)
if filters and filters.statuses:
query = query.where(
BulkActionsTable.c.status.in_([status.value for status in filters.statuses])
Expand Down Expand Up @@ -966,13 +994,15 @@ def get_backfills(
if status is not None:
filters = BulkActionsFilter(statuses=[status])

query = self._backfills_query(filters=filters)
table = self._add_backfill_filters_to_table(BulkActionsTable, filters)
query = self._backfills_query(filters=filters).select_from(table)
query = self._add_cursor_limit_to_backfills_query(query, cursor=cursor, limit=limit)
query = query.order_by(BulkActionsTable.c.id.desc())
rows = self.fetchall(query)
backfill_candidates = deserialize_values((row["body"] for row in rows), PartitionBackfill)

if filters and filters.tags:
if filters and filters.tags and not self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS):
# if we are still using the run tags table to get backfills by tag, we need to do an additional check.
# runs can have more tags than the backfill that launched them. Since we filtered tags by
# querying for runs with those tags, we need to do an additional check that the backfills
# also have the requested tags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.events import DagsterEvent, StepMaterializationData
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.execution.plan.outputs import StepOutputHandle
from dagster._core.execution.plan.state import KnownExecutionState
from dagster._core.instance import DagsterInstance, InstanceRef
Expand All @@ -45,7 +45,7 @@
from dagster._core.storage.event_log.migration import migrate_event_log_data
from dagster._core.storage.event_log.sql_event_log import SqlEventLogStorage
from dagster._core.storage.migration.utils import upgrading_instance
from dagster._core.storage.runs.migration import RUN_BACKFILL_ID
from dagster._core.storage.runs.migration import BACKFILL_JOB_NAME_AND_TAGS, RUN_BACKFILL_ID
from dagster._core.storage.sqlalchemy_compat import db_select
from dagster._core.storage.tags import (
BACKFILL_ID_TAG,
Expand Down Expand Up @@ -1338,6 +1338,23 @@ def test_add_backfill_tags():
backfill_timestamp=get_current_timestamp(),
)
instance.add_backfill(before_migration)
# filtering pre-migration relies on filtering runs, so add a run with the expected tags
pre_migration_run = instance.run_storage.add_run(
DagsterRun(
job_name="foo",
run_id=make_new_run_id(),
tags={"before": "migration", BACKFILL_ID_TAG: before_migration.backfill_id},
status=DagsterRunStatus.NOT_STARTED,
)
)

# filtering by tags works before migration
assert (
instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[
0
].backfill_id
== before_migration.backfill_id
)

instance.upgrade()
assert "backfill_tags" in get_sqlite3_tables(db_path)
Expand All @@ -1362,6 +1379,24 @@ def test_add_backfill_tags():
assert ids_to_tags.get(before_migration.backfill_id) == before_migration.tags
assert ids_to_tags[after_migration.backfill_id] == after_migration.tags

# filtering by tags works after migration
assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS)
# delete the run that was added pre-migration to prove that tags filtering is happening on the
# backfill_tags table
instance.delete_run(pre_migration_run.run_id)
assert (
instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[
0
].backfill_id
== before_migration.backfill_id
)
assert (
instance.get_backfills(filters=BulkActionsFilter(tags={"after": "migration"}))[
0
].backfill_id
== after_migration.backfill_id
)

# test downgrade
instance._run_storage._alembic_downgrade(rev="1aca709bba64")
assert get_current_alembic_version(db_path) == "1aca709bba64"
Expand Down Expand Up @@ -1392,7 +1427,7 @@ def test_add_bulk_actions_job_name_column():
),
repository_name="the_repo",
),
partition_set_name=partition_set_snap_name_for_job_name("foo"),
partition_set_name=partition_set_snap_name_for_job_name("before_migration"),
)
before_migration = PartitionBackfill(
"before_job_migration",
Expand All @@ -1403,12 +1438,38 @@ def test_add_bulk_actions_job_name_column():
backfill_timestamp=get_current_timestamp(),
)
instance.add_backfill(before_migration)
# filtering pre-migration relies on filtering runs, so add a run with the expected job_name
pre_migration_run = instance.run_storage.add_run(
DagsterRun(
job_name=before_migration.job_name,
run_id=make_new_run_id(),
tags={BACKFILL_ID_TAG: before_migration.backfill_id},
status=DagsterRunStatus.NOT_STARTED,
)
)

# filtering by job_name works before migration
assert (
instance.get_backfills(
filters=BulkActionsFilter(job_name=before_migration.job_name)
)[0].backfill_id
== before_migration.backfill_id
)

instance.upgrade()

backfill_columns = get_sqlite3_columns(db_path, "bulk_actions")
assert "job_name" in backfill_columns

partition_set_origin = RemotePartitionSetOrigin(
repository_origin=RemoteRepositoryOrigin(
code_location_origin=GrpcServerCodeLocationOrigin(
host="localhost", port=1234, location_name="test_location"
),
repository_name="the_repo",
),
partition_set_name=partition_set_snap_name_for_job_name("after_migration"),
)
after_migration = PartitionBackfill(
"after_job_migration",
partition_set_origin=partition_set_origin,
Expand All @@ -1429,6 +1490,24 @@ def test_add_bulk_actions_job_name_column():
assert ids_to_job_name[before_migration.backfill_id] == before_migration.job_name
assert ids_to_job_name[after_migration.backfill_id] == after_migration.job_name

# filtering by job_name works after migration
assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS)
# delete the run that was added pre-migration to prove that tags filtering is happening on the
# backfill_tags table
instance.delete_run(pre_migration_run.run_id)
assert (
instance.get_backfills(
filters=BulkActionsFilter(job_name=before_migration.job_name)
)[0].backfill_id
== before_migration.backfill_id
)
assert (
instance.get_backfills(
filters=BulkActionsFilter(job_name=after_migration.job_name)
)[0].backfill_id
== after_migration.backfill_id
)

# test downgrade
instance._run_storage._alembic_downgrade(rev="1aca709bba64")

Expand Down
Loading

0 comments on commit 2772968

Please sign in to comment.