diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/__init__.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py new file mode 100644 index 000000000000..36fb8ae90103 --- /dev/null +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py @@ -0,0 +1,52 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Interfaces with database for BigQuery engine +supporting sqlalchemy abstraction layer +""" + +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, +) +from metadata.profiler.processor.sqlalchemy.bigquery_sampler import BigQuerySampler + + +class BigQueryProfilerInterface(SQAProfilerInterface): + """ + Interface to interact with BigQuery registry. + """ + + _profiler_type: str = "BigQuery" + + def __init__( + self, + **kwargs, + ): + super().__init__(**kwargs) + + def _instantiate_sampler( + self, + session, + table, + sample_columns, + profile_sample_config, + partition_details, + profile_sample_query, + ): + return BigQuerySampler( + session=session, + table=table, + sample_columns=sample_columns, + profile_sample_config=profile_sample_config, + partition_details=partition_details, + profile_sample_query=profile_sample_query, + ) diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index d2931bb6aeb2..998818d368c9 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -346,7 +346,6 @@ def _( return None except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" handle_query_exception(msg, exc, session) if row: @@ -388,7 +387,7 @@ def _create_thread_safe_sampler( ): """Create thread safe runner""" if not hasattr(thread_local, "sampler"): - thread_local.sampler = Sampler( + thread_local.sampler = self._instantiate_sampler( session=session, table=table, sample_columns=self._get_sample_columns(), @@ -522,7 +521,7 @@ def fetch_sample_data(self, table) -> TableData: Returns: TableData: sample table data """ - sampler = Sampler( + sampler = self._instantiate_sampler( session=self.session, table=table, sample_columns=self._get_sample_columns(), @@ -565,7 +564,7 @@ def get_hybrid_metrics( Returns: dictionnary of results """ - sampler = Sampler( + sampler = self._instantiate_sampler( session=self.session, table=kwargs.get("table"), sample_columns=self._get_sample_columns(), @@ -582,6 +581,24 @@ def get_hybrid_metrics( self.session.rollback() return None + def _instantiate_sampler( + self, + session, + table, + sample_columns, + profile_sample_config, + partition_details, + profile_sample_query, + ): + return Sampler( + session=session, + table=table, + sample_columns=sample_columns, + profile_sample_config=profile_sample_config, + partition_details=partition_details, + profile_sample_query=profile_sample_query, + ) + def close(self): """Clean up session""" self.session.close() diff --git a/ingestion/src/metadata/profiler/processor/sqlalchemy/bigquery_sampler.py b/ingestion/src/metadata/profiler/processor/sqlalchemy/bigquery_sampler.py new file mode 100644 index 000000000000..0f66918045e9 --- /dev/null +++ b/ingestion/src/metadata/profiler/processor/sqlalchemy/bigquery_sampler.py @@ -0,0 +1,71 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Helper module to handle BigQuery data sampling +for the profiler +""" + +from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData +from metadata.profiler.processor.sqlalchemy.sampler import Sampler +from metadata.profiler.source.bigquery.queries import BIGQUERY_TABLESAMPLE + + +class BigQuerySampler(Sampler): + """ + Generates a sample of the BigQuery to not + run the query in the whole table. + """ + + sample_stmt = BIGQUERY_TABLESAMPLE + default_percent = 10 + + def __init__( + self, + **kwargs, + ): + super().__init__(**kwargs) + + def get_bq_sample_query(self) -> str: + """get query for sample data""" + if self.profile_sample_type == ProfileSampleType.PERCENTAGE: + return self.sample_stmt.format( + table=self.table.__tablename__, + col=", ".join( + "`" + col_name.lower() + "`" for col_name in self.sample_columns + ), + relative_table=self.table.__table__, + percent=self.profile_sample, + result_limit=self.sample_limit, + ) + + return self.sample_stmt.format( + table=self.table.__tablename__, + col=", ".join( + "`" + col_name.lower() + "`" for col_name in self.sample_columns + ), + relative_table=self.table.__table__, + percent=self.default_percent, + result_limit=self.profile_sample, + ) + + def fetch_sqa_sample_data(self) -> TableData: + """ + Use the sampler to retrieve sample data rows as per limit given by user + :return: TableData to be added to the Table Entity + """ + if self._profile_sample_query: + return self._fetch_sample_data_from_user_query() + + bq_sample = self.session.execute(self.get_bq_sample_query()) + return TableData( + columns=list(self.sample_columns), + rows=[list(row) for row in bq_sample], + ) diff --git a/ingestion/src/metadata/profiler/source/bigquery/profiler_source.py b/ingestion/src/metadata/profiler/source/bigquery/profiler_source.py index 641bbf036edf..9d5a1ad6bc75 100644 --- a/ingestion/src/metadata/profiler/source/bigquery/profiler_source.py +++ b/ingestion/src/metadata/profiler/source/bigquery/profiler_source.py @@ -14,7 +14,9 @@ """ from copy import deepcopy +from typing import Optional, Union +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( BigQueryConnection, ) @@ -27,6 +29,17 @@ MultipleProjectId, SingleProjectId, ) +from metadata.profiler.api.models import TableConfig +from metadata.profiler.interface.pandas.profiler_interface import ( + PandasProfilerInterface, +) +from metadata.profiler.interface.profiler_protocol import ProfilerProtocol +from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( + BigQueryProfilerInterface, +) +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, +) from metadata.profiler.source.base_profiler_source import BaseProfilerSource @@ -59,3 +72,22 @@ def _copy_service_config( ) return config_copy + + def create_profiler_interface( + self, + entity: Table, + table_config: Optional[TableConfig], + ) -> Union[SQAProfilerInterface, PandasProfilerInterface]: + """Create BigQuery profiler interface""" + profiler_interface: BigQueryProfilerInterface = ProfilerProtocol.create( + "BigQuery", + entity, + table_config, + self.source_config, + self.service_conn_config, + self.ometa_client, + sqa_metadata=self.sqa_metadata, + ) # type: ignore + + self.interface = profiler_interface + return self.interface diff --git a/ingestion/src/metadata/profiler/source/bigquery/queries.py b/ingestion/src/metadata/profiler/source/bigquery/queries.py new file mode 100644 index 000000000000..f6849c341d50 --- /dev/null +++ b/ingestion/src/metadata/profiler/source/bigquery/queries.py @@ -0,0 +1,26 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +BigQuery Queries for fetching sample data +""" + +import textwrap + +BIGQUERY_TABLESAMPLE = textwrap.dedent( + """ + WITH {table}_cte AS ( + SELECT {col} + FROM `{relative_table}` TABLESAMPLE SYSTEM ({percent} PERCENT) + ) + SELECT * FROM {table}_cte + LIMIT {result_limit} +""" +)