diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index 45aba365cce7..082280d9736c 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -18,7 +18,7 @@ import json import re import traceback -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Iterable, Optional, Tuple, Union from requests.exceptions import HTTPError @@ -572,11 +572,17 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]: ): return + raw_create_date: Optional[datetime] = table_entity_instance.get( + "creationTimeStamp" + ) + if raw_create_date: + raw_create_date = raw_create_date.replace(tzinfo=timezone.utc) + # create Profiles & Data Quality Column table_profile_request = CreateTableProfileRequest( tableProfile=TableProfile( timestamp=self.timestamp, - createDateTime=table_entity_instance["creationTimeStamp"], + createDateTime=raw_create_date, rowCount=int(table_extension.get("rowCount", 0)), columnCount=int(table_extension.get("columnCount", 0)), sizeInByte=int(table_extension.get("dataSize", 0)), diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index bac3a91b1bdd..c77c14a04aaa 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -15,7 +15,7 @@ from __future__ import annotations import traceback -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type from pydantic import ValidationError @@ -578,11 +578,17 @@ def get_profile(self) -> CreateTableProfileRequest: ) ] + raw_create_date: Optional[datetime] = self._table_results.get( + "createDateTime" + ) + if raw_create_date: + raw_create_date = raw_create_date.replace(tzinfo=timezone.utc) + table_profile = TableProfile( timestamp=self.profile_ts, columnCount=self._table_results.get("columnCount"), rowCount=self._table_results.get(RowCount.name()), - createDateTime=self._table_results.get("createDateTime"), + createDateTime=raw_create_date, sizeInByte=self._table_results.get("sizeInBytes"), profileSample=( self.profile_sample_config.profile_sample diff --git a/ingestion/tests/unit/test_workflow_parse.py b/ingestion/tests/unit/test_workflow_parse.py index 47e5dca7fbc6..5803fd84f4df 100644 --- a/ingestion/tests/unit/test_workflow_parse.py +++ b/ingestion/tests/unit/test_workflow_parse.py @@ -343,7 +343,7 @@ def test_parsing_ingestion_pipeline_mysql(self): }, "airflowConfig": { "retries": 0, - "startDate": "2023-12-19 00:00:00", + "startDate": "2023-12-19T00:00:00.000000Z", "retryDelay": 300, "concurrency": 1, "maxActiveRuns": 1, @@ -428,7 +428,7 @@ def test_parsing_ingestion_pipeline_dagster(self): }, "airflowConfig": { "retries": 0, - "startDate": "2023-12-19 00:00:00", + "startDate": "2023-12-19T00:00:00.000000Z", "retryDelay": 300, "concurrency": 1, "maxActiveRuns": 1, diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 44de0c45a501..1b6e001f61e5 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -17,7 +17,6 @@ from functools import partial from typing import Callable, Optional, Union -import airflow from airflow import DAG from openmetadata_managed_apis.api.utils import clean_dag_id from pydantic import ValidationError @@ -246,14 +245,20 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: :param ingestion_pipeline: pipeline configs :return: dict to use as kwargs """ + + if ingestion_pipeline.airflowConfig.startDate: + start_date = datetime.strptime( + ingestion_pipeline.airflowConfig.startDate.root, "%Y-%m-%dT%H:%M:%S.%fZ" + ).replace(tzinfo=None) + else: + start_date = datetime.now() - timedelta(days=1) + return { "dag_id": clean_dag_id(ingestion_pipeline.name.root), "description": ingestion_pipeline.description.root if ingestion_pipeline.description is not None else None, - "start_date": ingestion_pipeline.airflowConfig.startDate.root - if ingestion_pipeline.airflowConfig.startDate - else airflow.utils.dates.days_ago(1), + "start_date": start_date, "end_date": ingestion_pipeline.airflowConfig.endDate.root if ingestion_pipeline.airflowConfig.endDate else None, diff --git a/scripts/datamodel_generation.py b/scripts/datamodel_generation.py index a815e600e80d..085d3231d16f 100644 --- a/scripts/datamodel_generation.py +++ b/scripts/datamodel_generation.py @@ -85,12 +85,17 @@ # Supporting timezone aware datetime is too complex for the profiler DATETIME_AWARE_FILE_PATHS = [ f"{ingestion_path}src/metadata/generated/schema/type/basic.py", - f"{ingestion_path}src/metadata/generated/schema/entity/data/table.py", + # f"{ingestion_path}src/metadata/generated/schema/entity/data/table.py", ] for file_path in DATETIME_AWARE_FILE_PATHS: with open(file_path, "r", encoding=UTF_8) as file_: content = file_.read() - content = content.replace("AwareDatetime", "NaiveDatetime") + content = content.replace( + "from pydantic import AnyUrl, AwareDatetime, ConfigDict, EmailStr, Field, RootModel", + "from pydantic import AnyUrl, ConfigDict, EmailStr, Field, RootModel" + ) + content = content.replace("from datetime import date, time", "from datetime import date, time, datetime") + content = content.replace("AwareDatetime", "datetime") with open(file_path, "w", encoding=UTF_8) as file_: file_.write(content)