Skip to content

Commit

Permalink
data migration for populating backfill job name and backfill tags (#2…
Browse files Browse the repository at this point in the history
…5470)

## Summary & Motivation
See #25460 for additional
context

Adds the data migration to populate BackfillTags table and
BulkActionsTable.job_name with historical data

## How I Tested These Changes
  • Loading branch information
jamiedemaria authored Oct 24, 2024
1 parent a1d8f96 commit edfee57
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 10 deletions.
97 changes: 96 additions & 1 deletion python_modules/dagster/dagster/_core/storage/runs/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from dagster._core.execution.job_backfill import PartitionBackfill
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunRecord
from dagster._core.storage.runs.base import RunStorage
from dagster._core.storage.runs.schema import BulkActionsTable, RunsTable, RunTagsTable
from dagster._core.storage.runs.schema import (
BackfillTagsTable,
BulkActionsTable,
RunsTable,
RunTagsTable,
)
from dagster._core.storage.sqlalchemy_compat import db_select
from dagster._core.storage.tags import (
BACKFILL_ID_TAG,
Expand All @@ -28,6 +33,7 @@
RUN_REPO_LABEL_TAGS = "run_repo_label_tags"
BULK_ACTION_TYPES = "bulk_action_types"
RUN_BACKFILL_ID = "run_backfill_id"
BACKFILL_JOB_NAME_AND_TAGS = "backfill_job_name_and_tags"

PrintFn: TypeAlias = Callable[[Any], None]
MigrationFn: TypeAlias = Callable[[RunStorage, Optional[PrintFn]], None]
Expand All @@ -38,6 +44,7 @@
RUN_REPO_LABEL_TAGS: lambda: migrate_run_repo_tags,
BULK_ACTION_TYPES: lambda: migrate_bulk_actions,
RUN_BACKFILL_ID: lambda: migrate_run_backfill_id,
BACKFILL_JOB_NAME_AND_TAGS: lambda: migrate_backfill_job_name_and_tags,
}
# for `dagster instance reindex`, optionally run for better read performance
OPTIONAL_DATA_MIGRATIONS: Final[Mapping[str, Callable[[], MigrationFn]]] = {
Expand Down Expand Up @@ -104,6 +111,31 @@ def chunked_run_records_iterator(
progress.update(len(chunk))


def chunked_backfill_iterator(
storage: RunStorage, print_fn: Optional[PrintFn] = None, chunk_size: int = CHUNK_SIZE
) -> Iterator[PartitionBackfill]:
with ExitStack() as stack:
if print_fn:
backfill_count = storage.get_backfills_count()
progress = stack.enter_context(tqdm(total=backfill_count))
else:
progress = None

cursor = None
has_more = True

while has_more:
chunk = storage.get_backfills(cursor=cursor, limit=chunk_size)
has_more = chunk_size and len(chunk) >= chunk_size

for backfill in chunk:
cursor = backfill.backfill_id
yield backfill

if progress:
progress.update(len(chunk))


def migrate_run_partition(storage: RunStorage, print_fn: Optional[PrintFn] = None) -> None:
"""Utility method to build an asset key index from the data in existing event log records.
Takes in event_log_storage, and a print_fn to keep track of progress.
Expand Down Expand Up @@ -299,3 +331,66 @@ def add_backfill_id(run_storage: RunStorage, run_id: str, backfill_id) -> None:
backfill_id=backfill_id,
)
)


def migrate_backfill_job_name_and_tags(
storage: RunStorage, print_fn: Optional[PrintFn] = None
) -> None:
"""Utility method to add a backfill's job_name to the bulk_actions table and tags to the backfill_tags table."""
if print_fn:
print_fn("Querying run storage.")

for backfill in chunked_backfill_iterator(storage, print_fn):
if backfill.tags:
add_backfill_tags(
run_storage=storage, backfill_id=backfill.backfill_id, tags=backfill.tags
)

if backfill.job_name is not None:
add_backfill_job_name(
run_storage=storage, backfill_id=backfill.backfill_id, job_name=backfill.job_name
)


def add_backfill_tags(run_storage: RunStorage, backfill_id: str, tags: Mapping[str, str]):
from dagster._core.storage.runs.sql_run_storage import SqlRunStorage

check.str_param(backfill_id, "run_id")
check.dict_param(tags, "tags", key_type=str, value_type=str)
check.inst_param(run_storage, "run_storage", RunStorage)

if not isinstance(run_storage, SqlRunStorage):
return

with run_storage.connect() as conn:
conn.execute(
BackfillTagsTable.insert(),
[
dict(
backfill_id=backfill_id,
key=k,
value=v,
)
for k, v in tags.items()
],
)


def add_backfill_job_name(run_storage: RunStorage, backfill_id: str, job_name: str):
from dagster._core.storage.runs.sql_run_storage import SqlRunStorage

check.str_param(backfill_id, "run_id")
check.str_param(job_name, "job_name")
check.inst_param(run_storage, "run_storage", RunStorage)

if not isinstance(run_storage, SqlRunStorage):
return

with run_storage.connect() as conn:
conn.execute(
BulkActionsTable.update()
.values(
job_name=job_name,
)
.where(BulkActionsTable.c.key == backfill_id)
)
Original file line number Diff line number Diff line change
Expand Up @@ -1357,9 +1357,9 @@ def test_add_backfill_tags():
cursor.execute("SELECT backfill_id, key, value FROM backfill_tags")
rows = cursor.fetchall()

assert len(rows) == 1
assert len(rows) == 2
ids_to_tags = {row[0]: {row[1]: row[2]} for row in rows}
assert ids_to_tags.get(before_migration.backfill_id) is None
assert ids_to_tags.get(before_migration.backfill_id) == before_migration.tags
assert ids_to_tags[after_migration.backfill_id] == after_migration.tags

# test downgrade
Expand Down Expand Up @@ -1426,7 +1426,7 @@ def test_add_bulk_actions_job_name_column():

assert len(rows) == 3 # a backfill exists in the db snapshot
ids_to_job_name = {row[0]: row[1] for row in rows}
assert ids_to_job_name[before_migration.backfill_id] is None
assert ids_to_job_name[before_migration.backfill_id] == before_migration.job_name
assert ids_to_job_name[after_migration.backfill_id] == after_migration.job_name

# test downgrade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,9 @@ def test_add_backfill_tags(conn_string):
]
)
).fetchall()
assert len(rows) == 1
assert len(rows) == 2
ids_to_tags = {row[0]: {row[1]: row[2]} for row in rows}
assert ids_to_tags.get(before_migration.backfill_id) is None
assert ids_to_tags.get(before_migration.backfill_id) == before_migration.tags
assert ids_to_tags[after_migration.backfill_id] == after_migration.tags


Expand Down Expand Up @@ -710,5 +710,5 @@ def test_add_bulk_actions_job_name_column(conn_string):
).fetchall()
assert len(rows) == 2
ids_to_job_name = {row[0]: row[1] for row in rows}
assert ids_to_job_name[before_migration.backfill_id] is None
assert ids_to_job_name[before_migration.backfill_id] == before_migration.job_name
assert ids_to_job_name[after_migration.backfill_id] == after_migration.job_name
Original file line number Diff line number Diff line change
Expand Up @@ -1058,9 +1058,9 @@ def test_add_backfill_tags(hostname, conn_string):
]
)
).fetchall()
assert len(rows) == 1
assert len(rows) == 2
ids_to_tags = {row[0]: {row[1]: row[2]} for row in rows}
assert ids_to_tags.get(before_migration.backfill_id) is None
assert ids_to_tags.get(before_migration.backfill_id) == before_migration.tags
assert ids_to_tags[after_migration.backfill_id] == after_migration.tags


Expand Down Expand Up @@ -1131,5 +1131,5 @@ def test_add_bulk_actions_job_name_column(hostname, conn_string):
).fetchall()
assert len(rows) == 2
ids_to_job_name = {row[0]: row[1] for row in rows}
assert ids_to_job_name[before_migration.backfill_id] is None
assert ids_to_job_name[before_migration.backfill_id] == before_migration.job_name
assert ids_to_job_name[after_migration.backfill_id] == after_migration.job_name

0 comments on commit edfee57

Please sign in to comment.