Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for LZ4_RAW compression codec for Parquet #4446

Merged
merged 18 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@
import java.io.InputStream;

/**
* This is the default adapter for LZ4 data. It attempts to decompress with LZ4, and falls back to LZ4_RAW on failure.
* The fallback mechanism is particularly useful for decompressing parquet files that are compressed with LZ4_RAW but
* tagged as LZ4 in the metadata.
* This is the default adapter for LZ4 files. It attempts to decompress with LZ4 and falls back to LZ4_RAW on failure.
* After that, it always uses LZ4_RAW for decompression. This fallback mechanism is particularly useful for
* decompressing parquet files that are compressed with LZ4_RAW but tagged as LZ4 in the metadata. This adapter is
* internally stateful in some cases and therefore a single instance should not be re-used across files.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
class LZ4WithLZ4RawBackupCompressorAdapter extends DeephavenCompressorAdapterFactory.CodecWrappingCompressorAdapter {
private CompressorAdapter lz4RawAdapter = null; // Lazily initialized
private enum DecompressionMode {
INIT, LZ4, LZ4_RAW
}

private DecompressionMode mode = DecompressionMode.INIT;

/**
* Only initialized if we hit an exception while decompressing with LZ4.
*/
private CompressorAdapter lz4RawAdapter = null;

LZ4WithLZ4RawBackupCompressorAdapter(CompressionCodec compressionCodec,
CompressionCodecName compressionCodecName) {
Expand All @@ -27,19 +37,28 @@ class LZ4WithLZ4RawBackupCompressorAdapter extends DeephavenCompressorAdapterFac
@Override
public BytesInput decompress(final InputStream inputStream, final int compressedSize,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final int uncompressedSize) throws IOException {
if (mode == DecompressionMode.LZ4) {
return super.decompress(inputStream, compressedSize, uncompressedSize);
}
if (mode == DecompressionMode.LZ4_RAW) {
assert lz4RawAdapter != null;
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
return lz4RawAdapter.decompress(inputStream, compressedSize, uncompressedSize);
}
// Buffer input data in case we need to retry with LZ4_RAW.
final BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, compressedSize);
bufferedInputStream.mark(compressedSize);
BytesInput ret;
try {
return super.decompress(bufferedInputStream, compressedSize, uncompressedSize);
ret = super.decompress(bufferedInputStream, compressedSize, uncompressedSize);
mode = DecompressionMode.LZ4;
} catch (IOException e) {
super.reset();
bufferedInputStream.reset();
if (lz4RawAdapter == null) {
lz4RawAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName("LZ4_RAW");
}
return lz4RawAdapter.decompress(bufferedInputStream, compressedSize, uncompressedSize);
lz4RawAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName("LZ4_RAW");
ret = lz4RawAdapter.decompress(bufferedInputStream, compressedSize, uncompressedSize);
mode = DecompressionMode.LZ4_RAW;
}
return ret;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,16 +343,27 @@ public void testParquetLz4CompressionCodec() {

@Test
public void test_lz4_compressed() {
// The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with LZ4_RAW
// We should be able to read it anyway with no exceptions.
// Write and read a LZ4 compressed file
File dest = new File(rootFile + File.separator + "Table.parquet");
final Table table = getTableFlat(100, false, false);
ParquetTools.writeTable(table, dest, ParquetTools.LZ4);
Table fromDisk = ParquetTools.readTable(dest).select();
TstUtils.assertTableEquals(fromDisk, table);

try {
// The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with
// LZ4_RAW. We should be able to read it anyway with no exceptions.
String path = TestParquetTools.class.getResource("/sample_lz4_compressed.parquet").getFile();
final Table fromDisk = ParquetTools.readTable(path).select();
final File dest = new File(rootFile, "random.parquet");
ParquetTools.writeTable(fromDisk, dest, ParquetTools.LZ4_RAW);
fromDisk = ParquetTools.readTable(path).select();
File randomDest = new File(rootFile, "random.parquet");
ParquetTools.writeTable(fromDisk, randomDest, ParquetTools.LZ4_RAW);
} catch (RuntimeException e) {
TestCase.fail("Failed to read LZ4 compressed parquet file");
TestCase.fail("Failed to read parquet file sample_lz4_compressed.parquet");
}

// Read the LZ4 compressed file again, to make sure we use a new adapter
fromDisk = ParquetTools.readTable(dest).select();
TstUtils.assertTableEquals(fromDisk, table);
}

@Test
Expand Down
Loading