Skip to content

Commit

Permalink
Fix - e2e tests for pydantic V2 (open-metadata#16551)
Browse files Browse the repository at this point in the history
* Fix - e2e tests for pydantic V2

* add correct default

* add correct default

* revert datetime aware

* revert datetime aware

* revert datetime aware

* revert datetime aware

* revert datetime aware

* revert datetime aware

* revert datetime aware

* revert datetime aware

* fix apis

* format
  • Loading branch information
pmbrull authored Jun 7, 2024
1 parent 98945cb commit cb72a22
Show file tree
Hide file tree
Showing 115 changed files with 316 additions and 289 deletions.
2 changes: 1 addition & 1 deletion ingestion/operators/docker/exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def main():
raw_workflow_config = yaml.safe_load(config)
raw_workflow_config["pipelineRunId"] = pipeline_run_id

workflow_config = OpenMetadataWorkflowConfig.parse_obj(raw_workflow_config)
workflow_config = OpenMetadataWorkflowConfig.model_validate(raw_workflow_config)
metadata = OpenMetadata(
config=workflow_config.workflowConfig.openMetadataServerConfig
)
Expand Down
2 changes: 1 addition & 1 deletion ingestion/operators/docker/run_automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def main():
set_loggers_level(logging.INFO)

automation_workflow_dict = yaml.safe_load(config)
automation_workflow = AutomationWorkflow.parse_obj(automation_workflow_dict)
automation_workflow = AutomationWorkflow.model_validate(automation_workflow_dict)

execute(automation_workflow)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from __future__ import annotations

import abc
from datetime import datetime, timezone
from datetime import datetime
from typing import Callable, Iterable, Optional

from metadata.generated.schema.analytics.reportData import ReportData
Expand All @@ -43,7 +43,7 @@ def __init_subclass__(cls, *args, **kwargs) -> None:

def __init__(self, metadata: OpenMetadata):
self.metadata = metadata
self.timestamp = Timestamp(int(datetime.now(timezone.utc).timestamp() * 1000))
self.timestamp = Timestamp(int(datetime.now().timestamp() * 1000))
self.processor_status = Status()
self._refined_data = {}
self.post_hook: Optional[Callable] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from datetime import datetime, timezone
from datetime import datetime
from typing import Optional

from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
Expand Down Expand Up @@ -94,7 +94,7 @@ def run_test_case(
test_handler = TestHandler(
self.dfs,
test_case=test_case,
execution_date=int(datetime.now(tz=timezone.utc).timestamp() * 1000),
execution_date=int(datetime.now().timestamp() * 1000),
)

return Validator(validator_obj=test_handler).validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
supporting sqlalchemy abstraction layer
"""

from datetime import datetime, timezone
from datetime import datetime
from typing import Optional, Union

from sqlalchemy.orm import DeclarativeMeta
Expand Down Expand Up @@ -169,7 +169,7 @@ def run_test_case(
test_handler = TestHandler(
self.runner,
test_case=test_case,
execution_date=int(datetime.now(tz=timezone.utc).timestamp() * 1000),
execution_date=int(datetime.now().timestamp() * 1000),
)

return Validator(validator_obj=test_handler).validate()
Expand Down
6 changes: 2 additions & 4 deletions ingestion/src/metadata/great_expectations/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""
import logging
import traceback
from datetime import datetime, timezone
from datetime import datetime
from typing import Dict, List, Optional, Union, cast

from great_expectations.checkpoint.actions import ValidationAction
Expand Down Expand Up @@ -423,9 +423,7 @@ def _handle_test_case(

self.ometa_conn.add_test_case_results(
test_results=TestCaseResult(
timestamp=Timestamp(
int(datetime.now(tz=timezone.utc).timestamp() * 1000)
),
timestamp=Timestamp(int(datetime.now().timestamp() * 1000)),
testCaseStatus=TestCaseStatus.Success
if result["success"]
else TestCaseStatus.Failed,
Expand Down
9 changes: 4 additions & 5 deletions ingestion/src/metadata/ingestion/bulksink/metadata_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import os
import shutil
import traceback
from datetime import datetime, timezone
from datetime import datetime
from pathlib import Path
from typing import List, Optional

Expand All @@ -39,6 +39,7 @@
StackTraceError,
)
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.basic import Timestamp
from metadata.generated.schema.type.lifeCycle import AccessDetails, LifeCycle
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableUsageCount
from metadata.generated.schema.type.usageRequest import UsageRequest
Expand Down Expand Up @@ -282,9 +283,7 @@ def __get_table_joins(
Method to get Table Joins
"""
# TODO: Clean up how we are passing dates from query parsing to here to use timestamps instead of strings
start_date = datetime.fromtimestamp(int(table_usage.date) / 1000).replace(
tzinfo=timezone.utc
)
start_date = datetime.fromtimestamp(int(table_usage.date) / 1000)
table_joins: TableJoins = TableJoins(
columnJoins=[], directTableJoins=[], startDate=start_date
)
Expand Down Expand Up @@ -374,7 +373,7 @@ def _get_table_life_cycle_data(
query_type = get_query_type(create_query=create_query)
if query_type:
access_details = AccessDetails(
timestamp=create_query.queryDate.root,
timestamp=Timestamp(create_query.queryDate.root),
accessedBy=user,
accessedByAProcess=process_user,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
creating a service
"""
import traceback
from datetime import datetime, timezone
from datetime import datetime
from functools import partial
from typing import Callable, List, Optional

Expand Down Expand Up @@ -148,14 +148,14 @@ def _test_connection_steps_automation_workflow(
break

test_connection_result.lastUpdatedAt = Timestamp(
int(datetime.now(timezone.utc).timestamp() * 1000)
int(datetime.now().timestamp() * 1000)
)
metadata.patch_automation_workflow_response(
automation_workflow, test_connection_result, WorkflowStatus.Running
)

test_connection_result.lastUpdatedAt = Timestamp(
int(datetime.now(timezone.utc).timestamp() * 1000)
int(datetime.now().timestamp() * 1000)
)

test_connection_result.status = (
Expand All @@ -174,7 +174,7 @@ def _test_connection_steps_automation_workflow(
f"Wild error happened while testing the connection in the workflow - {err}"
)
logger.debug(traceback.format_exc())
test_connection_result.lastUpdatedAt = datetime.now(tz=timezone.utc).timestamp()
test_connection_result.lastUpdatedAt = datetime.now().timestamp()
metadata.create_or_update(
CreateWorkflowRequest(
name=automation_workflow.name,
Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/ingestion/ometa/auth_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os.path
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
from datetime import datetime

from dateutil.relativedelta import relativedelta

Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(self, config: OpenMetadataConnection):
self.config = config
self.security_config: OpenMetadataJWTClientConfig = self.config.securityConfig
self.jwt_token = None
self.expiry = datetime.now(tz=timezone.utc) - relativedelta(years=1)
self.expiry = datetime.now() - relativedelta(years=1)

@classmethod
def create(cls, config: OpenMetadataConnection):
Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/ingestion/ometa/mixins/tests_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""

import traceback
from datetime import datetime, timezone
from datetime import datetime
from typing import List, Optional, Type, Union
from urllib.parse import quote
from uuid import UUID
Expand Down Expand Up @@ -87,7 +87,7 @@ def get_or_create_test_suite(
test_suite_name: str,
test_suite_description: Optional[
str
] = f"Test Suite created on {datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
] = f"Test Suite created on {datetime.now().strftime('%Y-%m-%d')}",
) -> TestSuite:
"""Get or create a TestSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import copy
import os
import traceback
from datetime import datetime, timezone
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Set, Type, Union, cast

Expand Down Expand Up @@ -168,7 +168,7 @@ def __init__(
metadata: OpenMetadata,
):
super().__init__(config, metadata)
self.today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
self.today = datetime.now().strftime("%Y-%m-%d")

self._explores_cache = {}
self._repo_credentials: Optional[ReadersCredentials] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _get_metabase_session(self) -> str:
if self.config.password:
params[PASSWORD_HEADER] = self.config.password.get_secret_value()
self.resp = requests.post(
f"{self.config.hostPort}/{API_VERSION}/session/",
f"{self.config.hostPort}{API_VERSION}/session/",
data=json.dumps(params),
headers=SESSION_HEADERS,
timeout=DEFAULT_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def build_server_config(connection: TableauConnection) -> Dict[str, Dict[str, An
"""
tableau_server_config = {
f"{connection.env}": {
"server": connection.hostPort,
"api_version": connection.apiVersion,
"server": str(connection.hostPort),
"api_version": str(connection.apiVersion),
"site_name": connection.siteName if connection.siteName else "",
"site_url": connection.siteUrl if connection.siteUrl else "",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
Source connection handler
"""
import os
from datetime import datetime, timezone
from datetime import datetime
from functools import partial
from typing import Optional

Expand Down Expand Up @@ -145,7 +145,7 @@ def test_connection_inner(engine):
engine=engine,
statement=BIGQUERY_TEST_STATEMENT.format(
region=service_connection.usageLocation,
creation_date=datetime.now(tz=timezone.utc).strftime("%Y-%m-%d"),
creation_date=datetime.now().strftime("%Y-%m-%d"),
),
),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def yield_tag(
tags=[value],
classification_name=key,
tag_description="Bigquery Dataset Label",
classification_description="",
classification_description="BigQuery Dataset Classification",
include_tags=self.source_config.includeTags,
)
# Fetching policy tags on the column level
Expand All @@ -389,7 +389,7 @@ def yield_tag(
tags=[tag.display_name for tag in policy_tags],
classification_name=taxonomy.display_name,
tag_description="Bigquery Policy Tag",
classification_description="",
classification_description="BigQuery Policy Classification",
include_tags=self.source_config.includeTags,
)
except Exception as exc:
Expand Down Expand Up @@ -528,7 +528,7 @@ def yield_table_tags(self, table_name_and_type: Tuple[str, str]):
tags=[value],
classification_name=key,
tag_description="Bigquery Table Label",
classification_description="",
classification_description="BigQuery Table Classification",
include_tags=self.source_config.includeTags,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Databricks lineage module
"""
import traceback
from datetime import datetime, timezone
from datetime import datetime
from typing import Iterator

from metadata.generated.schema.type.basic import DateTime
Expand Down Expand Up @@ -44,7 +44,7 @@ def yield_table_query(self) -> Iterator[TableQuery]:
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=DateTime(datetime.now(tz=timezone.utc)),
analysisDate=DateTime(datetime.now()),
serviceName=self.config.serviceName,
)
except Exception as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@

logger = ingestion_logger()

DATABRICKS_TAG = "DATABRICK TAG"
DATABRICKS_TAG_CLASSIFICATION = "DATABRICK TAG CLASSIFICATION"
DATABRICKS_TAG = "DATABRICKS TAG"
DATABRICKS_TAG_CLASSIFICATION = "DATABRICKS TAG CLASSIFICATION"
DEFAULT_TAG_VALUE = "NONE"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Databricks usage module
"""
import traceback
from datetime import datetime, timezone
from datetime import datetime
from typing import Iterable

from metadata.generated.schema.type.basic import DateTime
Expand Down Expand Up @@ -49,7 +49,7 @@ def yield_table_queries(self) -> Iterable[TableQuery]:
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=DateTime(datetime.now(tz=timezone.utc)),
analysisDate=DateTime(datetime.now()),
serviceName=self.config.serviceName,
duration=row.get("duration")
if row.get("duration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(

def _calculate_pipeline_status_parameters(self) -> Tuple[int, int]:
"""Calculate the needed 'start' and 'end' parameters based on the 'lookbackDays'."""
now = datetime.now(tz=timezone.utc)
now = datetime.now()

# We multiply the value by 1000 because our backend uses epoch_milliseconds instead of epoch_seconds.
start = int(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.type.basic import Timestamp
from metadata.generated.schema.type.lifeCycle import AccessDetails, LifeCycle
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.api.status import Status
Expand Down Expand Up @@ -103,8 +104,12 @@ def get_life_cycle_data(
if life_cycle_data:
life_cycle = LifeCycle(
created=AccessDetails(
timestamp=convert_timestamp_to_milliseconds(
life_cycle_data.created_at.timestamp()
timestamp=Timestamp(
int(
convert_timestamp_to_milliseconds(
life_cycle_data.created_at.timestamp()
)
)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Postgres lineage module
"""
import traceback
from datetime import datetime, timezone
from datetime import datetime
from typing import Iterable

from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
Expand Down Expand Up @@ -84,7 +84,7 @@ def process_table_query(self) -> Iterable[TableQuery]:
yield TableQuery(
query=row["query_text"],
userName=row["usename"],
analysisDate=DateTime(datetime.now(tz=timezone.utc)),
analysisDate=DateTime(datetime.now()),
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Postgres usage module
"""
import traceback
from datetime import datetime, timezone
from datetime import datetime
from typing import Iterable

from metadata.generated.schema.type.basic import DateTime
Expand Down Expand Up @@ -53,7 +53,7 @@ def process_table_query(self) -> Iterable[TableQueries]:
TableQuery(
query=row["query_text"],
userName=row["usename"],
analysisDate=DateTime(datetime.now(tz=timezone.utc)),
analysisDate=DateTime(datetime.now()),
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
Expand Down
Loading

0 comments on commit cb72a22

Please sign in to comment.