Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #12129 : enhance bigquery sample data query #12130

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Interfaces with database for BigQuery engine
supporting sqlalchemy abstraction layer
"""

from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.processor.sqlalchemy.bigquery_sampler import BigQuerySampler


class BigQueryProfilerInterface(SQAProfilerInterface):
"""
Interface to interact with BigQuery registry.
"""

_profiler_type: str = "BigQuery"

def __init__(
self,
**kwargs,
):
super().__init__(**kwargs)

def _instantiate_sampler(
self,
session,
table,
sample_columns,
profile_sample_config,
partition_details,
profile_sample_query,
):
return BigQuerySampler(
session=session,
table=table,
sample_columns=sample_columns,
profile_sample_config=profile_sample_config,
partition_details=partition_details,
profile_sample_query=profile_sample_query,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying this base interface class we should

  1. Consolidate the Sampler() instantiation object in profiler_interface.py. We instantiate the object on ln. 391, 525, and 568. Instead of explicitly calling the object there we should define a method (e.g. _instantiate_sampler(...) that will instantiate a sampler object
  2. create a bigquery interface in sqlalchemy/bigquery/profiler_interface.py BigQueryProfilerInterface with _profiler_type: str = "bigquery".
  3. override the _instantiate_sampler(...) to return a BigQuerySampler() object (in BigQueryInterface).
  4. In source/bigquery/profiler_source.py override create_profiler_interface to something like below
    def create_profiler_interface(
        self,
        entity: Table,
        table_config: Optional[TableConfig],
    ) -> Union[SQAProfilerInterface, PandasProfilerInterface]:
        """Create sqlalchemy profiler interface"""
        profiler_interface: BigQueryInterface = ProfilerProtocol.create(
            "bigquery",
            entity,
            table_config,
            self.source_config,
            self.service_conn_config,
            self.ometa_client,
            sqa_metadata=self.sqa_metadata,
        )  # type: ignore

        self.interface = profiler_interface
        return self.interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @TeddyCr , super clear idea. I've already upgraded the old code and feel more sustainable with these new ones. Hope it's working out.

Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ def _(
return None

except Exception as exc:

msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session)
if row:
Expand Down Expand Up @@ -388,7 +387,7 @@ def _create_thread_safe_sampler(
):
"""Create thread safe runner"""
if not hasattr(thread_local, "sampler"):
thread_local.sampler = Sampler(
thread_local.sampler = self._instantiate_sampler(
session=session,
table=table,
sample_columns=self._get_sample_columns(),
Expand Down Expand Up @@ -522,7 +521,7 @@ def fetch_sample_data(self, table) -> TableData:
Returns:
TableData: sample table data
"""
sampler = Sampler(
sampler = self._instantiate_sampler(
session=self.session,
table=table,
sample_columns=self._get_sample_columns(),
Expand Down Expand Up @@ -565,7 +564,7 @@ def get_hybrid_metrics(
Returns:
dictionnary of results
"""
sampler = Sampler(
sampler = self._instantiate_sampler(
session=self.session,
table=kwargs.get("table"),
sample_columns=self._get_sample_columns(),
Expand All @@ -582,6 +581,24 @@ def get_hybrid_metrics(
self.session.rollback()
return None

def _instantiate_sampler(
self,
session,
table,
sample_columns,
profile_sample_config,
partition_details,
profile_sample_query,
):
return Sampler(
session=session,
table=table,
sample_columns=sample_columns,
profile_sample_config=profile_sample_config,
partition_details=partition_details,
profile_sample_query=profile_sample_query,
)

def close(self):
"""Clean up session"""
self.session.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper module to handle BigQuery data sampling
for the profiler
"""

from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData
from metadata.profiler.processor.sqlalchemy.sampler import Sampler
from metadata.profiler.source.bigquery.queries import BIGQUERY_TABLESAMPLE


class BigQuerySampler(Sampler):
"""
Generates a sample of the BigQuery to not
run the query in the whole table.
"""

sample_stmt = BIGQUERY_TABLESAMPLE
default_percent = 10

def __init__(
self,
**kwargs,
):
super().__init__(**kwargs)

def get_bq_sample_query(self) -> str:
"""get query for sample data"""
if self.profile_sample_type == ProfileSampleType.PERCENTAGE:
return self.sample_stmt.format(
table=self.table.__tablename__,
col=", ".join(
"`" + col_name.lower() + "`" for col_name in self.sample_columns
),
relative_table=self.table.__table__,
percent=self.profile_sample,
result_limit=self.sample_limit,
)

return self.sample_stmt.format(
table=self.table.__tablename__,
col=", ".join(
"`" + col_name.lower() + "`" for col_name in self.sample_columns
),
relative_table=self.table.__table__,
percent=self.default_percent,
result_limit=self.profile_sample,
)

def fetch_sqa_sample_data(self) -> TableData:
"""
Use the sampler to retrieve sample data rows as per limit given by user
:return: TableData to be added to the Table Entity
"""
if self._profile_sample_query:
return self._fetch_sample_data_from_user_query()

bq_sample = self.session.execute(self.get_bq_sample_query())
return TableData(
columns=list(self.sample_columns),
rows=[list(row) for row in bq_sample],
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"""

from copy import deepcopy
from typing import Optional, Union

from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
Expand All @@ -27,6 +29,14 @@
MultipleProjectId,
SingleProjectId,
)
from metadata.profiler.api.models import TableConfig
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.source.base_profiler_source import BaseProfilerSource


Expand Down Expand Up @@ -59,3 +69,22 @@ def _copy_service_config(
)

return config_copy

def create_profiler_interface(
self,
entity: Table,
table_config: Optional[TableConfig],
) -> Union[SQAProfilerInterface, PandasProfilerInterface]:
"""Create BigQuery profiler interface"""
profiler_interface: BigQueryProfilerSource = ProfilerProtocol.create(
duongphannamhung marked this conversation as resolved.
Show resolved Hide resolved
"BigQuery",
entity,
table_config,
self.source_config,
self.service_conn_config,
self.ometa_client,
sqa_metadata=self.sqa_metadata,
) # type: ignore

self.interface = profiler_interface
return self.interface
26 changes: 26 additions & 0 deletions ingestion/src/metadata/profiler/source/bigquery/queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BigQuery Queries for fetching sample data
"""

import textwrap

BIGQUERY_TABLESAMPLE = textwrap.dedent(
"""
WITH {table}_cte AS (
SELECT {col}
FROM `{relative_table}` TABLESAMPLE SYSTEM ({percent} PERCENT)
)
SELECT * FROM {table}_cte
LIMIT {result_limit}
"""
)