Skip to content

Commit

Permalink
ORC-1463: [Java] Support Brotli codec
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR is aimed to add Brotli codec defined by [RFC7932](https://www.ietf.org/rfc/rfc7932.txt).
This is a new PR implemented based on [Brotli4j](https://github.com/hyperxpro/Brotli4j). Previous PR: #1565

### Why are the changes needed?
To support more codec in ORC.

### How was this patch tested?
UT and CI pipeline.

Test brotli in example `CompressionWriter`. It will create a 10.2kb file while snappy produces a 44.2kb file.

Closes #1714 from deshanxiao/orc-1463.

Lead-authored-by: Deshan Xiao <deshanxiao@microsoft.com>
Co-authored-by: deshanxiao <deshanxiao@microsoft.com>
Signed-off-by: William Hyun <william@apache.org>
  • Loading branch information
deshanxiao authored and williamhyun committed Jan 2, 2024
1 parent ad8a8c5 commit 4bc2c58
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 1 deletion.
4 changes: 4 additions & 0 deletions java/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
<groupId>org.threeten</groupId>
<artifactId>threeten-extra</artifactId>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>brotli4j</artifactId>
</dependency>

<!-- test inter-project -->
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion java/core/src/java/org/apache/orc/CompressionKind.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* can be applied to ORC files.
*/
public enum CompressionKind {
NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD, BROTLI
}
206 changes: 206 additions & 0 deletions java/core/src/java/org/apache/orc/impl/BrotliCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.orc.impl;

import com.aayushatharva.brotli4j.Brotli4jLoader;
import com.aayushatharva.brotli4j.decoder.DecoderJNI;
import com.aayushatharva.brotli4j.encoder.Encoder;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;

import java.io.IOException;
import java.nio.ByteBuffer;

public class BrotliCodec implements CompressionCodec, DirectDecompressionCodec {
// load jni library.
static {
Brotli4jLoader.ensureAvailability();
}

public BrotliCodec() {
}

static class BrotliOptions implements Options {

private Encoder.Mode mode = Encoder.Mode.GENERIC;
private int quality = -1;
private int lgwin = -1;

BrotliOptions() {

}

BrotliOptions(int quality, int lgwin, Encoder.Mode mode) {
this.quality = quality;
this.lgwin = lgwin;
this.mode = mode;
}

@Override
public Options copy() {
return new BrotliOptions(quality, lgwin, mode);
}

@Override
public Options setSpeed(SpeedModifier newValue) {
switch (newValue) {
case FAST:
// best speed + 1.
quality = 1;
break;
case DEFAULT:
// best quality. Keep default with default value.
quality = -1;
break;
case FASTEST:
// best speed.
quality = 0;
break;
default:
break;
}
return this;
}

@Override
public Options setData(DataKind newValue) {
switch (newValue) {
case BINARY:
mode = Encoder.Mode.GENERIC;
break;
case TEXT:
mode = Encoder.Mode.TEXT;
break;
default:
break;
}
return this;
}

public Encoder.Parameters brotli4jParameter() {
return new Encoder.Parameters()
.setQuality(quality).setWindow(lgwin).setMode(mode);
}
}

private static final BrotliCodec.BrotliOptions DEFAULT_OPTIONS = new BrotliOptions();

@Override
public Options getDefaultOptions() {
return DEFAULT_OPTIONS;
}

@Override
public boolean compress(
ByteBuffer in,
ByteBuffer out,
ByteBuffer overflow,
Options options) throws IOException {
BrotliOptions brotliOptions = (BrotliOptions) options;
int inBytes = in.remaining();
byte[] compressed = Encoder.compress(
in.array(), in.arrayOffset() + in.position(), inBytes, brotliOptions.brotli4jParameter());
int outBytes = compressed.length;
if (outBytes < inBytes) {
int remaining = out.remaining();
if (remaining >= outBytes) {
System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
out.position(), outBytes);
out.position(out.position() + outBytes);
} else {
System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
out.position(), remaining);
out.position(out.limit());
System.arraycopy(compressed, remaining, overflow.array(),
overflow.arrayOffset(), outBytes - remaining);
overflow.position(outBytes - remaining);
}
return true;
} else {
return false;
}
}

@Override
public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
int compressedBytes = in.remaining();
DecoderJNI.Wrapper decoder = new DecoderJNI.Wrapper(compressedBytes);
try {
decoder.getInputBuffer().put(in);
decoder.push(compressedBytes);
while (decoder.getStatus() != DecoderJNI.Status.DONE) {
switch (decoder.getStatus()) {
case OK:
decoder.push(0);
break;

case NEEDS_MORE_OUTPUT:
ByteBuffer buffer = decoder.pull();
out.put(buffer);
break;

case NEEDS_MORE_INPUT:
// Give decoder a chance to process the remaining of the buffered byte.
decoder.push(0);
// If decoder still needs input, this means that stream is truncated.
if (decoder.getStatus() == DecoderJNI.Status.NEEDS_MORE_INPUT) {
return;
}
break;

default:
return;
}
}
} finally {
out.flip();
decoder.destroy();
}
}

@Override
public boolean isAvailable() {
return true;
}

@Override
public CompressionKind getKind() {
return CompressionKind.BROTLI;
}


@Override
public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException {
// decompress work well for both direct and heap.
decompress(in, out);
}

@Override
public void reset() {
}

@Override
public void destroy() {
}

@Override
public void close() {
OrcCodecPool.returnCodec(CompressionKind.BROTLI, this);
}
}
1 change: 1 addition & 0 deletions java/core/src/java/org/apache/orc/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ private static OrcProto.PostScript extractPostScript(BufferChunk buffer,
case LZO:
case LZ4:
case ZSTD:
case BROTLI:
break;
default:
throw new IllegalArgumentException("Unknown compression");
Expand Down
3 changes: 3 additions & 0 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ public static CompressionCodec createCodec(CompressionKind kind) {
case ZSTD:
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
case BROTLI:
return new BrotliCodec();
default:
throw new IllegalArgumentException("Unknown compression codec: " +
kind);
Expand Down Expand Up @@ -579,6 +581,7 @@ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
case LZO: return OrcProto.CompressionKind.LZO;
case LZ4: return OrcProto.CompressionKind.LZ4;
case ZSTD: return OrcProto.CompressionKind.ZSTD;
case BROTLI: return OrcProto.CompressionKind.BROTLI;
default:
throw new IllegalArgumentException("Unknown compression " + kind);
}
Expand Down
Loading

0 comments on commit 4bc2c58

Please sign in to comment.