Skip to content

Commit

Permalink
Convert DeltaLakeInsertTableHandle to record
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Apr 11, 2024
1 parent 807caf8 commit 694aeb5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.deltalake;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
Expand All @@ -25,76 +23,23 @@

import static java.util.Objects.requireNonNull;

public class DeltaLakeInsertTableHandle
public record DeltaLakeInsertTableHandle(
SchemaTableName tableName,
String location,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
List<DeltaLakeColumnHandle> inputColumns,
long readVersion,
boolean retriesEnabled)
implements ConnectorInsertTableHandle
{
private final SchemaTableName tableName;
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> inputColumns;
private final long readVersion;
private final boolean retriesEnabled;

@JsonCreator
public DeltaLakeInsertTableHandle(
@JsonProperty("tableName") SchemaTableName tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("inputColumns") List<DeltaLakeColumnHandle> inputColumns,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("retriesEnabled") boolean retriesEnabled)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.location = requireNonNull(location, "location is null");
this.readVersion = readVersion;
this.retriesEnabled = retriesEnabled;
}

@JsonProperty
public SchemaTableName getTableName()
{
return tableName;
}

@JsonProperty
public String getLocation()
{
return location;
}

@JsonProperty
public MetadataEntry getMetadataEntry()
{
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getInputColumns()
{
return inputColumns;
}

@JsonProperty
public long getReadVersion()
{
return readVersion;
}

@JsonProperty
public boolean isRetriesEnabled()
public DeltaLakeInsertTableHandle
{
return retriesEnabled;
requireNonNull(tableName, "tableName is null");
requireNonNull(metadataEntry, "metadataEntry is null");
requireNonNull(protocolEntry, "protocolEntry is null");
inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null"));
requireNonNull(location, "location is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1830,18 +1830,18 @@ public Optional<ConnectorOutputMetadata> finishInsert(
.map(dataFileInfoCodec::fromJson)
.collect(toImmutableList());

if (handle.isRetriesEnabled()) {
cleanExtraOutputFiles(session, Location.of(handle.getLocation()), dataFileInfos);
if (handle.retriesEnabled()) {
cleanExtraOutputFiles(session, Location.of(handle.location()), dataFileInfos);
}

boolean writeCommitted = false;
try {
IsolationLevel isolationLevel = getIsolationLevel(handle.getMetadataEntry());
AtomicReference<Long> readVersion = new AtomicReference<>(handle.getReadVersion());
IsolationLevel isolationLevel = getIsolationLevel(handle.metadataEntry());
AtomicReference<Long> readVersion = new AtomicReference<>(handle.readVersion());
long commitVersion = Failsafe.with(TRANSACTION_CONFLICT_RETRY_POLICY)
.get(context -> commitInsertOperation(session, handle, sourceTableHandles, isolationLevel, dataFileInfos, readVersion, context.getAttemptCount()));
writeCommitted = true;
writeCheckpointIfNeeded(session, handle.getTableName(), handle.getLocation(), handle.getReadVersion(), handle.getMetadataEntry().getCheckpointInterval(), commitVersion);
writeCheckpointIfNeeded(session, handle.tableName(), handle.location(), handle.readVersion(), handle.metadataEntry().getCheckpointInterval(), commitVersion);

if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty() && !dataFileInfos.isEmpty()) {
// TODO (https://github.com/trinodb/trino/issues/16088) Add synchronization when version conflict for INSERT is resolved.
Expand All @@ -1852,20 +1852,20 @@ public Optional<ConnectorOutputMetadata> finishInsert(
updateTableStatistics(
session,
Optional.empty(),
handle.getTableName(),
handle.getLocation(),
handle.tableName(),
handle.location(),
maxFileModificationTime,
computedStatistics,
getExactColumnNames(handle.getMetadataEntry()),
Optional.of(extractSchema(handle.getMetadataEntry(), handle.getProtocolEntry(), typeManager).stream()
getExactColumnNames(handle.metadataEntry()),
Optional.of(extractSchema(handle.metadataEntry(), handle.protocolEntry(), typeManager).stream()
.collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName))),
true);
}
}
catch (Exception e) {
if (!writeCommitted) {
// TODO perhaps it should happen in a background thread (https://github.com/trinodb/trino/issues/12011)
cleanupFailedWrite(session, handle.getLocation(), dataFileInfos);
cleanupFailedWrite(session, handle.location(), dataFileInfos);
}
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
}
Expand All @@ -1884,18 +1884,18 @@ private long commitInsertOperation(
throws IOException
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long currentVersion = getMandatoryCurrentVersion(fileSystem, handle.getLocation(), handle.getReadVersion());
long currentVersion = getMandatoryCurrentVersion(fileSystem, handle.location(), handle.readVersion());

List<DeltaLakeTableHandle> sameAsTargetSourceTableHandles = sourceTableHandles.stream()
.filter(sourceTableHandle -> sourceTableHandle instanceof DeltaLakeTableHandle)
.map(DeltaLakeTableHandle.class::cast)
.filter(tableHandle -> handle.getTableName().equals(tableHandle.getSchemaTableName())
.filter(tableHandle -> handle.tableName().equals(tableHandle.getSchemaTableName())
// disregard time travel table handles
&& !tableHandle.isTimeTravel())
.collect(toImmutableList());
long readVersionValue = readVersion.get();
if (currentVersion > readVersionValue) {
String transactionLogDirectory = getTransactionLogDir(handle.getLocation());
String transactionLogDirectory = getTransactionLogDir(handle.location());
for (long version = readVersionValue + 1; version <= currentVersion; version++) {
List<DeltaLakeTransactionLogEntry> transactionLogEntries;
try {
Expand Down Expand Up @@ -2012,15 +2012,15 @@ private void writeTransactionLogForInsertOperation(
throws IOException
{
// it is not obvious why we need to persist this readVersion
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, insertTableHandle.getLocation());
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, insertTableHandle.location());
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, isolationLevel, commitVersion, Instant.now().toEpochMilli(), INSERT_OPERATION, currentVersion, isBlindAppend));

ColumnMappingMode columnMappingMode = getColumnMappingMode(insertTableHandle.getMetadataEntry(), insertTableHandle.getProtocolEntry());
ColumnMappingMode columnMappingMode = getColumnMappingMode(insertTableHandle.metadataEntry(), insertTableHandle.protocolEntry());
List<String> partitionColumns = getPartitionColumns(
insertTableHandle.getMetadataEntry().getOriginalPartitionColumns(),
insertTableHandle.getInputColumns(),
insertTableHandle.metadataEntry().getOriginalPartitionColumns(),
insertTableHandle.inputColumns(),
columnMappingMode);
List<String> exactColumnNames = getExactColumnNames(insertTableHandle.getMetadataEntry());
List<String> exactColumnNames = getExactColumnNames(insertTableHandle.metadataEntry());
appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, exactColumnNames, true);

transactionLogWriter.flush();
Expand Down Expand Up @@ -2110,7 +2110,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg
List<DataFileInfo> newFiles = ImmutableList.copyOf(split.get(true));
List<DataFileInfo> cdcFiles = ImmutableList.copyOf(split.get(false));

if (mergeHandle.insertTableHandle().isRetriesEnabled()) {
if (mergeHandle.insertTableHandle().retriesEnabled()) {
cleanExtraOutputFiles(session, Location.of(handle.getLocation()), allFiles);
}

Expand Down Expand Up @@ -2138,7 +2138,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry(), handle.getProtocolEntry());
List<String> partitionColumns = getPartitionColumns(
handle.getMetadataEntry().getOriginalPartitionColumns(),
mergeHandle.insertTableHandle().getInputColumns(),
mergeHandle.insertTableHandle().inputColumns(),
columnMappingMode);

if (!cdcFiles.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId)
{
DeltaLakeInsertTableHandle tableHandle = (DeltaLakeInsertTableHandle) insertTableHandle;
MetadataEntry metadataEntry = tableHandle.getMetadataEntry();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, tableHandle.getProtocolEntry(), typeManager);
MetadataEntry metadataEntry = tableHandle.metadataEntry();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, tableHandle.protocolEntry(), typeManager);
return new DeltaLakePageSink(
typeManager.getTypeOperators(),
tableHandle.getInputColumns(),
tableHandle.getMetadataEntry().getOriginalPartitionColumns(),
tableHandle.inputColumns(),
tableHandle.metadataEntry().getOriginalPartitionColumns(),
pageIndexerFactory,
fileSystemFactory,
maxPartitionsPerWriter,
dataFileInfoCodec,
Location.of(tableHandle.getLocation()),
Location.of(tableHandle.location()),
session,
stats,
trinoVersion,
Expand Down Expand Up @@ -168,7 +168,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
DeltaLakeMergeTableHandle merge = (DeltaLakeMergeTableHandle) mergeHandle;
DeltaLakeInsertTableHandle tableHandle = merge.insertTableHandle();
ConnectorPageSink pageSink = createPageSink(transactionHandle, session, tableHandle, pageSinkId);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.metadataEntry(), tableHandle.protocolEntry(), typeManager);

return new DeltaLakeMergeSink(
typeManager.getTypeOperators(),
Expand All @@ -179,12 +179,12 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
dataFileInfoCodec,
mergeResultJsonCodec,
stats,
Location.of(tableHandle.getLocation()),
Location.of(tableHandle.location()),
pageSink,
tableHandle.getInputColumns(),
tableHandle.inputColumns(),
domainCompactionThreshold,
() -> createCdfPageSink(merge, session),
changeDataFeedEnabled(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).orElse(false),
changeDataFeedEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()).orElse(false),
parquetSchemaMapping);
}

Expand Down

0 comments on commit 694aeb5

Please sign in to comment.