From 431c5212f0c5bdf9709c41f4c32e7fdc6f193086 Mon Sep 17 00:00:00 2001 From: Zoltan Ratkai Date: Thu, 18 May 2023 00:02:58 -0700 Subject: [PATCH] ORC-1413: Fix for ORC row level filter issue with ACID table This PR fixes ORC row level filter with ACID table issue. Without this Hive can not work with ORC 1.8.3. and ACID table and row level filter enabled. Unit test added. Closes #1495 from zratkai/ORC-1413-1.8. Authored-by: Zoltan Ratkai Signed-off-by: Dongjoon Hyun --- .../java/org/apache/orc/impl/ParserUtils.java | 17 ++- .../org/apache/orc/TestOrcFilterContext.java | 132 +++++++++++++++++- 2 files changed, 143 insertions(+), 6 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/ParserUtils.java b/java/core/src/java/org/apache/orc/impl/ParserUtils.java index 84bf049df2..1afd6c0fc5 100644 --- a/java/core/src/java/org/apache/orc/impl/ParserUtils.java +++ b/java/core/src/java/org/apache/orc/impl/ParserUtils.java @@ -411,18 +411,22 @@ static class ColumnFinder implements TypeVisitor { private final ColumnVector[] result; private int resultIdx = 0; - ColumnFinder(TypeDescription schema, VectorizedRowBatch batch, int levels) { + ColumnFinder(TypeDescription schema, ColumnVector[] columnVectors, int levels) { if (schema.getCategory() == TypeDescription.Category.STRUCT) { - top = batch.cols; + top = columnVectors; result = new ColumnVector[levels]; } else { result = new ColumnVector[levels + 1]; - current = batch.cols[0]; + current = columnVectors[0]; top = null; addResult(current); } } + ColumnFinder(TypeDescription schema, VectorizedRowBatch vectorizedRowBatch, int levels) { + this(schema, vectorizedRowBatch.cols, levels); + } + private void addResult(ColumnVector vector) { result[resultIdx] = vector; resultIdx += 1; @@ -459,8 +463,11 @@ public static ColumnVector[] findColumnVectors(TypeDescription schema, boolean isCaseSensitive, VectorizedRowBatch batch) { List names = ParserUtils.splitName(source); - ColumnFinder result = new ColumnFinder(schema, batch, names.size()); - findColumn(removeAcid(schema), names, isCaseSensitive, result); + TypeDescription schemaToUse = removeAcid(schema); + ColumnVector[] columnVectors = SchemaEvolution.checkAcidSchema(schema) + ? ((StructColumnVector) batch.cols[batch.cols.length - 1]).fields : batch.cols; + ColumnFinder result = new ColumnFinder(schemaToUse, columnVectors, names.size()); + findColumn(schemaToUse, names, isCaseSensitive, result); return result.result; } diff --git a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java index c38e9081d5..8abc5a0d15 100644 --- a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java +++ b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java @@ -25,10 +25,24 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.orc.impl.OrcFilterContextImpl; +import org.apache.orc.impl.SchemaEvolution; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.Arrays; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -61,9 +75,22 @@ public class TestOrcFilterContext { TypeDescription.createList(TypeDescription.createChar())) ) ); + private static Configuration configuration; + private static FileSystem fileSystem; + private static final Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + + File.separator + "tmp")); + private static final Path filePath = new Path(workDir, "orc_filter_file.orc"); + + private static final int RowCount = 400; + private final OrcFilterContext filterContext = new OrcFilterContextImpl(schema, false) .setBatch(schema.createRowBatch()); - + TypeDescription typeDescriptionACID = + TypeDescription.fromString("struct"); + TypeDescription acidSchema = SchemaEvolution.createEventSchema(typeDescriptionACID); + private final OrcFilterContext filterContextACID = new OrcFilterContextImpl(acidSchema, true) + .setBatch(acidSchema.createRowBatch()); @BeforeEach public void setup() { filterContext.reset(); @@ -225,4 +252,107 @@ public void testRepeatingVector() { assertTrue(OrcFilterContext.isNull(vectorBranch, 1)); assertTrue(OrcFilterContext.isNull(vectorBranch, 2)); } + + @Test + public void testACIDTable() { + ColumnVector[] columnVector = filterContextACID.findColumnVector("string1"); + assertEquals(1, columnVector.length); + assertTrue(columnVector[0] instanceof BytesColumnVector, "Expected a BytesColumnVector, but found "+ columnVector[0].getClass()); + columnVector = filterContextACID.findColumnVector("int1"); + assertEquals(1, columnVector.length); + assertTrue(columnVector[0] instanceof LongColumnVector, "Expected a LongColumnVector, but found "+ columnVector[0].getClass()); + } + + @Test + public void testRowFilterWithACIDTable() throws IOException { + createAcidORCFile(); + readSingleRowWithFilter(new Random().nextInt(RowCount)); + fileSystem.delete(filePath, false); + + } + + private void createAcidORCFile() throws IOException { + configuration = new Configuration(); + fileSystem = FileSystem.get(configuration); + + try (Writer writer = OrcFile.createWriter(filePath, + OrcFile.writerOptions(configuration) + .fileSystem(fileSystem) + .overwrite(true) + .rowIndexStride(8192) + .setSchema(acidSchema))) { + + Random random = new Random(1024); + VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch(); + for (int rowId = 0; rowId < RowCount; rowId++) { + long v = random.nextLong(); + populateColumnValues(acidSchema, vectorizedRowBatch.cols,vectorizedRowBatch.size, v); + // Populate the rowId + ((LongColumnVector) vectorizedRowBatch.cols[3]).vector[vectorizedRowBatch.size] = rowId; + StructColumnVector row = (StructColumnVector) vectorizedRowBatch.cols[5]; + ((LongColumnVector) row.fields[0]).vector[vectorizedRowBatch.size] = rowId; + vectorizedRowBatch.size += 1; + if (vectorizedRowBatch.size == vectorizedRowBatch.getMaxSize()) { + writer.addRowBatch(vectorizedRowBatch); + vectorizedRowBatch.reset(); + } + } + if (vectorizedRowBatch.size > 0) { + writer.addRowBatch(vectorizedRowBatch); + vectorizedRowBatch.reset(); + } + } + } + + private void populateColumnValues(TypeDescription typeDescription, ColumnVector[] columnVectors, int index, long value) { + for (int columnId = 0; columnId < typeDescription.getChildren().size() ; columnId++) { + switch (typeDescription.getChildren().get(columnId).getCategory()) { + case INT: + ((LongColumnVector)columnVectors[columnId]).vector[index] = value; + break; + case LONG: + ((LongColumnVector)columnVectors[columnId]).vector[index] = value; + break; + case STRING: + ((BytesColumnVector) columnVectors[columnId]).setVal(index, + ("String-"+ index).getBytes(StandardCharsets.UTF_8)); + break; + case STRUCT: + populateColumnValues(typeDescription.getChildren().get(columnId), ((StructColumnVector)columnVectors[columnId]).fields, index, value); + break; + default: + throw new IllegalArgumentException(); + } + } + } + + private void readSingleRowWithFilter(int id) throws IOException { + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration).filesystem(fileSystem)); + SearchArgument searchArgument = SearchArgumentFactory.newBuilder() + .in("int1", PredicateLeaf.Type.LONG, Long.valueOf(id)) + .build(); + Reader.Options readerOptions = reader.options() + .searchArgument(searchArgument, new String[] {"int1"}) + .useSelected(true) + .allowSARGToFilter(true); + VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch(); + long rowCount = 0; + try (RecordReader recordReader = reader.rows(readerOptions)) { + assertTrue(recordReader.nextBatch(vectorizedRowBatch)); + rowCount += vectorizedRowBatch.size; + assertEquals(6, vectorizedRowBatch.cols.length); + assertTrue(vectorizedRowBatch.cols[5] instanceof StructColumnVector); + assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0] instanceof LongColumnVector); + assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1] instanceof BytesColumnVector); + assertEquals(id, ((LongColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0]).vector[vectorizedRowBatch.selected[0]]); + checkStringColumn(id, vectorizedRowBatch); + assertFalse(recordReader.nextBatch(vectorizedRowBatch)); + } + assertEquals(1, rowCount); + } + + private static void checkStringColumn(int id, VectorizedRowBatch vectorizedRowBatch) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1]; + assertEquals("String-"+ id, bytesColumnVector.toString(id)); + } }