Skip to content

Commit

Permalink
Raise exception when reading non-UTC adjusted timestamps in parquet (#…
Browse files Browse the repository at this point in the history
…4421)

Also moved all parquet files in core repo to Git LFS
  • Loading branch information
malhotrashivam committed Sep 19, 2023
1 parent e9e242f commit 59921bc
Show file tree
Hide file tree
Showing 33 changed files with 68 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.parquet filter=lfs diff=lfs merge=lfs -text
2 changes: 2 additions & 0 deletions .github/workflows/check-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true

- name: Setup JDK 11
id: setup-java-11
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/nightly-check-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true

- name: Setup JDK 11
id: setup-java-11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,17 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle
}

if (schemaElement.isSetLogicalType()) {
((Types.Builder) childBuilder)
.as(getLogicalTypeAnnotation(schemaElement.logicalType));
LogicalType logicalType = schemaElement.logicalType;
if (logicalType.isSetTIMESTAMP()) {
TimestampType timestamp = logicalType.getTIMESTAMP();
if (!timestamp.isAdjustedToUTC) {
// TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps
throw new ParquetFileReaderException(String.format(
"Only UTC timestamp is supported, found time column `%s` with isAdjustedToUTC=false",
schemaElement.getName()));
}
}
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(logicalType));
}

if (schemaElement.isSetConverted_type()) {
Expand Down Expand Up @@ -365,12 +374,16 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
case DATE:
return LogicalTypeAnnotation.dateType();
case TIME_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIME_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public Optional<Class<?>> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnot
@Override
public Optional<Class<?>> visit(
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#3588): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
switch (timestampLogicalType.getUnit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,8 @@ private static class LogicalTypeVisitor<ATTR extends Any>
@Override
public Optional<ToPage<ATTR, ?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
return Optional
.of(ToInstantPage.create(componentType, timestampLogicalType.getUnit()));
Expand Down
3 changes: 2 additions & 1 deletion extensions/parquet/table/src/test/e0.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"c": np.arange(3, 6).astype("u1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
# TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
# "f": pd.date_range("20130101", periods=3),
"g": pd.date_range("20130101", periods=3, tz="US/Eastern"),
"h": pd.Categorical(list("abc")),
"i": pd.Categorical(list("abc"), ordered=True),
Expand Down
2 changes: 1 addition & 1 deletion extensions/parquet/table/src/test/e0.requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
numpy==1.24.2
pandas==1.5.3
pyarrow==4.0.1
pyarrow==5.0.0
python-dateutil==2.8.2
pytz==2022.7.1
six==1.16.0
3 changes: 2 additions & 1 deletion extensions/parquet/table/src/test/e1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"c": np.arange(3, 6).astype("u1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
# TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
# "f": pd.date_range("20130101", periods=3),
"g": pd.date_range("20130101", periods=3, tz="US/Eastern"),
"h": pd.Categorical(list("abc")),
"i": pd.Categorical(list("abc"), ordered=True),
Expand Down
3 changes: 2 additions & 1 deletion extensions/parquet/table/src/test/e2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"c": np.arange(3, 6).astype("u1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
# TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
# "f": pd.date_range("20130101", periods=3),
"g": pd.date_range("20130101", periods=3, tz="US/Eastern"),
"h": pd.Categorical(list("abc")),
"i": pd.Categorical(list("abc"), ordered=True),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,24 +458,23 @@ public void e1() {
assertTableEquals(uncompressed, zstd);
}

// TODO(deephaven-core#3588): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
// @Test
// public void e2() {
// final Table uncompressed =
// ParquetTools.readTable(TestParquetTools.class.getResource("/e2/uncompressed.parquet").getFile());
//
// final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/gzip.parquet").getFile());
// assertTableEquals(uncompressed, gzip);
//
// final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/lz4.parquet").getFile());
// assertTableEquals(uncompressed, lz4);
//
// final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/snappy.parquet").getFile());
// assertTableEquals(uncompressed, snappy);
//
// final Table zstd = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/zstd.parquet").getFile());
// assertTableEquals(uncompressed, zstd);
// }
@Test
public void e2() {
final Table uncompressed =
ParquetTools.readTable(TestParquetTools.class.getResource("/e2/uncompressed.parquet").getFile());

final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/gzip.parquet").getFile());
assertTableEquals(uncompressed, gzip);

final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/lz4.parquet").getFile());
assertTableEquals(uncompressed, lz4);

final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/snappy.parquet").getFile());
assertTableEquals(uncompressed, snappy);

final Table zstd = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/zstd.parquet").getFile());
assertTableEquals(uncompressed, zstd);
}

private void testWriteRead(Table source, Function<Table, Table> transform) {
final File f2w = new File(testRoot, "testWriteRead.parquet");
Expand Down
Binary file modified extensions/parquet/table/src/test/resources/e0/brotli.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/gzip.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/lz4.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/snappy.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/zstd.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/brotli.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/gzip.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/lz4.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/snappy.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/zstd.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/brotli.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/gzip.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/lz4.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/snappy.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/zstd.parquet
Binary file not shown.
Binary file not shown.
Binary file modified py/server/tests/data/crypto_trades.parquet
Binary file not shown.
23 changes: 21 additions & 2 deletions py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pandas
import pyarrow.parquet

from deephaven import empty_table, dtypes, new_table
from deephaven import DHError, empty_table, dtypes, new_table
from deephaven import arrow as dharrow
from deephaven.column import InputColumn
from deephaven.pandas import to_pandas, to_table
Expand Down Expand Up @@ -304,7 +304,7 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c
# result_table = read('data_from_pandas.parquet')
# self.assert_table_equals(dh_table, result_table)

def test_writing_via_pyarrow(self):
def test_writing_lists_via_pyarrow(self):
# This function tests that we can write tables with list types to parquet files via pyarrow and read them back
# through deephaven's parquet reader code with no exceptions
pa_table = pyarrow.table({'numList': [[2, 2, 4]],
Expand All @@ -314,6 +314,25 @@ def test_writing_via_pyarrow(self):
pa_table_from_disk = dharrow.to_arrow(from_disk)
self.assertTrue(pa_table.equals(pa_table_from_disk))

def test_writing_time_via_pyarrow(self):
def _test_writing_time_helper(filename):
metadata = pyarrow.parquet.read_metadata(filename)
if "isAdjustedToUTC=false" in str(metadata.row_group(0).column(0)):
# TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps
with self.assertRaises(DHError) as e:
read(filename)
self.assertIn("ParquetFileReaderException", e.exception.root_cause)

df = pandas.DataFrame({
"f": pandas.date_range("20130101", periods=3),
})
df.to_parquet("pyarrow_26.parquet", engine='pyarrow', compression=None, version='2.6')
_test_writing_time_helper("pyarrow_26.parquet")
df.to_parquet("pyarrow_24.parquet", engine='pyarrow', compression=None, version='2.4')
_test_writing_time_helper("pyarrow_24.parquet")
df.to_parquet("pyarrow_10.parquet", engine='pyarrow', compression=None, version='1.0')
_test_writing_time_helper("pyarrow_10.parquet")

def test_dictionary_encoding(self):
dh_table = empty_table(10).update(formulas=[
"shortStringColumn = `Row ` + i",
Expand Down

0 comments on commit 59921bc

Please sign in to comment.