Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for LZ4_RAW compression codec for Parquet #4446

Merged
merged 18 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions extensions/parquet/compression/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,35 @@ This project abstracts over parquet's expected compression formats in a way to m
parquet reader/writer code to be able to use them.

There are two types in parquet that ostensibly offer compression codecs,
* org.apache.parquet.compression.CompressionCodecFactory, which depends on
* org.apache.hadoop.io.compress.CompressionCodecFactory
* `org.apache.parquet.compression.CompressionCodecFactory`, which depends on
* `org.apache.hadoop.io.compress.CompressionCodecFactory`

With no other information, it would seem like the parquet version gets its base functionality from the more general
hadoop type, and while it is true that both factories provide access to org.apache.hadoop.io.compress.CompressionCodec
hadoop type, and while it is true that both factories provide access to `org.apache.hadoop.io.compress.CompressionCodec`
instances, the parquet implementation disregards the hadoop implementation's ability to select codecs from either
configuration or from the classpath (via service loader), and instead relies on hardcoded fully-qualified classnames
found in `org.apache.parquet.hadoop.metadata.CompressionCodecName`.
found in `org.apache.parquet.hadoop.metadata.CompressionCodecName`. That is why we use the hadoop implementation.

Of those, most are present in the hadoop-common or parquet-hadoop. Most of these are self-contained and entirely
Most of these codecs are present in hadoop-common or parquet-hadoop, and are self-contained and entirely
implemented in bytecode. One counter-example would be the LZ4 codec, provided by `org.apache.hadoop.io.compress.Lz4Codec`,
which requires an external dependency that tries to load native code (but can fall back to bytecode). Two implementations
aren't provided at all:
* org.apache.hadoop.io.compress.BrotliCodec - No implementation is available of this in Maven Central, though other
repositories have an implementation, though one that is limited to only native implementations for x86 platforms.
* com.hadoop.compression.lzo.LzoCodec - There are GPL implementations of the LZO codec available, either bytecode or
native, but this license isn't compatible with many other projects, so we disregard it.

We ignore the provided codec for snappy, `org.apache.hadoop.io.compress.SnappyCodec`, since it requires native code and
doesn't provide all supported platforms. Instead, the configuration is modified to replace this ServiceLoader-provided
implementation with the pure-java implementation `io.airlift.compress.snappy.SnappyCodec`. This can be disabled through
deephaven configuration settings.
which requires an external dependency that tries to load native code (but can fall back to bytecode).

Two implementations aren't provided at all in hadoop-common or parquet-hadoop:
* `org.apache.hadoop.io.compress.BrotliCodec` - No implementation is available of this in Maven Central, though other
repositories have an implementation. For our testing, we use `jbrotli-native-darwin-x86-amd64` that is limited to only
native implementations for x86 platforms.
* `com.hadoop.compression.lzo.LzoCodec` - There are GPL implementations of the LZO codec available, either bytecode or
native, but this license isn't compatible with many other projects, so we disregard it. Instead, we use
`io.airlift.compress.lzo.LzoCodec`, that is shared under APACHE-2.0 license.

We also ignore the provided codec for snappy, `org.apache.hadoop.io.compress.SnappyCodec`, since it's not compatible with
other parquet implementations which claim to use Snappy. Instead, the configuration is modified to replace this
ServiceLoader-provided implementation with `org.apache.parquet.hadoop.codec.SnappyCodec`, which is the classname
hardcoded in `org.apache.parquet.compression.CompressionCodecFactory`.

Note that `org.apache.parquet.hadoop.metadata.ColumnChunkMetaData` instances created by Deephaven table writing code
do still require `CompressionCodecName` instances, which means that we must still have a way to translate our own codecs
into this enum's values, and only officially supported compression codecs can ever be used to write.

So, this project offers codecs from org.apache.hadoop.io.compress.CompressionCodecFactory, with configuration options,
So, this project offers codecs from `org.apache.hadoop.io.compress.CompressionCodecFactory`, with configuration options,
and utilities to map back to official codec names.
1 change: 0 additions & 1 deletion extensions/parquet/compression/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ plugins {
dependencies {
api project(':Util')

// TODO(deephaven-core#3148): LZ4_RAW parquet support
Classpaths.inheritParquetHadoop(project)
Classpaths.inheritParquetHadoopConfiguration(project)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.hadoop.codec.SnappyCodec;
import org.apache.parquet.hadoop.codec.Lz4RawCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;
Expand Down Expand Up @@ -61,6 +62,10 @@ private static DeephavenCompressorAdapterFactory createInstance() {
// does use platform-specific implementations, but has native implementations for the
// platforms we support today.
SnappyCodec.class, CompressionCodecName.SNAPPY,

// Use the Parquet LZ4_RAW codec, which internally uses aircompressor
Lz4RawCodec.class, CompressionCodecName.LZ4_RAW,

// The rest of these are aircompressor codecs which have fast / pure java implementations
JdkGzipCodec.class, CompressionCodecName.GZIP,
LzoCodec.class, CompressionCodecName.LZO,
Expand All @@ -82,14 +87,14 @@ private static DeephavenCompressorAdapterFactory createInstance() {
return new DeephavenCompressorAdapterFactory(factory, Collections.unmodifiableMap(codecToNames));
}

private static class CodecWrappingCompressorAdapter implements CompressorAdapter {
static class CodecWrappingCompressorAdapter implements CompressorAdapter {
private final CompressionCodec compressionCodec;
private final CompressionCodecName compressionCodecName;

private boolean innerCompressorPooled;
private Compressor innerCompressor;

private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec,
CodecWrappingCompressorAdapter(CompressionCodec compressionCodec,
CompressionCodecName compressionCodecName) {
this.compressionCodec = Objects.requireNonNull(compressionCodec);
this.compressionCodecName = Objects.requireNonNull(compressionCodecName);
Expand Down Expand Up @@ -178,26 +183,37 @@ private DeephavenCompressorAdapterFactory(CompressionCodecFactory compressionCod
}

/**
* Returns a compressor with the given codec name.
* Returns a compressor with the given codec name. The returned adapter can internally stateful in some cases and
* therefore a single instance should not be re-used across files (check
* {@link LZ4WithLZ4RawBackupCompressorAdapter} for more details).
*
* @param codecName the name of the codec to search for.
* @return a compressor instance with a name matching the given codec.
*/
public CompressorAdapter getByName(String codecName) {
public CompressorAdapter getByName(final String codecName) {
if (codecName.equalsIgnoreCase("UNCOMPRESSED")) {
return CompressorAdapter.PASSTHRU;
}
final CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName);
CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName);
if (codec == null) {
throw new IllegalArgumentException(
String.format("Failed to find CompressionCodec for codecName=%s", codecName));
if (codecName.equalsIgnoreCase("LZ4_RAW")) {
// Hacky work-around since codec factory refers to LZ4_RAW as LZ4RAW
codec = compressionCodecFactory.getCodecByName("LZ4RAW");
}
if (codec == null) {
throw new IllegalArgumentException(
String.format("Failed to find CompressionCodec for codecName=%s", codecName));
}
}
final CompressionCodecName ccn = codecClassnameToCodecName.get(codec.getClass().getName());
if (ccn == null) {
throw new IllegalArgumentException(String.format(
"Failed to find CompressionCodecName for codecName=%s, codec=%s, codec.getDefaultExtension()=%s",
codecName, codec, codec.getDefaultExtension()));
}
if (ccn == CompressionCodecName.LZ4) {
return new LZ4WithLZ4RawBackupCompressorAdapter(codec, ccn);
}
return new CodecWrappingCompressorAdapter(codec, ccn);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.compress;

import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* This is the default adapter for LZ4 files. It attempts to decompress with LZ4 and falls back to LZ4_RAW on failure.
* After that, it always uses LZ4_RAW for decompression. This fallback mechanism is particularly useful for
* decompressing parquet files that are compressed with LZ4_RAW but tagged as LZ4 in the metadata. This adapter is
* internally stateful in some cases and therefore a single instance should not be re-used across files.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
class LZ4WithLZ4RawBackupCompressorAdapter extends DeephavenCompressorAdapterFactory.CodecWrappingCompressorAdapter {
private enum DecompressionMode {
INIT, LZ4, LZ4_RAW
}

private DecompressionMode mode = DecompressionMode.INIT;

/**
* Only initialized if we hit an exception while decompressing with LZ4.
*/
private CompressorAdapter lz4RawAdapter = null;

LZ4WithLZ4RawBackupCompressorAdapter(CompressionCodec compressionCodec,
CompressionCodecName compressionCodecName) {
super(compressionCodec, compressionCodecName);
}

@Override
public BytesInput decompress(final InputStream inputStream, final int compressedSize,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final int uncompressedSize) throws IOException {
if (mode == DecompressionMode.LZ4) {
return super.decompress(inputStream, compressedSize, uncompressedSize);
}
if (mode == DecompressionMode.LZ4_RAW) {
// LZ4_RAW adapter should have been initialized if we hit this case.
return lz4RawAdapter.decompress(inputStream, compressedSize, uncompressedSize);
}
// Buffer input data in case we need to retry with LZ4_RAW.
final BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, compressedSize);
bufferedInputStream.mark(compressedSize);
BytesInput ret;
try {
ret = super.decompress(bufferedInputStream, compressedSize, uncompressedSize);
mode = DecompressionMode.LZ4;
} catch (IOException e) {
super.reset();
bufferedInputStream.reset();
lz4RawAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName("LZ4_RAW");
ret = lz4RawAdapter.decompress(bufferedInputStream, compressedSize, uncompressedSize);
mode = DecompressionMode.LZ4_RAW;
}
return ret;
}

@Override
public void reset() {
super.reset();
if (lz4RawAdapter != null) {
lz4RawAdapter.reset();
}
}

@Override
public void close() {
super.close();
if (lz4RawAdapter != null) {
lz4RawAdapter.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,17 @@ public boolean equals(Object obj) {
}
}

private void compressionCodecTestHelper(final String codec) {
final String currentCodec = ParquetInstructions.getDefaultCompressionCodecName();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
try {
ParquetInstructions.setDefaultCompressionCodecName(codec);
String path = rootFile + File.separator + "Table1.parquet";
final Table table1 = getTableFlat(10000, false);
ParquetTools.writeTable(table1, path);
assertTrue(new File(path).length() > 0);
final Table table2 = ParquetTools.readTable(path);
TstUtils.assertTableEquals(table1, table2);
} finally {
ParquetInstructions.setDefaultCompressionCodecName(currentCodec);
}
private void compressionCodecTestHelper(final ParquetInstructions codec) {
File dest = new File(rootFile + File.separator + "Table1.parquet");
final Table table1 = getTableFlat(10000, false);
ParquetTools.writeTable(table1, dest, codec);
assertTrue(dest.length() > 0L);
final Table table2 = ParquetTools.readTable(dest);
TstUtils.assertTableEquals(table1, table2);
}

@Test
public void testParquetBrotliCompressionCodec() {
compressionCodecTestHelper("BROTLI");

compressionCodecTestHelper(ParquetTools.BROTLI);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public abstract class ParquetInstructions implements ColumnToCodecMappings {
/**
* Set the default for {@link #getCompressionCodecName()}.
*
* @deprecated Use {@link Builder#setCompressionCodecName(String)} instead.
* @param name The new default
* @see Builder#setCompressionCodecName(String)
*/
@Deprecated
public static void setDefaultCompressionCodecName(final String name) {
defaultCompressionCodecName = name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,33 @@ public static Pair<List<ColumnDefinition<?>>, ParquetInstructions> convertSchema
s -> s.replace(" ", "_"), takenNames)));
}

public static final ParquetInstructions UNCOMPRESSED =
ParquetInstructions.builder().setCompressionCodecName("UNCOMPRESSED").build();

/**
* @deprecated Use LZ4_RAW instead, as explained
* <a href="https://github.com/apache/parquet-format/blob/master/Compression.md">here</a>
*/
@Deprecated
public static final ParquetInstructions LZ4 = ParquetInstructions.builder().setCompressionCodecName("LZ4").build();
public static final ParquetInstructions LZ4_RAW =
ParquetInstructions.builder().setCompressionCodecName("LZ4_RAW").build();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
public static final ParquetInstructions LZO = ParquetInstructions.builder().setCompressionCodecName("LZO").build();
public static final ParquetInstructions GZIP =
ParquetInstructions.builder().setCompressionCodecName("GZIP").build();
public static final ParquetInstructions ZSTD =
ParquetInstructions.builder().setCompressionCodecName("ZSTD").build();
public static final ParquetInstructions SNAPPY =
ParquetInstructions.builder().setCompressionCodecName("SNAPPY").build();
public static final ParquetInstructions BROTLI =
ParquetInstructions.builder().setCompressionCodecName("BROTLI").build();
public static final ParquetInstructions LEGACY = ParquetInstructions.builder().setIsLegacyParquet(true).build();

/**
* @deprecated Do not use this method, instead pass the above codecs as arguments to
* {@link #writeTable(Table, File, ParquetInstructions)} method
*/
@Deprecated
public static void setDefaultCompressionCodecName(final String compressionCodecName) {
ParquetInstructions.setDefaultCompressionCodecName(compressionCodecName);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
Loading
Loading