diff --git a/java/core/pom.xml b/java/core/pom.xml index 3c6fc0f4e9..a750911b7f 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -76,6 +76,10 @@ org.threeten threeten-extra + + com.aayushatharva.brotli4j + brotli4j + diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/org/apache/orc/CompressionKind.java index 3395298108..f5615acf80 100644 --- a/java/core/src/java/org/apache/orc/CompressionKind.java +++ b/java/core/src/java/org/apache/orc/CompressionKind.java @@ -23,5 +23,5 @@ * can be applied to ORC files. */ public enum CompressionKind { - NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD + NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD, BROTLI } diff --git a/java/core/src/java/org/apache/orc/impl/BrotliCodec.java b/java/core/src/java/org/apache/orc/impl/BrotliCodec.java new file mode 100644 index 0000000000..6e45d1a5f0 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/BrotliCodec.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import com.aayushatharva.brotli4j.Brotli4jLoader; +import com.aayushatharva.brotli4j.decoder.DecoderJNI; +import com.aayushatharva.brotli4j.encoder.Encoder; +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class BrotliCodec implements CompressionCodec, DirectDecompressionCodec { + // load jni library. + static { + Brotli4jLoader.ensureAvailability(); + } + + public BrotliCodec() { + } + + static class BrotliOptions implements Options { + + private Encoder.Mode mode = Encoder.Mode.GENERIC; + private int quality = -1; + private int lgwin = -1; + + BrotliOptions() { + + } + + BrotliOptions(int quality, int lgwin, Encoder.Mode mode) { + this.quality = quality; + this.lgwin = lgwin; + this.mode = mode; + } + + @Override + public Options copy() { + return new BrotliOptions(quality, lgwin, mode); + } + + @Override + public Options setSpeed(SpeedModifier newValue) { + switch (newValue) { + case FAST: + // best speed + 1. + quality = 1; + break; + case DEFAULT: + // best quality. Keep default with default value. + quality = -1; + break; + case FASTEST: + // best speed. + quality = 0; + break; + default: + break; + } + return this; + } + + @Override + public Options setData(DataKind newValue) { + switch (newValue) { + case BINARY: + mode = Encoder.Mode.GENERIC; + break; + case TEXT: + mode = Encoder.Mode.TEXT; + break; + default: + break; + } + return this; + } + + public Encoder.Parameters brotli4jParameter() { + return new Encoder.Parameters() + .setQuality(quality).setWindow(lgwin).setMode(mode); + } + } + + private static final BrotliCodec.BrotliOptions DEFAULT_OPTIONS = new BrotliOptions(); + + @Override + public Options getDefaultOptions() { + return DEFAULT_OPTIONS; + } + + @Override + public boolean compress( + ByteBuffer in, + ByteBuffer out, + ByteBuffer overflow, + Options options) throws IOException { + BrotliOptions brotliOptions = (BrotliOptions) options; + int inBytes = in.remaining(); + byte[] compressed = Encoder.compress( + in.array(), in.arrayOffset() + in.position(), inBytes, brotliOptions.brotli4jParameter()); + int outBytes = compressed.length; + if (outBytes < inBytes) { + int remaining = out.remaining(); + if (remaining >= outBytes) { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), outBytes); + out.position(out.position() + outBytes); + } else { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), remaining); + out.position(out.limit()); + System.arraycopy(compressed, remaining, overflow.array(), + overflow.arrayOffset(), outBytes - remaining); + overflow.position(outBytes - remaining); + } + return true; + } else { + return false; + } + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + int compressedBytes = in.remaining(); + DecoderJNI.Wrapper decoder = new DecoderJNI.Wrapper(compressedBytes); + try { + decoder.getInputBuffer().put(in); + decoder.push(compressedBytes); + while (decoder.getStatus() != DecoderJNI.Status.DONE) { + switch (decoder.getStatus()) { + case OK: + decoder.push(0); + break; + + case NEEDS_MORE_OUTPUT: + ByteBuffer buffer = decoder.pull(); + out.put(buffer); + break; + + case NEEDS_MORE_INPUT: + // Give decoder a chance to process the remaining of the buffered byte. + decoder.push(0); + // If decoder still needs input, this means that stream is truncated. + if (decoder.getStatus() == DecoderJNI.Status.NEEDS_MORE_INPUT) { + return; + } + break; + + default: + return; + } + } + } finally { + out.flip(); + decoder.destroy(); + } + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public CompressionKind getKind() { + return CompressionKind.BROTLI; + } + + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException { + // decompress work well for both direct and heap. + decompress(in, out); + } + + @Override + public void reset() { + } + + @Override + public void destroy() { + } + + @Override + public void close() { + OrcCodecPool.returnCodec(CompressionKind.BROTLI, this); + } +} diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index f2e150cea5..b80094b581 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -654,6 +654,7 @@ private static OrcProto.PostScript extractPostScript(BufferChunk buffer, case LZO: case LZ4: case ZSTD: + case BROTLI: break; default: throw new IllegalArgumentException("Unknown compression"); diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index b5bb64387f..9100aa3741 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -290,6 +290,8 @@ public static CompressionCodec createCodec(CompressionKind kind) { case ZSTD: return new AircompressorCodec(kind, new ZstdCompressor(), new ZstdDecompressor()); + case BROTLI: + return new BrotliCodec(); default: throw new IllegalArgumentException("Unknown compression codec: " + kind); @@ -579,6 +581,7 @@ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { case LZO: return OrcProto.CompressionKind.LZO; case LZ4: return OrcProto.CompressionKind.LZ4; case ZSTD: return OrcProto.CompressionKind.ZSTD; + case BROTLI: return OrcProto.CompressionKind.BROTLI; default: throw new IllegalArgumentException("Unknown compression " + kind); } diff --git a/java/core/src/test/org/apache/orc/impl/TestBrotli.java b/java/core/src/test/org/apache/orc/impl/TestBrotli.java new file mode 100644 index 0000000000..e5d23ca452 --- /dev/null +++ b/java/core/src/test/org/apache/orc/impl/TestBrotli.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.*; + +public class TestBrotli { + @Test + public void testOutputLargerThanBefore() { + ByteBuffer in = ByteBuffer.allocate(10); + ByteBuffer out = ByteBuffer.allocate(10); + in.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 10}); + in.flip(); + try (BrotliCodec brotliCodec = new BrotliCodec()) { + // The compressed data length is larger than the original data. + assertFalse(brotliCodec.compress(in, out, null, + brotliCodec.getDefaultOptions())); + } catch (Exception e) { + fail(e); + } + } + + @Test + public void testCompress() { + ByteBuffer in = ByteBuffer.allocate(10000); + ByteBuffer out = ByteBuffer.allocate(500); + ByteBuffer result = ByteBuffer.allocate(10000); + for (int i = 0; i < 10000; i++) { + in.put((byte) i); + } + in.flip(); + try (BrotliCodec brotliCodec = new BrotliCodec()) { + assertTrue(brotliCodec.compress(in, out, null, + brotliCodec.getDefaultOptions())); + out.flip(); + brotliCodec.decompress(out, result); + assertArrayEquals(result.array(), in.array()); + } catch (Exception e) { + fail(e); + } + } + + @Test + public void testCompressNotFromStart() { + ByteBuffer in = ByteBuffer.allocate(10000); + ByteBuffer out = ByteBuffer.allocate(10000); + ByteBuffer result = ByteBuffer.allocate(10000); + for (int i = 0; i < 10000; i++) { + in.put((byte) i); + } + in.flip(); + in.get(); + + ByteBuffer slice = in.slice(); + byte[] originalBytes = new byte[slice.remaining()]; + slice.get(originalBytes); + + try (BrotliCodec brotliCodec = new BrotliCodec()) { + // The compressed data length is larger than the original data. + assertTrue(brotliCodec.compress(in, out, null, + brotliCodec.getDefaultOptions())); + + out.flip(); + brotliCodec.decompress(out, result); + + byte[] resultArray = new byte[result.remaining()]; + result.get(resultArray); + assertArrayEquals(resultArray, originalBytes); + } catch (Exception e) { + fail(e); + } + } + + @Test + public void testCompressWithOverflow() { + ByteBuffer in = ByteBuffer.allocate(10000); + ByteBuffer out = ByteBuffer.allocate(1); + ByteBuffer overflow = ByteBuffer.allocate(10000); + ByteBuffer result = ByteBuffer.allocate(10000); + for (int i = 0; i < 10000; i++) { + in.put((byte) i); + } + in.flip(); + try (BrotliCodec brotliCodec = new BrotliCodec()) { + assertTrue(brotliCodec.compress(in, out, overflow, + brotliCodec.getDefaultOptions())); + out.flip(); + overflow.flip(); + + // copy out, overflow to compressed + byte[] compressed = new byte[out.remaining() + overflow.remaining()]; + System.arraycopy(out.array(), out.arrayOffset() + out.position(), compressed, 0, out.remaining()); + System.arraycopy(overflow.array(), overflow.arrayOffset() + overflow.position(), compressed, out.remaining(), overflow.remaining()); + // decompress compressedBuffer and check the result. + ByteBuffer compressedBuffer = ByteBuffer.allocate(compressed.length); + compressedBuffer.put(compressed); + compressedBuffer.flip(); + brotliCodec.decompress(compressedBuffer, result); + assertArrayEquals(result.array(), in.array()); + } catch (Exception e) { + fail(e); + } + } + + @Test + public void testDirectDecompress() { + ByteBuffer in = ByteBuffer.allocate(10000); + ByteBuffer out = ByteBuffer.allocate(10000); + ByteBuffer directOut = ByteBuffer.allocateDirect(10000); + ByteBuffer directResult = ByteBuffer.allocateDirect(10000); + for (int i = 0; i < 10000; i++) { + in.put((byte) i); + } + in.flip(); + try (BrotliCodec brotliCodec = new BrotliCodec()) { + // write bytes to heap buffer. + assertTrue(brotliCodec.compress(in, out, null, + brotliCodec.getDefaultOptions())); + out.flip(); + // copy heap buffer to direct buffer. + directOut.put(out.array()); + directOut.flip(); + + brotliCodec.decompress(directOut, directResult); + + // copy result from direct buffer to heap. + byte[] heapBytes = new byte[in.array().length]; + directResult.get(heapBytes, 0, directResult.limit()); + + assertArrayEquals(in.array(), heapBytes); + } catch (Exception e) { + fail(e); + } + } +} diff --git a/java/pom.xml b/java/pom.xml index d597144f70..b3ef977724 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -60,6 +60,7 @@ + 1.15.0 10.12.0 ${project.basedir}/../../examples 3.3.6 @@ -202,6 +203,11 @@ threeten-extra 1.7.1 + + com.aayushatharva.brotli4j + brotli4j + ${brotli4j.version} +