Skip to content

Commit

Permalink
ORC-998: Sharing compression output buffer among treeWriter: Refactor…
Browse files Browse the repository at this point in the history
…ing within OutStream for portability

### What changes were proposed in this pull request?
There's individual instance of OutStream within each TreeWriter created by WriterContext#createStream method. Within OutStream, there are totally 3 buffers:

current: the regular input buffer holding uncompressed, unencrypted bytes.
compress: the output buffer holding compressed bytes
overflow: same as "compress" but only used when the last compression output is larger than remaining capacity of compress buffer.
Potentially the compress and overflow buffer don't have to be allocated individually within each OutStream object, but shared across all of them so to save memory allocation.

This PR is the first step for sharing the compression output buffer, which refactors internal of OutStream and make the relevant object bundled together since they are logically related(and details of dealing with overflow doesn't have to be visible). This refactoring makes it easier to share the compression output buffer as a pass-in arguments in the follow-up PR.

### Why are the changes needed?
For the context of [ORC-997](https://issues.apache.org/jira/browse/ORC-997), this change makes the compression output buffer, as a single entity, easier to be shared and passed in from caller.

### How was this patch tested?
There's no functional changes from this PR so passing all existed unit tests. Also added a new test in TestOutStream to make sure that, the scenario where codec.compress() returns false (meaning the compression output is larger than original input) is also covered in the unit test.

This closes #909

Closes #1633 from mystic-lama/autumnust-refactor_compress_buffer.

Lead-authored-by: mystic-lama <mysticlama000@gmail.com>
Co-authored-by: Lei Sun <autumnust@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
2 people authored and dongjoon-hyun committed Nov 20, 2023
1 parent 8a1cb35 commit 22a06b4
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 78 deletions.
219 changes: 141 additions & 78 deletions java/core/src/java/org/apache/orc/impl/OutStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,10 @@ public class OutStream extends PositionedOutputStream {
private ByteBuffer current = null;

/**
* Stores the compressed bytes until we have a full buffer and then outputs
* them to the receiver. If no compression is being done, this (and overflow)
* will always be null and the current buffer will be sent directly to the
* receiver.
* Lazily initialized: Won't allocate byte buffer until invocation of init()
*/
private ByteBuffer compressed = null;
public OutputCompressedBuffer compressedBuffer = new OutputCompressedBuffer();

/**
* Since the compressed buffer may start with contents from previous
* compression blocks, we allocate an overflow buffer so that the
* output of the codec can be split between the two buffers. After the
* compressed buffer is sent to the receiver, the overflow buffer becomes
* the new compressed buffer.
*/
private ByteBuffer overflow = null;
private final int bufferSize;
private final CompressionCodec codec;
private final CompressionCodec.Options options;
Expand Down Expand Up @@ -223,13 +212,6 @@ public static void assertBufferSizeValid(int bufferSize) throws IllegalArgumentE
}
}

/**
* Allocate a new output buffer if we are compressing.
*/
private ByteBuffer getNewOutputBuffer() {
return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
}

private void flip() {
current.limit(current.position());
current.position(codec == null ? 0 : HEADER_SIZE);
Expand Down Expand Up @@ -266,7 +248,130 @@ public void write(byte[] bytes, int offset, int length) throws IOException {
}
}

private void spill() throws java.io.IOException {

/**
* An abstraction over compressed buffer and the overflow associated with it.
* See comments for {@link #compressed} and {@link #overflow} for details.
*/
private class OutputCompressedBuffer {
/**
* Stores the compressed bytes until we have a full buffer and then outputs
* them to the receiver. If no compression is being done, this (and overflow)
* will always be null and the current buffer will be sent directly to the
* receiver.
*/
ByteBuffer compressed = null;

/**
* Since the compressed buffer may start with contents from previous
* compression blocks, we allocate an overflow buffer so that the
* output of the codec can be split between the two buffers. After the
* compressed buffer is sent to the receiver, the overflow buffer becomes
* the new compressed buffer.
*/
ByteBuffer overflow = null;

public void init() {
if (compressed == null) {
compressed = getNewOutputBuffer();
} else if (overflow == null) {
overflow = getNewOutputBuffer();
}
}

public int getCurrentPosn() {
if (compressed != null) {
return compressed.position();
} else {
throw new IllegalStateException("Output Compression buffer not being init'ed properly");
}
}

public void advanceTo(int newPosn) {
compressed.position(newPosn);
}

public int getCapacity() {
int result = 0;

if (compressed != null) {
result += compressed.capacity();
}
if (overflow != null) {
result += overflow.capacity();
}

return result;
}

/**
* Commit the compression by
* 1) Writer header,
* 2) Checking if buffer is filled (so to be sent to
* {@link org.apache.orc.PhysicalWriter.OutputReceiver})and prepare for upcoming compression.
*
* @return the length of total compressed bytes.
*/
public long commitCompress(int startPosn) throws IOException {
// find the total bytes in the chunk
int totalBytes = compressed.position() - startPosn - HEADER_SIZE;
if (overflow != null) {
totalBytes += overflow.position();
}
writeHeader(compressed, startPosn, totalBytes, false);
// if we have less than the next header left, spill it.
if (compressed.remaining() < HEADER_SIZE) {
compressed.flip();
outputBuffer(compressed);
compressed = overflow;
overflow = null;
}
return totalBytes + HEADER_SIZE;
}

public void abortCompress(int currentPosn) throws IOException {
// we are using the original, but need to spill the current
// compressed buffer first for ordering. So back up to where we started,
// flip it and add it to done.
if (currentPosn != 0) {
compressed.position(currentPosn);
compressed.flip();
outputBuffer(compressed);
compressed = null;
// if we have an overflow, clear it and make it the new compress
// buffer
if (overflow != null) {
overflow.clear();
compressed = overflow;
overflow = null;
}
} else {
compressed.clear();
if (overflow != null) {
overflow.clear();
}
}
}

public void reset() throws IOException {
if (compressed != null && compressed.position() != 0) {
compressed.flip();
outputBuffer(compressed);
}

compressed = null;
overflow = null;
}

/**
* Allocate a new output buffer if we are compressing.
*/
private ByteBuffer getNewOutputBuffer() {
return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
}
}

private void spill() throws IOException {
// if there isn't anything in the current buffer, don't spill
if (current == null ||
current.position() == (codec == null ? 0 : HEADER_SIZE)) {
Expand All @@ -277,56 +382,25 @@ private void spill() throws java.io.IOException {
outputBuffer(current);
getNewInputBuffer();
} else {
if (compressed == null) {
compressed = getNewOutputBuffer();
} else if (overflow == null) {
overflow = getNewOutputBuffer();
}
int sizePosn = compressed.position();
compressed.position(compressed.position() + HEADER_SIZE);
if (codec.compress(current, compressed, overflow, options)) {
uncompressedBytes = 0;
compressedBuffer.init();
int currentPosn = compressedBuffer.getCurrentPosn();
compressedBuffer.advanceTo(currentPosn + HEADER_SIZE);

// Worth compression
if (codec.compress(current, compressedBuffer.compressed,
compressedBuffer.overflow, options)) {
// move position back to after the header
uncompressedBytes = 0;

current.position(HEADER_SIZE);
current.limit(current.capacity());
// find the total bytes in the chunk
int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
if (overflow != null) {
totalBytes += overflow.position();
}
compressedBytes += totalBytes + HEADER_SIZE;
writeHeader(compressed, sizePosn, totalBytes, false);
// if we have less than the next header left, spill it.
if (compressed.remaining() < HEADER_SIZE) {
compressed.flip();
outputBuffer(compressed);
compressed = overflow;
overflow = null;
}

compressedBytes += compressedBuffer.commitCompress(currentPosn);
} else {
compressedBytes += uncompressedBytes + HEADER_SIZE;
uncompressedBytes = 0;
// we are using the original, but need to spill the current
// compressed buffer first. So back up to where we started,
// flip it and add it to done.
if (sizePosn != 0) {
compressed.position(sizePosn);
compressed.flip();
outputBuffer(compressed);
compressed = null;
// if we have an overflow, clear it and make it the new compress
// buffer
if (overflow != null) {
overflow.clear();
compressed = overflow;
overflow = null;
}
} else {
compressed.clear();
if (overflow != null) {
overflow.clear();
}
}

compressedBuffer.abortCompress(currentPosn);

// now add the current buffer into the done list and get a new one.
current.position(0);
Expand All @@ -351,17 +425,12 @@ public void getPosition(PositionRecorder recorder) {
@Override
public void flush() throws IOException {
spill();
if (compressed != null && compressed.position() != 0) {
compressed.flip();
outputBuffer(compressed);
}
compressedBuffer.reset();
if (cipher != null) {
finishEncryption();
}
compressed = null;
uncompressedBytes = 0;
compressedBytes = 0;
overflow = null;
current = null;
}

Expand All @@ -379,13 +448,7 @@ public long getBufferSize() {
if (current != null) {
result += current.capacity();
}
if (compressed != null) {
result += compressed.capacity();
}
if (overflow != null) {
result += overflow.capacity();
}
return result + compressedBytes;
return result + compressedBuffer.getCapacity() + compressedBytes;
}
}

Expand Down
51 changes: 51 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestOutStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.util.Random;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand All @@ -59,6 +60,56 @@ public void testFlush() throws Exception {
}
}

/**
* Creates randomness into whether a compression should be committed or aborted (so that
* the isOriginal bits is set to true and data being flushed as uncompressed).
* This class should be used for testing purpose only.
*/
private static class TestZlibCodec extends ZlibCodec {
private Random rand = new Random();

@Override
public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow, Options options) {
super.compress(in, out, overflow, options);
return rand.nextBoolean();
}
}

@Test
public void testCompressWithoutEncryption() throws Exception {
TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
CompressionCodec codec = new TestZlibCodec();
StreamOptions options = new StreamOptions(1024)
.withCodec(codec, codec.getDefaultOptions());

try (OutStream stream = new OutStream("test", options, receiver)) {
for (int i = 0; i < 20000; ++i) {
stream.write(("The Cheesy Poofs " + i + "\n")
.getBytes(StandardCharsets.UTF_8));
}
stream.flush();
}

byte[] compressed = receiver.buffer.get();

// use InStream to decompress it
BufferChunkList ranges = new BufferChunkList();
ranges.add(new BufferChunk(ByteBuffer.wrap(compressed), 0));
try (InStream decompressedStream = InStream.create("test", ranges.get(), 0,
compressed.length,
InStream.options().withCodec(new TestZlibCodec()).withBufferSize(1024));
BufferedReader reader
= new BufferedReader(new InputStreamReader(decompressedStream,
StandardCharsets.UTF_8))) {
// check the contents of the decompressed stream
for (int i = 0; i < 20000; ++i) {
assertEquals("The Cheesy Poofs " + i, reader.readLine(), "i = " + i);
}
assertNull(reader.readLine());
}

}

@Test
public void testAssertBufferSizeValid() {
try {
Expand Down

0 comments on commit 22a06b4

Please sign in to comment.