Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dbt freshness check test #18730

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
DBT_CATALOG_FILE_NAME = "catalog.json"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_RUN_RESULTS_FILE_NAME = "run_results"
DBT_SOURCES_FILE_NAME = "sources.json"


class SkipResourceTypeEnum(Enum):
Expand All @@ -91,6 +92,7 @@ class SkipResourceTypeEnum(Enum):

ANALYSIS = "analysis"
TEST = "test"
SOURCE = "source"


class CompiledQueriesEnum(Enum):
Expand Down Expand Up @@ -127,6 +129,7 @@ class DbtTestFailureEnum(Enum):

FAILURE = "failure"
FAIL = "fail"
ERROR = "error"


class DbtCommonEnum(Enum):
Expand All @@ -137,6 +140,7 @@ class DbtCommonEnum(Enum):
OWNER = "owner"
NODES = "nodes"
SOURCES = "sources"
SOURCES_FILE = "sources_file"
SOURCE = "source"
RESOURCETYPE = "resource_type"
MANIFEST_NODE = "manifest_node"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
DBT_CATALOG_FILE_NAME,
DBT_MANIFEST_FILE_NAME,
DBT_RUN_RESULTS_FILE_NAME,
DBT_SOURCES_FILE_NAME,
)
from metadata.ingestion.source.database.dbt.models import DbtFiles
from metadata.readers.file.config_source_factory import get_reader
Expand Down Expand Up @@ -85,6 +86,7 @@ def _(config: DbtLocalConfig):
config.dbtManifestFilePath,
config.dbtCatalogFilePath,
config.dbtRunResultsFilePath,
config.dbtSourcesFilePath,
]
yield from download_dbt_files(
blob_grouped_by_directory=blob_grouped_by_directory,
Expand Down Expand Up @@ -123,12 +125,22 @@ def _(config: DbtHttpConfig):
dbt_catalog = requests.get( # pylint: disable=missing-timeout
config.dbtCatalogHttpPath
)

dbt_sources = None
if config.dbtSourcesHttpPath:
logger.debug(
f"Requesting [dbtSourcesHttpPath] to: {config.dbtSourcesHttpPath}"
)
dbt_sources = requests.get( # pylint: disable=missing-timeout
config.dbtSourcesHttpPath
)
if not dbt_manifest:
raise DBTConfigException("Manifest file not found in file server")
yield DbtFiles(
dbt_catalog=dbt_catalog.json() if dbt_catalog else None,
dbt_manifest=dbt_manifest.json(),
dbt_run_results=[dbt_run_results.json()] if dbt_run_results else None,
dbt_sources=dbt_sources.json() if dbt_sources else None,
)
except DBTConfigException as exc:
raise exc
Expand Down Expand Up @@ -243,6 +255,7 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]:
return blob_grouped_by_directory


# pylint: disable=too-many-locals, too-many-branches
def download_dbt_files(
blob_grouped_by_directory: Dict, config, client, bucket_name: Optional[str]
) -> Iterable[DbtFiles]:
Expand All @@ -255,6 +268,7 @@ def download_dbt_files(
) in blob_grouped_by_directory.items():
dbt_catalog = None
dbt_manifest = None
dbt_sources = None
dbt_run_results = []
kwargs = {}
if bucket_name:
Expand Down Expand Up @@ -285,12 +299,16 @@ def download_dbt_files(
logger.warning(
f"{DBT_RUN_RESULTS_FILE_NAME} not found in {key}: {exc}"
)
if DBT_SOURCES_FILE_NAME == blob_file_name.lower():
logger.debug(f"{DBT_SOURCES_FILE_NAME} found in {key}")
dbt_sources = reader.read(path=blob, **kwargs)
if not dbt_manifest:
raise DBTConfigException(f"Manifest file not found at: {key}")
yield DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=dbt_run_results if dbt_run_results else None,
dbt_sources=json.loads(dbt_sources) if dbt_sources else None,
)
except DBTConfigException as exc:
logger.warning(exc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
from abc import ABC, abstractmethod
from typing import Iterable, List

from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results
from dbt_artifacts_parser.parser import (
parse_catalog,
parse_manifest,
parse_run_results,
parse_sources,
)
from pydantic import Field
from typing_extensions import Annotated

Expand Down Expand Up @@ -209,11 +214,13 @@ def get_dbt_objects(self) -> Iterable[DbtObjects]:
self.remove_run_result_non_required_keys(
run_results=self.context.get().dbt_file.dbt_run_results
)

dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog)
if self.context.get().dbt_file.dbt_catalog
else None,
dbt_manifest=parse_manifest(self.context.get().dbt_file.dbt_manifest),
dbt_sources=parse_sources(self.context.get().dbt_file.dbt_sources),
dbt_run_results=[
parse_run_results(run_result_file)
for run_result_file in self.context.get().dbt_file.dbt_run_results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ def create_test_case_parameter_definitions(dbt_test):
}
]
return test_case_param_definition
if hasattr(dbt_test, "freshness"):
test_case_param_definition = [
{
"name": "warn_after",
"displayName": "warn_after",
"required": False,
},
{
"name": "error_after",
"displayName": "error_after",
"required": False,
},
]
return test_case_param_definition
Comment on lines +47 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be gotten from the dbt_test object similar to how we do it above?

Copy link
Contributor Author

@mgorsk1 mgorsk1 Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it cannot, freshness check has different model than regular test: https://schemas.getdbt.com/dbt/sources/v3/index.html#tab-pane_results_items_anyOf_i1

and it's not stored in run_results.json

what we could do is test_case_param_definition = [ {"name": x, "displayName": x, "required": False} for x in dbt_test.criteria.keys() ] but criteria object also contains filter and I didn't want to include it.

except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(
Expand All @@ -67,6 +81,21 @@ def create_test_case_parameter_values(dbt_test):
{"name": manifest_node.test_metadata.name, "value": dbt_test_values}
]
return test_case_param_values
if hasattr(manifest_node, "freshness"):
warn_after = manifest_node.freshness.warn_after
error_after = manifest_node.freshness.error_after

test_case_param_values = [
{
"name": "error_after",
"value": f"{error_after.count} {error_after.period.value}",
},
{
"name": "warn_after",
"value": f"{warn_after.count} {warn_after.period.value}",
},
]
Comment on lines +84 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here for the "name" values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar answer

return test_case_param_values
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DBT source methods.
"""
import traceback
from copy import deepcopy
from datetime import datetime
from typing import Any, Iterable, List, Optional, Union

Expand Down Expand Up @@ -324,7 +325,41 @@ def add_dbt_tests(
None,
)

# pylint: disable=too-many-locals, too-many-branches
def _add_dbt_freshness_test_from_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
):
# in dbt manifest sources node name is table/view name (not test name like with test nodes)
# so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy
manifest_node_new = deepcopy(manifest_node)
manifest_node_new.name = manifest_node_new.name + "_freshness"

freshness_test_result = next(
(item for item in dbt_objects.dbt_sources.results if item.unique_id == key),
None,
)

if freshness_test_result:
self.context.get().dbt_tests[key + "_freshness"] = {
DbtCommonEnum.MANIFEST_NODE.value: manifest_node_new
}
self.context.get().dbt_tests[key + "_freshness"][
DbtCommonEnum.UPSTREAM.value
] = self.parse_upstream_nodes(manifest_entities, manifest_node)
self.context.get().dbt_tests[key + "_freshness"][
DbtCommonEnum.RESULTS.value
] = freshness_test_result

def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
) -> None:
"""
Method to append dbt test cases based on sources file for later processing
"""
self._add_dbt_freshness_test_from_sources(
key, manifest_node, manifest_entities, dbt_objects
)

# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def yield_data_models(
self, dbt_objects: DbtObjects
) -> Iterable[Either[DataModelLink]]:
Expand Down Expand Up @@ -376,6 +411,17 @@ def yield_data_models(
)
continue

if (
dbt_objects.dbt_sources
and resource_type == SkipResourceTypeEnum.SOURCE.value
):
self.add_dbt_sources(
key,
manifest_node=manifest_node,
manifest_entities=manifest_entities,
dbt_objects=dbt_objects,
)

# Skip the ephemeral nodes since it is not materialized
if check_ephemeral_node(manifest_node):
logger.debug(f"Skipping ephemeral DBT node: {key}.")
Expand Down Expand Up @@ -549,6 +595,29 @@ def parse_upstream_nodes(self, manifest_entities, dbt_node):
f"Failed to parse the DBT node {node} to get upstream nodes: {exc}"
)
continue

if dbt_node.resource_type == SkipResourceTypeEnum.SOURCE.value:
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name="*",
database_name=get_corrected_name(dbt_node.database),
schema_name=get_corrected_name(dbt_node.schema_),
table_name=dbt_node.name,
)

# check if the parent table exists in OM before adding it to the upstream list
parent_table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table, fqn_search_string=parent_fqn
),
fetch_multiple_entities=False,
)
if parent_table_entity:
upstream_nodes.append(parent_fqn)

return upstream_nodes

def parse_data_model_columns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict] = None
dbt_manifest: dict
dbt_sources: Optional[dict] = None
dbt_run_results: Optional[List[dict]] = None


class DbtObjects(BaseModel):
dbt_catalog: Optional[Any] = None
dbt_manifest: Any
dbt_sources: Optional[Any] = None
dbt_run_results: Optional[List[Any]] = None


Expand Down
3 changes: 2 additions & 1 deletion ingestion/tests/unit/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"dbtCatalogFilePath": "sample/dbt_files/catalog.json",
"dbtManifestFilePath": "sample/dbt_files/manifest.json",
"dbtRunResultsFilePath": "sample/dbt_files/run_results.json",
"dbtSourcesFilePath": "sample/dbt_files/sources.json",
},
}
},
Expand Down Expand Up @@ -682,7 +683,7 @@ def check_yield_datamodel(self, dbt_objects, expected_data_models):
self.assertEqual(expected, original)

@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_updtream_nodes_for_lineage(self, es_search_from_fqn):
def test_upstream_nodes_for_lineage(self, es_search_from_fqn):
expected_upstream_nodes = [
"model.jaffle_shop.stg_customers",
"model.jaffle_shop.stg_orders",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
"title": "DBT Run Results HTTP File Path",
"description": "DBT run results http file path to extract the test results information.",
"type": "string"
},
"dbtSourcesHttpPath": {
"title": "DBT Sources HTTP File Path",
"description": "DBT sources http file path to extract freshness test results information.",
"type": "string"
}
},
"additionalProperties": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
"title": "DBT Run Results File Path",
"description": "DBT run results file path to extract the test results information.",
"type": "string"
},
"dbtSourcesFilePath": {
"title": "DBT Sources File Path",
"description": "DBT sources file path to extract the freshness test result.",
"type": "string"
}
},
"additionalProperties": false,
Expand Down
Loading