From f59ad138d0d08c38e1bff0a583a003c61a5c39f2 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 18 Sep 2023 15:46:44 -0500 Subject: [PATCH] Added support for LZ4_RAW compression codec for Parquet (#4446) --- extensions/parquet/compression/README.md | 37 ++++---- extensions/parquet/compression/build.gradle | 1 - .../DeephavenCompressorAdapterFactory.java | 30 +++++-- .../LZ4WithLZ4RawBackupCompressorAdapter.java | 79 ++++++++++++++++++ .../BrotliParquetTableReadWriteTest.java | 23 ++--- .../parquet/table/ParquetInstructions.java | 3 +- .../deephaven/parquet/table/ParquetTools.java | 19 +++++ .../table/ParquetTableReadWriteTest.java | 68 ++++++++++----- .../parquet/table/TestParquetTools.java | 7 +- .../resources/sample_lz4_compressed.parquet | Bin 0 -> 1228 bytes py/server/tests/test_parquet.py | 17 ++-- 11 files changed, 213 insertions(+), 71 deletions(-) create mode 100644 extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/LZ4WithLZ4RawBackupCompressorAdapter.java create mode 100644 extensions/parquet/table/src/test/resources/sample_lz4_compressed.parquet diff --git a/extensions/parquet/compression/README.md b/extensions/parquet/compression/README.md index a706bd7fe21..e36212d8313 100644 --- a/extensions/parquet/compression/README.md +++ b/extensions/parquet/compression/README.md @@ -4,32 +4,35 @@ This project abstracts over parquet's expected compression formats in a way to m parquet reader/writer code to be able to use them. There are two types in parquet that ostensibly offer compression codecs, -* org.apache.parquet.compression.CompressionCodecFactory, which depends on -* org.apache.hadoop.io.compress.CompressionCodecFactory +* `org.apache.parquet.compression.CompressionCodecFactory`, which depends on +* `org.apache.hadoop.io.compress.CompressionCodecFactory` With no other information, it would seem like the parquet version gets its base functionality from the more general -hadoop type, and while it is true that both factories provide access to org.apache.hadoop.io.compress.CompressionCodec +hadoop type, and while it is true that both factories provide access to `org.apache.hadoop.io.compress.CompressionCodec` instances, the parquet implementation disregards the hadoop implementation's ability to select codecs from either configuration or from the classpath (via service loader), and instead relies on hardcoded fully-qualified classnames -found in `org.apache.parquet.hadoop.metadata.CompressionCodecName`. +found in `org.apache.parquet.hadoop.metadata.CompressionCodecName`. That is why we use the hadoop implementation. -Of those, most are present in the hadoop-common or parquet-hadoop. Most of these are self-contained and entirely +Most of these codecs are present in hadoop-common or parquet-hadoop, and are self-contained and entirely implemented in bytecode. One counter-example would be the LZ4 codec, provided by `org.apache.hadoop.io.compress.Lz4Codec`, -which requires an external dependency that tries to load native code (but can fall back to bytecode). Two implementations -aren't provided at all: -* org.apache.hadoop.io.compress.BrotliCodec - No implementation is available of this in Maven Central, though other -repositories have an implementation, though one that is limited to only native implementations for x86 platforms. -* com.hadoop.compression.lzo.LzoCodec - There are GPL implementations of the LZO codec available, either bytecode or -native, but this license isn't compatible with many other projects, so we disregard it. - -We ignore the provided codec for snappy, `org.apache.hadoop.io.compress.SnappyCodec`, since it requires native code and -doesn't provide all supported platforms. Instead, the configuration is modified to replace this ServiceLoader-provided -implementation with the pure-java implementation `io.airlift.compress.snappy.SnappyCodec`. This can be disabled through -deephaven configuration settings. +which requires an external dependency that tries to load native code (but can fall back to bytecode). + +Two implementations aren't provided at all in hadoop-common or parquet-hadoop: +* `org.apache.hadoop.io.compress.BrotliCodec` - No implementation is available of this in Maven Central, though other +repositories have an implementation. For our testing, we use `jbrotli-native-darwin-x86-amd64` that is limited to only +native implementations for x86 platforms. +* `com.hadoop.compression.lzo.LzoCodec` - There are GPL implementations of the LZO codec available, either bytecode or +native, but this license isn't compatible with many other projects, so we disregard it. Instead, we use +`io.airlift.compress.lzo.LzoCodec`, that is shared under APACHE-2.0 license. + +We also ignore the provided codec for snappy, `org.apache.hadoop.io.compress.SnappyCodec`, since it's not compatible with +other parquet implementations which claim to use Snappy. Instead, the configuration is modified to replace this +ServiceLoader-provided implementation with `org.apache.parquet.hadoop.codec.SnappyCodec`, which is the classname +hardcoded in `org.apache.parquet.compression.CompressionCodecFactory`. Note that `org.apache.parquet.hadoop.metadata.ColumnChunkMetaData` instances created by Deephaven table writing code do still require `CompressionCodecName` instances, which means that we must still have a way to translate our own codecs into this enum's values, and only officially supported compression codecs can ever be used to write. -So, this project offers codecs from org.apache.hadoop.io.compress.CompressionCodecFactory, with configuration options, +So, this project offers codecs from `org.apache.hadoop.io.compress.CompressionCodecFactory`, with configuration options, and utilities to map back to official codec names. diff --git a/extensions/parquet/compression/build.gradle b/extensions/parquet/compression/build.gradle index b71871e5516..8ee25720b66 100644 --- a/extensions/parquet/compression/build.gradle +++ b/extensions/parquet/compression/build.gradle @@ -6,7 +6,6 @@ plugins { dependencies { api project(':Util') - // TODO(deephaven-core#3148): LZ4_RAW parquet support Classpaths.inheritParquetHadoop(project) Classpaths.inheritParquetHadoopConfiguration(project) diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index 5b506d5f1fc..2d0ee895fd7 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -18,6 +18,7 @@ import org.apache.hadoop.io.compress.Decompressor; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.codec.Lz4RawCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; @@ -61,6 +62,10 @@ private static DeephavenCompressorAdapterFactory createInstance() { // does use platform-specific implementations, but has native implementations for the // platforms we support today. SnappyCodec.class, CompressionCodecName.SNAPPY, + + // Use the Parquet LZ4_RAW codec, which internally uses aircompressor + Lz4RawCodec.class, CompressionCodecName.LZ4_RAW, + // The rest of these are aircompressor codecs which have fast / pure java implementations JdkGzipCodec.class, CompressionCodecName.GZIP, LzoCodec.class, CompressionCodecName.LZO, @@ -82,14 +87,14 @@ private static DeephavenCompressorAdapterFactory createInstance() { return new DeephavenCompressorAdapterFactory(factory, Collections.unmodifiableMap(codecToNames)); } - private static class CodecWrappingCompressorAdapter implements CompressorAdapter { + static class CodecWrappingCompressorAdapter implements CompressorAdapter { private final CompressionCodec compressionCodec; private final CompressionCodecName compressionCodecName; private boolean innerCompressorPooled; private Compressor innerCompressor; - private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec, + CodecWrappingCompressorAdapter(CompressionCodec compressionCodec, CompressionCodecName compressionCodecName) { this.compressionCodec = Objects.requireNonNull(compressionCodec); this.compressionCodecName = Objects.requireNonNull(compressionCodecName); @@ -178,19 +183,27 @@ private DeephavenCompressorAdapterFactory(CompressionCodecFactory compressionCod } /** - * Returns a compressor with the given codec name. + * Returns a compressor with the given codec name. The returned adapter can internally stateful in some cases and + * therefore a single instance should not be re-used across files (check + * {@link LZ4WithLZ4RawBackupCompressorAdapter} for more details). * * @param codecName the name of the codec to search for. * @return a compressor instance with a name matching the given codec. */ - public CompressorAdapter getByName(String codecName) { + public CompressorAdapter getByName(final String codecName) { if (codecName.equalsIgnoreCase("UNCOMPRESSED")) { return CompressorAdapter.PASSTHRU; } - final CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName); + CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName); if (codec == null) { - throw new IllegalArgumentException( - String.format("Failed to find CompressionCodec for codecName=%s", codecName)); + if (codecName.equalsIgnoreCase("LZ4_RAW")) { + // Hacky work-around since codec factory refers to LZ4_RAW as LZ4RAW + codec = compressionCodecFactory.getCodecByName("LZ4RAW"); + } + if (codec == null) { + throw new IllegalArgumentException( + String.format("Failed to find CompressionCodec for codecName=%s", codecName)); + } } final CompressionCodecName ccn = codecClassnameToCodecName.get(codec.getClass().getName()); if (ccn == null) { @@ -198,6 +211,9 @@ public CompressorAdapter getByName(String codecName) { "Failed to find CompressionCodecName for codecName=%s, codec=%s, codec.getDefaultExtension()=%s", codecName, codec, codec.getDefaultExtension())); } + if (ccn == CompressionCodecName.LZ4) { + return new LZ4WithLZ4RawBackupCompressorAdapter(codec, ccn); + } return new CodecWrappingCompressorAdapter(codec, ccn); } } diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/LZ4WithLZ4RawBackupCompressorAdapter.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/LZ4WithLZ4RawBackupCompressorAdapter.java new file mode 100644 index 00000000000..fb08e723cdd --- /dev/null +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/LZ4WithLZ4RawBackupCompressorAdapter.java @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.parquet.compress; + +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * This is the default adapter for LZ4 files. It attempts to decompress with LZ4 and falls back to LZ4_RAW on failure. + * After that, it always uses LZ4_RAW for decompression. This fallback mechanism is particularly useful for + * decompressing parquet files that are compressed with LZ4_RAW but tagged as LZ4 in the metadata. This adapter is + * internally stateful in some cases and therefore a single instance should not be re-used across files. + */ +class LZ4WithLZ4RawBackupCompressorAdapter extends DeephavenCompressorAdapterFactory.CodecWrappingCompressorAdapter { + private enum DecompressionMode { + INIT, LZ4, LZ4_RAW + } + + private DecompressionMode mode = DecompressionMode.INIT; + + /** + * Only initialized if we hit an exception while decompressing with LZ4. + */ + private CompressorAdapter lz4RawAdapter = null; + + LZ4WithLZ4RawBackupCompressorAdapter(CompressionCodec compressionCodec, + CompressionCodecName compressionCodecName) { + super(compressionCodec, compressionCodecName); + } + + @Override + public BytesInput decompress(final InputStream inputStream, final int compressedSize, + final int uncompressedSize) throws IOException { + if (mode == DecompressionMode.LZ4) { + return super.decompress(inputStream, compressedSize, uncompressedSize); + } + if (mode == DecompressionMode.LZ4_RAW) { + // LZ4_RAW adapter should have been initialized if we hit this case. + return lz4RawAdapter.decompress(inputStream, compressedSize, uncompressedSize); + } + // Buffer input data in case we need to retry with LZ4_RAW. + final BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, compressedSize); + bufferedInputStream.mark(compressedSize); + BytesInput ret; + try { + ret = super.decompress(bufferedInputStream, compressedSize, uncompressedSize); + mode = DecompressionMode.LZ4; + } catch (IOException e) { + super.reset(); + bufferedInputStream.reset(); + lz4RawAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName("LZ4_RAW"); + ret = lz4RawAdapter.decompress(bufferedInputStream, compressedSize, uncompressedSize); + mode = DecompressionMode.LZ4_RAW; + } + return ret; + } + + @Override + public void reset() { + super.reset(); + if (lz4RawAdapter != null) { + lz4RawAdapter.reset(); + } + } + + @Override + public void close() { + super.close(); + if (lz4RawAdapter != null) { + lz4RawAdapter.close(); + } + } +} diff --git a/extensions/parquet/table/src/brotliTest/java/io/deephaven/parquet/table/BrotliParquetTableReadWriteTest.java b/extensions/parquet/table/src/brotliTest/java/io/deephaven/parquet/table/BrotliParquetTableReadWriteTest.java index 2c403f4129a..68bca70ff5c 100644 --- a/extensions/parquet/table/src/brotliTest/java/io/deephaven/parquet/table/BrotliParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/brotliTest/java/io/deephaven/parquet/table/BrotliParquetTableReadWriteTest.java @@ -94,24 +94,17 @@ public boolean equals(Object obj) { } } - private void compressionCodecTestHelper(final String codec) { - final String currentCodec = ParquetInstructions.getDefaultCompressionCodecName(); - try { - ParquetInstructions.setDefaultCompressionCodecName(codec); - String path = rootFile + File.separator + "Table1.parquet"; - final Table table1 = getTableFlat(10000, false); - ParquetTools.writeTable(table1, path); - assertTrue(new File(path).length() > 0); - final Table table2 = ParquetTools.readTable(path); - TstUtils.assertTableEquals(table1, table2); - } finally { - ParquetInstructions.setDefaultCompressionCodecName(currentCodec); - } + private void compressionCodecTestHelper(final ParquetInstructions codec) { + File dest = new File(rootFile + File.separator + "Table1.parquet"); + final Table table1 = getTableFlat(10000, false); + ParquetTools.writeTable(table1, dest, codec); + assertTrue(dest.length() > 0L); + final Table table2 = ParquetTools.readTable(dest); + TstUtils.assertTableEquals(table1, table2); } @Test public void testParquetBrotliCompressionCodec() { - compressionCodecTestHelper("BROTLI"); - + compressionCodecTestHelper(ParquetTools.BROTLI); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index ee15e2cd399..c455b5acaa0 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -30,9 +30,10 @@ public abstract class ParquetInstructions implements ColumnToCodecMappings { /** * Set the default for {@link #getCompressionCodecName()}. * + * @deprecated Use {@link Builder#setCompressionCodecName(String)} instead. * @param name The new default - * @see Builder#setCompressionCodecName(String) */ + @Deprecated public static void setDefaultCompressionCodecName(final String name) { defaultCompressionCodecName = name; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 6bfe5ed8673..0efadb57f16 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -841,14 +841,33 @@ public static Pair>, ParquetInstructions> convertSchema s -> s.replace(" ", "_"), takenNames))); } + public static final ParquetInstructions UNCOMPRESSED = + ParquetInstructions.builder().setCompressionCodecName("UNCOMPRESSED").build(); + + /** + * @deprecated Use LZ4_RAW instead, as explained + * here + */ + @Deprecated public static final ParquetInstructions LZ4 = ParquetInstructions.builder().setCompressionCodecName("LZ4").build(); + public static final ParquetInstructions LZ4_RAW = + ParquetInstructions.builder().setCompressionCodecName("LZ4_RAW").build(); public static final ParquetInstructions LZO = ParquetInstructions.builder().setCompressionCodecName("LZO").build(); public static final ParquetInstructions GZIP = ParquetInstructions.builder().setCompressionCodecName("GZIP").build(); public static final ParquetInstructions ZSTD = ParquetInstructions.builder().setCompressionCodecName("ZSTD").build(); + public static final ParquetInstructions SNAPPY = + ParquetInstructions.builder().setCompressionCodecName("SNAPPY").build(); + public static final ParquetInstructions BROTLI = + ParquetInstructions.builder().setCompressionCodecName("BROTLI").build(); public static final ParquetInstructions LEGACY = ParquetInstructions.builder().setIsLegacyParquet(true).build(); + /** + * @deprecated Do not use this method, instead pass the above codecs as arguments to + * {@link #writeTable(Table, File, ParquetInstructions)} method + */ + @Deprecated public static void setDefaultCompressionCodecName(final String compressionCodecName) { ParquetInstructions.setDefaultCompressionCodecName(compressionCodecName); } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 24a878da4ec..c962e3882f6 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -320,52 +320,81 @@ public void groupingByBigInt() { TestCase.assertNotNull(fromDisk.getColumnSource("someBigInt").getGroupToRange()); } - private void compressionCodecTestHelper(final String codec) { - final String currentCodec = ParquetInstructions.getDefaultCompressionCodecName(); - try { - ParquetInstructions.setDefaultCompressionCodecName(codec); - String path = rootFile + File.separator + "Table1.parquet"; - final Table table1 = getTableFlat(10000, false, true); - ParquetTools.writeTable(table1, path); - assertTrue(new File(path).length() > 0); - final Table table2 = ParquetTools.readTable(path); - TstUtils.assertTableEquals(maybeFixBigDecimal(table1), table2); - } finally { - ParquetInstructions.setDefaultCompressionCodecName(currentCodec); - } + private void compressionCodecTestHelper(final ParquetInstructions codec) { + File dest = new File(rootFile + File.separator + "Table1.parquet"); + final Table table1 = getTableFlat(10000, false, true); + ParquetTools.writeTable(table1, dest, codec); + assertTrue(dest.length() > 0L); + final Table table2 = ParquetTools.readTable(dest); + TstUtils.assertTableEquals(maybeFixBigDecimal(table1), table2); + } + + @Test + public void testParquetUncompressedCompressionCodec() { + compressionCodecTestHelper(ParquetTools.UNCOMPRESSED); } @Test public void testParquetLzoCompressionCodec() { - compressionCodecTestHelper("LZO"); + compressionCodecTestHelper(ParquetTools.LZO); } @Test public void testParquetLz4CompressionCodec() { - compressionCodecTestHelper("LZ4"); + compressionCodecTestHelper(ParquetTools.LZ4); + } + + @Test + public void test_lz4_compressed() { + // Write and read a LZ4 compressed file + File dest = new File(rootFile + File.separator + "Table.parquet"); + final Table table = getTableFlat(100, false, false); + ParquetTools.writeTable(table, dest, ParquetTools.LZ4); + Table fromDisk = ParquetTools.readTable(dest).select(); + TstUtils.assertTableEquals(fromDisk, table); + + try { + // The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with + // LZ4_RAW. We should be able to read it anyway with no exceptions. + String path = TestParquetTools.class.getResource("/sample_lz4_compressed.parquet").getFile(); + fromDisk = ParquetTools.readTable(path).select(); + File randomDest = new File(rootFile, "random.parquet"); + ParquetTools.writeTable(fromDisk, randomDest, ParquetTools.LZ4_RAW); + } catch (RuntimeException e) { + TestCase.fail("Failed to read parquet file sample_lz4_compressed.parquet"); + } + + // Read the LZ4 compressed file again, to make sure we use a new adapter + fromDisk = ParquetTools.readTable(dest).select(); + TstUtils.assertTableEquals(fromDisk, table); + } + + @Test + public void testParquetLz4RawCompressionCodec() { + compressionCodecTestHelper(ParquetTools.LZ4_RAW); } @Ignore("See BrotliParquetReadWriteTest instead") @Test public void testParquetBrotliCompressionCodec() { - compressionCodecTestHelper("BROTLI"); + compressionCodecTestHelper(ParquetTools.BROTLI); } @Test public void testParquetZstdCompressionCodec() { - compressionCodecTestHelper("ZSTD"); + compressionCodecTestHelper(ParquetTools.ZSTD); } @Test public void testParquetGzipCompressionCodec() { - compressionCodecTestHelper("GZIP"); + compressionCodecTestHelper(ParquetTools.GZIP); } @Test public void testParquetSnappyCompressionCodec() { // while Snappy is covered by other tests, this is a very fast test to quickly confirm that it works in the same // way as the other similar codec tests. - compressionCodecTestHelper("SNAPPY"); + compressionCodecTestHelper(ParquetTools.SNAPPY); } @Test @@ -2355,6 +2384,5 @@ private void assertBigIntegerColumnStatistics(SerialObjectColumnIteratorN|T4;eU z;D8Y4p85ej@da?}q2-7WC&U*Z&K#Jr;}B>#Ak?R5XZOwQ+qbiFI;!iVfh9cKq+rYf zd=u|p{I~;z_oIk_vsfsjMHMERu5D!*+sbMs%uQiQ$J_;8Nr)q%d*jiM< zoV4VDkROEC@T`APb;|4&H6UjkfP;(0>JLqBo&Qpi`rJJ7w?y8GMCsV*v5Zc>xDk*7gGP!Em{1?&|AItbuYJs>j$l#|WO zGPZSe$S4Nf%;PLEducN@eI2~-ri;0eSAE-0&3+d0lkmRIc)K&y(Bm3^QJ!)Y&1qJ_ zHn*aCgNJ=Isl(?qel@^+GVuX(G~ts$T8`^1p5*)f@5`KUx8z$xT6BMk1pvpj^Qzlz z-@cS~&)(P0GcLx8u)+i3x>7qTjMWvTVpU60DkQ4K$(mAEIflO?tvD4{m~ZotNYyr2 zniipYo{+bAq