Skip to content

Commit

Permalink
Fixes #12129 : enhance bigquery sample data query (#12130)
Browse files Browse the repository at this point in the history
* upgrade BigQuery Sampler

* beautify code

* revert old way of profiler & data quality, keep fetch new way sample

* Update profiler_source.py

* Update profiler_source.py

---------

Co-authored-by: hung.duong <hung.duong@be.com.vn>
Co-authored-by: Teddy <teddy.crepineau@gmail.com>
  • Loading branch information
3 people authored Jul 4, 2023
1 parent 97140e1 commit 64f147c
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 4 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Interfaces with database for BigQuery engine
supporting sqlalchemy abstraction layer
"""

from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.processor.sqlalchemy.bigquery_sampler import BigQuerySampler


class BigQueryProfilerInterface(SQAProfilerInterface):
"""
Interface to interact with BigQuery registry.
"""

_profiler_type: str = "BigQuery"

def __init__(
self,
**kwargs,
):
super().__init__(**kwargs)

def _instantiate_sampler(
self,
session,
table,
sample_columns,
profile_sample_config,
partition_details,
profile_sample_query,
):
return BigQuerySampler(
session=session,
table=table,
sample_columns=sample_columns,
profile_sample_config=profile_sample_config,
partition_details=partition_details,
profile_sample_query=profile_sample_query,
)
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ def _(
return None

except Exception as exc:

msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session)
if row:
Expand Down Expand Up @@ -388,7 +387,7 @@ def _create_thread_safe_sampler(
):
"""Create thread safe runner"""
if not hasattr(thread_local, "sampler"):
thread_local.sampler = Sampler(
thread_local.sampler = self._instantiate_sampler(
session=session,
table=table,
sample_columns=self._get_sample_columns(),
Expand Down Expand Up @@ -522,7 +521,7 @@ def fetch_sample_data(self, table) -> TableData:
Returns:
TableData: sample table data
"""
sampler = Sampler(
sampler = self._instantiate_sampler(
session=self.session,
table=table,
sample_columns=self._get_sample_columns(),
Expand Down Expand Up @@ -565,7 +564,7 @@ def get_hybrid_metrics(
Returns:
dictionnary of results
"""
sampler = Sampler(
sampler = self._instantiate_sampler(
session=self.session,
table=kwargs.get("table"),
sample_columns=self._get_sample_columns(),
Expand All @@ -582,6 +581,24 @@ def get_hybrid_metrics(
self.session.rollback()
return None

def _instantiate_sampler(
self,
session,
table,
sample_columns,
profile_sample_config,
partition_details,
profile_sample_query,
):
return Sampler(
session=session,
table=table,
sample_columns=sample_columns,
profile_sample_config=profile_sample_config,
partition_details=partition_details,
profile_sample_query=profile_sample_query,
)

def close(self):
"""Clean up session"""
self.session.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper module to handle BigQuery data sampling
for the profiler
"""

from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData
from metadata.profiler.processor.sqlalchemy.sampler import Sampler
from metadata.profiler.source.bigquery.queries import BIGQUERY_TABLESAMPLE


class BigQuerySampler(Sampler):
"""
Generates a sample of the BigQuery to not
run the query in the whole table.
"""

sample_stmt = BIGQUERY_TABLESAMPLE
default_percent = 10

def __init__(
self,
**kwargs,
):
super().__init__(**kwargs)

def get_bq_sample_query(self) -> str:
"""get query for sample data"""
if self.profile_sample_type == ProfileSampleType.PERCENTAGE:
return self.sample_stmt.format(
table=self.table.__tablename__,
col=", ".join(
"`" + col_name.lower() + "`" for col_name in self.sample_columns
),
relative_table=self.table.__table__,
percent=self.profile_sample,
result_limit=self.sample_limit,
)

return self.sample_stmt.format(
table=self.table.__tablename__,
col=", ".join(
"`" + col_name.lower() + "`" for col_name in self.sample_columns
),
relative_table=self.table.__table__,
percent=self.default_percent,
result_limit=self.profile_sample,
)

def fetch_sqa_sample_data(self) -> TableData:
"""
Use the sampler to retrieve sample data rows as per limit given by user
:return: TableData to be added to the Table Entity
"""
if self._profile_sample_query:
return self._fetch_sample_data_from_user_query()

bq_sample = self.session.execute(self.get_bq_sample_query())
return TableData(
columns=list(self.sample_columns),
rows=[list(row) for row in bq_sample],
)
32 changes: 32 additions & 0 deletions ingestion/src/metadata/profiler/source/bigquery/profiler_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"""

from copy import deepcopy
from typing import Optional, Union

from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
Expand All @@ -27,6 +29,17 @@
MultipleProjectId,
SingleProjectId,
)
from metadata.profiler.api.models import TableConfig
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.source.base_profiler_source import BaseProfilerSource


Expand Down Expand Up @@ -59,3 +72,22 @@ def _copy_service_config(
)

return config_copy

def create_profiler_interface(
self,
entity: Table,
table_config: Optional[TableConfig],
) -> Union[SQAProfilerInterface, PandasProfilerInterface]:
"""Create BigQuery profiler interface"""
profiler_interface: BigQueryProfilerInterface = ProfilerProtocol.create(
"BigQuery",
entity,
table_config,
self.source_config,
self.service_conn_config,
self.ometa_client,
sqa_metadata=self.sqa_metadata,
) # type: ignore

self.interface = profiler_interface
return self.interface
26 changes: 26 additions & 0 deletions ingestion/src/metadata/profiler/source/bigquery/queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BigQuery Queries for fetching sample data
"""

import textwrap

BIGQUERY_TABLESAMPLE = textwrap.dedent(
"""
WITH {table}_cte AS (
SELECT {col}
FROM `{relative_table}` TABLESAMPLE SYSTEM ({percent} PERCENT)
)
SELECT * FROM {table}_cte
LIMIT {result_limit}
"""
)

0 comments on commit 64f147c

Please sign in to comment.