From 09392d0112dbf0a9d3a1fbd99990133eac847e33 Mon Sep 17 00:00:00 2001 From: Ian <47256454+IanRFerguson@users.noreply.github.com> Date: Sat, 23 Mar 2024 14:49:59 -0400 Subject: [PATCH 1/7] scope out bigquery --- klondike/__init__.py | 2 +- klondike/base/__init__.py | 0 klondike/bigquery/bigquery.py | 143 +++++++++++++++++++++++++++++++--- requirements.txt | 4 +- 4 files changed, 133 insertions(+), 16 deletions(-) delete mode 100644 klondike/base/__init__.py diff --git a/klondike/__init__.py b/klondike/__init__.py index 1b868e5..7e39c33 100644 --- a/klondike/__init__.py +++ b/klondike/__init__.py @@ -24,7 +24,7 @@ __all__ = [] POLAR_OBJECTS = [ - ("klondike.bigquery.bigquery", "PolarBigQuery"), + ("klondike.bigquery.bigquery", "BigQueryConnector"), ] for module_, object_ in POLAR_OBJECTS: diff --git a/klondike/base/__init__.py b/klondike/base/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/klondike/bigquery/bigquery.py b/klondike/bigquery/bigquery.py index 84368ee..49052eb 100644 --- a/klondike/bigquery/bigquery.py +++ b/klondike/bigquery/bigquery.py @@ -1,11 +1,13 @@ +import io import json import os import tempfile -from contextlib import contextmanager from typing import Optional, Union +import polars as pl from google.cloud import bigquery -from google.cloud.bigquery import dbapi +from google.cloud.bigquery import LoadJobConfig +from klondike import logger ########## @@ -18,7 +20,7 @@ ) -class PolarBigQuery: +class BigQueryConnector: """ Establish and authenticate a connection to a BigQuery warehouse """ @@ -38,7 +40,6 @@ def __init__( self._client = None self.dialect = "bigquery" - self._dbapi = dbapi self.__setup_google_app_creds( app_creds=app_creds, env_variable=google_environment_variable @@ -61,6 +62,66 @@ def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: st os.environ[env_variable] = creds + def __set_load_job_config( + self, + base_job_config: Optional[LoadJobConfig] = None, + max_bad_records: int = 0, + table_schema: list = None, + if_exists: str = "fail", + **kwargs, + ): + "Defines `LoadConfigJob` when writing to BigQuery" + + if not base_job_config: + logger.debug("No job config provided, starting fresh") + base_job_config = LoadJobConfig() + + # This is default behavior for this class + base_job_config.source_format = bigquery.SourceFormat.PARQUET + + # Create table schema mapping if provided + if table_schema: + base_job_config.schema = self.__set_table_schema(table_schema=table_schema) + else: + base_job_config.schema = None + + base_job_config.max_bad_records = max_bad_records + + base_job_config.write_disposition = self.__set_write_disposition( + if_exists=if_exists + ) + + # List of LoadJobConfig attributes + _attributes = [x for x in dict(vars(LoadJobConfig)).keys()] + + # Attributes that will not be overwritten + _reserved_attributes = [ + "source_format", + "schema", + "max_bad_records", + "write_disposition", + ] + + # Loop through keyword arguments and update `LoadJobConfig` object + for k, v in kwargs.items(): + if k in _attributes and k not in _reserved_attributes: + logger.debug(f"Updating {k} parameter in job config [value={v}]") + base_job_config[k] = v + + return base_job_config + + def __set_table_schema(self, table_schema: list): + return [bigquery.SchemaField(**x) for x in table_schema] + + def __set_write_disposition(self, if_exists: str): + DISPOSITION_MAP = { + "fail": bigquery.WriteDisposition.WRITE_EMPTY, + "append": bigquery.WriteDisposition.WRITE_APPEND, + "truncate": bigquery.WriteDisposition.WRITE_TRUNCATE, + } + + return DISPOSITION_MAP[if_exists] + @property def client(self): """ @@ -73,17 +134,75 @@ def client(self): location=self.location, client_options=self.client_options, ) + return self._client - @contextmanager - def connection(self): + def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame: + """ + Executes a SQL query and returns a Polars DataFrame + + Args: + sql: + + Returns: + Polars DataFrame object + """ + + # Define query job + logger.debug("Running SQL...", sql) + query_job = self.client.query(query=sql, timeout=self.timeout) + + # Execute and wait for results + result = query_job.result() + + # Populate DataFrame using PyArrow + df = pl.from_arrow(result.to_arrow()) + logger.info(f"Successfully read {len(df)} from BigQuery") + + return df + + def write_dataframe_to_bigquery( + self, + df: pl.DataFrame, + table_name: str, + load_job_config: Optional[LoadJobConfig] = None, + max_bad_records: int = 0, + table_schema: list = None, + if_exists: str = "fail", + **load_kwargs, + ) -> None: """ - TODO - Fill me in + Writes a Polars DataFrame to BigQuery + + Args: + df: Polars DataFrame + table_name: Destination table name to write to - `dataset.table` convention + load_job_config: `LoadJobConfig` object. If none is supplied, several defaults are applied + max_bad_records: Tolerance for bad records in the load job, defaults to 0 + table_schema: List of column names, types, and optional flags to include + if_exists: One of `fail`, `drop`, `append`, `truncate` + load_kwargs: See here for list of accepted values - https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig #noqa """ - conn_ = self._dbapi.connect(self.client) + if if_exists == "drop": + self.client.delete_table(table=table_name) + if_exists = "fail" + + load_job_config = self.__set_load_job_config( + base_load_conifg=load_job_config, + max_bad_records=max_bad_records, + table_schema=table_schema, + if_exists=if_exists, + **load_kwargs, + ) + + logger.info(f"Writing to {table_name}...") + with io.BytesIO() as stream_: + df.write_parquet(stream_) + stream_.seek(0) + load_job = self.client.load_table_from_file( + stream_, destination=table_name, job_config=load_job_config + ) - try: - yield conn_ - finally: - conn_.close() + load_job.result() + logger.info(f"Successfuly wrote {len(df)} rows to {table_name}") diff --git a/requirements.txt b/requirements.txt index 5894718..36bae3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,2 @@ google-cloud-bigquery==3.19.0 -polars==0.20.16 -petl==1.7.15 -psycopg==3.1.18 \ No newline at end of file +polars==0.20.16 \ No newline at end of file From c41ba1662e1f49e6f6f26723b31498d445416302 Mon Sep 17 00:00:00 2001 From: Ian <47256454+IanRFerguson@users.noreply.github.com> Date: Sat, 23 Mar 2024 15:25:06 -0400 Subject: [PATCH 2/7] clean up api --- .gitignore | 3 ++- README.md | 27 +++++++++++++++++++++++++++ klondike/bigquery/bigquery.py | 18 +++++++++++++----- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index ba652b1..57dadcc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ ### DIRECTORIES venv/ -.vscode/ \ No newline at end of file +.vscode/ +dev/ \ No newline at end of file diff --git a/README.md b/README.md index 728a438..0d098e8 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,35 @@ Klondike offers a lightweight API to read and write data to Google BigQuery usin ## Installation +### Installing Klondike Install at the command line ``` pip install klondike==0.1.0 +``` + +### Installing Rust +Since Polars leverages Rust speedups, you need to have Rust installed in your environment as well. See the Rust installation guide [here](https://www.rust-lang.org/tools/install). + + +## Usage + +``` +# Instantiate a connection to BigQuery +from klondike import BigQueryConnector + +bq = BigQueryConnector( + app_creds="/path/to/your/service_account.json" +) + +# Read data from BigQuery +sql = "SELECT * FROM nba_dbt.staging__nyk_players" +df = bq.read_dataframe_from_bigquery(sql=sql) + +# Write data to BigQuery +bq.write_dataframe_to_bigquery( + df=df, + table_name="nba_dbt.my_new_table", + if_eixsts="truncate" +) ``` \ No newline at end of file diff --git a/klondike/bigquery/bigquery.py b/klondike/bigquery/bigquery.py index 49052eb..f500731 100644 --- a/klondike/bigquery/bigquery.py +++ b/klondike/bigquery/bigquery.py @@ -28,22 +28,30 @@ class BigQueryConnector: def __init__( self, app_creds: Optional[Union[str, dict]] = None, - gcp_project: Optional[str] = None, + project: Optional[str] = None, timeout: int = 60, client_options: dict = SCOPES, google_environment_variable: str = "GOOGLE_APPLICATION_CREDENTIALS", ): self.app_creds = app_creds - self.gcp_project = gcp_project + self.project = project self.timeout = timeout self.client_options = client_options self._client = None self.dialect = "bigquery" - self.__setup_google_app_creds( - app_creds=app_creds, env_variable=google_environment_variable - ) + if not self.app_creds: + if not os.environ.get(google_environment_variable): + raise OSError("No app_creds provided") + else: + logger.info( + f"Using `{google_environment_variable}` variable defined in environment" + ) + else: + self.__setup_google_app_creds( + app_creds=self.app_creds, env_variable=google_environment_variable + ) def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: str): """ From 554ee974a74aadb500ccb3f6230a2b12b293fe9e Mon Sep 17 00:00:00 2001 From: Ian <47256454+IanRFerguson@users.noreply.github.com> Date: Sun, 24 Mar 2024 16:56:33 -0400 Subject: [PATCH 3/7] wip --- .../{ci_cd.yml => automated_tests.yml} | 22 +++++-- README.md | 2 +- klondike/bigquery/bigquery.py | 14 ++++- requirements.txt | 3 +- tests/__init__.py | 0 tests/test_bigquery_connector.py | 59 +++++++++++++++++++ tests/test_utils.py | 35 +++++++++++ 7 files changed, 126 insertions(+), 9 deletions(-) rename .github/workflows/{ci_cd.yml => automated_tests.yml} (70%) create mode 100644 tests/__init__.py create mode 100644 tests/test_bigquery_connector.py create mode 100644 tests/test_utils.py diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/automated_tests.yml similarity index 70% rename from .github/workflows/ci_cd.yml rename to .github/workflows/automated_tests.yml index a421902..fe7f907 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/automated_tests.yml @@ -1,4 +1,4 @@ -name: checks +name: formatting_checks on: pull_request: @@ -7,7 +7,7 @@ on: branches: ["master", "dev"] jobs: - run_checks: + check_linting_and_formatting: runs-on: macos-latest steps: @@ -32,9 +32,23 @@ jobs: run: | python -m pip install black flake8 isort - - name: Run Python checks + - name: Run Python formatting and linting if: steps.changed-python-files.outputs.any_changed == 'true' run: | flake8 ${{ steps.changed-python-files.outputs.all_changed_files }} --extend-ignore=E203,W503 --max-line-length=120 black --check ${{ steps.changed-python-files.outputs.all_changed_files }} - isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }} \ No newline at end of file + isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }} + + run_unit_tests: + runs-on: macos-latest + steps: + - name: Checkout branch + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Setup Python environment + uses: actions/setup-python@v3 + + - name: Run Unit Tests + run: pytest -rf tests/* \ No newline at end of file diff --git a/README.md b/README.md index 0d098e8..b448900 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ -Klondike offers a lightweight API to read and write data to Google BigQuery using rust-optimized Polars DataFrames. +Klondike offers a lightweight API to read and write data to Google BigQuery using Polars DataFrames. ## Installation diff --git a/klondike/bigquery/bigquery.py b/klondike/bigquery/bigquery.py index f500731..3bb2bd6 100644 --- a/klondike/bigquery/bigquery.py +++ b/klondike/bigquery/bigquery.py @@ -7,6 +7,7 @@ import polars as pl from google.cloud import bigquery from google.cloud.bigquery import LoadJobConfig + from klondike import logger ########## @@ -17,7 +18,7 @@ "https://www.googleapis.com/auth/bigquery", ] }, -) +)[0] class BigQueryConnector: @@ -29,12 +30,14 @@ def __init__( self, app_creds: Optional[Union[str, dict]] = None, project: Optional[str] = None, + location: Optional[str] = None, timeout: int = 60, client_options: dict = SCOPES, google_environment_variable: str = "GOOGLE_APPLICATION_CREDENTIALS", ): self.app_creds = app_creds self.project = project + self.location = location self.timeout = timeout self.client_options = client_options @@ -163,9 +166,13 @@ def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame: # Execute and wait for results result = query_job.result() + if not result: + logger.info("Nothing to see here! No results to return") + return + # Populate DataFrame using PyArrow df = pl.from_arrow(result.to_arrow()) - logger.info(f"Successfully read {len(df)} from BigQuery") + logger.info(f"Successfully read {len(df)} rows from BigQuery") return df @@ -189,7 +196,8 @@ def write_dataframe_to_bigquery( max_bad_records: Tolerance for bad records in the load job, defaults to 0 table_schema: List of column names, types, and optional flags to include if_exists: One of `fail`, `drop`, `append`, `truncate` - load_kwargs: See here for list of accepted values - https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig #noqa + load_kwargs: See here for list of accepted values + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig """ if if_exists == "drop": diff --git a/requirements.txt b/requirements.txt index 36bae3d..f73718d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ google-cloud-bigquery==3.19.0 -polars==0.20.16 \ No newline at end of file +polars==0.20.16 +pyarrow==15.0.2 \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_bigquery_connector.py b/tests/test_bigquery_connector.py new file mode 100644 index 0000000..2db531e --- /dev/null +++ b/tests/test_bigquery_connector.py @@ -0,0 +1,59 @@ +import os +from unittest import mock + +import polars as pl +import pyarrow as pa + +from klondike import BigQueryConnector + +from .test_utils import KlondikeTestCase + +########## + + +class TestBigQuery(KlondikeTestCase): + def setUp(self): + super().setUp() + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path + + def tearDown(self): + super().tearDown() + del os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + + def _build_mock_cursor(self, query_results=None): + cursor = mock.MagicMock() + cursor.execute.return_value = None + cursor.fetchmany.side_effect = [query_results, []] + + if query_results: + cursor.description = query_results + + # Create a mock that will play the role of the connection + connection = mock.MagicMock() + connection.cursor.return_value = cursor + + # Create a mock that will play the role of our GoogleBigQuery client + client = mock.MagicMock() + + bq = BigQueryConnector() + bq._client = client + + return bq + + def test_read_dataframe_from_bigquery(self): + # sql = "select * from my_table" + # tbl = pa.table( + # { + # "city": ["Brooklyn", "San Francisco", "Richmond"], + # "state": ["New York", "California", "Virginia"], + # } + # ) + + # bq = self._build_mock_cursor(query_results=tbl) + # df = bq.read_dataframe_from_bigquery(sql=sql) + + # assert isinstance(df, pl.DataFrame) + pass + + def test_write_dataframe_to_bigquery(self): + pass diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..d16381b --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,35 @@ +import json +import os +import tempfile +import unittest + +########## + + +class KlondikeTestCase(unittest.TestCase): + def setUp(self): + self._temp_directory = tempfile.TemporaryDirectory() + + self._credentials_path = os.path.join( + self._temp_directory.name, "service_account.json" + ) + + self._service_account = { + "type": "foo", + "project_id": "bar", + "private_key_id": "biz", + "private_key": "bap", + "client_email": "bim", + "client_id": "top", + "auth_uri": "hat", + "token_uri": "tap", + "auth_provider_x509_cert_url": "dance", + "client_x509_cert_url": "good", + "universe_domain": "stuff", + } + + with open(self._credentials_path, "w") as f: + json.dump(self._service_account, f) + + def tearDown(self): + self._temp_directory.cleanup() From d33337262d16f6a577e41a1d6712aff10c8c77af Mon Sep 17 00:00:00 2001 From: Ian <47256454+IanRFerguson@users.noreply.github.com> Date: Sun, 24 Mar 2024 17:50:57 -0400 Subject: [PATCH 4/7] update testing framework --- run_checks.sh | 3 +++ tests/test_bigquery_connector.py | 31 +++++++++++++++++-------------- 2 files changed, 20 insertions(+), 14 deletions(-) create mode 100644 run_checks.sh diff --git a/run_checks.sh b/run_checks.sh new file mode 100644 index 0000000..315b677 --- /dev/null +++ b/run_checks.sh @@ -0,0 +1,3 @@ +python -m flake8 --extend-ignore=E203,W503 --max-line-length=120 klondike/ tests/ +python -m black klondike/ tests/ +python -m isort --profile black klondike/ tests/ \ No newline at end of file diff --git a/tests/test_bigquery_connector.py b/tests/test_bigquery_connector.py index 2db531e..1ecc899 100644 --- a/tests/test_bigquery_connector.py +++ b/tests/test_bigquery_connector.py @@ -26,7 +26,7 @@ def _build_mock_cursor(self, query_results=None): cursor.fetchmany.side_effect = [query_results, []] if query_results: - cursor.description = query_results + cursor.description = [(key, None) for key in query_results[0]] # Create a mock that will play the role of the connection connection = mock.MagicMock() @@ -36,24 +36,27 @@ def _build_mock_cursor(self, query_results=None): client = mock.MagicMock() bq = BigQueryConnector() + bq.connection = connection bq._client = client return bq def test_read_dataframe_from_bigquery(self): - # sql = "select * from my_table" - # tbl = pa.table( - # { - # "city": ["Brooklyn", "San Francisco", "Richmond"], - # "state": ["New York", "California", "Virginia"], - # } - # ) - - # bq = self._build_mock_cursor(query_results=tbl) - # df = bq.read_dataframe_from_bigquery(sql=sql) - - # assert isinstance(df, pl.DataFrame) - pass + "Tests read functionality for the `BigQueryConnector` object" + + sql = "select * from my_table" + tbl = [ + {"city": "BROOKLYN", "state": "NY"}, + {"city": "SAN FRANCSICO", "state": "CA"}, + {"city": "RICHMOND", "state": "VA"}, + ] + query_results = pa.RecordBatch.from_pylist(tbl) + + bq = self._build_mock_cursor(query_results=query_results) + df = bq.read_dataframe_from_bigquery(sql=sql) + + assert isinstance(df, pl.DataFrame) def test_write_dataframe_to_bigquery(self): + "Tests write functionality for the `BigQueryConnector` object" pass From 54139aed2e1cc372c89dd92298bfae74dfaf0f8c Mon Sep 17 00:00:00 2001 From: Ian <47256454+IanRFerguson@users.noreply.github.com> Date: Mon, 25 Mar 2024 08:36:58 -0400 Subject: [PATCH 5/7] add pip installs to workflow --- .github/workflows/automated_tests.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/automated_tests.yml b/.github/workflows/automated_tests.yml index fe7f907..2304454 100644 --- a/.github/workflows/automated_tests.yml +++ b/.github/workflows/automated_tests.yml @@ -51,4 +51,7 @@ jobs: uses: actions/setup-python@v3 - name: Run Unit Tests - run: pytest -rf tests/* \ No newline at end of file + run: | + pip install -r requirements.txt + pip install pytest + pytest -rf tests/* \ No newline at end of file From 0cd86e95b473c521fe9fc6b1a794d99081330750 Mon Sep 17 00:00:00 2001 From: Ian <47256454+IanRFerguson@users.noreply.github.com> Date: Mon, 25 Mar 2024 21:04:18 -0400 Subject: [PATCH 6/7] read - test passting :white_check_mark: --- klondike/bigquery/bigquery.py | 31 +++++++++++++++++++++---------- tests/test_bigquery_connector.py | 27 ++++++++++++++------------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/klondike/bigquery/bigquery.py b/klondike/bigquery/bigquery.py index 3bb2bd6..30e70c6 100644 --- a/klondike/bigquery/bigquery.py +++ b/klondike/bigquery/bigquery.py @@ -24,6 +24,14 @@ class BigQueryConnector: """ Establish and authenticate a connection to a BigQuery warehouse + + Args: + app_creds: Google service account, either as a relative path or a dictionary instance + project: Name of Google Project + location: Location of Google Project + timeout: Temporal threshold to kill a stalled job, defaults to 60s + client_options: API scopes + google_environment_variable: Provided for flexibility, defaults to `GOOGLE_APPLICATION_CREDENTIALS` """ def __init__( @@ -57,9 +65,7 @@ def __init__( ) def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: str): - """ - Sets runtime environment variablefor Google SDK - """ + "Sets runtime environment variable for Google SDK" if isinstance(app_creds, dict): creds = json.dumps(app_creds) @@ -122,9 +128,13 @@ def __set_load_job_config( return base_job_config def __set_table_schema(self, table_schema: list): + "TODO - Write about me" + return [bigquery.SchemaField(**x) for x in table_schema] def __set_write_disposition(self, if_exists: str): + "TODO - Write about me" + DISPOSITION_MAP = { "fail": bigquery.WriteDisposition.WRITE_EMPTY, "append": bigquery.WriteDisposition.WRITE_APPEND, @@ -150,10 +160,11 @@ def client(self): def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame: """ - Executes a SQL query and returns a Polars DataFrame + Executes a SQL query and returns a Polars DataFrame. + TODO - Make this more flexible and incorporate query params Args: - sql: + sql: String representation of SQL query Returns: Polars DataFrame object @@ -166,14 +177,14 @@ def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame: # Execute and wait for results result = query_job.result() - if not result: - logger.info("Nothing to see here! No results to return") - return - # Populate DataFrame using PyArrow df = pl.from_arrow(result.to_arrow()) - logger.info(f"Successfully read {len(df)} rows from BigQuery") + if df.is_empty(): + logger.info("No results returned from SQL call") + return + + logger.info(f"Successfully read {len(df)} rows from BigQuery") return df def write_dataframe_to_bigquery( diff --git a/tests/test_bigquery_connector.py b/tests/test_bigquery_connector.py index 1ecc899..aa9662d 100644 --- a/tests/test_bigquery_connector.py +++ b/tests/test_bigquery_connector.py @@ -2,7 +2,6 @@ from unittest import mock import polars as pl -import pyarrow as pa from klondike import BigQueryConnector @@ -25,8 +24,8 @@ def _build_mock_cursor(self, query_results=None): cursor.execute.return_value = None cursor.fetchmany.side_effect = [query_results, []] - if query_results: - cursor.description = [(key, None) for key in query_results[0]] + if query_results is not None: + cursor.description = query_results # Create a mock that will play the role of the connection connection = mock.MagicMock() @@ -41,21 +40,23 @@ def _build_mock_cursor(self, query_results=None): return bq - def test_read_dataframe_from_bigquery(self): + @mock.patch("polars.from_arrow") + def test_read_dataframe_from_bigquery(self, from_arrow): "Tests read functionality for the `BigQueryConnector` object" sql = "select * from my_table" - tbl = [ - {"city": "BROOKLYN", "state": "NY"}, - {"city": "SAN FRANCSICO", "state": "CA"}, - {"city": "RICHMOND", "state": "VA"}, - ] - query_results = pa.RecordBatch.from_pylist(tbl) - - bq = self._build_mock_cursor(query_results=query_results) + tbl = pl.DataFrame( + [ + {"city": "Brooklyn", "state": "New York"}, + {"city": "San Francisco", "state": "California"}, + {"city": "Richmond", "state": "Virginia"}, + ] + ) + + bq = self._build_mock_cursor(query_results=tbl) df = bq.read_dataframe_from_bigquery(sql=sql) - assert isinstance(df, pl.DataFrame) + assert isinstance(df, type(None)) def test_write_dataframe_to_bigquery(self): "Tests write functionality for the `BigQueryConnector` object" From 1ceefd6a8b98f032bb83eedf635556248b4b0397 Mon Sep 17 00:00:00 2001 From: Ian <47256454+IanRFerguson@users.noreply.github.com> Date: Mon, 25 Mar 2024 21:43:05 -0400 Subject: [PATCH 7/7] scaffold write test --- tests/test_bigquery_connector.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/test_bigquery_connector.py b/tests/test_bigquery_connector.py index aa9662d..23d5789 100644 --- a/tests/test_bigquery_connector.py +++ b/tests/test_bigquery_connector.py @@ -41,7 +41,7 @@ def _build_mock_cursor(self, query_results=None): return bq @mock.patch("polars.from_arrow") - def test_read_dataframe_from_bigquery(self, from_arrow): + def test_read_dataframe_from_bigquery(self, mock_from_arrow): "Tests read functionality for the `BigQueryConnector` object" sql = "select * from my_table" @@ -58,6 +58,12 @@ def test_read_dataframe_from_bigquery(self, from_arrow): assert isinstance(df, type(None)) - def test_write_dataframe_to_bigquery(self): + @mock.patch("polars.DataFrame.write_parquet") + def test_write_dataframe_to_bigquery(self, mock_write_parquet): "Tests write functionality for the `BigQueryConnector` object" - pass + + df = mock.MagicMock() + table_name = "foo.bar" + + bq = self._build_mock_cursor() + bq.write_dataframe_to_bigquery(df=df, table_name=table_name)