diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/automated_tests.yml similarity index 66% rename from .github/workflows/ci_cd.yml rename to .github/workflows/automated_tests.yml index a421902..2304454 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/automated_tests.yml @@ -1,4 +1,4 @@ -name: checks +name: formatting_checks on: pull_request: @@ -7,7 +7,7 @@ on: branches: ["master", "dev"] jobs: - run_checks: + check_linting_and_formatting: runs-on: macos-latest steps: @@ -32,9 +32,26 @@ jobs: run: | python -m pip install black flake8 isort - - name: Run Python checks + - name: Run Python formatting and linting if: steps.changed-python-files.outputs.any_changed == 'true' run: | flake8 ${{ steps.changed-python-files.outputs.all_changed_files }} --extend-ignore=E203,W503 --max-line-length=120 black --check ${{ steps.changed-python-files.outputs.all_changed_files }} - isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }} \ No newline at end of file + isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }} + + run_unit_tests: + runs-on: macos-latest + steps: + - name: Checkout branch + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Setup Python environment + uses: actions/setup-python@v3 + + - name: Run Unit Tests + run: | + pip install -r requirements.txt + pip install pytest + pytest -rf tests/* \ No newline at end of file diff --git a/.gitignore b/.gitignore index ba652b1..57dadcc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ ### DIRECTORIES venv/ -.vscode/ \ No newline at end of file +.vscode/ +dev/ \ No newline at end of file diff --git a/README.md b/README.md index 728a438..b448900 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,39 @@ -Klondike offers a lightweight API to read and write data to Google BigQuery using rust-optimized Polars DataFrames. +Klondike offers a lightweight API to read and write data to Google BigQuery using Polars DataFrames. ## Installation +### Installing Klondike Install at the command line ``` pip install klondike==0.1.0 +``` + +### Installing Rust +Since Polars leverages Rust speedups, you need to have Rust installed in your environment as well. See the Rust installation guide [here](https://www.rust-lang.org/tools/install). + + +## Usage + +``` +# Instantiate a connection to BigQuery +from klondike import BigQueryConnector + +bq = BigQueryConnector( + app_creds="/path/to/your/service_account.json" +) + +# Read data from BigQuery +sql = "SELECT * FROM nba_dbt.staging__nyk_players" +df = bq.read_dataframe_from_bigquery(sql=sql) + +# Write data to BigQuery +bq.write_dataframe_to_bigquery( + df=df, + table_name="nba_dbt.my_new_table", + if_eixsts="truncate" +) ``` \ No newline at end of file diff --git a/klondike/__init__.py b/klondike/__init__.py index 1b868e5..7e39c33 100644 --- a/klondike/__init__.py +++ b/klondike/__init__.py @@ -24,7 +24,7 @@ __all__ = [] POLAR_OBJECTS = [ - ("klondike.bigquery.bigquery", "PolarBigQuery"), + ("klondike.bigquery.bigquery", "BigQueryConnector"), ] for module_, object_ in POLAR_OBJECTS: diff --git a/klondike/bigquery/bigquery.py b/klondike/bigquery/bigquery.py index 84368ee..30e70c6 100644 --- a/klondike/bigquery/bigquery.py +++ b/klondike/bigquery/bigquery.py @@ -1,11 +1,14 @@ +import io import json import os import tempfile -from contextlib import contextmanager from typing import Optional, Union +import polars as pl from google.cloud import bigquery -from google.cloud.bigquery import dbapi +from google.cloud.bigquery import LoadJobConfig + +from klondike import logger ########## @@ -15,39 +18,54 @@ "https://www.googleapis.com/auth/bigquery", ] }, -) +)[0] -class PolarBigQuery: +class BigQueryConnector: """ Establish and authenticate a connection to a BigQuery warehouse + + Args: + app_creds: Google service account, either as a relative path or a dictionary instance + project: Name of Google Project + location: Location of Google Project + timeout: Temporal threshold to kill a stalled job, defaults to 60s + client_options: API scopes + google_environment_variable: Provided for flexibility, defaults to `GOOGLE_APPLICATION_CREDENTIALS` """ def __init__( self, app_creds: Optional[Union[str, dict]] = None, - gcp_project: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, timeout: int = 60, client_options: dict = SCOPES, google_environment_variable: str = "GOOGLE_APPLICATION_CREDENTIALS", ): self.app_creds = app_creds - self.gcp_project = gcp_project + self.project = project + self.location = location self.timeout = timeout self.client_options = client_options self._client = None self.dialect = "bigquery" - self._dbapi = dbapi - self.__setup_google_app_creds( - app_creds=app_creds, env_variable=google_environment_variable - ) + if not self.app_creds: + if not os.environ.get(google_environment_variable): + raise OSError("No app_creds provided") + else: + logger.info( + f"Using `{google_environment_variable}` variable defined in environment" + ) + else: + self.__setup_google_app_creds( + app_creds=self.app_creds, env_variable=google_environment_variable + ) def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: str): - """ - Sets runtime environment variablefor Google SDK - """ + "Sets runtime environment variable for Google SDK" if isinstance(app_creds, dict): creds = json.dumps(app_creds) @@ -61,6 +79,70 @@ def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: st os.environ[env_variable] = creds + def __set_load_job_config( + self, + base_job_config: Optional[LoadJobConfig] = None, + max_bad_records: int = 0, + table_schema: list = None, + if_exists: str = "fail", + **kwargs, + ): + "Defines `LoadConfigJob` when writing to BigQuery" + + if not base_job_config: + logger.debug("No job config provided, starting fresh") + base_job_config = LoadJobConfig() + + # This is default behavior for this class + base_job_config.source_format = bigquery.SourceFormat.PARQUET + + # Create table schema mapping if provided + if table_schema: + base_job_config.schema = self.__set_table_schema(table_schema=table_schema) + else: + base_job_config.schema = None + + base_job_config.max_bad_records = max_bad_records + + base_job_config.write_disposition = self.__set_write_disposition( + if_exists=if_exists + ) + + # List of LoadJobConfig attributes + _attributes = [x for x in dict(vars(LoadJobConfig)).keys()] + + # Attributes that will not be overwritten + _reserved_attributes = [ + "source_format", + "schema", + "max_bad_records", + "write_disposition", + ] + + # Loop through keyword arguments and update `LoadJobConfig` object + for k, v in kwargs.items(): + if k in _attributes and k not in _reserved_attributes: + logger.debug(f"Updating {k} parameter in job config [value={v}]") + base_job_config[k] = v + + return base_job_config + + def __set_table_schema(self, table_schema: list): + "TODO - Write about me" + + return [bigquery.SchemaField(**x) for x in table_schema] + + def __set_write_disposition(self, if_exists: str): + "TODO - Write about me" + + DISPOSITION_MAP = { + "fail": bigquery.WriteDisposition.WRITE_EMPTY, + "append": bigquery.WriteDisposition.WRITE_APPEND, + "truncate": bigquery.WriteDisposition.WRITE_TRUNCATE, + } + + return DISPOSITION_MAP[if_exists] + @property def client(self): """ @@ -73,17 +155,81 @@ def client(self): location=self.location, client_options=self.client_options, ) + return self._client - @contextmanager - def connection(self): + def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame: + """ + Executes a SQL query and returns a Polars DataFrame. + TODO - Make this more flexible and incorporate query params + + Args: + sql: String representation of SQL query + + Returns: + Polars DataFrame object + """ + + # Define query job + logger.debug("Running SQL...", sql) + query_job = self.client.query(query=sql, timeout=self.timeout) + + # Execute and wait for results + result = query_job.result() + + # Populate DataFrame using PyArrow + df = pl.from_arrow(result.to_arrow()) + + if df.is_empty(): + logger.info("No results returned from SQL call") + return + + logger.info(f"Successfully read {len(df)} rows from BigQuery") + return df + + def write_dataframe_to_bigquery( + self, + df: pl.DataFrame, + table_name: str, + load_job_config: Optional[LoadJobConfig] = None, + max_bad_records: int = 0, + table_schema: list = None, + if_exists: str = "fail", + **load_kwargs, + ) -> None: """ - TODO - Fill me in + Writes a Polars DataFrame to BigQuery + + Args: + df: Polars DataFrame + table_name: Destination table name to write to - `dataset.table` convention + load_job_config: `LoadJobConfig` object. If none is supplied, several defaults are applied + max_bad_records: Tolerance for bad records in the load job, defaults to 0 + table_schema: List of column names, types, and optional flags to include + if_exists: One of `fail`, `drop`, `append`, `truncate` + load_kwargs: See here for list of accepted values + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig """ - conn_ = self._dbapi.connect(self.client) + if if_exists == "drop": + self.client.delete_table(table=table_name) + if_exists = "fail" + + load_job_config = self.__set_load_job_config( + base_load_conifg=load_job_config, + max_bad_records=max_bad_records, + table_schema=table_schema, + if_exists=if_exists, + **load_kwargs, + ) + + logger.info(f"Writing to {table_name}...") + with io.BytesIO() as stream_: + df.write_parquet(stream_) + stream_.seek(0) + load_job = self.client.load_table_from_file( + stream_, destination=table_name, job_config=load_job_config + ) - try: - yield conn_ - finally: - conn_.close() + load_job.result() + logger.info(f"Successfuly wrote {len(df)} rows to {table_name}") diff --git a/requirements.txt b/requirements.txt index 5894718..f73718d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ google-cloud-bigquery==3.19.0 polars==0.20.16 -petl==1.7.15 -psycopg==3.1.18 \ No newline at end of file +pyarrow==15.0.2 \ No newline at end of file diff --git a/run_checks.sh b/run_checks.sh new file mode 100644 index 0000000..315b677 --- /dev/null +++ b/run_checks.sh @@ -0,0 +1,3 @@ +python -m flake8 --extend-ignore=E203,W503 --max-line-length=120 klondike/ tests/ +python -m black klondike/ tests/ +python -m isort --profile black klondike/ tests/ \ No newline at end of file diff --git a/klondike/base/__init__.py b/tests/__init__.py similarity index 100% rename from klondike/base/__init__.py rename to tests/__init__.py diff --git a/tests/test_bigquery_connector.py b/tests/test_bigquery_connector.py new file mode 100644 index 0000000..23d5789 --- /dev/null +++ b/tests/test_bigquery_connector.py @@ -0,0 +1,69 @@ +import os +from unittest import mock + +import polars as pl + +from klondike import BigQueryConnector + +from .test_utils import KlondikeTestCase + +########## + + +class TestBigQuery(KlondikeTestCase): + def setUp(self): + super().setUp() + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path + + def tearDown(self): + super().tearDown() + del os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + + def _build_mock_cursor(self, query_results=None): + cursor = mock.MagicMock() + cursor.execute.return_value = None + cursor.fetchmany.side_effect = [query_results, []] + + if query_results is not None: + cursor.description = query_results + + # Create a mock that will play the role of the connection + connection = mock.MagicMock() + connection.cursor.return_value = cursor + + # Create a mock that will play the role of our GoogleBigQuery client + client = mock.MagicMock() + + bq = BigQueryConnector() + bq.connection = connection + bq._client = client + + return bq + + @mock.patch("polars.from_arrow") + def test_read_dataframe_from_bigquery(self, mock_from_arrow): + "Tests read functionality for the `BigQueryConnector` object" + + sql = "select * from my_table" + tbl = pl.DataFrame( + [ + {"city": "Brooklyn", "state": "New York"}, + {"city": "San Francisco", "state": "California"}, + {"city": "Richmond", "state": "Virginia"}, + ] + ) + + bq = self._build_mock_cursor(query_results=tbl) + df = bq.read_dataframe_from_bigquery(sql=sql) + + assert isinstance(df, type(None)) + + @mock.patch("polars.DataFrame.write_parquet") + def test_write_dataframe_to_bigquery(self, mock_write_parquet): + "Tests write functionality for the `BigQueryConnector` object" + + df = mock.MagicMock() + table_name = "foo.bar" + + bq = self._build_mock_cursor() + bq.write_dataframe_to_bigquery(df=df, table_name=table_name) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..d16381b --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,35 @@ +import json +import os +import tempfile +import unittest + +########## + + +class KlondikeTestCase(unittest.TestCase): + def setUp(self): + self._temp_directory = tempfile.TemporaryDirectory() + + self._credentials_path = os.path.join( + self._temp_directory.name, "service_account.json" + ) + + self._service_account = { + "type": "foo", + "project_id": "bar", + "private_key_id": "biz", + "private_key": "bap", + "client_email": "bim", + "client_id": "top", + "auth_uri": "hat", + "token_uri": "tap", + "auth_provider_x509_cert_url": "dance", + "client_x509_cert_url": "good", + "universe_domain": "stuff", + } + + with open(self._credentials_path, "w") as f: + json.dump(self._service_account, f) + + def tearDown(self): + self._temp_directory.cleanup()