Skip to content

Commit

Permalink
ORC-1413: Fix for ORC row level filter issue with ACID table
Browse files Browse the repository at this point in the history
This PR fixes ORC row level filter with ACID table issue.

Without this Hive can not work with ORC 1.8.3. and ACID table and row level filter enabled.

Unit test added.

Closes #1495 from zratkai/ORC-1413-1.8.

Authored-by: Zoltan Ratkai <zratkai@cloudera.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
zratkai authored and dongjoon-hyun committed Nov 5, 2023
1 parent 488184b commit 431c521
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 6 deletions.
17 changes: 12 additions & 5 deletions java/core/src/java/org/apache/orc/impl/ParserUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,18 +411,22 @@ static class ColumnFinder implements TypeVisitor {
private final ColumnVector[] result;
private int resultIdx = 0;

ColumnFinder(TypeDescription schema, VectorizedRowBatch batch, int levels) {
ColumnFinder(TypeDescription schema, ColumnVector[] columnVectors, int levels) {
if (schema.getCategory() == TypeDescription.Category.STRUCT) {
top = batch.cols;
top = columnVectors;
result = new ColumnVector[levels];
} else {
result = new ColumnVector[levels + 1];
current = batch.cols[0];
current = columnVectors[0];
top = null;
addResult(current);
}
}

ColumnFinder(TypeDescription schema, VectorizedRowBatch vectorizedRowBatch, int levels) {
this(schema, vectorizedRowBatch.cols, levels);
}

private void addResult(ColumnVector vector) {
result[resultIdx] = vector;
resultIdx += 1;
Expand Down Expand Up @@ -459,8 +463,11 @@ public static ColumnVector[] findColumnVectors(TypeDescription schema,
boolean isCaseSensitive,
VectorizedRowBatch batch) {
List<String> names = ParserUtils.splitName(source);
ColumnFinder result = new ColumnFinder(schema, batch, names.size());
findColumn(removeAcid(schema), names, isCaseSensitive, result);
TypeDescription schemaToUse = removeAcid(schema);
ColumnVector[] columnVectors = SchemaEvolution.checkAcidSchema(schema)
? ((StructColumnVector) batch.cols[batch.cols.length - 1]).fields : batch.cols;
ColumnFinder result = new ColumnFinder(schemaToUse, columnVectors, names.size());
findColumn(schemaToUse, names, isCaseSensitive, result);
return result.result;
}

Expand Down
132 changes: 131 additions & 1 deletion java/core/src/test/org/apache/orc/TestOrcFilterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,24 @@
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.impl.OrcFilterContextImpl;
import org.apache.orc.impl.SchemaEvolution;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.Arrays;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -61,9 +75,22 @@ public class TestOrcFilterContext {
TypeDescription.createList(TypeDescription.createChar()))
)
);
private static Configuration configuration;
private static FileSystem fileSystem;
private static final Path workDir = new Path(System.getProperty("test.tmp.dir",
"target" + File.separator + "test"
+ File.separator + "tmp"));
private static final Path filePath = new Path(workDir, "orc_filter_file.orc");

private static final int RowCount = 400;

private final OrcFilterContext filterContext = new OrcFilterContextImpl(schema, false)
.setBatch(schema.createRowBatch());

TypeDescription typeDescriptionACID =
TypeDescription.fromString("struct<int1:int,string1:string>");
TypeDescription acidSchema = SchemaEvolution.createEventSchema(typeDescriptionACID);
private final OrcFilterContext filterContextACID = new OrcFilterContextImpl(acidSchema, true)
.setBatch(acidSchema.createRowBatch());
@BeforeEach
public void setup() {
filterContext.reset();
Expand Down Expand Up @@ -225,4 +252,107 @@ public void testRepeatingVector() {
assertTrue(OrcFilterContext.isNull(vectorBranch, 1));
assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
}

@Test
public void testACIDTable() {
ColumnVector[] columnVector = filterContextACID.findColumnVector("string1");
assertEquals(1, columnVector.length);
assertTrue(columnVector[0] instanceof BytesColumnVector, "Expected a BytesColumnVector, but found "+ columnVector[0].getClass());
columnVector = filterContextACID.findColumnVector("int1");
assertEquals(1, columnVector.length);
assertTrue(columnVector[0] instanceof LongColumnVector, "Expected a LongColumnVector, but found "+ columnVector[0].getClass());
}

@Test
public void testRowFilterWithACIDTable() throws IOException {
createAcidORCFile();
readSingleRowWithFilter(new Random().nextInt(RowCount));
fileSystem.delete(filePath, false);

}

private void createAcidORCFile() throws IOException {
configuration = new Configuration();
fileSystem = FileSystem.get(configuration);

try (Writer writer = OrcFile.createWriter(filePath,
OrcFile.writerOptions(configuration)
.fileSystem(fileSystem)
.overwrite(true)
.rowIndexStride(8192)
.setSchema(acidSchema))) {

Random random = new Random(1024);
VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch();
for (int rowId = 0; rowId < RowCount; rowId++) {
long v = random.nextLong();
populateColumnValues(acidSchema, vectorizedRowBatch.cols,vectorizedRowBatch.size, v);
// Populate the rowId
((LongColumnVector) vectorizedRowBatch.cols[3]).vector[vectorizedRowBatch.size] = rowId;
StructColumnVector row = (StructColumnVector) vectorizedRowBatch.cols[5];
((LongColumnVector) row.fields[0]).vector[vectorizedRowBatch.size] = rowId;
vectorizedRowBatch.size += 1;
if (vectorizedRowBatch.size == vectorizedRowBatch.getMaxSize()) {
writer.addRowBatch(vectorizedRowBatch);
vectorizedRowBatch.reset();
}
}
if (vectorizedRowBatch.size > 0) {
writer.addRowBatch(vectorizedRowBatch);
vectorizedRowBatch.reset();
}
}
}

private void populateColumnValues(TypeDescription typeDescription, ColumnVector[] columnVectors, int index, long value) {
for (int columnId = 0; columnId < typeDescription.getChildren().size() ; columnId++) {
switch (typeDescription.getChildren().get(columnId).getCategory()) {
case INT:
((LongColumnVector)columnVectors[columnId]).vector[index] = value;
break;
case LONG:
((LongColumnVector)columnVectors[columnId]).vector[index] = value;
break;
case STRING:
((BytesColumnVector) columnVectors[columnId]).setVal(index,
("String-"+ index).getBytes(StandardCharsets.UTF_8));
break;
case STRUCT:
populateColumnValues(typeDescription.getChildren().get(columnId), ((StructColumnVector)columnVectors[columnId]).fields, index, value);
break;
default:
throw new IllegalArgumentException();
}
}
}

private void readSingleRowWithFilter(int id) throws IOException {
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration).filesystem(fileSystem));
SearchArgument searchArgument = SearchArgumentFactory.newBuilder()
.in("int1", PredicateLeaf.Type.LONG, Long.valueOf(id))
.build();
Reader.Options readerOptions = reader.options()
.searchArgument(searchArgument, new String[] {"int1"})
.useSelected(true)
.allowSARGToFilter(true);
VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch();
long rowCount = 0;
try (RecordReader recordReader = reader.rows(readerOptions)) {
assertTrue(recordReader.nextBatch(vectorizedRowBatch));
rowCount += vectorizedRowBatch.size;
assertEquals(6, vectorizedRowBatch.cols.length);
assertTrue(vectorizedRowBatch.cols[5] instanceof StructColumnVector);
assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0] instanceof LongColumnVector);
assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1] instanceof BytesColumnVector);
assertEquals(id, ((LongColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0]).vector[vectorizedRowBatch.selected[0]]);
checkStringColumn(id, vectorizedRowBatch);
assertFalse(recordReader.nextBatch(vectorizedRowBatch));
}
assertEquals(1, rowCount);
}

private static void checkStringColumn(int id, VectorizedRowBatch vectorizedRowBatch) {
BytesColumnVector bytesColumnVector = (BytesColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1];
assertEquals("String-"+ id, bytesColumnVector.toString(id));
}
}

0 comments on commit 431c521

Please sign in to comment.