Skip to content

Commit

Permalink
Fix #11659: Add support for filter patterns in dbt workflow (#12063)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 authored Jun 26, 2023
1 parent 18c8eb3 commit a3fd6e9
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 50 deletions.
16 changes: 16 additions & 0 deletions ingestion/src/metadata/examples/workflows/dbt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ source:
# dbtUpdateDescriptions: true or false
# includeTags: true or false
# dbtClassificationName: dbtTags
# databaseFilterPattern:
# includes:
# - .*db.*
# excludes:
# - .*demo.*
# schemaFilterPattern:
# includes:
# - .*schema.*
# excludes:
# - .*demo.*
# tableFilterPattern:
# includes:
# - .*table.*
# excludes:
# - .*demo.*

sink:
type: metadata-rest
config: {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
import json
import traceback
from functools import singledispatch
from typing import Any, Optional, Tuple
from typing import Optional, Tuple

import requests
from pydantic import BaseModel

from metadata.generated.schema.metadataIngestion.dbtconfig.dbtAzureConfig import (
DbtAzureConfig,
Expand All @@ -37,27 +36,17 @@
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtS3Config import (
DbtS3Config,
)
from metadata.ingestion.source.database.dbt.constants import (
DBT_CATALOG_FILE_NAME,
DBT_MANIFEST_FILE_NAME,
DBT_RUN_RESULTS_FILE_NAME,
)
from metadata.ingestion.source.database.dbt.models import DbtFiles
from metadata.utils.credentials import set_google_credentials
from metadata.utils.logger import ometa_logger

logger = ometa_logger()

DBT_CATALOG_FILE_NAME = "catalog.json"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_RUN_RESULTS_FILE_NAME = "run_results.json"


class DbtFiles(BaseModel):
dbt_catalog: Optional[dict]
dbt_manifest: dict
dbt_run_results: Optional[dict]


class DbtObjects(BaseModel):
dbt_catalog: Optional[Any]
dbt_manifest: Any
dbt_run_results: Optional[Any]


class DBTConfigException(Exception):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
)
from metadata.generated.schema.metadataIngestion.dbtPipeline import DbtPipeline
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
Expand All @@ -33,7 +34,14 @@
create_source_context,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.utils.dbt_config import DbtFiles, DbtObjects, get_dbt_details
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.models import (
DbtFiles,
DbtFilteredModel,
DbtObjects,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand Down Expand Up @@ -131,6 +139,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):

topology = DbtServiceTopology()
context = create_source_context(topology)
source_config: DbtPipeline

def remove_manifest_non_required_keys(self, manifest_dict: dict):
"""
Expand All @@ -152,9 +161,7 @@ def remove_manifest_non_required_keys(self, manifest_dict: dict):
)

def get_dbt_files(self) -> DbtFiles:
dbt_files = get_dbt_details(
self.source_config.dbtConfigSource # pylint: disable=no-member
)
dbt_files = get_dbt_details(self.source_config.dbtConfigSource)
self.context.dbt_files = dbt_files
yield dbt_files

Expand Down Expand Up @@ -246,3 +253,30 @@ def add_dbt_test_result(self, dbt_test: dict):
"""
After test cases has been processed, add the tests results info
"""

def is_filtered(
self, database_name: str, schema_name: str, table_name: str
) -> DbtFilteredModel:
"""
Function used to identify the filtered models
"""
# pylint: disable=protected-access
model_fqn = fqn._build(str(database_name), str(schema_name), str(table_name))
is_filtered = False
reason = None
message = None

if filter_by_table(self.source_config.tableFilterPattern, table_name):
reason = "table"
is_filtered = True
if filter_by_schema(self.source_config.schemaFilterPattern, schema_name):
reason = "schema"
is_filtered = True
if filter_by_database(self.source_config.databaseFilterPattern, database_name):
reason = "database"
is_filtered = True
if is_filtered:
message = f"Model Filtered due to {reason} filter pattern"
return DbtFilteredModel(
is_filtered=is_filtered, message=message, model_fqn=model_fqn
)
24 changes: 23 additions & 1 deletion ingestion/src/metadata/ingestion/source/database/dbt/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def add_dbt_tests(
None,
)

# pylint: disable=too-many-locals
def yield_data_models(self, dbt_objects: DbtObjects) -> Iterable[DataModelLink]:
"""
Yield the data models
Expand Down Expand Up @@ -359,6 +360,17 @@ def yield_data_models(self, dbt_objects: DbtObjects) -> Iterable[DataModelLink]:
continue

model_name = get_dbt_model_name(manifest_node)

# Filter the dbt models based on filter patterns
filter_model = self.is_filtered(
database_name=get_corrected_name(manifest_node.database),
schema_name=get_corrected_name(manifest_node.schema_),
table_name=model_name,
)
if filter_model.is_filtered:
self.status.filter(filter_model.model_fqn, filter_model.message)
continue

logger.debug(f"Processing DBT node: {model_name}")

catalog_node = None
Expand Down Expand Up @@ -387,6 +399,7 @@ def yield_data_models(self, dbt_objects: DbtObjects) -> Iterable[DataModelLink]:
schema_name=get_corrected_name(manifest_node.schema_),
table_name=model_name,
)

table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
Expand Down Expand Up @@ -448,6 +461,15 @@ def parse_upstream_nodes(self, manifest_entities, dbt_node):
for node in dbt_node.depends_on.nodes:
try:
parent_node = manifest_entities[node]
table_name = get_dbt_model_name(parent_node)

filter_model = self.is_filtered(
database_name=get_corrected_name(parent_node.database),
schema_name=get_corrected_name(parent_node.schema_),
table_name=table_name,
)
if filter_model.is_filtered:
continue

# check if the node is an ephemeral node
# Recursively store the upstream of the ephemeral node in the upstream list
Expand All @@ -462,7 +484,7 @@ def parse_upstream_nodes(self, manifest_entities, dbt_node):
service_name=self.config.serviceName,
database_name=get_corrected_name(parent_node.database),
schema_name=get_corrected_name(parent_node.schema_),
table_name=get_dbt_model_name(parent_node),
table_name=table_name,
)

# check if the parent table exists in OM before adding it to the upstream list
Expand Down
35 changes: 35 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/dbt/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Models required for dbt
"""

from typing import Any, Optional

from pydantic import BaseModel


class DbtFiles(BaseModel):
dbt_catalog: Optional[dict]
dbt_manifest: dict
dbt_run_results: Optional[dict]


class DbtObjects(BaseModel):
dbt_catalog: Optional[Any]
dbt_manifest: Any
dbt_run_results: Optional[Any]


class DbtFilteredModel(BaseModel):
is_filtered: Optional[bool] = False
message: Optional[str]
model_fqn: Optional[str]
2 changes: 1 addition & 1 deletion ingestion/tests/unit/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
get_dbt_raw_query,
)
from metadata.ingestion.source.database.dbt.metadata import DbtSource
from metadata.utils.dbt_config import DbtFiles, DbtObjects
from metadata.ingestion.source.database.dbt.models import DbtFiles, DbtObjects
from metadata.utils.tag_utils import get_tag_labels

mock_dbt_config = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,21 @@ source:
# dbtPrefixConfig:
# dbtBucketName: bucket
# dbtObjectPrefix: "dbt/"
# databaseFilterPattern:
# includes:
# - .*db.*
# excludes:
# - .*demo.*
# schemaFilterPattern:
# includes:
# - .*schema.*
# excludes:
# - .*demo.*
# tableFilterPattern:
# includes:
# - .*table.*
# excludes:
# - .*demo.*
sink:
type: metadata-rest
config: {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Add the details of the AWS s3 bucket in the above config:
The `get_dbt_details` method takes in the source config provided in the json and detects source type (gcp, s3, local or file server) based on the fields provided in the config.

```python
from metadata.utils.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
dbt_details = get_dbt_details(self.source_config.dbtConfigSource)
self.dbt_catalog = dbt_details[0] if dbt_details else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@
"description": "Custom OpenMetadata Classification name for dbt tags.",
"type": "string",
"default": "dbtTags"
},
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"tableFilterPattern": {
"description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"databaseFilterPattern": {
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ const AddIngestion = ({
dbtUpdateDescriptions: dbtConfigSource?.dbtUpdateDescriptions,
includeTags: dbtConfigSource?.includeTags,
dbtClassificationName: dbtConfigSource?.dbtClassificationName,
databaseFilterPattern: databaseFilterPattern,
schemaFilterPattern: schemaFilterPattern,
tableFilterPattern: tableFilterPattern,
};
}

Expand Down Expand Up @@ -770,6 +773,9 @@ const AddIngestion = ({
cancelText={t('label.cancel')}
data={state}
formType={status}
getExcludeValue={getExcludeValue}
getIncludeValue={getIncludeValue}
handleShowFilter={handleShowFilter}
okText={t('label.next')}
onCancel={handleCancelClick}
onChange={handleStateChange}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
PipelineType,
} from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { DbtPipelineClass } from '../../generated/metadataIngestion/dbtPipeline';

import {
DBT_SOURCES,
GCS_CONFIG,
Expand Down Expand Up @@ -92,7 +91,12 @@ export type ScheduleIntervalProps = {
export type ModifiedDbtConfig = DbtConfig &
Pick<
DbtPipelineClass,
'dbtUpdateDescriptions' | 'dbtClassificationName' | 'includeTags'
| 'dbtUpdateDescriptions'
| 'dbtClassificationName'
| 'includeTags'
| 'databaseFilterPattern'
| 'schemaFilterPattern'
| 'tableFilterPattern'
>;

export interface AddIngestionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* limitations under the License.
*/

import { FilterPatternEnum } from 'enums/filterPattern.enum';
import { FormSubmitType } from '../../../enums/form.enum';
import {
Credentials,
Expand All @@ -33,9 +34,11 @@ export interface DBTFormCommonProps {
export interface DBTConfigFormProps extends DBTFormCommonProps {
formType: FormSubmitType;
data: AddIngestionState;

onChange: (newState: Partial<AddIngestionState>) => void;
onFocus: (fieldName: string) => void;
getExcludeValue: (value: string[], type: FilterPatternEnum) => void;
getIncludeValue: (value: string[], type: FilterPatternEnum) => void;
handleShowFilter: (value: boolean, type: string) => void;
}

export type DbtConfigCloud = Pick<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const handleSubmit = jest.fn();
const handleFocus = jest.fn();
const handleCancel = jest.fn();
const handleChange = jest.fn();
const mockGetExculdeValue = jest.fn();
const mockGetIncludeValue = jest.fn();
const mockHandleShowFilter = jest.fn();

describe('DBTConfigFormBuilder', () => {
it('renders the DBTCloudConfig form when dbtConfigSourceType is "cloud"', async () => {
Expand All @@ -32,6 +35,9 @@ describe('DBTConfigFormBuilder', () => {
cancelText="Cancel"
data={data as AddIngestionState}
formType={FormSubmitType.ADD}
getExcludeValue={mockGetExculdeValue}
getIncludeValue={mockGetIncludeValue}
handleShowFilter={mockHandleShowFilter}
okText="Ok"
onCancel={handleCancel}
onChange={handleChange}
Expand Down
Loading

0 comments on commit a3fd6e9

Please sign in to comment.