diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java index d71b558b63..686f168a2b 100644 --- a/java/core/src/java/org/apache/orc/impl/OutStream.java +++ b/java/core/src/java/org/apache/orc/impl/OutStream.java @@ -58,21 +58,10 @@ public class OutStream extends PositionedOutputStream { private ByteBuffer current = null; /** - * Stores the compressed bytes until we have a full buffer and then outputs - * them to the receiver. If no compression is being done, this (and overflow) - * will always be null and the current buffer will be sent directly to the - * receiver. + * Lazily initialized: Won't allocate byte buffer until invocation of init() */ - private ByteBuffer compressed = null; + public OutputCompressedBuffer compressedBuffer = new OutputCompressedBuffer(); - /** - * Since the compressed buffer may start with contents from previous - * compression blocks, we allocate an overflow buffer so that the - * output of the codec can be split between the two buffers. After the - * compressed buffer is sent to the receiver, the overflow buffer becomes - * the new compressed buffer. - */ - private ByteBuffer overflow = null; private final int bufferSize; private final CompressionCodec codec; private final CompressionCodec.Options options; @@ -223,13 +212,6 @@ public static void assertBufferSizeValid(int bufferSize) throws IllegalArgumentE } } - /** - * Allocate a new output buffer if we are compressing. - */ - private ByteBuffer getNewOutputBuffer() { - return ByteBuffer.allocate(bufferSize + HEADER_SIZE); - } - private void flip() { current.limit(current.position()); current.position(codec == null ? 0 : HEADER_SIZE); @@ -266,7 +248,130 @@ public void write(byte[] bytes, int offset, int length) throws IOException { } } - private void spill() throws java.io.IOException { + + /** + * An abstraction over compressed buffer and the overflow associated with it. + * See comments for {@link #compressed} and {@link #overflow} for details. + */ + private class OutputCompressedBuffer { + /** + * Stores the compressed bytes until we have a full buffer and then outputs + * them to the receiver. If no compression is being done, this (and overflow) + * will always be null and the current buffer will be sent directly to the + * receiver. + */ + ByteBuffer compressed = null; + + /** + * Since the compressed buffer may start with contents from previous + * compression blocks, we allocate an overflow buffer so that the + * output of the codec can be split between the two buffers. After the + * compressed buffer is sent to the receiver, the overflow buffer becomes + * the new compressed buffer. + */ + ByteBuffer overflow = null; + + public void init() { + if (compressed == null) { + compressed = getNewOutputBuffer(); + } else if (overflow == null) { + overflow = getNewOutputBuffer(); + } + } + + public int getCurrentPosn() { + if (compressed != null) { + return compressed.position(); + } else { + throw new IllegalStateException("Output Compression buffer not being init'ed properly"); + } + } + + public void advanceTo(int newPosn) { + compressed.position(newPosn); + } + + public int getCapacity() { + int result = 0; + + if (compressed != null) { + result += compressed.capacity(); + } + if (overflow != null) { + result += overflow.capacity(); + } + + return result; + } + + /** + * Commit the compression by + * 1) Writer header, + * 2) Checking if buffer is filled (so to be sent to + * {@link org.apache.orc.PhysicalWriter.OutputReceiver})and prepare for upcoming compression. + * + * @return the length of total compressed bytes. + */ + public long commitCompress(int startPosn) throws IOException { + // find the total bytes in the chunk + int totalBytes = compressed.position() - startPosn - HEADER_SIZE; + if (overflow != null) { + totalBytes += overflow.position(); + } + writeHeader(compressed, startPosn, totalBytes, false); + // if we have less than the next header left, spill it. + if (compressed.remaining() < HEADER_SIZE) { + compressed.flip(); + outputBuffer(compressed); + compressed = overflow; + overflow = null; + } + return totalBytes + HEADER_SIZE; + } + + public void abortCompress(int currentPosn) throws IOException { + // we are using the original, but need to spill the current + // compressed buffer first for ordering. So back up to where we started, + // flip it and add it to done. + if (currentPosn != 0) { + compressed.position(currentPosn); + compressed.flip(); + outputBuffer(compressed); + compressed = null; + // if we have an overflow, clear it and make it the new compress + // buffer + if (overflow != null) { + overflow.clear(); + compressed = overflow; + overflow = null; + } + } else { + compressed.clear(); + if (overflow != null) { + overflow.clear(); + } + } + } + + public void reset() throws IOException { + if (compressed != null && compressed.position() != 0) { + compressed.flip(); + outputBuffer(compressed); + } + + compressed = null; + overflow = null; + } + + /** + * Allocate a new output buffer if we are compressing. + */ + private ByteBuffer getNewOutputBuffer() { + return ByteBuffer.allocate(bufferSize + HEADER_SIZE); + } + } + + private void spill() throws IOException { // if there isn't anything in the current buffer, don't spill if (current == null || current.position() == (codec == null ? 0 : HEADER_SIZE)) { @@ -277,56 +382,25 @@ private void spill() throws java.io.IOException { outputBuffer(current); getNewInputBuffer(); } else { - if (compressed == null) { - compressed = getNewOutputBuffer(); - } else if (overflow == null) { - overflow = getNewOutputBuffer(); - } - int sizePosn = compressed.position(); - compressed.position(compressed.position() + HEADER_SIZE); - if (codec.compress(current, compressed, overflow, options)) { - uncompressedBytes = 0; + compressedBuffer.init(); + int currentPosn = compressedBuffer.getCurrentPosn(); + compressedBuffer.advanceTo(currentPosn + HEADER_SIZE); + + // Worth compression + if (codec.compress(current, compressedBuffer.compressed, + compressedBuffer.overflow, options)) { // move position back to after the header + uncompressedBytes = 0; + current.position(HEADER_SIZE); current.limit(current.capacity()); - // find the total bytes in the chunk - int totalBytes = compressed.position() - sizePosn - HEADER_SIZE; - if (overflow != null) { - totalBytes += overflow.position(); - } - compressedBytes += totalBytes + HEADER_SIZE; - writeHeader(compressed, sizePosn, totalBytes, false); - // if we have less than the next header left, spill it. - if (compressed.remaining() < HEADER_SIZE) { - compressed.flip(); - outputBuffer(compressed); - compressed = overflow; - overflow = null; - } + + compressedBytes += compressedBuffer.commitCompress(currentPosn); } else { compressedBytes += uncompressedBytes + HEADER_SIZE; uncompressedBytes = 0; - // we are using the original, but need to spill the current - // compressed buffer first. So back up to where we started, - // flip it and add it to done. - if (sizePosn != 0) { - compressed.position(sizePosn); - compressed.flip(); - outputBuffer(compressed); - compressed = null; - // if we have an overflow, clear it and make it the new compress - // buffer - if (overflow != null) { - overflow.clear(); - compressed = overflow; - overflow = null; - } - } else { - compressed.clear(); - if (overflow != null) { - overflow.clear(); - } - } + + compressedBuffer.abortCompress(currentPosn); // now add the current buffer into the done list and get a new one. current.position(0); @@ -351,17 +425,12 @@ public void getPosition(PositionRecorder recorder) { @Override public void flush() throws IOException { spill(); - if (compressed != null && compressed.position() != 0) { - compressed.flip(); - outputBuffer(compressed); - } + compressedBuffer.reset(); if (cipher != null) { finishEncryption(); } - compressed = null; uncompressedBytes = 0; compressedBytes = 0; - overflow = null; current = null; } @@ -379,13 +448,7 @@ public long getBufferSize() { if (current != null) { result += current.capacity(); } - if (compressed != null) { - result += compressed.capacity(); - } - if (overflow != null) { - result += overflow.capacity(); - } - return result + compressedBytes; + return result + compressedBuffer.getCapacity() + compressedBytes; } } diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java index a673314647..aa3e9ec4f6 100644 --- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java +++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java @@ -35,6 +35,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.Key; +import java.util.Random; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -59,6 +60,56 @@ public void testFlush() throws Exception { } } + /** + * Creates randomness into whether a compression should be committed or aborted (so that + * the isOriginal bits is set to true and data being flushed as uncompressed). + * This class should be used for testing purpose only. + */ + private static class TestZlibCodec extends ZlibCodec { + private Random rand = new Random(); + + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow, Options options) { + super.compress(in, out, overflow, options); + return rand.nextBoolean(); + } + } + + @Test + public void testCompressWithoutEncryption() throws Exception { + TestInStream.OutputCollector receiver = new TestInStream.OutputCollector(); + CompressionCodec codec = new TestZlibCodec(); + StreamOptions options = new StreamOptions(1024) + .withCodec(codec, codec.getDefaultOptions()); + + try (OutStream stream = new OutStream("test", options, receiver)) { + for (int i = 0; i < 20000; ++i) { + stream.write(("The Cheesy Poofs " + i + "\n") + .getBytes(StandardCharsets.UTF_8)); + } + stream.flush(); + } + + byte[] compressed = receiver.buffer.get(); + + // use InStream to decompress it + BufferChunkList ranges = new BufferChunkList(); + ranges.add(new BufferChunk(ByteBuffer.wrap(compressed), 0)); + try (InStream decompressedStream = InStream.create("test", ranges.get(), 0, + compressed.length, + InStream.options().withCodec(new TestZlibCodec()).withBufferSize(1024)); + BufferedReader reader + = new BufferedReader(new InputStreamReader(decompressedStream, + StandardCharsets.UTF_8))) { + // check the contents of the decompressed stream + for (int i = 0; i < 20000; ++i) { + assertEquals("The Cheesy Poofs " + i, reader.readLine(), "i = " + i); + } + assertNull(reader.readLine()); + } + + } + @Test public void testAssertBufferSizeValid() { try {