Skip to content

Commit

Permalink
#14859 - Fix signature of status callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull committed Jan 26, 2024
1 parent 6d02fe1 commit 4371cb5
Showing 1 changed file with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
}


def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, _):
def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, *_, **__):
"""
Airflow on_failure_callback to update workflow status if something unexpected
happens or if the DAG is externally killed.
Expand Down Expand Up @@ -315,7 +315,7 @@ def on_kill(self) -> None:
"""
workflow_config = self.op_kwargs.get("workflow_config")
if workflow_config:
send_failed_status_callback(workflow_config, None)
send_failed_status_callback(workflow_config)


def build_dag(
Expand All @@ -340,9 +340,7 @@ def build_dag(
# There's no need to retry if we have had an error. Wait until the next schedule or manual rerun.
retries=ingestion_pipeline.airflowConfig.retries or 0,
# each DAG will call its own OpenMetadataWorkflowConfig
on_failure_callback=partial(
send_failed_status_callback, workflow_config, None
),
on_failure_callback=partial(send_failed_status_callback, workflow_config),
# Add tag and ownership to easily identify DAGs generated by OM
owner=ingestion_pipeline.owner.name
if ingestion_pipeline.owner
Expand Down

0 comments on commit 4371cb5

Please sign in to comment.