Skip to content

Commit

Permalink
Add support for UPDATE in iceberg
Browse files Browse the repository at this point in the history
This commit allows users to perform row-level updates when using
the Iceberg connector with Java executors.

This is achieved by improving on the IcebergUpdatablePageSource
to implement the updateRows method. The implementation passes
a  generated row ID column as a field in the page required by
updateRows. Then during updateRows, generated a positionDelete
file entry for the row ID, and also writes the row's updated value to a
new page sink for the newly updated data.

These new files are then commited in a rowDelta transaction within
the Iceberg connector metadata after processing is complete.

Co-Authored-By: Nidhin Varghese <Nidhin.Varghese1@ibm.com>
Co-Authored-By: Anoop V S <anoop.v.s@ibm.com>
  • Loading branch information
3 people committed Dec 18, 2024
1 parent 2c55764 commit e61ff60
Show file tree
Hide file tree
Showing 31 changed files with 965 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class CommitTaskData
private final Optional<String> partitionDataJson;
private final FileFormat fileFormat;
private final Optional<String> referencedDataFile;
private final FileContent content;

@JsonCreator
public CommitTaskData(
Expand All @@ -38,7 +39,8 @@ public CommitTaskData(
@JsonProperty("partitionSpecJson") int partitionSpecId,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("referencedDataFile") String referencedDataFile)
@JsonProperty("referencedDataFile") String referencedDataFile,
@JsonProperty("content") FileContent content)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
Expand All @@ -47,6 +49,7 @@ public CommitTaskData(
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
this.content = requireNonNull(content, "content is null");
}

@JsonProperty
Expand Down Expand Up @@ -90,4 +93,10 @@ public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}

@JsonProperty
public FileContent getContent()
{
return content;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,23 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for

return prestoFileFormat;
}

public org.apache.iceberg.FileFormat toIceberg()
{
org.apache.iceberg.FileFormat fileFormat;
switch (this) {
case ORC:
fileFormat = org.apache.iceberg.FileFormat.ORC;
break;
case PARQUET:
fileFormat = org.apache.iceberg.FileFormat.PARQUET;
break;
case AVRO:
fileFormat = org.apache.iceberg.FileFormat.AVRO;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + this);
}
return fileFormat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.hive.HivePartition;
import com.facebook.presto.hive.HiveWrittenPartitions;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.iceberg.changelog.ChangelogUtil;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.ColumnHandle;
Expand Down Expand Up @@ -69,12 +70,12 @@
import com.google.common.collect.Maps;
import io.airlift.slice.Slice;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes.None;
import org.apache.iceberg.PartitionField;
Expand All @@ -89,6 +90,7 @@
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand All @@ -110,6 +112,8 @@
import java.util.stream.Collectors;

import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveUtil.PRESTO_QUERY_ID;
import static com.facebook.presto.hive.MetadataUtils.getCombinedRemainingPredicate;
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
Expand All @@ -119,6 +123,9 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PRESTO_UPDATE_ROW_ID_COLUMN_ID;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PRESTO_UPDATE_ROW_ID_COLUMN_NAME;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBEG_COMMIT_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
Expand Down Expand Up @@ -163,6 +170,9 @@
import static com.facebook.presto.iceberg.TableStatisticsMaker.getSupportedColumnStatistics;
import static com.facebook.presto.iceberg.TypeConverter.toIcebergType;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.INSERT;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.UPDATE_AFTER;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.UPDATE_BEFORE;
import static com.facebook.presto.iceberg.changelog.ChangelogUtil.getRowTypeFromColumnMeta;
import static com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer.getEnforcedColumns;
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateBaseTableStatistics;
Expand All @@ -180,6 +190,8 @@
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;

public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -473,7 +485,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return finishWrite((IcebergOutputTableHandle) tableHandle, fragments);
return finishWrite(session, (IcebergOutputTableHandle) tableHandle, fragments, INSERT);
}

protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession session, IcebergTableHandle table, Table icebergTable)
Expand All @@ -495,10 +507,10 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se
@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return finishWrite((IcebergInsertTableHandle) insertHandle, fragments);
return finishWrite(session, (IcebergInsertTableHandle) insertHandle, fragments, INSERT);
}

private Optional<ConnectorOutputMetadata> finishWrite(IcebergWritableTableHandle writableTableHandle, Collection<Slice> fragments)
private Optional<ConnectorOutputMetadata> finishWrite(ConnectorSession session, IcebergWritableTableHandle writableTableHandle, Collection<Slice> fragments, ChangelogOperation operationType)
{
if (fragments.isEmpty()) {
transaction.commitTransaction();
Expand All @@ -511,30 +523,74 @@ private Optional<ConnectorOutputMetadata> finishWrite(IcebergWritableTableHandle
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
RowDelta rowDelta = transaction.newRowDelta();
writableTableHandle.getTableName().getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
rowDelta.validateNoConflictingDataFiles();
}

AppendFiles appendFiles = transaction.newFastAppend();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(FileFormat.fromString(writableTableHandle.getFileFormat().name()))
.withMetrics(task.getMetrics().metrics());
// Ensure a row that is updated by this commit was not deleted by a separate commit
if (operationType == UPDATE_BEFORE || operationType == UPDATE_AFTER) {
rowDelta.validateDeletedFiles();
rowDelta.validateNoConflictingDeleteFiles();
}

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
PartitionSpec partitionSpec = icebergTable.specs().get(task.getPartitionSpecId());
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
switch (task.getContent()) {
case POSITION_DELETES:
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
break;
case DATA:
DataFiles.Builder builder = DataFiles.builder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addRows(builder.build());
writtenFiles.add(task.getPath());
break;
default:
throw new UnsupportedOperationException("Unsupported task content: " + task.getContent());
}

appendFiles.appendFile(builder.build());
}

appendFiles.commit();
transaction.commitTransaction();
rowDelta.validateDataFilesExist(referencedDataFiles.build());
try {
rowDelta.set(PRESTO_QUERY_ID, session.getQueryId());
rowDelta.commit();
transaction.commitTransaction();
}
catch (ValidationException e) {
log.error(e, "ValidationException in finishWrite");
throw new PrestoException(ICEBEG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + writableTableHandle.getTableName(), e);
}

return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
.map(CommitTaskData::getPath)
Expand All @@ -544,7 +600,7 @@ private Optional<ConnectorOutputMetadata> finishWrite(IcebergWritableTableHandle
@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return IcebergColumnHandle.create(ROW_POSITION, typeManager, IcebergColumnHandle.ColumnType.REGULAR);
return IcebergColumnHandle.create(ROW_POSITION, typeManager, REGULAR);
}

@Override
Expand Down Expand Up @@ -829,7 +885,9 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
tryGetProperties(table),
tableSchemaJson,
Optional.empty(),
Optional.empty());
Optional.empty(),
Optional.empty(),
ImmutableList.of());
}

@Override
Expand Down Expand Up @@ -1082,4 +1140,67 @@ else if (tableVersion.getVersionExpressionType() instanceof VarcharType) {
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version type: " + tableVersion.getVersionType());
}

/**
* The row ID update column handle is a struct type which represents the unmodified columns of
* the query.
*
* @return A column handle for the Row ID update column.
*/
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
List<Types.NestedField> unmodifiedColumns = new ArrayList<>();
unmodifiedColumns.add(ROW_POSITION);
// Include all the non-updated columns. These are needed when writing the new data file with updated column values.
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Set<Integer> updatedFields = updatedColumns.stream()
.map(IcebergColumnHandle.class::cast)
.map(IcebergColumnHandle::getId)
.collect(toImmutableSet());
for (Types.NestedField column : SchemaParser.fromJson(table.getTableSchemaJson().get()).columns()) {
if (!updatedFields.contains(column.fieldId())) {
unmodifiedColumns.add(column);
}
}
Types.NestedField field = Types.NestedField.required(PRESTO_UPDATE_ROW_ID_COLUMN_ID, PRESTO_UPDATE_ROW_ID_COLUMN_NAME, Types.StructType.of(unmodifiedColumns));
return new IcebergColumnHandle(ColumnIdentity.createColumnIdentity(field),
toPrestoType(field.type(), typeManager),
Optional.ofNullable(field.doc()),
REGULAR);
}

@Override
public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2");
}
validateTableMode(session, icebergTable);
transaction = icebergTable.newTransaction();
return handle
.withUpdatedColumns(updatedColumns.stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList()));
}

@Override
public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName());
IcebergOutputTableHandle outputTableHandle = new IcebergOutputTableHandle(
handle.getSchemaName(),
handle.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
handle.getUpdatedColumns(),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
icebergTable.properties());
finishWrite(session, outputTableHandle, fragments, UPDATE_AFTER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class IcebergColumnHandle
public static final IcebergColumnHandle DATA_SEQUENCE_NUMBER_COLUMN_HANDLE = getIcebergColumnHandle(DATA_SEQUENCE_NUMBER);
public static final ColumnMetadata DATA_SEQUENCE_NUMBER_COLUMN_METADATA = getColumnMetadata(DATA_SEQUENCE_NUMBER);

// Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts.
public static final int PRESTO_UPDATE_ROW_ID_COLUMN_ID = Integer.MIN_VALUE;
public static final String PRESTO_UPDATE_ROW_ID_COLUMN_NAME = "$row_id";
private final ColumnIdentity columnIdentity;
private final Type type;

Expand Down Expand Up @@ -96,6 +99,12 @@ public boolean isRowPositionColumn()
return columnIdentity.getId() == ROW_POSITION.fieldId();
}

@JsonIgnore
public boolean isUpdateRowIdColumn()
{
return getId() == PRESTO_UPDATE_ROW_ID_COLUMN_ID;
}

@Override
public ColumnHandle withRequiredSubfields(List<Subfield> subfields)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public enum IcebergErrorCode
ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR),
ICEBERG_ROLLBACK_ERROR(13, EXTERNAL),
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR),
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL);
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL),
ICEBEG_COMMIT_ERROR(16, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Loading

0 comments on commit e61ff60

Please sign in to comment.