Skip to content

Commit

Permalink
Feat: Update partitioning by DATE, DATETIME, TIMESTAMP, _PARTITIONDATE (
Browse files Browse the repository at this point in the history
#1113)

* adds additional functionality to cover more partitioning capability

* Updates the partitioning algorithm and tests

* Updates special case and tests

* Updates test in possible effort to increase coverage.

* Tweaks the conditionals in time partitioning process

* Updates linting

* chore(deps): update all dependencies (#1136)

* chore(deps): update all dependencies

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Update protobuf to 5.28.3

* pin google-crc32c for python 3.7/3.8

* Pin mako for python 3.8

* Pin markupsafe for Python 3.8

* Pin pyparsing for python 3.8

* Pin pyparsing for Python 3.8

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* chore(deps): update all dependencies (#1140)

* chore(deps): update all dependencies

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* Removes duplicate test

---------

Co-authored-by: Mend Renovate <bot@renovateapp.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Anthonios Partheniou <partheniou@google.com>
  • Loading branch information
4 people authored Nov 22, 2024
1 parent fb8f009 commit 413cd24
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 91 deletions.
97 changes: 77 additions & 20 deletions sqlalchemy_bigquery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,42 +812,99 @@ def _raise_for_type(self, option, value, expected_type):
)

def _process_time_partitioning(
self, table: Table, time_partitioning: TimePartitioning
self,
table: Table,
time_partitioning: TimePartitioning,
):
"""
Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp.
Generates a SQL 'PARTITION BY' clause for partitioning a table,
Args:
- table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned.
- table (Table): The SQLAlchemy table object representing the BigQuery
table to be partitioned.
- time_partitioning (TimePartitioning): The time partitioning details,
including the field to be used for partitioning.
Returns:
- str: A SQL 'PARTITION BY' clause that uses either TIMESTAMP_TRUNC or DATE_TRUNC to
partition data on the specified field.
- str: A SQL 'PARTITION BY' clause.
Example:
- Given a table with a TIMESTAMP type column 'event_timestamp' and setting
'time_partitioning.field' to 'event_timestamp', the function returns
- Given a table with an 'event_timestamp' and setting time_partitioning.type
as DAY and by setting 'time_partitioning.field' as 'event_timestamp', the
function returns:
"PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)".
Current inputs allowed by BQ and covered by this function include:
* _PARTITIONDATE
* DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR)
* TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR)
* DATE_TRUNC(<date_column>, MONTH/YEAR)
Additional options allowed by BQ but not explicitly covered by this
function include:
* DATE(_PARTITIONTIME)
* DATE(<timestamp_column>)
* DATE(<datetime_column>)
* DATE column
"""
field = "_PARTITIONDATE"
trunc_fn = "DATE_TRUNC"

sqltypes = {
"_PARTITIONDATE": ("_PARTITIONDATE", None),
"TIMESTAMP": ("TIMESTAMP_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
"DATETIME": ("DATETIME_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
"DATE": ("DATE_TRUNC", {"MONTH", "YEAR"}),
}

# Extract field (i.e <column_name> or _PARTITIONDATE)
# AND extract the name of the column_type (i.e. "TIMESTAMP", "DATE",
# "DATETIME", "_PARTITIONDATE")
if time_partitioning.field is not None:
field = time_partitioning.field
if isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.DATE,
):
return f"PARTITION BY {field}"
elif isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.TIMESTAMP,
):
trunc_fn = "TIMESTAMP_TRUNC"
column_type = table.columns[field].type.__visit_name__.upper()

else:
field = "_PARTITIONDATE"
column_type = "_PARTITIONDATE"

# Extract time_partitioning.type_ (DAY, HOUR, MONTH, YEAR)
# i.e. generates one partition per type (1/DAY, 1/HOUR)
# NOTE: if time_partitioning.type_ == None, it gets
# immediately overwritten by python-bigquery to a default of DAY.
partitioning_period = time_partitioning.type_

# Extract the truncation_function (i.e. DATE_TRUNC)
# and the set of allowable partition_periods
# that can be used in that function
trunc_fn, allowed_partitions = sqltypes[column_type]

# Create output:
# Special Case: _PARTITIONDATE does NOT use a function or partitioning_period
if trunc_fn == "_PARTITIONDATE":
return f"PARTITION BY {field}"

# Special Case: BigQuery will not accept DAY as partitioning_period for
# DATE_TRUNC.
# However, the default argument in python-bigquery for TimePartioning
# is DAY. This case overwrites that to avoid making a breaking change in
# python-bigquery.
# https://github.com/googleapis/python-bigquery/blob/a4d9534a900f13ae7355904cda05097d781f27e3/google/cloud/bigquery/table.py#L2916
if trunc_fn == "DATE_TRUNC" and partitioning_period == "DAY":
raise ValueError(
"The TimePartitioning.type_ must be one of: "
f"{allowed_partitions}, received {partitioning_period}."
"NOTE: the `default` value for TimePartioning.type_ as set in "
"python-bigquery is 'DAY', if you wish to use 'DATE_TRUNC' "
"ensure that you overwrite the default TimePartitioning.type_. "
)

# Generic Case
if partitioning_period not in allowed_partitions:
raise ValueError(
"The TimePartitioning.type_ must be one of: "
f"{allowed_partitions}, received {partitioning_period}."
)

return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})"
return f"PARTITION BY {trunc_fn}({field}, {partitioning_period})"

def _process_range_partitioning(
self, table: Table, range_partitioning: RangePartitioning
Expand Down
11 changes: 10 additions & 1 deletion tests/system/test_sqlalchemy_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,16 @@ def test_dml(engine, session, table_dml):
assert len(result) == 0


@pytest.mark.parametrize("time_partitioning_field", ["timestamp_c", "date_c"])
@pytest.mark.parametrize(
"time_partitioning_field",
[
("timestamp_c"),
("datetime_c"),
# Fails because python-bigquery TimePartitioning.type_ defaults to "DAY", but
# the DATE_TRUNC() function only allows "MONTH"/"YEAR"
pytest.param("date_c", marks=[pytest.mark.xfail]),
],
)
def test_create_table(engine, bigquery_dataset, time_partitioning_field):
meta = MetaData()
Table(
Expand Down
193 changes: 123 additions & 70 deletions tests/unit/test_table_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,111 +104,161 @@ def test_table_clustering_fields_dialect_option_type_error(faux_conn):
)


def test_table_time_partitioning_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(),
)
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type,func_name",
[
# DATE dtype
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR, # Only MONTH/YEAR are permitted in BigQuery
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.DAY, # Only MONTH/YEAR are permitted in BigQuery
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.MONTH, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.YEAR, "DATE_TRUNC"),
# TIMESTAMP dtype
(sqlalchemy.TIMESTAMP, TimePartitioningType.HOUR, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.DAY, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.MONTH, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.YEAR, "TIMESTAMP_TRUNC"),
# DATETIME dtype
(sqlalchemy.DATETIME, TimePartitioningType.HOUR, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.DAY, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.MONTH, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.YEAR, "DATETIME_TRUNC"),
# TimePartitioning.type_ == None
(sqlalchemy.DATETIME, None, "DATETIME_TRUNC"),
],
)
def test_table_time_partitioning_given_field_and_type__dialect_options(
faux_conn, column_dtype, time_partitioning_type, func_name
):
"""NOTE: Expect table creation to fail as SQLite does not support
partitioned tables, despite that, we are still able to test the generation
of SQL statements.
assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)"
)
Each parametrization ensures that the appropriate function is generated
depending on whether the column datatype is DATE, TIMESTAMP, DATETIME and
whether the TimePartitioningType is HOUR, DAY, MONTH, YEAR.
`DATE_TRUNC` only returns a result if TimePartitioningType is DAY, MONTH,
YEAR. BigQuery cannot partition on DATE by HOUR, so that is expected to
xfail.
def test_table_require_partition_filter_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
bigquery_require_partition_filter=True,
)
A distinguishing characteristic of this test is we provide an argument to
the TimePartitioning class for both field and type_.
assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
" OPTIONS(require_partition_filter=true)"
)
Special case: IF time_partitioning_type is None, the __init__() in the
TimePartitioning class will overwrite it with TimePartitioningType.DAY as
the default.
"""

if time_partitioning_type is None:
time_partitioning_type = TimePartitioningType.DAY

def test_table_time_partitioning_with_field_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(
field="createdAt", type_=time_partitioning_type
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
f"CREATE TABLE `some_table` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY {func_name}(createdAt, {time_partitioning_type})"
)
assert result == expected


def test_table_time_partitioning_by_month_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(
field="createdAt",
type_=TimePartitioningType.MONTH,
),
)
def test_table_time_partitioning_given_field_but_no_type__dialect_option(faux_conn):
"""Expect table creation to fail as SQLite does not support partitioned tables
assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, MONTH)"
)
Confirms that if the column datatype is DATETIME but no TimePartitioning.type_
has been supplied, the system will default to DAY.
A distinguishing characteristic of this test is we provide an argument to
the TimePartitioning class for field but not type_.
"""

def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )"
" PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)"
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATETIME_TRUNC(createdAt, DAY)"
)
assert result == expected


def test_table_time_partitioning_with_date_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type",
[
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR,
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.DAY),
(sqlalchemy.DATE, TimePartitioningType.MONTH),
(sqlalchemy.DATE, TimePartitioningType.YEAR),
],
)
def test_table_time_partitioning_given_type__but_no_field_dialect_option(
faux_conn,
column_dtype,
time_partitioning_type,
):
"""NOTE: Expect table creation to fail as SQLite does not support
partitioned tables, despite that, we are still able to test the generation
of SQL statements
If the `field` argument to TimePartitioning() is not provided, it defaults to
None. That causes the pseudocolumn "_PARTITIONDATE" to be used by default as
the column to partition by.
_PARTITIONTIME only returns a result if TimePartitioningType is DAY, MONTH,
YEAR. BigQuery cannot partition on _PARTITIONDATE by HOUR, so that is
expected to xfail.
A distinguishing characteristic of this test is we provide an argument to
the TimePartitioning class for type_ but not field.
"""

with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table_2",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DATE),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(type_=time_partitioning_type),
)

# confirm that the following code creates the correct SQL string
assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` DATE )"
" PARTITION BY createdAt"
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())

# We need two versions of expected depending on whether we use _PARTITIONDATE
expected = (
f"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY _PARTITIONDATE"
)
assert result == expected


def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_conn):
Expand All @@ -227,7 +277,7 @@ def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_c

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
" PARTITION BY DATETIME_TRUNC(createdAt, DAY)"
" OPTIONS(partition_expiration_days=0.25)"
)

Expand Down Expand Up @@ -400,13 +450,16 @@ def test_table_all_dialect_option(faux_conn):
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
" PARTITION BY DATETIME_TRUNC(createdAt, DAY)"
" CLUSTER BY country, town"
" OPTIONS(partition_expiration_days=30.0, expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')"
)

assert result == expected


def test_validate_friendly_name_value_type(ddl_compiler):
# expect option value to be transformed as a string expression
Expand Down

0 comments on commit 413cd24

Please sign in to comment.