Skip to content

Commit

Permalink
fix apis
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull committed Jun 6, 2024
1 parent 2e45ce5 commit e4b7ce3
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from typing import Callable, Optional, Union

from airflow import DAG
from metadata.generated.schema.type.basic import Uuid, Timestamp

from openmetadata_managed_apis.api.utils import clean_dag_id
from pydantic import ValidationError
from requests.utils import quote
Expand Down Expand Up @@ -314,7 +316,7 @@ def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, *_,
pipeline_status = metadata.get_pipeline_status(
workflow_config.ingestionPipelineFQN, str(workflow_config.pipelineRunId)
)
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.endDate = Timestamp(int(datetime.now().timestamp() * 1000))
pipeline_status.pipelineState = PipelineState.failed

metadata.create_or_update_pipeline_status(
Expand Down Expand Up @@ -352,7 +354,7 @@ def build_dag(
with DAG(**build_dag_configs(ingestion_pipeline)) as dag:
# Initialize with random UUID4. Will be used by the callback instead of
# generating it inside the Workflow itself.
workflow_config.pipelineRunId = str(uuid.uuid4())
workflow_config.pipelineRunId = Uuid(uuid.uuid4())

CustomPythonOperator(
task_id=task_name,
Expand Down

0 comments on commit e4b7ce3

Please sign in to comment.