Skip to content

Commit

Permalink
[Kernel] Resolve first batch of comments of cc interface
Browse files Browse the repository at this point in the history
  • Loading branch information
EstherBear committed Jul 18, 2024
1 parent c74a3bf commit e10ffd6
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 168 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
libraryDependencies ++= Seq(
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "1.7.36",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
*/
public class Commit {

private long version;
private final long version;

private FileStatus fileStatus;
private final FileStatus fileStatus;

private long commitTimestamp;
private final long commitTimestamp;

public Commit(long version, FileStatus fileStatus, long commitTimestamp) {
this.version = version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
*/
public class CommitFailedException extends Exception {

private boolean retryable;
private final boolean retryable;

private boolean conflict;
private final boolean conflict;

private String message;
private final String message;

public CommitFailedException(boolean retryable, boolean conflict, String message) {
this.retryable = retryable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
public class CommitResponse {

private Commit commit;
private final Commit commit;

public CommitResponse(Commit commit) {
this.commit = commit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
* String, Map, Long, Long)}.
*/
public class GetCommitsResponse {
private List<Commit> commits;
private final List<Commit> commits;

private long latestTableVersion;
private final long latestTableVersion;

public GetCommitsResponse(List<Commit> commits, long latestTableVersion) {
this.commits = commits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
* Protocol/Metadata
*/
public class UpdatedActions {
private AbstractCommitInfo commitInfo;
private final AbstractCommitInfo commitInfo;

private AbstractMetadata newMetadata;
private final AbstractMetadata newMetadata;

private AbstractProtocol newProtocol;
private final AbstractProtocol newProtocol;

private AbstractMetadata oldMetadata;
private final AbstractMetadata oldMetadata;

private AbstractProtocol oldProtocol;
private final AbstractProtocol oldProtocol;

public UpdatedActions(
AbstractCommitInfo commitInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
import java.util.function.Function;
import java.util.function.Predicate;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.delta.kernel.exceptions.InvalidConfigurationValueException;
import io.delta.kernel.exceptions.UnknownConfigurationException;
import io.delta.kernel.internal.actions.Metadata;
Expand All @@ -33,7 +29,6 @@
* from the table metadata.
*/
public class TableConfig<T> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* The shortest duration we have to keep logically deleted data files around before deleting
Expand Down Expand Up @@ -141,12 +136,7 @@ public class TableConfig<T> {
if (v == null) {
return Collections.emptyMap();
}
try {
return OBJECT_MAPPER.readValue(
v, new TypeReference<Map<String, String>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return parseStringToJsonMap(v);
},
value -> true,
"A string-to-string map of configuration properties for the" +
Expand All @@ -164,12 +154,7 @@ public class TableConfig<T> {
if (v == null) {
return Collections.emptyMap();
}
try {
return OBJECT_MAPPER.readValue(
v, new TypeReference<Map<String, String>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return parseStringToJsonMap(v);
},
value -> true,
"A string-to-string map of configuration properties for" +
Expand Down Expand Up @@ -275,4 +260,35 @@ private void validate(String value) {
private static void addConfig(HashMap<String, TableConfig<?>> configs, TableConfig<?> config) {
configs.put(config.getKey().toLowerCase(Locale.ROOT), config);
}

private static Map<String, String> parseStringToJsonMap(String jsonString) {
if (!jsonString.startsWith("{") || !jsonString.endsWith("}")) {
throw new IllegalArgumentException("Input string is not a valid JSON object.");
}

Map<String, String> map = new HashMap<>();
String processedString = jsonString.trim().substring(1, jsonString.length() - 1);
String[] pairs = processedString.split(",");

for (String pair : pairs) {
String[] keyValue = pair.split(":");
if (keyValue.length != 2
|| !keyValue[0].trim().matches("^\".*\"$")
|| !keyValue[1].trim().matches("^\".*\"$")) {
throw new IllegalArgumentException(
"Input string contains invalid key-value pairs.");
}

String key = keyValue[0].trim().replaceAll("^\"|\"$", "");
String value = keyValue[1].trim();

if (value.matches("^\".*\"$")) { // Value is a string
// Unescape any escaped quotes within the string
value = value.substring(1, value.length() - 1).replace("\\\"", "\"");
}
map.put(key, value);
}

return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,23 @@
package io.delta.kernel.defaults.engine;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import io.delta.storage.LogStore;
import io.delta.storage.commit.CommitCoordinatorClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import io.delta.kernel.commit.Commit;
import io.delta.kernel.commit.CommitResponse;
import io.delta.kernel.commit.GetCommitsResponse;
import io.delta.kernel.commit.UpdatedActions;
import io.delta.kernel.commit.actions.AbstractCommitInfo;
import io.delta.kernel.commit.actions.AbstractMetadata;
import io.delta.kernel.commit.actions.AbstractProtocol;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import io.delta.kernel.defaults.internal.coordinatedcommits.CommitCoordinatorProvider;
import io.delta.kernel.defaults.internal.coordinatedcommits.StorageKernelAPIAdapter;
import io.delta.kernel.defaults.internal.json.JsonUtils;
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;

Expand Down Expand Up @@ -90,8 +85,8 @@ public Map<String, String> registerTable(
return commitCoordinatorClient.registerTable(
new Path(logPath),
currentVersion,
convertAbstractMetadata(currentMetadata),
convertAbstractProtocol(currentProtocol));
StorageKernelAPIAdapter.toStorageAbstractMetadata(currentMetadata),
StorageKernelAPIAdapter.toStorageAbstractProtocol(currentProtocol));
}

@Override
Expand All @@ -103,7 +98,7 @@ public CommitResponse commit(
UpdatedActions updatedActions) {
Path path = new Path(logPath);
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
return convertCommitResponse(commitCoordinatorClient.commit(
return StorageKernelAPIAdapter.toKernelAPICommitResponse(commitCoordinatorClient.commit(
logStore,
hadoopConf,
path,
Expand All @@ -120,7 +115,7 @@ public String next() {
return JsonUtils.rowToJson(actions.next());
}
},
convertUpdatedActions(updatedActions)));
StorageKernelAPIAdapter.toStorageUpdatedActions(updatedActions)));
}

@Override
Expand All @@ -129,11 +124,12 @@ public GetCommitsResponse getCommits(
Map<String, String> tableConf,
Long startVersion,
Long endVersion) {
return convertGetCommitsResponse(commitCoordinatorClient.getCommits(
new Path(tablePath),
tableConf,
startVersion,
endVersion));
return StorageKernelAPIAdapter.toKernelAPIGetCommitsResponse(
commitCoordinatorClient.getCommits(
new Path(tablePath),
tableConf,
startVersion,
endVersion));
}

@Override
Expand Down Expand Up @@ -163,123 +159,4 @@ public Boolean semanticEquals(CommitCoordinatorClientHandler other) {
public CommitCoordinatorClient getCommitCoordinatorClient() {
return commitCoordinatorClient;
}

private io.delta.storage.commit.UpdatedActions convertUpdatedActions(
UpdatedActions updatedActions) {
if (updatedActions == null) {
return null;
}
return new io.delta.storage.commit.UpdatedActions(
convertAbstractCommitInfo(updatedActions.getCommitInfo()),
convertAbstractMetadata(updatedActions.getNewMetadata()),
convertAbstractProtocol(updatedActions.getNewProtocol()),
convertAbstractMetadata(updatedActions.getOldMetadata()),
convertAbstractProtocol(updatedActions.getOldProtocol()));
}

private CommitResponse convertCommitResponse(io.delta.storage.commit.CommitResponse response) {
return new CommitResponse(convertCommit(response.getCommit()));
}

private Commit convertCommit(io.delta.storage.commit.Commit commit) {
return new Commit(
commit.getVersion(),
convertFileStatus(commit.getFileStatus()),
commit.getCommitTimestamp());
}

private FileStatus convertFileStatus(org.apache.hadoop.fs.FileStatus hadoopFileStatus) {
return FileStatus.of(
hadoopFileStatus.getPath().toString(),
hadoopFileStatus.getLen(),
hadoopFileStatus.getModificationTime());
}

private GetCommitsResponse convertGetCommitsResponse(
io.delta.storage.commit.GetCommitsResponse response) {
List<Commit> commits = response.getCommits().stream()
.map(this::convertCommit)
.collect(Collectors.toList());
return new GetCommitsResponse(commits, response.getLatestTableVersion());
}

private io.delta.storage.commit.actions.AbstractMetadata convertAbstractMetadata(
AbstractMetadata metadata) {
return new io.delta.storage.commit.actions.AbstractMetadata() {
@Override
public String getId() {
return metadata.getId();
}

@Override
public String getName() {
return metadata.getName();
}

@Override
public String getDescription() {
return metadata.getDescription();
}

@Override
public String getProvider() {
return metadata.getProvider();
}

@Override
public Map<String, String> getFormatOptions() {
return metadata.getFormatOptions();
}

@Override
public String getSchemaString() {
return metadata.getSchemaString();
}

@Override
public List<String> getPartitionColumns() {
return metadata.getPartitionColumns();
}

@Override
public Map<String, String> getConfiguration() {
return metadata.getConfiguration();
}

@Override
public Long getCreatedTime() {
return metadata.getCreatedTime();
}
};
}

private io.delta.storage.commit.actions.AbstractProtocol convertAbstractProtocol(
AbstractProtocol protocol) {
return new io.delta.storage.commit.actions.AbstractProtocol() {
@Override
public int getMinReaderVersion() {
return protocol.getMinReaderVersion();
}

@Override
public int getMinWriterVersion() {
return protocol.getMinWriterVersion();
}

@Override
public Set<String> getReaderFeatures() {
return protocol.getReaderFeatures();
}

@Override
public Set<String> getWriterFeatures() {
return protocol.getWriterFeatures();
}
};
}

private io.delta.storage.commit.actions.AbstractCommitInfo convertAbstractCommitInfo(
AbstractCommitInfo commitInfo) {
return commitInfo::getCommitTimestamp;
}
}
Loading

0 comments on commit e10ffd6

Please sign in to comment.