Skip to content

Commit

Permalink
[core] support null value for record-level.time-field (#4839)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf authored Jan 6, 2025
1 parent f4ecba5 commit 39de7cc
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A factory to create {@link RecordReader} expires records by time. */
public class RecordLevelExpire {

private final int expireTime;
private final Function<InternalRow, Integer> fieldGetter;
private final Function<InternalRow, Optional<Integer>> fieldGetter;

@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
Expand All @@ -65,11 +64,13 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
}

DataType dataType = rowType.getField(timeFieldName).type();
Function<InternalRow, Integer> fieldGetter = createFieldGetter(dataType, fieldIndex);
Function<InternalRow, Optional<Integer>> fieldGetter =
createFieldGetter(dataType, fieldIndex);
return new RecordLevelExpire((int) expireTime.getSeconds(), fieldGetter);
}

private RecordLevelExpire(int expireTime, Function<InternalRow, Integer> fieldGetter) {
private RecordLevelExpire(
int expireTime, Function<InternalRow, Optional<Integer>> fieldGetter) {
this.expireTime = expireTime;
this.fieldGetter = fieldGetter;
}
Expand All @@ -80,38 +81,53 @@ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactor

private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(kv -> currentTime <= fieldGetter.apply(kv.value()) + expireTime);
return reader.filter(
keyValue ->
fieldGetter
.apply(keyValue.value())
.map(integer -> currentTime <= integer + expireTime)
.orElse(true));
}

private static Function<InternalRow, Integer> createFieldGetter(
private static Function<InternalRow, Optional<Integer>> createFieldGetter(
DataType dataType, int fieldIndex) {
final Function<InternalRow, Integer> fieldGetter;
final Function<InternalRow, Optional<Integer>> fieldGetter;
if (dataType instanceof IntType) {
fieldGetter = row -> row.getInt(fieldIndex);
fieldGetter =
row ->
row.isNullAt(fieldIndex)
? Optional.empty()
: Optional.of(row.getInt(fieldIndex));
} else if (dataType instanceof BigIntType) {
fieldGetter =
row -> {
if (row.isNullAt(fieldIndex)) {
return Optional.empty();
}
long value = row.getLong(fieldIndex);
// If it is milliseconds, convert it to seconds.
return (int) (value >= 1_000_000_000_000L ? value / 1000 : value);
return Optional.of(
(int) (value >= 1_000_000_000_000L ? value / 1000 : value));
};
} else if (dataType instanceof TimestampType
|| dataType instanceof LocalZonedTimestampType) {
int precision = DataTypeChecks.getPrecision(dataType);
fieldGetter =
row -> (int) (row.getTimestamp(fieldIndex, precision).getMillisecond() / 1000);
row ->
row.isNullAt(fieldIndex)
? Optional.empty()
: Optional.of(
(int)
(row.getTimestamp(fieldIndex, precision)
.getMillisecond()
/ 1000));
} else {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.",
dataType));
}

return row -> {
checkArgument(
!row.isNullAt(fieldIndex),
"Time field for record-level expire should not be null.");
return fieldGetter.apply(row);
};
return fieldGetter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class RecordLevelExpireTest extends PrimaryKeyTableTestBase {

Expand Down Expand Up @@ -102,12 +101,17 @@ public void test() throws Exception {
.containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 * 60));

writeCommit(GenericRow.of(1, 5, null));
compact(1);
assertThat(query())
.containsExactlyInAnyOrder(
GenericRow.of(1, 4, currentSecs + 60 * 60), GenericRow.of(1, 5, null));

// null time field for record-level expire is not supported yet.
assertThatThrownBy(() -> compact(1))
.hasMessageContaining("Time field for record-level expire should not be null.");
writeCommit(GenericRow.of(1, 5, currentSecs + 60 * 60));
// compact, merged
compact(1);
assertThat(query())
.containsExactlyInAnyOrder(
GenericRow.of(1, 4, currentSecs + 60 * 60),
GenericRow.of(1, 5, currentSecs + 60 * 60));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class RecordLevelExpireWithAggregationTest extends PrimaryKeyTableTestBase {

@Override
@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

@Override
protected Options tableOptions() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.AGGREGATE);
options.set(CoreOptions.FIELDS_DEFAULT_AGG_FUNC, "last_non_null_value");
options.set("fields.col1.ignore-retract", "true");
return options;
}

@Test
public void test() throws Exception {
writeCommit(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));
// disordered retract message will generate null fields
writeCommit(
GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 3, 1),
GenericRow.ofKind(RowKind.DELETE, 1, 3, 1));

int currentSecs = (int) (System.currentTimeMillis() / 1000);
writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60));

Thread.sleep(2000);

// no compaction, can be queried
assertThat(query())
.containsExactlyInAnyOrder(
GenericRow.of(1, 1, 1),
GenericRow.of(1, 2, 2),
GenericRow.of(1, 3, null),
GenericRow.of(1, 4, currentSecs + 60 * 60));

// compact, expired
compact(1);
assertThat(query())
.containsExactlyInAnyOrder(
GenericRow.of(1, 3, null), GenericRow.of(1, 4, currentSecs + 60 * 60));

writeCommit(GenericRow.of(1, 3, currentSecs + 60 * 60));
compact(1);

// compact, merged
assertThat(query())
.containsExactlyInAnyOrder(
GenericRow.of(1, 4, currentSecs + 60 * 60),
GenericRow.of(1, 3, currentSecs + 60 * 60));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class RecordLevelExpireWithMillisecondTest extends PrimaryKeyTableTestBase {
@Override
Expand Down Expand Up @@ -102,8 +101,10 @@ public void test() throws Exception {
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(GenericRow.of(1, 4), GenericRow.of(1, 5));

// null time field for record-level expire is not supported yet.
assertThatThrownBy(() -> compact(1))
.hasMessageContaining("Time field for record-level expire should not be null.");
writeCommit(GenericRow.of(1, 5, currentSecs + 60 * 60 * 1000));
// compact, merged
compact(1);
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(GenericRow.of(1, 4), GenericRow.of(1, 5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

abstract class RecordLevelExpireWithTimestampBaseTest extends PrimaryKeyTableTestBase {

Expand Down Expand Up @@ -67,8 +66,10 @@ public void testTimestampTypeExpire() throws Exception {
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(GenericRow.of(1, 3), GenericRow.of(1, 5));

// null time field for record-level expire is not supported yet.
assertThatThrownBy(() -> compact(1))
.hasMessageContaining("Time field for record-level expire should not be null.");
writeCommit(GenericRow.of(1, 5, timestamp3));
// compact, merged
compact(1);
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(GenericRow.of(1, 3), GenericRow.of(1, 5));
}
}

0 comments on commit 39de7cc

Please sign in to comment.