Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for LZ4_RAW compression codec for Parquet #4446

Merged
merged 18 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ParquetHadoop/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ tasks.withType(License) {
}

dependencies {
// TODO(deephaven-core#3148): LZ4_RAW parquet support
api('org.apache.parquet:parquet-hadoop:1.13.0')

// TODO(deephaven-core#806): Remove dependency on hadoop-common
Expand Down
37 changes: 20 additions & 17 deletions extensions/parquet/compression/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,35 @@ This project abstracts over parquet's expected compression formats in a way to m
parquet reader/writer code to be able to use them.

There are two types in parquet that ostensibly offer compression codecs,
* org.apache.parquet.compression.CompressionCodecFactory, which depends on
* org.apache.hadoop.io.compress.CompressionCodecFactory
* `org.apache.parquet.compression.CompressionCodecFactory`, which depends on
* `org.apache.hadoop.io.compress.CompressionCodecFactory`

With no other information, it would seem like the parquet version gets its base functionality from the more general
hadoop type, and while it is true that both factories provide access to org.apache.hadoop.io.compress.CompressionCodec
hadoop type, and while it is true that both factories provide access to `org.apache.hadoop.io.compress.CompressionCodec`
instances, the parquet implementation disregards the hadoop implementation's ability to select codecs from either
configuration or from the classpath (via service loader), and instead relies on hardcoded fully-qualified classnames
found in `org.apache.parquet.hadoop.metadata.CompressionCodecName`.
found in `org.apache.parquet.hadoop.metadata.CompressionCodecName`. That is why we use the hadoop implementation.

Of those, most are present in the hadoop-common or parquet-hadoop. Most of these are self-contained and entirely
Most of these codecs are present in hadoop-common or parquet-hadoop, and are self-contained and entirely
implemented in bytecode. One counter-example would be the LZ4 codec, provided by `org.apache.hadoop.io.compress.Lz4Codec`,
which requires an external dependency that tries to load native code (but can fall back to bytecode). Two implementations
aren't provided at all:
* org.apache.hadoop.io.compress.BrotliCodec - No implementation is available of this in Maven Central, though other
repositories have an implementation, though one that is limited to only native implementations for x86 platforms.
* com.hadoop.compression.lzo.LzoCodec - There are GPL implementations of the LZO codec available, either bytecode or
native, but this license isn't compatible with many other projects, so we disregard it.

We ignore the provided codec for snappy, `org.apache.hadoop.io.compress.SnappyCodec`, since it requires native code and
doesn't provide all supported platforms. Instead, the configuration is modified to replace this ServiceLoader-provided
implementation with the pure-java implementation `io.airlift.compress.snappy.SnappyCodec`. This can be disabled through
deephaven configuration settings.
which requires an external dependency that tries to load native code (but can fall back to bytecode).

Two implementations aren't provided at all in hadoop-common or parquet-hadoop:
* `org.apache.hadoop.io.compress.BrotliCodec` - No implementation is available of this in Maven Central, though other
repositories have an implementation. For our testing, we use `jbrotli-native-darwin-x86-amd64` that is limited to only
native implementations for x86 platforms.
* `com.hadoop.compression.lzo.LzoCodec` - There are GPL implementations of the LZO codec available, either bytecode or
native, but this license isn't compatible with many other projects, so we disregard it. Instead, we use
`io.airlift.compress.lzo.LzoCodec`, that is shared under APACHE-2.0 license.

We also ignore the provided codec for snappy, `org.apache.hadoop.io.compress.SnappyCodec`, since it's not compatible with
other parquet implementations which claim to use Snappy. Instead, the configuration is modified to replace this
ServiceLoader-provided implementation with `org.apache.parquet.hadoop.codec.SnappyCodec`, which is the classname
hardcoded in `org.apache.parquet.compression.CompressionCodecFactory`.

Note that `org.apache.parquet.hadoop.metadata.ColumnChunkMetaData` instances created by Deephaven table writing code
do still require `CompressionCodecName` instances, which means that we must still have a way to translate our own codecs
into this enum's values, and only officially supported compression codecs can ever be used to write.

So, this project offers codecs from org.apache.hadoop.io.compress.CompressionCodecFactory, with configuration options,
So, this project offers codecs from `org.apache.hadoop.io.compress.CompressionCodecFactory`, with configuration options,
and utilities to map back to official codec names.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.hadoop.codec.SnappyCodec;
import org.apache.parquet.hadoop.codec.Lz4RawCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;
Expand Down Expand Up @@ -61,6 +62,10 @@ private static DeephavenCompressorAdapterFactory createInstance() {
// does use platform-specific implementations, but has native implementations for the
// platforms we support today.
SnappyCodec.class, CompressionCodecName.SNAPPY,

// Use the Parquet LZ4_RAW codec, which internally uses aircompressor
Lz4RawCodec.class, CompressionCodecName.LZ4_RAW,

// The rest of these are aircompressor codecs which have fast / pure java implementations
JdkGzipCodec.class, CompressionCodecName.GZIP,
LzoCodec.class, CompressionCodecName.LZO,
Expand Down Expand Up @@ -183,14 +188,20 @@ private DeephavenCompressorAdapterFactory(CompressionCodecFactory compressionCod
* @param codecName the name of the codec to search for.
* @return a compressor instance with a name matching the given codec.
*/
public CompressorAdapter getByName(String codecName) {
public CompressorAdapter getByName(final String codecName) {
if (codecName.equalsIgnoreCase("UNCOMPRESSED")) {
return CompressorAdapter.PASSTHRU;
}
final CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName);
CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName);
if (codec == null) {
throw new IllegalArgumentException(
String.format("Failed to find CompressionCodec for codecName=%s", codecName));
if (codecName.equalsIgnoreCase("LZ4_RAW")) {
// Hacky work-around since codec factory refers to LZ4_RAW as LZ4RAW
codec = compressionCodecFactory.getCodecByName("LZ4RAW");
}
if (codec == null) {
throw new IllegalArgumentException(
String.format("Failed to find CompressionCodec for codecName=%s", codecName));
}
}
final CompressionCodecName ccn = codecClassnameToCodecName.get(codec.getClass().getName());
if (ccn == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,17 @@ public boolean equals(Object obj) {
}
}

private void compressionCodecTestHelper(final String codec) {
final String currentCodec = ParquetInstructions.getDefaultCompressionCodecName();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
try {
ParquetInstructions.setDefaultCompressionCodecName(codec);
String path = rootFile + File.separator + "Table1.parquet";
final Table table1 = getTableFlat(10000, false);
ParquetTools.writeTable(table1, path);
assertTrue(new File(path).length() > 0);
final Table table2 = ParquetTools.readTable(path);
TstUtils.assertTableEquals(table1, table2);
} finally {
ParquetInstructions.setDefaultCompressionCodecName(currentCodec);
}
private void compressionCodecTestHelper(final ParquetInstructions codec) {
File dest = new File(rootFile + File.separator + "Table1.parquet");
final Table table1 = getTableFlat(10000, false);
ParquetTools.writeTable(table1, dest, codec);
assertTrue(dest.length() > 0L);
final Table table2 = ParquetTools.readTable(dest);
TstUtils.assertTableEquals(table1, table2);
}

@Test
public void testParquetBrotliCompressionCodec() {
compressionCodecTestHelper("BROTLI");

compressionCodecTestHelper(ParquetTools.BROTLI);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,33 @@ public static Pair<List<ColumnDefinition<?>>, ParquetInstructions> convertSchema
s -> s.replace(" ", "_"), takenNames)));
}

public static final ParquetInstructions UNCOMPRESSED =
ParquetInstructions.builder().setCompressionCodecName("UNCOMPRESSED").build();

/**
* Deprecated: Use LZ4_RAW instead, as explained
* <a href="https://github.com/apache/parquet-format/blob/master/Compression.md">here</a>
*/
@Deprecated
public static final ParquetInstructions LZ4 = ParquetInstructions.builder().setCompressionCodecName("LZ4").build();
public static final ParquetInstructions LZ4_RAW =
ParquetInstructions.builder().setCompressionCodecName("LZ4_RAW").build();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
public static final ParquetInstructions LZO = ParquetInstructions.builder().setCompressionCodecName("LZO").build();
public static final ParquetInstructions GZIP =
ParquetInstructions.builder().setCompressionCodecName("GZIP").build();
public static final ParquetInstructions ZSTD =
ParquetInstructions.builder().setCompressionCodecName("ZSTD").build();
public static final ParquetInstructions SNAPPY =
ParquetInstructions.builder().setCompressionCodecName("SNAPPY").build();
public static final ParquetInstructions BROTLI =
ParquetInstructions.builder().setCompressionCodecName("BROTLI").build();
public static final ParquetInstructions LEGACY = ParquetInstructions.builder().setIsLegacyParquet(true).build();

/**
* Deprecated: Do not use this method directly, instead pass the above codecs as arguments to
* {@link #writeTable(Table, File, ParquetInstructions)} method
*/
@Deprecated
public static void setDefaultCompressionCodecName(final String compressionCodecName) {
ParquetInstructions.setDefaultCompressionCodecName(compressionCodecName);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,52 +317,56 @@ public void groupingByBigInt() {
TestCase.assertNotNull(fromDisk.getColumnSource("someBigInt").getGroupToRange());
}

private void compressionCodecTestHelper(final String codec) {
final String currentCodec = ParquetInstructions.getDefaultCompressionCodecName();
try {
ParquetInstructions.setDefaultCompressionCodecName(codec);
String path = rootFile + File.separator + "Table1.parquet";
final Table table1 = getTableFlat(10000, false, true);
ParquetTools.writeTable(table1, path);
assertTrue(new File(path).length() > 0);
final Table table2 = ParquetTools.readTable(path);
TstUtils.assertTableEquals(maybeFixBigDecimal(table1), table2);
} finally {
ParquetInstructions.setDefaultCompressionCodecName(currentCodec);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
private void compressionCodecTestHelper(final ParquetInstructions codec) {
File dest = new File(rootFile + File.separator + "Table1.parquet");
final Table table1 = getTableFlat(10000, false, true);
ParquetTools.writeTable(table1, dest, codec);
assertTrue(dest.length() > 0L);
final Table table2 = ParquetTools.readTable(dest);
TstUtils.assertTableEquals(maybeFixBigDecimal(table1), table2);
}

@Test
public void testParquetUncompressedCompressionCodec() {
compressionCodecTestHelper(ParquetTools.UNCOMPRESSED);
}

@Test
public void testParquetLzoCompressionCodec() {
compressionCodecTestHelper("LZO");
compressionCodecTestHelper(ParquetTools.LZO);
}

@Test
public void testParquetLz4CompressionCodec() {
compressionCodecTestHelper("LZ4");
compressionCodecTestHelper(ParquetTools.LZ4);
}

@Test
public void testParquetLz4RawCompressionCodec() {
compressionCodecTestHelper(ParquetTools.LZ4_RAW);
}

@Ignore("See BrotliParquetReadWriteTest instead")
@Test
public void testParquetBrotliCompressionCodec() {
compressionCodecTestHelper("BROTLI");
compressionCodecTestHelper(ParquetTools.BROTLI);
}

@Test
public void testParquetZstdCompressionCodec() {
compressionCodecTestHelper("ZSTD");
compressionCodecTestHelper(ParquetTools.ZSTD);
}

@Test
public void testParquetGzipCompressionCodec() {
compressionCodecTestHelper("GZIP");
compressionCodecTestHelper(ParquetTools.GZIP);
}

@Test
public void testParquetSnappyCompressionCodec() {
// while Snappy is covered by other tests, this is a very fast test to quickly confirm that it works in the same
// way as the other similar codec tests.
compressionCodecTestHelper("SNAPPY");
compressionCodecTestHelper(ParquetTools.SNAPPY);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.file.Files;
import java.util.ArrayList;
Expand Down Expand Up @@ -450,9 +448,8 @@ public void e1() {
final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/gzip.parquet").getFile());
assertTableEquals(uncompressed, gzip);

// TODO(deephaven-core#3585): LZ4_RAW parquet support
// final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/lz4.parquet").getFile());
// assertTableEquals(uncompressed, lz4);
final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/lz4.parquet").getFile());
assertTableEquals(uncompressed, lz4);

final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/snappy.parquet").getFile());
assertTableEquals(uncompressed, snappy);
Expand Down
16 changes: 11 additions & 5 deletions py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,9 @@ def test_round_trip_data(self):
dh_table = self.get_table_data()
self.round_trip_with_compression("UNCOMPRESSED", dh_table)
self.round_trip_with_compression("SNAPPY", dh_table)
# LZO is not fully supported in python/c++
# self.round_trip_with_compression("LZO", dh_table)
# TODO(deephaven-core#3148): LZ4_RAW parquet support
# self.round_trip_with_compression("LZ4", dh_table)
self.round_trip_with_compression("LZO", dh_table)
self.round_trip_with_compression("LZ4", dh_table)
self.round_trip_with_compression("LZ4_RAW", dh_table)
self.round_trip_with_compression("GZIP", dh_table)
self.round_trip_with_compression("ZSTD", dh_table)

Expand All @@ -262,6 +261,10 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c
result_table = read('data_from_dh.parquet')
self.assert_table_equals(dh_table, result_table)

# LZO is not fully supported in pyarrow, so we can't do the rest of the tests
if compression_codec_name is 'LZO':
return

# Read the parquet file as a pandas dataframe, convert it to deephaven table and compare
if pandas.__version__.split('.')[0] == "1":
dataframe = pandas.read_parquet("data_from_dh.parquet", use_nullable_dtypes=True)
Expand All @@ -285,8 +288,11 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c
self.assert_table_equals(dh_table, result_table)

# Write the pandas dataframe back to parquet (via pyarraow) and read it back using deephaven.parquet to compare
# Pandas references LZ4_RAW as LZ4, so we need to convert the name
dataframe.to_parquet('data_from_pandas.parquet',
compression=None if compression_codec_name is 'UNCOMPRESSED' else compression_codec_name)
compression=None if compression_codec_name is 'UNCOMPRESSED' else
"LZ4" if compression_codec_name is 'LZ4_RAW'
else compression_codec_name)
result_table = read('data_from_pandas.parquet')
self.assert_table_equals(dh_table, result_table)

Expand Down
Loading