Skip to content

Commit

Permalink
Merge pull request #2 from IanRFerguson/ian/bigquery
Browse files Browse the repository at this point in the history
BigQuery Connector
  • Loading branch information
IanRFerguson authored Mar 26, 2024
2 parents 377ee73 + 1ceefd6 commit dd4de91
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: checks
name: formatting_checks

on:
pull_request:
Expand All @@ -7,7 +7,7 @@ on:
branches: ["master", "dev"]

jobs:
run_checks:
check_linting_and_formatting:
runs-on: macos-latest
steps:

Expand All @@ -32,9 +32,26 @@ jobs:
run: |
python -m pip install black flake8 isort
- name: Run Python checks
- name: Run Python formatting and linting
if: steps.changed-python-files.outputs.any_changed == 'true'
run: |
flake8 ${{ steps.changed-python-files.outputs.all_changed_files }} --extend-ignore=E203,W503 --max-line-length=120
black --check ${{ steps.changed-python-files.outputs.all_changed_files }}
isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }}
isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }}
run_unit_tests:
runs-on: macos-latest
steps:
- name: Checkout branch
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Setup Python environment
uses: actions/setup-python@v3

- name: Run Unit Tests
run: |
pip install -r requirements.txt
pip install pytest
pytest -rf tests/*
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

### DIRECTORIES
venv/
.vscode/
.vscode/
dev/
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,39 @@

<img src="https://upload.wikimedia.org/wikipedia/en/d/d5/Klondike_logo.svg">

Klondike offers a lightweight API to read and write data to Google BigQuery using rust-optimized Polars DataFrames.
Klondike offers a lightweight API to read and write data to Google BigQuery using Polars DataFrames.

## Installation

### Installing Klondike
Install at the command line

```
pip install klondike==0.1.0
```

### Installing Rust
Since Polars leverages Rust speedups, you need to have Rust installed in your environment as well. See the Rust installation guide [here](https://www.rust-lang.org/tools/install).


## Usage

```
# Instantiate a connection to BigQuery
from klondike import BigQueryConnector
bq = BigQueryConnector(
app_creds="/path/to/your/service_account.json"
)
# Read data from BigQuery
sql = "SELECT * FROM nba_dbt.staging__nyk_players"
df = bq.read_dataframe_from_bigquery(sql=sql)
# Write data to BigQuery
bq.write_dataframe_to_bigquery(
df=df,
table_name="nba_dbt.my_new_table",
if_eixsts="truncate"
)
```
2 changes: 1 addition & 1 deletion klondike/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

__all__ = []
POLAR_OBJECTS = [
("klondike.bigquery.bigquery", "PolarBigQuery"),
("klondike.bigquery.bigquery", "BigQueryConnector"),
]

for module_, object_ in POLAR_OBJECTS:
Expand Down
188 changes: 167 additions & 21 deletions klondike/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import io
import json
import os
import tempfile
from contextlib import contextmanager
from typing import Optional, Union

import polars as pl
from google.cloud import bigquery
from google.cloud.bigquery import dbapi
from google.cloud.bigquery import LoadJobConfig

from klondike import logger

##########

Expand All @@ -15,39 +18,54 @@
"https://www.googleapis.com/auth/bigquery",
]
},
)
)[0]


class PolarBigQuery:
class BigQueryConnector:
"""
Establish and authenticate a connection to a BigQuery warehouse
Args:
app_creds: Google service account, either as a relative path or a dictionary instance
project: Name of Google Project
location: Location of Google Project
timeout: Temporal threshold to kill a stalled job, defaults to 60s
client_options: API scopes
google_environment_variable: Provided for flexibility, defaults to `GOOGLE_APPLICATION_CREDENTIALS`
"""

def __init__(
self,
app_creds: Optional[Union[str, dict]] = None,
gcp_project: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
timeout: int = 60,
client_options: dict = SCOPES,
google_environment_variable: str = "GOOGLE_APPLICATION_CREDENTIALS",
):
self.app_creds = app_creds
self.gcp_project = gcp_project
self.project = project
self.location = location
self.timeout = timeout
self.client_options = client_options

self._client = None
self.dialect = "bigquery"
self._dbapi = dbapi

self.__setup_google_app_creds(
app_creds=app_creds, env_variable=google_environment_variable
)
if not self.app_creds:
if not os.environ.get(google_environment_variable):
raise OSError("No app_creds provided")
else:
logger.info(
f"Using `{google_environment_variable}` variable defined in environment"
)
else:
self.__setup_google_app_creds(
app_creds=self.app_creds, env_variable=google_environment_variable
)

def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: str):
"""
Sets runtime environment variablefor Google SDK
"""
"Sets runtime environment variable for Google SDK"

if isinstance(app_creds, dict):
creds = json.dumps(app_creds)
Expand All @@ -61,6 +79,70 @@ def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: st

os.environ[env_variable] = creds

def __set_load_job_config(
self,
base_job_config: Optional[LoadJobConfig] = None,
max_bad_records: int = 0,
table_schema: list = None,
if_exists: str = "fail",
**kwargs,
):
"Defines `LoadConfigJob` when writing to BigQuery"

if not base_job_config:
logger.debug("No job config provided, starting fresh")
base_job_config = LoadJobConfig()

# This is default behavior for this class
base_job_config.source_format = bigquery.SourceFormat.PARQUET

# Create table schema mapping if provided
if table_schema:
base_job_config.schema = self.__set_table_schema(table_schema=table_schema)
else:
base_job_config.schema = None

base_job_config.max_bad_records = max_bad_records

base_job_config.write_disposition = self.__set_write_disposition(
if_exists=if_exists
)

# List of LoadJobConfig attributes
_attributes = [x for x in dict(vars(LoadJobConfig)).keys()]

# Attributes that will not be overwritten
_reserved_attributes = [
"source_format",
"schema",
"max_bad_records",
"write_disposition",
]

# Loop through keyword arguments and update `LoadJobConfig` object
for k, v in kwargs.items():
if k in _attributes and k not in _reserved_attributes:
logger.debug(f"Updating {k} parameter in job config [value={v}]")
base_job_config[k] = v

return base_job_config

def __set_table_schema(self, table_schema: list):
"TODO - Write about me"

return [bigquery.SchemaField(**x) for x in table_schema]

def __set_write_disposition(self, if_exists: str):
"TODO - Write about me"

DISPOSITION_MAP = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"append": bigquery.WriteDisposition.WRITE_APPEND,
"truncate": bigquery.WriteDisposition.WRITE_TRUNCATE,
}

return DISPOSITION_MAP[if_exists]

@property
def client(self):
"""
Expand All @@ -73,17 +155,81 @@ def client(self):
location=self.location,
client_options=self.client_options,
)

return self._client

@contextmanager
def connection(self):
def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame:
"""
Executes a SQL query and returns a Polars DataFrame.
TODO - Make this more flexible and incorporate query params
Args:
sql: String representation of SQL query
Returns:
Polars DataFrame object
"""

# Define query job
logger.debug("Running SQL...", sql)
query_job = self.client.query(query=sql, timeout=self.timeout)

# Execute and wait for results
result = query_job.result()

# Populate DataFrame using PyArrow
df = pl.from_arrow(result.to_arrow())

if df.is_empty():
logger.info("No results returned from SQL call")
return

logger.info(f"Successfully read {len(df)} rows from BigQuery")
return df

def write_dataframe_to_bigquery(
self,
df: pl.DataFrame,
table_name: str,
load_job_config: Optional[LoadJobConfig] = None,
max_bad_records: int = 0,
table_schema: list = None,
if_exists: str = "fail",
**load_kwargs,
) -> None:
"""
TODO - Fill me in
Writes a Polars DataFrame to BigQuery
Args:
df: Polars DataFrame
table_name: Destination table name to write to - `dataset.table` convention
load_job_config: `LoadJobConfig` object. If none is supplied, several defaults are applied
max_bad_records: Tolerance for bad records in the load job, defaults to 0
table_schema: List of column names, types, and optional flags to include
if_exists: One of `fail`, `drop`, `append`, `truncate`
load_kwargs: See here for list of accepted values
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig
"""

conn_ = self._dbapi.connect(self.client)
if if_exists == "drop":
self.client.delete_table(table=table_name)
if_exists = "fail"

load_job_config = self.__set_load_job_config(
base_load_conifg=load_job_config,
max_bad_records=max_bad_records,
table_schema=table_schema,
if_exists=if_exists,
**load_kwargs,
)

logger.info(f"Writing to {table_name}...")
with io.BytesIO() as stream_:
df.write_parquet(stream_)
stream_.seek(0)
load_job = self.client.load_table_from_file(
stream_, destination=table_name, job_config=load_job_config
)

try:
yield conn_
finally:
conn_.close()
load_job.result()
logger.info(f"Successfuly wrote {len(df)} rows to {table_name}")
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
google-cloud-bigquery==3.19.0
polars==0.20.16
petl==1.7.15
psycopg==3.1.18
pyarrow==15.0.2
3 changes: 3 additions & 0 deletions run_checks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
python -m flake8 --extend-ignore=E203,W503 --max-line-length=120 klondike/ tests/
python -m black klondike/ tests/
python -m isort --profile black klondike/ tests/
File renamed without changes.
Loading

0 comments on commit dd4de91

Please sign in to comment.