Skip to content

Commit

Permalink
[Kernel] Rename TableClient to Engine (delta-io#3015)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Renames `TableClient` to `Engine` in package names and code.

A follow-up PR will be needed to update the `USER_GUIDE` and `README`

## How was this patch tested?

Existing tests suffice.

## Does this PR introduce _any_ user-facing changes?

Yes, renames public interface.
  • Loading branch information
allisonport-db authored May 3, 2024
1 parent e7fa94d commit f727b84
Show file tree
Hide file tree
Showing 67 changed files with 536 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import io.delta.kernel.{Table, TableNotFoundException}
import io.delta.kernel.defaults.client.DefaultTableClient
import io.delta.kernel.internal.{SnapshotImpl => SnapshotImplKernel, TableImpl}
import io.delta.kernel.defaults.engine.DefaultEngine
import io.delta.kernel.internal.{TableImpl, SnapshotImpl => SnapshotImplKernel}
import io.delta.standalone.VersionLog
import io.delta.standalone.actions.{CommitInfo => CommitInfoJ}
import io.delta.standalone.internal.{SnapshotImpl => StandaloneSnapshotImpl, InitialSnapshotImpl => StandaloneInitialSnapshotImpl}
import io.delta.standalone.internal.{InitialSnapshotImpl => StandaloneInitialSnapshotImpl, SnapshotImpl => StandaloneSnapshotImpl}
import io.delta.standalone.internal.util.{Clock, SystemClock}

class KernelOptTxn(kernelDeltaLog: KernelDeltaLogDelegator, kernelSnapshot: KernelSnapshotDelegator)
Expand All @@ -44,7 +44,7 @@ class KernelOptTxn(kernelDeltaLog: KernelDeltaLogDelegator, kernelSnapshot: Kern
* provides features used by flink+startTransaction.
*/
class KernelDeltaLogDelegator(
tableClient: DefaultTableClient,
engine: DefaultEngine,
table: TableImpl,
standaloneDeltaLog: DeltaLogImpl,
hadoopConf: Configuration,
Expand All @@ -68,7 +68,7 @@ class KernelDeltaLogDelegator(
override def update(): StandaloneSnapshotImpl = {
// get latest snapshot via kernel
val kernelSnapshot = try {
table.getLatestSnapshot(tableClient).asInstanceOf[SnapshotImplKernel]
table.getLatestSnapshot(engine).asInstanceOf[SnapshotImplKernel]
} catch {
case e: TableNotFoundException =>
return new StandaloneInitialSnapshotImpl(hadoopConf, logPath, this)
Expand All @@ -82,7 +82,7 @@ class KernelDeltaLogDelegator(
kernelSnapshotWrapper,
hadoopConf,
logPath,
kernelSnapshot.getVersion(tableClient), // note: tableClient isn't used
kernelSnapshot.getVersion(engine), // note: engine isn't used
this,
standaloneDeltaLog
))
Expand Down Expand Up @@ -128,9 +128,9 @@ object KernelDeltaLogDelegator {
// the kernel does not
val standaloneDeltaLog = new DeltaLogImpl(hadoopConf, logPath, dataPathFromLog, clock)
standaloneDeltaLog.ensureLogDirectoryExist()
val tableClient = DefaultTableClient.create(hadoopConf)
val table = Table.forPath(tableClient, dataPath).asInstanceOf[TableImpl]
val engine = DefaultEngine.create(hadoopConf)
val table = Table.forPath(engine, dataPath).asInstanceOf[TableImpl]
// Todo: Potentially we could get the resolved paths out of the table above
new KernelDeltaLogDelegator(tableClient, table, standaloneDeltaLog, hadoopConf, logPath, dataPathFromLog, clock)
new KernelDeltaLogDelegator(engine, table, standaloneDeltaLog, hadoopConf, logPath, dataPathFromLog, clock)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Metadata getMetadata() {
*/
@Override
public long getVersion() {
// WARNING: getVersion in SnapshotImpl currently doesn't use the table client, so we can
// WARNING: getVersion in SnapshotImpl currently doesn't use the engine so we can
// pass null, but if this changes this code could break
return kernelSnapshot.getVersion(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.apache.hadoop.conf.Configuration;

import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterator;

import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;

/**
* Base class for reading Delta Lake tables using the Delta Kernel APIs.
Expand All @@ -53,11 +53,11 @@ public abstract class BaseTableReader {
public static final int DEFAULT_LIMIT = 20;

protected final String tablePath;
protected final TableClient tableClient;
protected final Engine engine;

public BaseTableReader(String tablePath) {
this.tablePath = requireNonNull(tablePath);
this.tableClient = DefaultTableClient.create(new Configuration());
this.engine = DefaultEngine.create(new Configuration());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.commons.cli.Options;

import io.delta.kernel.*;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
Expand Down Expand Up @@ -78,15 +78,15 @@ public MultiThreadedTableReader(int numThreads, String tablePath) {

public int show(int limit, Optional<List<String>> columnsOpt, Optional<Predicate> predicate)
throws TableNotFoundException {
Table table = Table.forPath(tableClient, tablePath);
Snapshot snapshot = table.getLatestSnapshot(tableClient);
StructType readSchema = pruneSchema(snapshot.getSchema(tableClient), columnsOpt);
Table table = Table.forPath(engine, tablePath);
Snapshot snapshot = table.getLatestSnapshot(engine);
StructType readSchema = pruneSchema(snapshot.getSchema(engine), columnsOpt);

ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient)
.withReadSchema(tableClient, readSchema);
ScanBuilder scanBuilder = snapshot.getScanBuilder(engine)
.withReadSchema(engine, readSchema);

if (predicate.isPresent()) {
scanBuilder = scanBuilder.withFilter(tableClient, predicate.get());
scanBuilder = scanBuilder.withFilter(engine, predicate.get());
}

return new Reader(limit)
Expand Down Expand Up @@ -140,15 +140,15 @@ private static class ScanFile {
/**
* Get the deserialized scan state as {@link Row} object
*/
Row getScanRow(TableClient tableClient) {
return RowSerDe.deserializeRowFromJson(tableClient, stateJson);
Row getScanRow(Engine engine) {
return RowSerDe.deserializeRowFromJson(engine, stateJson);
}

/**
* Get the deserialized scan file as {@link Row} object
*/
Row getScanFileRow(TableClient tableClient) {
return RowSerDe.deserializeRowFromJson(tableClient, fileJson);
Row getScanFileRow(Engine engine) {
return RowSerDe.deserializeRowFromJson(engine, fileJson);
}
}

Expand Down Expand Up @@ -199,9 +199,9 @@ int readData(StructType readSchema, Scan scan) {

private Runnable workGenerator(Scan scan) {
return (() -> {
Row scanStateRow = scan.getScanState(tableClient);
Row scanStateRow = scan.getScanState(engine);
try(CloseableIterator<FilteredColumnarBatch> scanFileIter =
scan.getScanFiles(tableClient)) {
scan.getScanFiles(engine)) {

while (scanFileIter.hasNext() && !stopSignal.get()) {
try (CloseableIterator<Row> scanFileRows = scanFileIter.next().getRows()) {
Expand Down Expand Up @@ -231,23 +231,23 @@ private Runnable workConsumer(int workerId) {
if (work == ScanFile.POISON_PILL) {
return; // exit as there are no more work units
}
Row scanState = work.getScanRow(tableClient);
Row scanFile = work.getScanFileRow(tableClient);
Row scanState = work.getScanRow(engine);
Row scanFile = work.getScanFileRow(engine);
FileStatus fileStatus =
InternalScanFileUtils.getAddFileStatus(scanFile);
StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);

CloseableIterator<ColumnarBatch> physicalDataIter =
tableClient.getParquetHandler().readParquetFiles(
engine.getParquetHandler().readParquetFiles(
singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty());

try (
CloseableIterator<FilteredColumnarBatch> dataIter =
Scan.transformPhysicalData(
tableClient,
engine,
scanState,
scanFile,
physicalDataIter)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public SingleThreadedTableReader(String tablePath) {
@Override
public int show(int limit, Optional<List<String>> columnsOpt, Optional<Predicate> predicate)
throws TableNotFoundException, IOException {
Table table = Table.forPath(tableClient, tablePath);
Snapshot snapshot = table.getLatestSnapshot(tableClient);
StructType readSchema = pruneSchema(snapshot.getSchema(tableClient), columnsOpt);
Table table = Table.forPath(engine, tablePath);
Snapshot snapshot = table.getLatestSnapshot(engine);
StructType readSchema = pruneSchema(snapshot.getSchema(engine), columnsOpt);

ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient)
.withReadSchema(tableClient, readSchema);
ScanBuilder scanBuilder = snapshot.getScanBuilder(engine)
.withReadSchema(engine, readSchema);

if (predicate.isPresent()) {
scanBuilder = scanBuilder.withFilter(tableClient, predicate.get());
scanBuilder = scanBuilder.withFilter(engine, predicate.get());
}

return readData(readSchema, scanBuilder.build(), limit);
Expand Down Expand Up @@ -95,13 +95,13 @@ public static void main(String[] args)
private int readData(StructType readSchema, Scan scan, int maxRowCount) throws IOException {
printSchema(readSchema);

Row scanState = scan.getScanState(tableClient);
CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(tableClient);
Row scanState = scan.getScanState(engine);
CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(engine);

int readRecordCount = 0;
try {
StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
while (scanFileIter.hasNext()) {
FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
try (CloseableIterator<Row> scanFileRows = scanFilesBatch.getRows()) {
Expand All @@ -110,14 +110,14 @@ private int readData(StructType readSchema, Scan scan, int maxRowCount) throws I
FileStatus fileStatus =
InternalScanFileUtils.getAddFileStatus(scanFileRow);
CloseableIterator<ColumnarBatch> physicalDataIter =
tableClient.getParquetHandler().readParquetFiles(
engine.getParquetHandler().readParquetFiles(
singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty());
try (
CloseableIterator<FilteredColumnarBatch> transformedData =
Scan.transformPhysicalData(
tableClient,
engine,
scanState,
scanFileRow,
physicalDataIter)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.*;

Expand Down Expand Up @@ -59,12 +59,12 @@ public static String serializeRowToJson(Row row) {
/**
* Utility method to deserialize a {@link Row} object from the JSON form.
*/
public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema) {
public static Row deserializeRowFromJson(Engine engine, String jsonRowWithSchema) {
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
JsonNode schemaNode = jsonNode.get("schema");
StructType schema =
tableClient.getJsonHandler().deserializeStructType(schemaNode.asText());
engine.getJsonHandler().deserializeStructType(schemaNode.asText());
return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
} catch (JsonProcessingException ex) {
throw new UncheckedIOException(ex);
Expand Down
24 changes: 12 additions & 12 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.Optional;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.*;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
Expand All @@ -47,7 +47,7 @@ public interface Scan {
/**
* Get an iterator of data files to scan.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param engine {@link Engine} instance to use in Delta Kernel.
* @return iterator of {@link FilteredColumnarBatch}s where each selected row in
* the batch corresponds to one scan file. Schema of each row is defined as follows:
* <p>
Expand Down Expand Up @@ -87,7 +87,7 @@ public interface Scan {
* </ul></li>
* </ol>
*/
CloseableIterator<FilteredColumnarBatch> getScanFiles(TableClient tableClient);
CloseableIterator<FilteredColumnarBatch> getScanFiles(Engine engine);

/**
* Get the remaining filter that is not guaranteed to be satisfied for the data Delta Kernel
Expand All @@ -101,17 +101,17 @@ public interface Scan {
* Get the scan state associated with the current scan. This state is common across all
* files in the scan to be read.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param engine {@link Engine} instance to use in Delta Kernel.
* @return Scan state in {@link Row} format.
*/
Row getScanState(TableClient tableClient);
Row getScanState(Engine engine);

/**
* Transform the physical data read from the table data file into the logical data that expected
* out of the Delta table.
*
* @param tableClient Connector provided {@link TableClient} implementation.
* @param scanState Scan state returned by {@link Scan#getScanState(TableClient)}
* @param engine Connector provided {@link Engine} implementation.
* @param scanState Scan state returned by {@link Scan#getScanState(Engine)}
* @param scanFile Scan file from where the physical data {@code physicalDataIter} is
* read from.
* @param physicalDataIter Iterator of {@link ColumnarBatch}s containing the physical data read
Expand All @@ -123,7 +123,7 @@ public interface Scan {
* @throws IOException when error occurs while reading the data.
*/
static CloseableIterator<FilteredColumnarBatch> transformPhysicalData(
TableClient tableClient,
Engine engine,
Row scanState,
Row scanFile,
CloseableIterator<ColumnarBatch> physicalDataIter) throws IOException {
Expand All @@ -142,8 +142,8 @@ private void initIfRequired() {
if (inited) {
return;
}
physicalReadSchema = ScanStateRow.getPhysicalSchema(tableClient, scanState);
logicalReadSchema = ScanStateRow.getLogicalSchema(tableClient, scanState);
physicalReadSchema = ScanStateRow.getPhysicalSchema(engine, scanState);
logicalReadSchema = ScanStateRow.getLogicalSchema(engine, scanState);

tablePath = ScanStateRow.getTableRoot(scanState);
inited = true;
Expand Down Expand Up @@ -182,7 +182,7 @@ public FilteredColumnarBatch next() {
}
if (!dv.equals(currDV)) {
Tuple2<DeletionVectorDescriptor, RoaringBitmapArray> dvInfo =
DeletionVectorUtils.loadNewDvAndBitmap(tableClient, tablePath, dv);
DeletionVectorUtils.loadNewDvAndBitmap(engine, tablePath, dv);
this.currDV = dvInfo._1;
this.currBitmap = dvInfo._2;
}
Expand All @@ -197,7 +197,7 @@ public FilteredColumnarBatch next() {
// Add partition columns
nextDataBatch =
PartitionUtils.withPartitionColumns(
tableClient.getExpressionHandler(),
engine.getExpressionHandler(),
nextDataBatch,
InternalScanFileUtils.getPartitionValues(scanFile),
physicalReadSchema
Expand Down
10 changes: 5 additions & 5 deletions kernel/kernel-api/src/main/java/io/delta/kernel/ScanBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.delta.kernel;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;

Expand All @@ -33,21 +33,21 @@ public interface ScanBuilder {
* Apply the given filter expression to prune any files that do not contain data satisfying
* the given filter.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param predicate a {@link Predicate} to prune the metadata or data.
* @return A {@link ScanBuilder} with filter applied.
*/
ScanBuilder withFilter(TableClient tableClient, Predicate predicate);
ScanBuilder withFilter(Engine engine, Predicate predicate);

/**
* Apply the given <i>readSchema</i>. If the builder already has a projection applied, calling
* this again replaces the existing projection.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param readSchema Subset of columns to read from the Delta table.
* @return A {@link ScanBuilder} with projection pruning.
*/
ScanBuilder withReadSchema(TableClient tableClient, StructType readSchema);
ScanBuilder withReadSchema(Engine engine, StructType readSchema);

/**
* @return Build the {@link Scan instance}
Expand Down
Loading

0 comments on commit f727b84

Please sign in to comment.