Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Minor cleanup #3783

Merged
merged 7 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ public static Literal ofDecimal(BigDecimal value, int precision, int scale) {
BigDecimal valueToStore = value.setScale(scale);
checkArgument(
valueToStore.precision() <= precision,
String.format(
"Decimal precision=%s for decimal %s exceeds max precision %s",
valueToStore.precision(), valueToStore, precision));
"Decimal precision=%s for decimal %s exceeds max precision %s",
valueToStore.precision(),
valueToStore,
precision);
return new Literal(valueToStore, new DecimalType(precision, scale));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ private InternalScanFileUtils() {}
public static FileStatus getAddFileStatus(Row scanFileInfo) {
Row addFile = getAddFileEntry(scanFileInfo);
String path = addFile.getString(ADD_FILE_PATH_ORDINAL);
Long size = addFile.getLong(ADD_FILE_SIZE_ORDINAL);
Long modificationTime = addFile.getLong(ADD_FILE_MOD_TIME_ORDINAL);
long size = addFile.getLong(ADD_FILE_SIZE_ORDINAL);
long modificationTime = addFile.getLong(ADD_FILE_MOD_TIME_ORDINAL);

// TODO: this is hack until the path in `add.path` is converted to an absolute path
String tableRoot = scanFileInfo.getString(TABLE_ROOT_ORDINAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Metadata(
this.createdTime = createdTime;
this.configurationMapValue = requireNonNull(configurationMapValue, "configuration is null");
this.configuration = new Lazy<>(() -> VectorUtils.toJavaMap(configurationMapValue));
this.partitionColNames = new Lazy<>(() -> loadPartitionColNames());
this.partitionColNames = new Lazy<>(this::loadPartitionColNames);
this.dataSchema =
new Lazy<>(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static RoaringBitmap[] deserialize(ByteBuffer buffer) throws IOException {
// This format is designed for sparse bitmaps, so numberOfBitmaps is only a lower bound
// for the actual size of the array.
int minimumArraySize = (int) numberOfBitmaps;
ArrayList<RoaringBitmap> bitmaps = new ArrayList(minimumArraySize);
ArrayList<RoaringBitmap> bitmaps = new ArrayList<>(minimumArraySize);
int lastIndex = 0;
for (long i = 0; i < numberOfBitmaps; i++) {
int key = buffer.getInt();
Expand Down Expand Up @@ -233,7 +233,7 @@ static RoaringBitmap[] deserialize(ByteBuffer buffer) throws IOException {
////////////////////////////////////////////////////////////////////////////////

static Tuple2<Integer, Integer> decomposeHighLowBytes(long value) {
return new Tuple2(highBytes(value), lowBytes(value));
return new Tuple2<>(highBytes(value), lowBytes(value));
}

public void add(long value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ public ActionsIterator(
this.checkpointPredicate = checkpointPredicate;
this.filesList = new LinkedList<>();
this.filesList.addAll(
files.stream()
.map(file -> DeltaLogFile.forCommitOrCheckpoint(file))
.collect(Collectors.toList()));
files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList()));
this.readSchema = readSchema;
this.actionsIter = Optional.empty();
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static DeltaLogFile forCommitOrCheckpoint(FileStatus file) {
} else if (FileNames.isClassicCheckpointFile(fileName)) {
logType = LogType.CHECKPOINT_CLASSIC;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isMulitPartCheckpointFile(fileName)) {
} else if (FileNames.isMultiPartCheckpointFile(fileName)) {
logType = LogType.MULTIPART_CHECKPOINT;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isV2CheckpointFile(fileName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public StatsSchemaHelper(StructType dataSchema) {
public Tuple2<Column, Optional<Expression>> getMinColumn(Column column) {
checkArgument(
isSkippingEligibleMinMaxColumn(column),
String.format("%s is not a valid min column for data schema %s", column, dataSchema));
"%s is not a valid min column for data schema %s",
column,
dataSchema);
return new Tuple2<>(getStatsColumn(column, MIN), Optional.empty());
}

Expand All @@ -138,7 +140,9 @@ public Tuple2<Column, Optional<Expression>> getMinColumn(Column column) {
public Tuple2<Column, Optional<Expression>> getMaxColumn(Column column) {
checkArgument(
isSkippingEligibleMinMaxColumn(column),
String.format("%s is not a valid min column for data schema %s", column, dataSchema));
"%s is not a valid min column for data schema %s",
column,
dataSchema);
DataType dataType = logicalToDataType.get(column);
Column maxColumn = getStatsColumn(column, MAX);

Expand All @@ -164,8 +168,9 @@ public Tuple2<Column, Optional<Expression>> getMaxColumn(Column column) {
public Column getNullCountColumn(Column column) {
checkArgument(
isSkippingEligibleNullCountColumn(column),
String.format(
"%s is not a valid null_count column for data schema %s", column, dataSchema));
"%s is not a valid null_count column for data schema %s",
column,
dataSchema);
return getStatsColumn(column, NULL_COUNT);
}

Expand Down Expand Up @@ -279,7 +284,9 @@ private static StructType getNullCountSchema(StructType dataSchema) {
private Column getStatsColumn(Column column, String statType) {
checkArgument(
logicalToPhysicalColumn.containsKey(column),
String.format("%s is not a valid leaf column for data schema", column, dataSchema));
"%s is not a valid leaf column for data schema: %s",
column,
dataSchema);
return getChildColumn(logicalToPhysicalColumn.get(column), statType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,15 @@ public static void verifyDeltaVersions(
v -> {
checkArgument(
!versions.isEmpty() && Objects.equals(versions.get(0), v),
format("Did not get the first delta file version %s to compute Snapshot", v));
"Did not get the first delta file version %s to compute Snapshot",
v);
});
expectedEndVersion.ifPresent(
v -> {
checkArgument(
!versions.isEmpty() && Objects.equals(versions.get(versions.size() - 1), v),
format("Did not get the last delta file version %s to compute Snapshot", v));
"Did not get the last delta file version %s to compute Snapshot",
v);
});
}

Expand Down Expand Up @@ -141,9 +143,9 @@ public Snapshot getSnapshotAt(Engine engine, long version) throws TableNotFoundE
Optional.of(version) /* versionToLoadOpt */,
Optional.empty() /* tableCommitHandlerOpt */);

// For non-coordinated commit table, the {@code getCoodinatedCommitsAwareSnapshot} will
// For non-coordinated commit table, the {@code getCoordinatedCommitsAwareSnapshot} will
// create the snapshot with the {@code logSegmentOpt} built here and will not trigger other
// operations. For coordinated commit table, the {@code getCoodinatedCommitsAwareSnapshot}
// operations. For coordinated commit table, the {@code getCoordinatedCommitsAwareSnapshot}
// will create the snapshot with the {@code logSegmentOpt} built here and will build the
// logSegment again by also fetching the unbackfilled commits from the commit coordinator.
// With the unbackfilled commits plus the backfilled commits in Delta log, a new snapshot
Expand Down Expand Up @@ -350,7 +352,9 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
v ->
checkArgument(
v >= startVersion,
format("versionToLoad=%s provided is less than startVersion=%s", v, startVersion)));
"versionToLoad=%s provided is less than startVersion=%s",
v,
startVersion));
logger.debug(
"startVersion: {}, versionToLoad: {}, coordinated commits enabled: {}",
startVersion,
Expand Down Expand Up @@ -403,7 +407,7 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
break;
}

// Ideally listFromOrNone should return lexiographically sorted
// Ideally listFromOrNone should return lexicographically sorted
// files and so maxDeltaVersionSeen should be equal to fileVersion.
// But we are being defensive here and taking max of all the
// fileVersions seen.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.util.Preconditions;
import io.delta.kernel.types.*;
import java.io.IOException;
import java.io.StringWriter;
Expand Down Expand Up @@ -206,7 +205,8 @@ private static ArrayType parseArrayType(
JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
checkArgument(
json.isObject() && json.size() == 3,
String.format("Expected JSON object with 3 fields for array data type but got:\n%s", json));
"Expected JSON object with 3 fields for array data type but got:\n%s",
json);
boolean containsNull = getBooleanField(json, "containsNull");
DataType dataType =
parseDataType(
Expand All @@ -222,7 +222,8 @@ private static MapType parseMapType(
JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
checkArgument(
json.isObject() && json.size() == 4,
String.format("Expected JSON object with 4 fields for map data type but got:\n%s", json));
"Expected JSON object with 4 fields for map data type but got:\n%s",
json);
boolean valueContainsNull = getBooleanField(json, "valueContainsNull");
DataType keyType =
parseDataType(getNonNullField(json, "keyType"), fieldPath + ".key", collationsMetadata);
Expand All @@ -238,12 +239,10 @@ private static MapType parseMapType(
private static StructType parseStructType(JsonNode json) {
checkArgument(
json.isObject() && json.size() == 2,
String.format(
"Expected JSON object with 2 fields for struct data type but got:\n%s", json));
"Expected JSON object with 2 fields for struct data type but got:\n%s",
json);
JsonNode fieldsNode = getNonNullField(json, "fields");
Preconditions.checkArgument(
fieldsNode.isArray(),
String.format("Expected array for fieldName=%s in:\n%s", "fields", json));
checkArgument(fieldsNode.isArray(), "Expected array for fieldName=%s in:\n%s", "fields", json);
Iterator<JsonNode> fields = fieldsNode.elements();
List<StructField> parsedFields = new ArrayList<>();
while (fields.hasNext()) {
Expand All @@ -257,7 +256,7 @@ private static StructType parseStructType(JsonNode json) {
* struct field </a>
*/
private static StructField parseStructField(JsonNode json) {
Preconditions.checkArgument(json.isObject(), "Expected JSON object for struct field");
checkArgument(json.isObject(), "Expected JSON object for struct field");
String name = getStringField(json, "name");
FieldMetadata metadata = parseFieldMetadata(json.get("metadata"), false);
DataType type =
Expand All @@ -282,7 +281,7 @@ private static FieldMetadata parseFieldMetadata(
return FieldMetadata.empty();
}

Preconditions.checkArgument(json.isObject(), "Expected JSON object for struct field metadata");
checkArgument(json.isObject(), "Expected JSON object for struct field metadata");
final Iterator<Map.Entry<String, JsonNode>> iterator = json.fields();
final FieldMetadata.Builder builder = FieldMetadata.builder();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -403,9 +402,8 @@ private static JsonNode getNonNullField(JsonNode rootNode, String fieldName) {

private static String getStringField(JsonNode rootNode, String fieldName) {
JsonNode node = getNonNullField(rootNode, fieldName);
Preconditions.checkArgument(
node.isTextual(),
String.format("Expected string for fieldName=%s in:\n%s", fieldName, rootNode));
checkArgument(
node.isTextual(), "Expected string for fieldName=%s in:\n%s", fieldName, rootNode);
return node.textValue(); // double check this only works for string values! and isTextual()!
}

Expand All @@ -427,9 +425,8 @@ private static FieldMetadata getCollationsMetadata(JsonNode fieldMetadata) {

private static boolean getBooleanField(JsonNode rootNode, String fieldName) {
JsonNode node = getNonNullField(rootNode, fieldName);
Preconditions.checkArgument(
node.isBoolean(),
String.format("Expected boolean for fieldName=%s in:\n%s", fieldName, rootNode));
checkArgument(
node.isBoolean(), "Expected boolean for fieldName=%s in:\n%s", fieldName, rootNode);
return node.booleanValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.delta.kernel.internal.util;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.lang.String.format;

import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
Expand All @@ -25,36 +24,31 @@
public class ExpressionUtils {
/** Return an expression cast as a predicate, throw an error if it is not a predicate */
public static Predicate asPredicate(Expression expression) {
checkArgument(
expression instanceof Predicate,
String.format("Expected predicate but got %s", expression));
checkArgument(expression instanceof Predicate, "Expected predicate but got %s", expression);
return (Predicate) expression;
}

/** Utility method to return the left child of the binary input expression */
public static Expression getLeft(Expression expression) {
List<Expression> children = expression.getChildren();
checkArgument(
children.size() == 2,
format("%s: expected two inputs, but got %s", expression, children.size()));
children.size() == 2, "%s: expected two inputs, but got %s", expression, children.size());
return children.get(0);
}

/** Utility method to return the right child of the binary input expression */
public static Expression getRight(Expression expression) {
List<Expression> children = expression.getChildren();
checkArgument(
children.size() == 2,
format("%s: expected two inputs, but got %s", expression, children.size()));
children.size() == 2, "%s: expected two inputs, but got %s", expression, children.size());
return children.get(1);
}

/** Utility method to return the single child of the unary input expression */
public static Expression getUnaryChild(Expression expression) {
List<Expression> children = expression.getChildren();
checkArgument(
children.size() == 1,
format("%s: expected one inputs, but got %s", expression, children.size()));
children.size() == 1, "%s: expected one inputs, but got %s", expression, children.size());
return children.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public static boolean isClassicCheckpointFile(String fileName) {
return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}

public static boolean isMulitPartCheckpointFile(String fileName) {
public static boolean isMultiPartCheckpointFile(String fileName) {
return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ public static Map<String, Literal> validateAndSanitizePartitionValues(
// this shouldn't happen as we have already validated the partition column names
checkArgument(
partColField != null,
"Partition column " + partColName + " is not present in the table schema");
"Partition column %s is not present in the table schema",
partColName);
DataType partColType = partColField.getDataType();

if (!partColType.equivalent(partValue.getDataType())) {
Expand Down Expand Up @@ -230,7 +231,7 @@ public static Tuple2<Predicate, Predicate> splitMetadataAndDataPredicates(
combineWithAndOp(leftResult._2, rightResult._2));
}
if (hasNonPartitionColumns(children, partitionColNames)) {
return new Tuple2(ALWAYS_TRUE, predicate);
return new Tuple2<>(ALWAYS_TRUE, predicate);
} else {
return new Tuple2<>(predicate, ALWAYS_TRUE);
}
Expand Down Expand Up @@ -354,7 +355,8 @@ public static String getTargetDirectory(
Literal partitionValue = partitionValues.get(partitionColName);
checkArgument(
partitionValue != null,
"Partition column value is missing for column: " + partitionColName);
"Partition column value is missing for column: %s",
partitionColName);
String serializedValue = serializePartitionValue(partitionValue);
if (serializedValue == null) {
// Follow the delta-spark behavior to use "__HIVE_DEFAULT_PARTITION__" for null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public static void validatePartitionColumns(StructType schema, List<String> part
partitionCol -> {
DataType dataType = columnNameToType.get(partitionCol.toLowerCase(Locale.ROOT));
checkArgument(
dataType != null,
"Partition column " + partitionCol + " not found in the schema");
dataType != null, "Partition column %s not found in the schema", partitionCol);

if (!(dataType instanceof BooleanType
|| dataType instanceof ByteType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ public void close() {

@Override
public boolean isNullAt(int rowId) {
checkArgument(rowId >= 0 && rowId < values.size(), "Invalid rowId: " + rowId);
checkArgument(rowId >= 0 && rowId < values.size(), "Invalid rowId: %s", rowId);
return values.get(rowId) == null;
}

@Override
public String getString(int rowId) {
checkArgument(rowId >= 0 && rowId < values.size(), "Invalid rowId: " + rowId);
checkArgument(rowId >= 0 && rowId < values.size(), "Invalid rowId: %s", rowId);
return values.get(rowId);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Optional<String> getVersion() {
*/
public static CollationIdentifier fromString(String identifier) {
long numDots = identifier.chars().filter(ch -> ch == '.').count();
checkArgument(numDots > 0, String.format("Invalid collation identifier: %s", identifier));
checkArgument(numDots > 0, "Invalid collation identifier: %s", identifier);
if (numDots == 1) {
String[] parts = identifier.split("\\.");
return new CollationIdentifier(parts[0], parts[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel
package io.delta.kernel.internal.checkpoints

import java.util.Optional

import scala.collection.JavaConverters._

import io.delta.kernel.internal.checkpoints.CheckpointInstance
import io.delta.kernel.internal.fs.Path
import org.scalatest.funsuite.AnyFunSuite

import java.util.Optional
import scala.collection.JavaConverters._

class CheckpointInstanceSuite extends AnyFunSuite {

private val FAKE_DELTA_LOG_PATH = new Path("/path/to/delta/log")
Expand Down
Loading
Loading