Skip to content

Commit

Permalink
revert datetime aware
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull committed Jun 6, 2024
1 parent f22c2d9 commit dbcbba3
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 12 deletions.
10 changes: 8 additions & 2 deletions ingestion/src/metadata/ingestion/source/database/sas/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import json
import re
import traceback
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Iterable, Optional, Tuple, Union

from requests.exceptions import HTTPError
Expand Down Expand Up @@ -572,11 +572,17 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]:
):
return

raw_create_date: Optional[datetime] = table_entity_instance.get(
"creationTimeStamp"
)
if raw_create_date:
raw_create_date = raw_create_date.replace(tzinfo=timezone.utc)

# create Profiles & Data Quality Column
table_profile_request = CreateTableProfileRequest(
tableProfile=TableProfile(
timestamp=self.timestamp,
createDateTime=table_entity_instance["creationTimeStamp"],
createDateTime=raw_create_date,
rowCount=int(table_extension.get("rowCount", 0)),
columnCount=int(table_extension.get("columnCount", 0)),
sizeInByte=int(table_extension.get("dataSize", 0)),
Expand Down
10 changes: 8 additions & 2 deletions ingestion/src/metadata/profiler/processor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

import traceback
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type

from pydantic import ValidationError
Expand Down Expand Up @@ -578,11 +578,17 @@ def get_profile(self) -> CreateTableProfileRequest:
)
]

raw_create_date: Optional[datetime] = self._table_results.get(
"createDateTime"
)
if raw_create_date:
raw_create_date = raw_create_date.replace(tzinfo=timezone.utc)

table_profile = TableProfile(
timestamp=self.profile_ts,
columnCount=self._table_results.get("columnCount"),
rowCount=self._table_results.get(RowCount.name()),
createDateTime=self._table_results.get("createDateTime"),
createDateTime=raw_create_date,
sizeInByte=self._table_results.get("sizeInBytes"),
profileSample=(
self.profile_sample_config.profile_sample
Expand Down
4 changes: 2 additions & 2 deletions ingestion/tests/unit/test_workflow_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def test_parsing_ingestion_pipeline_mysql(self):
},
"airflowConfig": {
"retries": 0,
"startDate": "2023-12-19 00:00:00",
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
Expand Down Expand Up @@ -428,7 +428,7 @@ def test_parsing_ingestion_pipeline_dagster(self):
},
"airflowConfig": {
"retries": 0,
"startDate": "2023-12-19 00:00:00",
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from functools import partial
from typing import Callable, Optional, Union

import airflow
from airflow import DAG
from openmetadata_managed_apis.api.utils import clean_dag_id
from pydantic import ValidationError
Expand Down Expand Up @@ -246,14 +245,20 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
:param ingestion_pipeline: pipeline configs
:return: dict to use as kwargs
"""

if ingestion_pipeline.airflowConfig.startDate:
start_date = datetime.strptime(
ingestion_pipeline.airflowConfig.startDate.root, "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(tzinfo=None)
else:
start_date = datetime.now() - timedelta(days=1)

return {
"dag_id": clean_dag_id(ingestion_pipeline.name.root),
"description": ingestion_pipeline.description.root
if ingestion_pipeline.description is not None
else None,
"start_date": ingestion_pipeline.airflowConfig.startDate.root
if ingestion_pipeline.airflowConfig.startDate
else airflow.utils.dates.days_ago(1),
"start_date": start_date,
"end_date": ingestion_pipeline.airflowConfig.endDate.root
if ingestion_pipeline.airflowConfig.endDate
else None,
Expand Down
9 changes: 7 additions & 2 deletions scripts/datamodel_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@
# Supporting timezone aware datetime is too complex for the profiler
DATETIME_AWARE_FILE_PATHS = [
f"{ingestion_path}src/metadata/generated/schema/type/basic.py",
f"{ingestion_path}src/metadata/generated/schema/entity/data/table.py",
# f"{ingestion_path}src/metadata/generated/schema/entity/data/table.py",
]

for file_path in DATETIME_AWARE_FILE_PATHS:
with open(file_path, "r", encoding=UTF_8) as file_:
content = file_.read()
content = content.replace("AwareDatetime", "NaiveDatetime")
content = content.replace(
"from pydantic import AnyUrl, AwareDatetime, ConfigDict, EmailStr, Field, RootModel",
"from pydantic import AnyUrl, ConfigDict, EmailStr, Field, RootModel"
)
content = content.replace("from datetime import date, time", "from datetime import date, time, datetime")
content = content.replace("AwareDatetime", "datetime")
with open(file_path, "w", encoding=UTF_8) as file_:
file_.write(content)

0 comments on commit dbcbba3

Please sign in to comment.