Skip to content

Commit

Permalink
feat: add dbt freshness check test (#18730)
Browse files Browse the repository at this point in the history
* add dbt freshness check

* docs

* run linting

* add test case param definition

* fix test case param definition

* add config for dbt http, fix linting

* refactor (only create freshness test definition when user executed one)

* fix dbt files class

* fix dbt files class 2

* fix dbt objects class

* fix linting

* fix pylint

* fix linting once and for all

---------

Co-authored-by: Teddy <teddy.crepineau@gmail.com>
  • Loading branch information
mgorsk1 and TeddyCr authored Nov 28, 2024
1 parent 6410583 commit da17676
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
DBT_CATALOG_FILE_NAME = "catalog.json"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_RUN_RESULTS_FILE_NAME = "run_results"
DBT_SOURCES_FILE_NAME = "sources.json"


class SkipResourceTypeEnum(Enum):
Expand All @@ -91,6 +92,7 @@ class SkipResourceTypeEnum(Enum):

ANALYSIS = "analysis"
TEST = "test"
SOURCE = "source"


class CompiledQueriesEnum(Enum):
Expand Down Expand Up @@ -127,6 +129,7 @@ class DbtTestFailureEnum(Enum):

FAILURE = "failure"
FAIL = "fail"
ERROR = "error"


class DbtCommonEnum(Enum):
Expand All @@ -137,6 +140,7 @@ class DbtCommonEnum(Enum):
OWNER = "owner"
NODES = "nodes"
SOURCES = "sources"
SOURCES_FILE = "sources_file"
SOURCE = "source"
RESOURCETYPE = "resource_type"
MANIFEST_NODE = "manifest_node"
Expand Down
18 changes: 18 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
DBT_CATALOG_FILE_NAME,
DBT_MANIFEST_FILE_NAME,
DBT_RUN_RESULTS_FILE_NAME,
DBT_SOURCES_FILE_NAME,
)
from metadata.ingestion.source.database.dbt.models import DbtFiles
from metadata.readers.file.config_source_factory import get_reader
Expand Down Expand Up @@ -85,6 +86,7 @@ def _(config: DbtLocalConfig):
config.dbtManifestFilePath,
config.dbtCatalogFilePath,
config.dbtRunResultsFilePath,
config.dbtSourcesFilePath,
]
yield from download_dbt_files(
blob_grouped_by_directory=blob_grouped_by_directory,
Expand Down Expand Up @@ -123,12 +125,22 @@ def _(config: DbtHttpConfig):
dbt_catalog = requests.get( # pylint: disable=missing-timeout
config.dbtCatalogHttpPath
)

dbt_sources = None
if config.dbtSourcesHttpPath:
logger.debug(
f"Requesting [dbtSourcesHttpPath] to: {config.dbtSourcesHttpPath}"
)
dbt_sources = requests.get( # pylint: disable=missing-timeout
config.dbtSourcesHttpPath
)
if not dbt_manifest:
raise DBTConfigException("Manifest file not found in file server")
yield DbtFiles(
dbt_catalog=dbt_catalog.json() if dbt_catalog else None,
dbt_manifest=dbt_manifest.json(),
dbt_run_results=[dbt_run_results.json()] if dbt_run_results else None,
dbt_sources=dbt_sources.json() if dbt_sources else None,
)
except DBTConfigException as exc:
raise exc
Expand Down Expand Up @@ -243,6 +255,7 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]:
return blob_grouped_by_directory


# pylint: disable=too-many-locals, too-many-branches
def download_dbt_files(
blob_grouped_by_directory: Dict, config, client, bucket_name: Optional[str]
) -> Iterable[DbtFiles]:
Expand All @@ -255,6 +268,7 @@ def download_dbt_files(
) in blob_grouped_by_directory.items():
dbt_catalog = None
dbt_manifest = None
dbt_sources = None
dbt_run_results = []
kwargs = {}
if bucket_name:
Expand Down Expand Up @@ -285,12 +299,16 @@ def download_dbt_files(
logger.warning(
f"{DBT_RUN_RESULTS_FILE_NAME} not found in {key}: {exc}"
)
if DBT_SOURCES_FILE_NAME == blob_file_name.lower():
logger.debug(f"{DBT_SOURCES_FILE_NAME} found in {key}")
dbt_sources = reader.read(path=blob, **kwargs)
if not dbt_manifest:
raise DBTConfigException(f"Manifest file not found at: {key}")
yield DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=dbt_run_results if dbt_run_results else None,
dbt_sources=json.loads(dbt_sources) if dbt_sources else None,
)
except DBTConfigException as exc:
logger.warning(exc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
from abc import ABC, abstractmethod
from typing import Iterable, List

from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results
from dbt_artifacts_parser.parser import (
parse_catalog,
parse_manifest,
parse_run_results,
parse_sources,
)
from pydantic import Field
from typing_extensions import Annotated

Expand Down Expand Up @@ -209,11 +214,13 @@ def get_dbt_objects(self) -> Iterable[DbtObjects]:
self.remove_run_result_non_required_keys(
run_results=self.context.get().dbt_file.dbt_run_results
)

dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog)
if self.context.get().dbt_file.dbt_catalog
else None,
dbt_manifest=parse_manifest(self.context.get().dbt_file.dbt_manifest),
dbt_sources=parse_sources(self.context.get().dbt_file.dbt_sources),
dbt_run_results=[
parse_run_results(run_result_file)
for run_result_file in self.context.get().dbt_file.dbt_run_results
Expand Down
29 changes: 29 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ def create_test_case_parameter_definitions(dbt_test):
}
]
return test_case_param_definition
if hasattr(dbt_test, "freshness"):
test_case_param_definition = [
{
"name": "warn_after",
"displayName": "warn_after",
"required": False,
},
{
"name": "error_after",
"displayName": "error_after",
"required": False,
},
]
return test_case_param_definition
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(
Expand All @@ -67,6 +81,21 @@ def create_test_case_parameter_values(dbt_test):
{"name": manifest_node.test_metadata.name, "value": dbt_test_values}
]
return test_case_param_values
if hasattr(manifest_node, "freshness"):
warn_after = manifest_node.freshness.warn_after
error_after = manifest_node.freshness.error_after

test_case_param_values = [
{
"name": "error_after",
"value": f"{error_after.count} {error_after.period.value}",
},
{
"name": "warn_after",
"value": f"{warn_after.count} {warn_after.period.value}",
},
]
return test_case_param_values
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(
Expand Down
71 changes: 70 additions & 1 deletion ingestion/src/metadata/ingestion/source/database/dbt/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DBT source methods.
"""
import traceback
from copy import deepcopy
from datetime import datetime
from typing import Any, Iterable, List, Optional, Union

Expand Down Expand Up @@ -324,7 +325,41 @@ def add_dbt_tests(
None,
)

# pylint: disable=too-many-locals, too-many-branches
def _add_dbt_freshness_test_from_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
):
# in dbt manifest sources node name is table/view name (not test name like with test nodes)
# so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy
manifest_node_new = deepcopy(manifest_node)
manifest_node_new.name = manifest_node_new.name + "_freshness"

freshness_test_result = next(
(item for item in dbt_objects.dbt_sources.results if item.unique_id == key),
None,
)

if freshness_test_result:
self.context.get().dbt_tests[key + "_freshness"] = {
DbtCommonEnum.MANIFEST_NODE.value: manifest_node_new
}
self.context.get().dbt_tests[key + "_freshness"][
DbtCommonEnum.UPSTREAM.value
] = self.parse_upstream_nodes(manifest_entities, manifest_node)
self.context.get().dbt_tests[key + "_freshness"][
DbtCommonEnum.RESULTS.value
] = freshness_test_result

def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
) -> None:
"""
Method to append dbt test cases based on sources file for later processing
"""
self._add_dbt_freshness_test_from_sources(
key, manifest_node, manifest_entities, dbt_objects
)

# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def yield_data_models(
self, dbt_objects: DbtObjects
) -> Iterable[Either[DataModelLink]]:
Expand Down Expand Up @@ -376,6 +411,17 @@ def yield_data_models(
)
continue

if (
dbt_objects.dbt_sources
and resource_type == SkipResourceTypeEnum.SOURCE.value
):
self.add_dbt_sources(
key,
manifest_node=manifest_node,
manifest_entities=manifest_entities,
dbt_objects=dbt_objects,
)

# Skip the ephemeral nodes since it is not materialized
if check_ephemeral_node(manifest_node):
logger.debug(f"Skipping ephemeral DBT node: {key}.")
Expand Down Expand Up @@ -549,6 +595,29 @@ def parse_upstream_nodes(self, manifest_entities, dbt_node):
f"Failed to parse the DBT node {node} to get upstream nodes: {exc}"
)
continue

if dbt_node.resource_type == SkipResourceTypeEnum.SOURCE.value:
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name="*",
database_name=get_corrected_name(dbt_node.database),
schema_name=get_corrected_name(dbt_node.schema_),
table_name=dbt_node.name,
)

# check if the parent table exists in OM before adding it to the upstream list
parent_table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table, fqn_search_string=parent_fqn
),
fetch_multiple_entities=False,
)
if parent_table_entity:
upstream_nodes.append(parent_fqn)

return upstream_nodes

def parse_data_model_columns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict] = None
dbt_manifest: dict
dbt_sources: Optional[dict] = None
dbt_run_results: Optional[List[dict]] = None


class DbtObjects(BaseModel):
dbt_catalog: Optional[Any] = None
dbt_manifest: Any
dbt_sources: Optional[Any] = None
dbt_run_results: Optional[List[Any]] = None


Expand Down
3 changes: 2 additions & 1 deletion ingestion/tests/unit/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"dbtCatalogFilePath": "sample/dbt_files/catalog.json",
"dbtManifestFilePath": "sample/dbt_files/manifest.json",
"dbtRunResultsFilePath": "sample/dbt_files/run_results.json",
"dbtSourcesFilePath": "sample/dbt_files/sources.json",
},
}
},
Expand Down Expand Up @@ -682,7 +683,7 @@ def check_yield_datamodel(self, dbt_objects, expected_data_models):
self.assertEqual(expected, original)

@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_updtream_nodes_for_lineage(self, es_search_from_fqn):
def test_upstream_nodes_for_lineage(self, es_search_from_fqn):
expected_upstream_nodes = [
"model.jaffle_shop.stg_customers",
"model.jaffle_shop.stg_orders",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
"title": "DBT Run Results HTTP File Path",
"description": "DBT run results http file path to extract the test results information.",
"type": "string"
},
"dbtSourcesHttpPath": {
"title": "DBT Sources HTTP File Path",
"description": "DBT sources http file path to extract freshness test results information.",
"type": "string"
}
},
"additionalProperties": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
"title": "DBT Run Results File Path",
"description": "DBT run results file path to extract the test results information.",
"type": "string"
},
"dbtSourcesFilePath": {
"title": "DBT Sources File Path",
"description": "DBT sources file path to extract the freshness test result.",
"type": "string"
}
},
"additionalProperties": false,
Expand Down

0 comments on commit da17676

Please sign in to comment.