Skip to content

Commit

Permalink
read - test passting ✅
Browse files Browse the repository at this point in the history
  • Loading branch information
IanRFerguson committed Mar 26, 2024
1 parent 54139ae commit 0cd86e9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
31 changes: 21 additions & 10 deletions klondike/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
class BigQueryConnector:
"""
Establish and authenticate a connection to a BigQuery warehouse
Args:
app_creds: Google service account, either as a relative path or a dictionary instance
project: Name of Google Project
location: Location of Google Project
timeout: Temporal threshold to kill a stalled job, defaults to 60s
client_options: API scopes
google_environment_variable: Provided for flexibility, defaults to `GOOGLE_APPLICATION_CREDENTIALS`
"""

def __init__(
Expand Down Expand Up @@ -57,9 +65,7 @@ def __init__(
)

def __setup_google_app_creds(self, app_creds: Union[str, dict], env_variable: str):
"""
Sets runtime environment variablefor Google SDK
"""
"Sets runtime environment variable for Google SDK"

if isinstance(app_creds, dict):
creds = json.dumps(app_creds)
Expand Down Expand Up @@ -122,9 +128,13 @@ def __set_load_job_config(
return base_job_config

def __set_table_schema(self, table_schema: list):
"TODO - Write about me"

return [bigquery.SchemaField(**x) for x in table_schema]

def __set_write_disposition(self, if_exists: str):
"TODO - Write about me"

DISPOSITION_MAP = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"append": bigquery.WriteDisposition.WRITE_APPEND,
Expand All @@ -150,10 +160,11 @@ def client(self):

def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame:
"""
Executes a SQL query and returns a Polars DataFrame
Executes a SQL query and returns a Polars DataFrame.
TODO - Make this more flexible and incorporate query params
Args:
sql:
sql: String representation of SQL query
Returns:
Polars DataFrame object
Expand All @@ -166,14 +177,14 @@ def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame:
# Execute and wait for results
result = query_job.result()

if not result:
logger.info("Nothing to see here! No results to return")
return

# Populate DataFrame using PyArrow
df = pl.from_arrow(result.to_arrow())
logger.info(f"Successfully read {len(df)} rows from BigQuery")

if df.is_empty():
logger.info("No results returned from SQL call")
return

logger.info(f"Successfully read {len(df)} rows from BigQuery")
return df

def write_dataframe_to_bigquery(
Expand Down
27 changes: 14 additions & 13 deletions tests/test_bigquery_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from unittest import mock

import polars as pl
import pyarrow as pa

from klondike import BigQueryConnector

Expand All @@ -25,8 +24,8 @@ def _build_mock_cursor(self, query_results=None):
cursor.execute.return_value = None
cursor.fetchmany.side_effect = [query_results, []]

if query_results:
cursor.description = [(key, None) for key in query_results[0]]
if query_results is not None:
cursor.description = query_results

# Create a mock that will play the role of the connection
connection = mock.MagicMock()
Expand All @@ -41,21 +40,23 @@ def _build_mock_cursor(self, query_results=None):

return bq

def test_read_dataframe_from_bigquery(self):
@mock.patch("polars.from_arrow")
def test_read_dataframe_from_bigquery(self, from_arrow):
"Tests read functionality for the `BigQueryConnector` object"

sql = "select * from my_table"
tbl = [
{"city": "BROOKLYN", "state": "NY"},
{"city": "SAN FRANCSICO", "state": "CA"},
{"city": "RICHMOND", "state": "VA"},
]
query_results = pa.RecordBatch.from_pylist(tbl)

bq = self._build_mock_cursor(query_results=query_results)
tbl = pl.DataFrame(
[
{"city": "Brooklyn", "state": "New York"},
{"city": "San Francisco", "state": "California"},
{"city": "Richmond", "state": "Virginia"},
]
)

bq = self._build_mock_cursor(query_results=tbl)
df = bq.read_dataframe_from_bigquery(sql=sql)

assert isinstance(df, pl.DataFrame)
assert isinstance(df, type(None))

def test_write_dataframe_to_bigquery(self):
"Tests write functionality for the `BigQueryConnector` object"
Expand Down

0 comments on commit 0cd86e9

Please sign in to comment.