Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise exception when reading non-UTC adjusted timestamps in parquet #4421

Merged
merged 13 commits into from
Sep 19, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,16 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle
}

if (schemaElement.isSetLogicalType()) {
((Types.Builder) childBuilder)
.as(getLogicalTypeAnnotation(schemaElement.logicalType));
LogicalType logicalType = schemaElement.logicalType;
if (logicalType.isSetTIMESTAMP()) {
TimestampType timestamp = logicalType.getTIMESTAMP();
if (!timestamp.isAdjustedToUTC) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
// TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps
throw new ParquetFileReaderException(
"Only UTC timestamp is supported, found time column with isAdjustedToUTC=false");
}
}
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(logicalType));
}

if (schemaElement.isSetConverted_type()) {
Expand Down Expand Up @@ -365,12 +373,16 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
case DATE:
return LogicalTypeAnnotation.dateType();
case TIME_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIME_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public Optional<Class<?>> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnot
@Override
public Optional<Class<?>> visit(
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#3588): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
switch (timestampLogicalType.getUnit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,8 @@ private static class LogicalTypeVisitor<ATTR extends Any>
@Override
public Optional<ToPage<ATTR, ?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
return Optional
.of(ToInstantPage.create(componentType, timestampLogicalType.getUnit()));
Expand Down
3 changes: 2 additions & 1 deletion extensions/parquet/table/src/test/e0.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"c": np.arange(3, 6).astype("u1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
# TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
# "f": pd.date_range("20130101", periods=3),
"g": pd.date_range("20130101", periods=3, tz="US/Eastern"),
"h": pd.Categorical(list("abc")),
"i": pd.Categorical(list("abc"), ordered=True),
Expand Down
2 changes: 1 addition & 1 deletion extensions/parquet/table/src/test/e0.requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
numpy==1.24.2
pandas==1.5.3
pyarrow==4.0.1
pyarrow==5.0.0
python-dateutil==2.8.2
pytz==2022.7.1
six==1.16.0
3 changes: 2 additions & 1 deletion extensions/parquet/table/src/test/e1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"c": np.arange(3, 6).astype("u1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
# TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
# "f": pd.date_range("20130101", periods=3),
"g": pd.date_range("20130101", periods=3, tz="US/Eastern"),
"h": pd.Categorical(list("abc")),
"i": pd.Categorical(list("abc"), ordered=True),
Expand Down
3 changes: 2 additions & 1 deletion extensions/parquet/table/src/test/e2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"c": np.arange(3, 6).astype("u1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
# TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
# "f": pd.date_range("20130101", periods=3),
"g": pd.date_range("20130101", periods=3, tz="US/Eastern"),
"h": pd.Categorical(list("abc")),
"i": pd.Categorical(list("abc"), ordered=True),
Expand Down
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.file.Files;
import java.util.ArrayList;
Expand Down Expand Up @@ -432,8 +430,9 @@ public void e0() {
final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/gzip.parquet").getFile());
assertTableEquals(uncompressed, gzip);

final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/lz4.parquet").getFile());
assertTableEquals(uncompressed, lz4);
// TODO(deephaven-core#3585): LZ4_RAW parquet support
// final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/lz4.parquet").getFile());
// assertTableEquals(uncompressed, lz4);

final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/snappy.parquet").getFile());
assertTableEquals(uncompressed, snappy);
Expand Down Expand Up @@ -461,24 +460,24 @@ public void e1() {
assertTableEquals(uncompressed, zstd);
}

// TODO(deephaven-core#3588): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC
// @Test
// public void e2() {
// final Table uncompressed =
// ParquetTools.readTable(TestParquetTools.class.getResource("/e2/uncompressed.parquet").getFile());
//
// final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/gzip.parquet").getFile());
// assertTableEquals(uncompressed, gzip);
//
// final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/lz4.parquet").getFile());
// assertTableEquals(uncompressed, lz4);
//
// final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/snappy.parquet").getFile());
// assertTableEquals(uncompressed, snappy);
//
// final Table zstd = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/zstd.parquet").getFile());
// assertTableEquals(uncompressed, zstd);
// }
@Test
public void e2() {
final Table uncompressed =
ParquetTools.readTable(TestParquetTools.class.getResource("/e2/uncompressed.parquet").getFile());

final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/gzip.parquet").getFile());
assertTableEquals(uncompressed, gzip);

// TODO(deephaven-core#3585): LZ4_RAW parquet support
// final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/lz4.parquet").getFile());
// assertTableEquals(uncompressed, lz4);

final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/snappy.parquet").getFile());
assertTableEquals(uncompressed, snappy);

final Table zstd = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/zstd.parquet").getFile());
assertTableEquals(uncompressed, zstd);
}

private void testWriteRead(Table source, Function<Table, Table> transform) {
final File f2w = new File(testRoot, "testWriteRead.parquet");
Expand Down
Binary file modified extensions/parquet/table/src/test/resources/e0/brotli.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/gzip.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/lz4.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/snappy.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e0/zstd.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/brotli.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/gzip.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/lz4.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/snappy.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e1/zstd.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/brotli.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/gzip.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/lz4.parquet
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/snappy.parquet
Binary file not shown.
Binary file not shown.
Binary file modified extensions/parquet/table/src/test/resources/e2/zstd.parquet
Binary file not shown.
23 changes: 21 additions & 2 deletions py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pandas
import pyarrow.parquet

from deephaven import empty_table, dtypes, new_table
from deephaven import DHError, empty_table, dtypes, new_table
from deephaven import arrow as dharrow
from deephaven.column import InputColumn
from deephaven.pandas import to_pandas, to_table
Expand Down Expand Up @@ -297,7 +297,7 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c
# result_table = read('data_from_pandas.parquet')
# self.assert_table_equals(dh_table, result_table)

def test_writing_via_pyarrow(self):
def test_writing_lists_via_pyarrow(self):
# This function tests that we can write tables with list types to parquet files via pyarrow and read them back
# through deephaven's parquet reader code with no exceptions
pa_table = pyarrow.table({'numList': [[2, 2, 4]],
Expand All @@ -307,6 +307,25 @@ def test_writing_via_pyarrow(self):
pa_table_from_disk = dharrow.to_arrow(from_disk)
self.assertTrue(pa_table.equals(pa_table_from_disk))

def test_writing_time_via_pyarrow(self):
def _test_writing_time_helper(filename):
metadata = pyarrow.parquet.read_metadata(filename)
if "isAdjustedToUTC=false" in str(metadata.row_group(0).column(0)):
# TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps
with self.assertRaises(DHError) as e:
read(filename)
self.assertIn("ParquetFileReaderException", e.exception.root_cause)

df = pandas.DataFrame({
"f": pandas.date_range("20130101", periods=3),
})
df.to_parquet("pyarrow_26.parquet", engine='pyarrow', compression=None, version='2.6')
_test_writing_time_helper("pyarrow_26.parquet")
df.to_parquet("pyarrow_24.parquet", engine='pyarrow', compression=None, version='2.4')
_test_writing_time_helper("pyarrow_24.parquet")
df.to_parquet("pyarrow_10.parquet", engine='pyarrow', compression=None, version='1.0')
_test_writing_time_helper("pyarrow_10.parquet")

def test_dictionary_encoding(self):
dh_table = empty_table(10).update(formulas=[
"shortStringColumn = `Row ` + i",
Expand Down
Loading