Skip to content

Commit

Permalink
Merge pull request #3 from IanRFerguson/ian/add-snowflake
Browse files Browse the repository at this point in the history
v0.2.0 - Add Snowflake Connector
  • Loading branch information
IanRFerguson authored Jun 7, 2024
2 parents 6d1effb + 9992309 commit 5dca8c6
Show file tree
Hide file tree
Showing 12 changed files with 488 additions and 167 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
venv/
.vscode/
dev/
service_accounts/

build/
dist/
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Since Polars leverages Rust speedups, you need to have Rust installed in your en

## Usage

In this demo we'll connect to BigQuery, read data, transform it, and write it back to the data warehouse.
In this demo we'll connect to Google BigQuery, read data, transform it, and write it back to the data warehouse.

First, connect to the BigQuery warehouse by supplying the `BigQueryConnector()` object with the relative path to your service account credentials.

Expand All @@ -32,7 +32,7 @@ bq = BigQueryConnector(
)
```

Next, supply the object with a SQL query in the `read_dataframe_from_bigquery()` function to redner a `DataFrame` object:
Next, supply the object with a SQL query in the `read_dataframe()` function to redner a `DataFrame` object:

```
# Write some valid SQL
Expand All @@ -45,7 +45,7 @@ ORDER BY avg_points DESC
# Pull BigQuery data into a Polars DataFrame
nyk = bq.read_dataframe_from_bigquery(sql=sql)
nyk = bq.read_dataframe(sql=sql)
```

Now that your data is pulled into a local instance, you can clean and transform it using standard Polars functionality - [see the docs](https://docs.pola.rs/py-polars/html/reference/dataframe/index.html) for more information.
Expand All @@ -61,11 +61,11 @@ key_metrics = [
summary_stats = nyk[key_metrics].describe()
```

Finally, push your transformed data back to the BigQuery warehouse using the `write_dataframe_to_bigquery()` function:
Finally, push your transformed data back to the BigQuery warehouse using the `write_dataframe()` function:

```
# Write back to BigQuery
bq.write_dataframe_to_bigquery(
bq.write_dataframe(
df=summary_stats,
table_name="nba_dbt.summary_statistics",
if_exists="truncate"
Expand Down
1 change: 1 addition & 0 deletions klondike/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
__all__ = []
POLAR_OBJECTS = [
("klondike.bigquery.bigquery", "BigQueryConnector"),
("klondike.snowflake.snowflake", "SnowflakeConnector"),
]

for module_, object_ in POLAR_OBJECTS:
Expand Down
169 changes: 114 additions & 55 deletions klondike/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import polars as pl
from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig
from google.cloud.exceptions import NotFound

from klondike import logger
from klondike.utilities.utilities import validate_if_exists_behavior

##########

Expand All @@ -26,12 +28,18 @@ class BigQueryConnector:
Establish and authenticate a connection to a BigQuery warehouse
Args:
app_creds: Google service account, either as a relative path or a dictionary instance
project: Name of Google Project
location: Location of Google Project
timeout: Temporal threshold to kill a stalled job, defaults to 60s
client_options: API scopes
google_environment_variable: Provided for flexibility, defaults to `GOOGLE_APPLICATION_CREDENTIALS`
app_creds: `str`
Google service account, either as a relative path or a dictionary instance
project: `str`
Name of Google Project
location: `str`
Location of Google Project
timeout: `int`
Temporal threshold to kill a stalled job, defaults to 60s
client_options: `list`
API scopes
google_environment_variable: `str`
Provided for flexibility, defaults to `GOOGLE_APPLICATION_CREDENTIALS`
"""

def __init__(
Expand All @@ -46,11 +54,11 @@ def __init__(
self.app_creds = app_creds
self.project = project
self.location = location
self.timeout = timeout
self.client_options = client_options

self._client = None
self.dialect = "bigquery"
self.__client = None
self.__timeout = timeout

if not self.app_creds:
if not os.environ.get(google_environment_variable):
Expand All @@ -64,6 +72,30 @@ def __init__(
app_creds=self.app_creds, env_variable=google_environment_variable
)

@property
def client(self):
"""
Instantiate BigQuery client and assign it
as class property
"""

if not self.__client:
self.__client = bigquery.Client(
project=self.project,
location=self.location,
client_options=self.client_options,
)

return self.__client

@property
def timeout(self):
return self.__timeout

@timeout.setter
def timeout(self, timeout):
self.__timeout = timeout

def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: str):
"Sets runtime environment variable for Google SDK"

Expand All @@ -89,6 +121,20 @@ def __set_load_job_config(
):
"Defines `LoadConfigJob` when writing to BigQuery"

def set_write_disposition(if_exists: str):
DISPOSITION_MAP = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"append": bigquery.WriteDisposition.WRITE_APPEND,
"truncate": bigquery.WriteDisposition.WRITE_TRUNCATE,
}

return DISPOSITION_MAP[if_exists]

def set_table_schema(table_schema: list):
return [bigquery.SchemaField(**x) for x in table_schema]

###

if not base_job_config:
logger.debug("No job config provided, starting fresh")
base_job_config = LoadJobConfig()
Expand All @@ -98,17 +144,16 @@ def __set_load_job_config(

# Create table schema mapping if provided
if table_schema:
base_job_config.schema = self.__set_table_schema(table_schema=table_schema)
base_job_config.schema = set_table_schema(table_schema=table_schema)
else:
base_job_config.schema = None

base_job_config.max_bad_records = max_bad_records
base_job_config.write_disposition = set_write_disposition(if_exists=if_exists)

base_job_config.write_disposition = self.__set_write_disposition(
if_exists=if_exists
)
###

# List of LoadJobConfig attributes
# List of available LoadJobConfig attributes
_attributes = [x for x in dict(vars(LoadJobConfig)).keys()]

# Attributes that will not be overwritten
Expand All @@ -127,44 +172,14 @@ def __set_load_job_config(

return base_job_config

def __set_table_schema(self, table_schema: list):
"TODO - Write about me"

return [bigquery.SchemaField(**x) for x in table_schema]

def __set_write_disposition(self, if_exists: str):
"TODO - Write about me"

DISPOSITION_MAP = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"append": bigquery.WriteDisposition.WRITE_APPEND,
"truncate": bigquery.WriteDisposition.WRITE_TRUNCATE,
}

return DISPOSITION_MAP[if_exists]

@property
def client(self):
"""
Instantiate BigQuery client
"""

if not self._client:
self._client = bigquery.Client(
project=self.project,
location=self.location,
client_options=self.client_options,
)

return self._client

def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame:
def read_dataframe(self, sql: str) -> pl.DataFrame:
"""
Executes a SQL query and returns a Polars DataFrame.
TODO - Make this more flexible and incorporate query params
Args:
sql: String representation of SQL query
sql: `str`
String representation of SQL query
Returns:
Polars DataFrame object
Expand All @@ -185,9 +200,10 @@ def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame:
return

logger.info(f"Successfully read {len(df)} rows from BigQuery")

return df

def write_dataframe_to_bigquery(
def write_dataframe(
self,
df: pl.DataFrame,
table_name: str,
Expand All @@ -201,16 +217,26 @@ def write_dataframe_to_bigquery(
Writes a Polars DataFrame to BigQuery
Args:
df: Polars DataFrame
table_name: Destination table name to write to - `dataset.table` convention
load_job_config: `LoadJobConfig` object. If none is supplied, several defaults are applied
max_bad_records: Tolerance for bad records in the load job, defaults to 0
table_schema: List of column names, types, and optional flags to include
if_exists: One of `fail`, `drop`, `append`, `truncate`
load_kwargs: See here for list of accepted values
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig
df: `polars.DataFrame`
DataFrame to write to BigQuery
table_name: `str`
Destination table name to write to - `dataset.table` convention
load_job_config: `LoadJobConfig`
Configures load job; if none is supplied, several defaults are applied
max_bad_records: `int`
Tolerance for bad records in the load job, defaults to 0
table_schema: `list`
List of column names, types, and optional flags to include
if_exists: `str`
One of `fail`, `drop`, `append`, `truncate`
load_kwargs:
See here for list of accepted values \
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig
"""

if not validate_if_exists_behavior(user_input=if_exists):
raise ValueError(f"{if_exists} is an invalid input")

if if_exists == "drop":
self.client.delete_table(table=table_name)
if_exists = "fail"
Expand All @@ -233,3 +259,36 @@ def write_dataframe_to_bigquery(

load_job.result()
logger.info(f"Successfuly wrote {len(df)} rows to {table_name}")

def table_exists(self, table_name: str) -> bool:
"""
Determines if a BigQuery table exists
Args:
table_name: `str`
BigQuery table name in `schema.table` or `project.schema.table` format
"""

try:
_ = self.client.get_table(table=table_name)
return True

except NotFound:
return False

def list_tables(self, schema_name: str) -> list:
"""
Gets a list of available tables in a BigQuery schema
Args:
schema_name: `str`
BigQuery schema name
Returns:
List of table names
"""

return [
x.full_table_id.replace(":", ".")
for x in self.client.list_tables(dataset=schema_name)
]
File renamed without changes.
Loading

0 comments on commit 5dca8c6

Please sign in to comment.