From 902830369662f5a84e987b3a97e23f916da104ca Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Fri, 29 Mar 2024 11:17:54 -0500 Subject: [PATCH] Hudi uniform support (#2333) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Uniform) ## Description - This change aims to add support for Hudi in Uniform - The changes were mostly copied from [OneTable](https://github.com/onetable-io/onetable) which has a working version of Delta to Hudi already ## How was this patch tested? Some basic tests are added ## Does this PR introduce _any_ user-facing changes? Yes, this allows users to expose their Delta tables as Hudi --- build.sbt | 53 ++- hudi/README.md | 22 ++ .../hudi/HudiConversionTransaction.scala | 357 ++++++++++++++++++ .../spark/sql/delta/hudi/HudiConverter.scala | 336 +++++++++++++++++ .../sql/delta/hudi/HudiSchemaUtils.scala | 86 +++++ .../sql/delta/hudi/HudiTransactionUtils.scala | 141 +++++++ .../sql/delta/hudi/ConvertToHudiSuite.scala | 293 ++++++++++++++ .../apache/spark/sql/delta/DeltaErrors.scala | 29 ++ .../sql/delta/OptimisticTransaction.scala | 3 +- .../sql/delta/ProvidesUniFormConverters.scala | 18 + .../spark/sql/delta/UniversalFormat.scala | 38 +- .../commands/CreateDeltaTableCommand.scala | 8 +- .../sql/delta/hooks/HudiConverterHook.scala | 49 +++ .../sql/delta/sources/DeltaSQLConf.scala | 9 + .../spark/sql/delta/DeltaVacuumSuite.scala | 15 + 15 files changed, 1452 insertions(+), 5 deletions(-) create mode 100644 hudi/README.md create mode 100644 hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala create mode 100644 hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala create mode 100644 hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala create mode 100644 hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala create mode 100644 hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/hooks/HudiConverterHook.scala diff --git a/build.sbt b/build.sbt index f7fc22399ee..004f9fe972c 100644 --- a/build.sbt +++ b/build.sbt @@ -484,6 +484,57 @@ lazy val icebergShaded = (project in file("icebergShaded")) // Make the 'compile' invoke the 'assembly' task to generate the uber jar. ) +lazy val hudi = (project in file("hudi")) + .dependsOn(spark % "compile->compile;test->test;provided->provided") + .settings ( + name := "delta-hudi", + commonSettings, + scalaStyleSettings, + releaseSettings, + libraryDependencies ++= Seq( + "org.apache.hudi" % "hudi-java-client" % "0.14.0" % "compile" excludeAll( + ExclusionRule(organization = "org.apache.hadoop"), + ExclusionRule(organization = "org.apache.zookeeper"), + ), + "org.apache.spark" %% "spark-avro" % sparkVersion % "test" excludeAll ExclusionRule(organization = "org.apache.hadoop"), + "org.apache.parquet" % "parquet-avro" % "1.12.3" % "compile" + ), + assembly / assemblyJarName := s"${name.value}-assembly_${scalaBinaryVersion.value}-${version.value}.jar", + assembly / logLevel := Level.Info, + assembly / test := {}, + assembly / assemblyMergeStrategy := { + // Project hudi `dependsOn` spark and accidentally brings in it, along with its + // compile-time dependencies (like delta-storage). We want these excluded from the + // delta-hudi jar. + case PathList("io", "delta", xs @ _*) => + // - delta-storage will bring in classes: io/delta/storage + // - delta-spark will bring in classes: io/delta/exceptions/, io/delta/implicits, + // io/delta/package, io/delta/sql, io/delta/tables, + MergeStrategy.discard + case PathList("com", "databricks", xs @ _*) => + // delta-spark will bring in com/databricks/spark/util + MergeStrategy.discard + case PathList("org", "apache", "spark", "sql", "delta", "hudi", xs @ _*) => + MergeStrategy.first + case PathList("org", "apache", "spark", xs @ _*) => + MergeStrategy.discard + // Discard `module-info.class` to fix the `different file contents found` error. + // TODO Upgrade SBT to 1.5 which will do this automatically + case "module-info.class" => MergeStrategy.discard + // Discard unused `parquet.thrift` so that we don't conflict the file used by the user + case "parquet.thrift" => MergeStrategy.discard + // Hudi metadata writer requires this service file to be present on the classpath + case "META-INF/services/org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceFactory" => MergeStrategy.first + // Discard the jackson service configs that we don't need. These files are not shaded so + // adding them may conflict with other jackson version used by the user. + case PathList("META-INF", "services", xs @ _*) => MergeStrategy.discard + case x => + MergeStrategy.first + }, + // Make the 'compile' invoke the 'assembly' task to generate the uber jar. + Compile / packageBin := assembly.value + ) + lazy val hive = (project in file("connectors/hive")) .dependsOn(standaloneCosmetic) .settings ( @@ -1120,7 +1171,7 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir") // Don't use these groups for any other projects lazy val sparkGroup = project - .aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing) + .aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing, hudi) .settings( // crossScalaVersions must be set to Nil on the aggregating project crossScalaVersions := Nil, diff --git a/hudi/README.md b/hudi/README.md new file mode 100644 index 00000000000..54b6f40634e --- /dev/null +++ b/hudi/README.md @@ -0,0 +1,22 @@ +# Converting to Hudi with UniForm +## Create a table with Hudi UniForm enabled +Using spark-sql you can create a table and insert a few records into it. You will need to include the delta-hudi-assembly jar on the path. +``` +spark-sql --packages io.delta:delta-spark_2.12:3.2.0-SNAPSHOT --jars delta-hudi-assembly_2.12-3.2.0-SNAPSHOT.jar --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" +``` +Then you can create a table with Hudi UniForm enabled. +``` +CREATE TABLE `delta_table_with_hudi` (col1 INT) USING DELTA TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') LOCATION '/tmp/delta-table-with-hudi'; +``` +And insert a record into it. +``` +INSERT INTO delta_table_with_hudi VALUES (1); +``` + +## Read the table with Hudi +Hudi does not currently support spark 3.5.X so you will need to launch a spark shell with spark 3.4.X or earlier. +Instructions for launching the spark-shell with Hudi can be found [here](https://hudi.apache.org/docs/quick-start-guide#spark-shellsql). +After launching the shell, you can read the table by enabling the hudi metadata table in the reader and loading from the path used in the create table step. +```scala +val df = spark.read.format("hudi").option("hoodie.metadata.enable", "true").load("/tmp/delta-table-with-hudi") +``` \ No newline at end of file diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala new file mode 100644 index 00000000000..5eff0192a18 --- /dev/null +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala @@ -0,0 +1,357 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hudi + +import org.apache.avro.Schema + +import scala.util.control.NonFatal +import org.apache.spark.sql.delta.Snapshot +import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.hudi.HudiSchemaUtils._ +import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._ +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hudi.avro.model.HoodieActionInstant +import org.apache.hudi.avro.model.HoodieCleanFileInfo +import org.apache.hudi.avro.model.HoodieCleanerPlan +import org.apache.hudi.client.HoodieJavaWriteClient +import org.apache.hudi.client.HoodieTimelineArchiver +import org.apache.hudi.client.WriteStatus +import org.apache.hudi.client.common.HoodieJavaEngineContext +import org.apache.hudi.common.HoodieCleanStat +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.engine.HoodieEngineContext +import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieBaseFile, HoodieCleaningPolicy} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, HoodieTimeline, TimelineMetadataUtils} +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.{MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH, SECS_INSTANT_ID_LENGTH, SECS_INSTANT_TIMESTAMP_FORMAT} +import org.apache.hudi.common.util.CleanerUtils +import org.apache.hudi.common.util.ExternalFilePathUtil +import org.apache.hudi.common.util.{Option => HudiOption} +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.config.HoodieArchivalConfig +import org.apache.hudi.config.HoodieCleanConfig +import org.apache.hudi.config.HoodieIndexConfig +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY +import org.apache.hudi.table.HoodieJavaTable +import org.apache.hudi.table.action.clean.CleanPlanner + +import java.io.{IOException, UncheckedIOException} +import java.time.{Instant, LocalDateTime, ZoneId} +import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException} +import java.time.temporal.{ChronoField, ChronoUnit} +import java.util +import java.util.stream.Collectors +import java.util.{Collections, Properties} +import collection.mutable._ +import scala.collection.JavaConverters._ + +/** + * Used to prepare (convert) and then commit a set of Delta actions into the Hudi table located + * at the same path as [[postCommitSnapshot]] + * + * + * @param conf Configuration for Hudi Hadoop interactions. + * @param postCommitSnapshot Latest Delta snapshot associated with this Hudi commit. + */ +class HudiConversionTransaction( + protected val conf: Configuration, + protected val postCommitSnapshot: Snapshot, + protected val providedMetaClient: HoodieTableMetaClient, + protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging { + + ////////////////////// + // Member variables // + ////////////////////// + + private val tablePath = postCommitSnapshot.deltaLog.dataPath + private val hudiSchema: Schema = + convertDeltaSchemaToHudiSchema(postCommitSnapshot.metadata.schema) + private var metaClient = providedMetaClient + private val instantTime = convertInstantToCommit( + Instant.ofEpochMilli(postCommitSnapshot.timestamp)) + private var writeStatuses: util.List[WriteStatus] = Collections.emptyList[WriteStatus] + private var partitionToReplacedFileIds: util.Map[String, util.List[String]] = + Collections.emptyMap[String, util.List[String]] + + private val version = postCommitSnapshot.version + /** Tracks if this transaction has already committed. You can only commit once. */ + private var committed = false + + ///////////////// + // Public APIs // + ///////////////// + + def setCommitFileUpdates(actions: scala.collection.Seq[Action]): Unit = { + // for all removed files, group by partition path and then map to + // the file group ID (name in this case) + partitionToReplacedFileIds = actions + .map(_.wrap) + .filter(action => action.remove != null) + .map(_.remove) + .map(remove => { + val path = remove.toPath + val partitionPath = getPartitionPath(tablePath, path) + (partitionPath, path.getName)}) + .groupBy(_._1).map(v => (v._1, v._2.map(_._2).asJava)) + .asJava + // Convert the AddFiles to write statuses for the commit + writeStatuses = actions + .map(_.wrap) + .filter(action => action.add != null) + .map(_.add) + .map(add => { + convertAddFile(add, tablePath, instantTime) + }) + .asJava + } + + def commit(): Unit = { + assert(!committed, "Cannot commit. Transaction already committed.") + val writeConfig = getWriteConfig(hudiSchema, getNumInstantsToRetain, 10, 7*24) + val engineContext: HoodieEngineContext = new HoodieJavaEngineContext(metaClient.getHadoopConf) + val writeClient = new HoodieJavaWriteClient[AnyRef](engineContext, writeConfig) + try { + writeClient.startCommitWithTime(instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION) + metaClient.getActiveTimeline.transitionReplaceRequestedToInflight( + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, + instantTime), + HudiOption.empty[Array[Byte]]) + val syncMetadata: Map[String, String] = Map( + HudiConverter.DELTA_VERSION_PROPERTY -> version.toString, + HudiConverter.DELTA_TIMESTAMP_PROPERTY -> postCommitSnapshot.timestamp.toString) + writeClient.commit(instantTime, + writeStatuses, + HudiOption.of(syncMetadata.asJava), + HoodieTimeline.REPLACE_COMMIT_ACTION, + partitionToReplacedFileIds) + // if the metaclient was created before the table's first commit, we need to reload it to + // pick up the metadata table context + if (!metaClient.getTableConfig.isMetadataTableAvailable) { + metaClient = HoodieTableMetaClient.reload(metaClient) + } + val table = HoodieJavaTable.create(writeClient.getConfig, engineContext, metaClient) + // clean up old commits and archive them + markInstantsAsCleaned(table, writeClient.getConfig, engineContext) + runArchiver(table, writeClient.getConfig, engineContext) + } catch { + case NonFatal(e) => + recordHudiCommit(Some(e)) + throw e + } finally { + if (writeClient != null) writeClient.close() + recordHudiCommit() + } + committed = true + } + + //////////////////// + // Helper Methods // + //////////////////// + + private def getNumInstantsToRetain = { + val commitCutoff = convertInstantToCommit( + parseFromInstantTime(instantTime).minus(7*24, ChronoUnit.HOURS)) + // count number of completed commits after the cutoff + metaClient + .getActiveTimeline + .filterCompletedInstants + .findInstantsAfter(commitCutoff) + .countInstants + } + + private def markInstantsAsCleaned(table: HoodieJavaTable[_], + writeConfig: HoodieWriteConfig, engineContext: HoodieEngineContext): Unit = { + val planner = new CleanPlanner(engineContext, table, writeConfig) + val earliestInstant = planner.getEarliestCommitToRetain + // since we're retaining based on time, we should exit early if earliestInstant is empty + if (!earliestInstant.isPresent) return + var partitionsToClean: util.List[String] = null + try partitionsToClean = planner.getPartitionPathsToClean(earliestInstant) + catch { + case ex: IOException => + throw new UncheckedIOException("Unable to get partitions to clean", ex) + } + if (partitionsToClean.isEmpty) return + val activeTimeline = metaClient.getActiveTimeline + val fsView = table.getHoodieView + val cleanInfoPerPartition = partitionsToClean.asScala.map(partition => + Pair.of(partition, planner.getDeletePaths(partition, earliestInstant))) + .filter(deletePaths => !deletePaths.getValue.getValue.isEmpty) + .map(deletePathsForPartition => deletePathsForPartition.getKey -> { + val partition = deletePathsForPartition.getKey + // we need to manipulate the path to properly clean from the metadata table, + // so we map the file path to the base file + val baseFiles = fsView.getAllReplacedFileGroups(partition) + .flatMap(fileGroup => fileGroup.getAllBaseFiles) + .collect(Collectors.toList[HoodieBaseFile]) + val baseFilesByPath = baseFiles.asScala + .map(baseFile => baseFile.getPath -> baseFile).toMap + deletePathsForPartition.getValue.getValue.asScala.map(cleanFileInfo => { + val baseFile = baseFilesByPath.getOrElse(cleanFileInfo.getFilePath, null) + new HoodieCleanFileInfo(ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker( + baseFile.getFileName, baseFile.getCommitTime), false) + }).asJava + }).toMap.asJava + // there is nothing to clean, so exit early + if (cleanInfoPerPartition.isEmpty) return + // create a clean instant write after this latest commit + val cleanTime = convertInstantToCommit(parseFromInstantTime(instantTime) + .plus(1, ChronoUnit.SECONDS)) + // create a metadata table writer in order to mark files as deleted in the table + // the deleted entries are cleaned up in the metadata table during compaction to control the + // growth of the table + val hoodieTableMetadataWriter = table.getMetadataWriter(cleanTime).get + try { + val earliestInstantToRetain = earliestInstant + .map[HoodieActionInstant]((earliestInstantToRetain: HoodieInstant) => + new HoodieActionInstant( + earliestInstantToRetain.getTimestamp, + earliestInstantToRetain.getAction, + earliestInstantToRetain.getState.name)) + .orElse(null) + val cleanerPlan = new HoodieCleanerPlan(earliestInstantToRetain, instantTime, + writeConfig.getCleanerPolicy.name, Collections.emptyMap[String, util.List[String]], + CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanInfoPerPartition, + Collections.emptyList[String]) + // create a clean instant and mark it as requested with the clean plan + val requestedCleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, + HoodieTimeline.CLEAN_ACTION, cleanTime) + activeTimeline.saveToCleanRequested( + requestedCleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)) + val inflightClean = activeTimeline + .transitionCleanRequestedToInflight(requestedCleanInstant, HudiOption.empty[Array[Byte]]) + val cleanStats = cleanInfoPerPartition.entrySet.asScala.map(entry => { + val partitionPath = entry.getKey + val deletePaths = entry.getValue.asScala.map(_.getFilePath).asJava + new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, partitionPath, deletePaths, + deletePaths, Collections.emptyList[String], earliestInstant.get.getTimestamp, instantTime) + }).toSeq.asJava + val cleanMetadata = + CleanerUtils.convertCleanMetadata(cleanTime, HudiOption.empty[java.lang.Long], cleanStats) + // update the metadata table with the clean metadata so the files' metadata are marked for + // deletion + hoodieTableMetadataWriter.performTableServices(HudiOption.empty[String]) + hoodieTableMetadataWriter.update(cleanMetadata, cleanTime) + // mark the commit as complete on the table timeline + activeTimeline.transitionCleanInflightToComplete(inflightClean, + TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata)) + } catch { + case ex: IOException => + throw new UncheckedIOException("Unable to clean Hudi timeline", ex) + } finally if (hoodieTableMetadataWriter != null) hoodieTableMetadataWriter.close() + } + + private def runArchiver(table: HoodieJavaTable[_ <: HoodieAvroPayload], + config: HoodieWriteConfig, engineContext: HoodieEngineContext): Unit = { + // trigger archiver manually + val archiver = new HoodieTimelineArchiver(config, table) + archiver.archiveIfRequired(engineContext, true) + } + + private def getWriteConfig(schema: Schema, numCommitsToKeep: Int, + maxNumDeltaCommitsBeforeCompaction: Int, timelineRetentionInHours: Int) = { + val properties = new Properties + properties.setProperty(HoodieMetadataConfig.AUTO_INITIALIZE.key, "false") + HoodieWriteConfig.newBuilder + .withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(INMEMORY).build) + .withPath(metaClient.getBasePathV2.toString) + .withPopulateMetaFields(metaClient.getTableConfig.populateMetaFields) + .withEmbeddedTimelineServerEnabled(false) + .withSchema(if (schema == null) "" else schema.toString) + .withArchivalConfig(HoodieArchivalConfig.newBuilder + .archiveCommitsWith(Math.max(0, numCommitsToKeep - 1), Math.max(1, numCommitsToKeep)) + .withAutoArchive(false) + .build) + .withCleanConfig( + HoodieCleanConfig.newBuilder + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) + .cleanerNumHoursRetained(timelineRetentionInHours) + .withAutoClean(false) + .build) + .withMetadataConfig(HoodieMetadataConfig.newBuilder + .enable(true) + .withProperties(properties) + .withMetadataIndexColumnStats(true) + .withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommitsBeforeCompaction) + .build) + .build + } + + /** + * Copied mostly from {@link + * org.apache.hudi.common.table.timeline.HoodieActiveTimeline#parseDateFromInstantTime(String)} + * but forces the timestamp to use UTC unlike the Hudi code. + * + * @param timestamp input commit timestamp + * @return timestamp parsed as Instant + */ + private def parseFromInstantTime(timestamp: String): Instant = { + try { + var timestampInMillis: String = timestamp + if (isSecondGranularity(timestamp)) { + timestampInMillis = timestamp + "999" + } + else { + if (timestamp.length > MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) { + timestampInMillis = timestamp.substring(0, MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) + } + } + val dt: LocalDateTime = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER) + dt.atZone(ZoneId.of("UTC")).toInstant + } catch { + case ex: DateTimeParseException => + throw new RuntimeException("Unable to parse date from commit timestamp: " + timestamp, ex) + } + } + + private def isSecondGranularity(instant: String) = instant.length == SECS_INSTANT_ID_LENGTH + + private def convertInstantToCommit(instant: Instant): String = { + val instantTime = instant.atZone(ZoneId.of("UTC")).toLocalDateTime + HoodieInstantTimeGenerator.getInstantFromTemporalAccessor(instantTime) + } + + private def recordHudiCommit(errorOpt: Option[Throwable] = None): Unit = { + + val errorData = errorOpt.map { e => + Map( + "exception" -> ExceptionUtils.getMessage(e), + "stackTrace" -> ExceptionUtils.getStackTrace(e) + ) + }.getOrElse(Map.empty) + + + recordDeltaEvent( + postCommitSnapshot.deltaLog, + s"delta.hudi.conversion.commit.${if (errorOpt.isEmpty) "success" else "error"}", + data = Map( + "version" -> postCommitSnapshot.version, + "timestamp" -> postCommitSnapshot.timestamp, + "prevConvertedDeltaVersion" -> lastConvertedDeltaVersion + ) ++ errorData + ) + } + + private val MILLIS_INSTANT_TIME_FORMATTER = new DateTimeFormatterBuilder() + .appendPattern(SECS_INSTANT_TIMESTAMP_FORMAT) + .appendValue(ChronoField.MILLI_OF_SECOND, 3) + .toFormatter + .withZone(ZoneId.of("UTC")) +} diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala new file mode 100644 index 00000000000..18eade6b43d --- /dev/null +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala @@ -0,0 +1,336 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hudi + +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.hooks.HudiConverterHook +import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._ +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta._ + +import java.io.{IOException, UncheckedIOException} +import java.util.concurrent.atomic.AtomicReference +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +object HudiConverter { + /** + * Property to be set in translated Hudi commit metadata. + * Indicates the delta commit version # that it corresponds to. + */ + val DELTA_VERSION_PROPERTY = "delta-version" + + /** + * Property to be set in translated Hudi commit metadata. + * Indicates the timestamp (milliseconds) of the delta commit that it corresponds to. + */ + val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp" +} + +/** + * This class manages the transformation of delta snapshots into their Hudi equivalent. + */ +class HudiConverter(spark: SparkSession) + extends UniversalFormatConverter(spark) + with DeltaLogging { + + // Save an atomic reference of the snapshot being converted, and the txn that triggered + // resulted in the specified snapshot + protected val currentConversion = + new AtomicReference[(Snapshot, OptimisticTransactionImpl)]() + protected val standbyConversion = + new AtomicReference[(Snapshot, OptimisticTransactionImpl)]() + + // Whether our async converter thread is active. We may already have an alive thread that is + // about to shutdown, but in such cases this value should return false. + @GuardedBy("asyncThreadLock") + private var asyncConverterThreadActive: Boolean = false + private val asyncThreadLock = new Object + + /** + * Enqueue the specified snapshot to be converted to Hudi. This will start an async + * job to run the conversion, unless there already is an async conversion running for + * this table. In that case, it will queue up the provided snapshot to be run after + * the existing job completes. + * Note that if there is another snapshot already queued, the previous snapshot will get + * removed from the wait queue. Only one snapshot is queued at any point of time. + * + */ + override def enqueueSnapshotForConversion( + snapshotToConvert: Snapshot, + txn: OptimisticTransactionImpl): Unit = { + if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) { + return + } + val log = snapshotToConvert.deltaLog + // Replace any previously queued snapshot + val previouslyQueued = standbyConversion.getAndSet((snapshotToConvert, txn)) + asyncThreadLock.synchronized { + if (!asyncConverterThreadActive) { + val threadName = HudiConverterHook.ASYNC_HUDI_CONVERTER_THREAD_NAME + + s" [id=${snapshotToConvert.metadata.id}]" + val asyncConverterThread: Thread = new Thread(threadName) { + setDaemon(true) + + override def run(): Unit = + try { + var snapshotAndTxn = getNextSnapshot + while (snapshotAndTxn != null) { + val snapshotVal = snapshotAndTxn._1 + val prevTxn = snapshotAndTxn._2 + try { + logInfo(s"Converting Delta table [path=${log.logPath}, " + + s"tableId=${log.tableId}, version=${snapshotVal.version}] into Hudi") + convertSnapshot(snapshotVal, prevTxn) + } catch { + case NonFatal(e) => + logWarning(s"Error when writing Hudi metadata asynchronously", e) + recordDeltaEvent( + log, + "delta.hudi.conversion.async.error", + data = Map( + "exception" -> ExceptionUtils.getMessage(e), + "stackTrace" -> ExceptionUtils.getStackTrace(e) + ) + ) + } + currentConversion.set(null) + // Pick next snapshot to convert if there's a new one + snapshotAndTxn = getNextSnapshot + } + } finally { + // shuttingdown thread + asyncThreadLock.synchronized { + asyncConverterThreadActive = false + } + } + + // Get a snapshot to convert from the hudiQueue. Sets the queue to null after. + private def getNextSnapshot: (Snapshot, OptimisticTransactionImpl) = + asyncThreadLock.synchronized { + val potentialSnapshotAndTxn = standbyConversion.get() + currentConversion.set(potentialSnapshotAndTxn) + standbyConversion.compareAndSet(potentialSnapshotAndTxn, null) + if (potentialSnapshotAndTxn == null) { + asyncConverterThreadActive = false + } + potentialSnapshotAndTxn + } + } + asyncConverterThread.start() + asyncConverterThreadActive = true + } + } + + // If there already was a snapshot waiting to be converted, log that snapshot info. + if (previouslyQueued != null) { + recordDeltaEvent( + snapshotToConvert.deltaLog, + "delta.hudi.conversion.async.backlog", + data = Map( + "newVersion" -> snapshotToConvert.version, + "replacedVersion" -> previouslyQueued._1.version) + ) + } + } + + /** + * Convert the specified snapshot into Hudi for the given catalogTable + * @param snapshotToConvert the snapshot that needs to be converted to Hudi + * @param catalogTable the catalogTable this conversion targets. + * @return Converted Delta version and commit timestamp + */ + override def convertSnapshot( + snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] = { + if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) { + return None + } + convertSnapshot(snapshotToConvert, None, Option.apply(catalogTable.identifier.table)) + } + + /** + * Convert the specified snapshot into Hudi when performing an OptimisticTransaction + * on a delta table. + * @param snapshotToConvert the snapshot that needs to be converted to Hudi + * @param txn the transaction that triggers the conversion. It must + * contain the catalogTable this conversion targets. + * @return Converted Delta version and commit timestamp + */ + override def convertSnapshot( + snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = { + if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) { + return None + } + convertSnapshot(snapshotToConvert, Some(txn), txn.catalogTable.map(_.identifier.table)) + } + + /** + * Convert the specified snapshot into Hudi. NOTE: This operation is blocking. Call + * enqueueSnapshotForConversion to run the operation asynchronously. + * @param snapshotToConvert the snapshot that needs to be converted to Hudi + * @param txnOpt the OptimisticTransaction that created snapshotToConvert. + * Used as a hint to avoid recomputing old metadata. + * @param catalogTable the catalogTable this conversion targets + * @return Converted Delta version and commit timestamp + */ + private def convertSnapshot( + snapshotToConvert: Snapshot, + txnOpt: Option[OptimisticTransactionImpl], + tableName: Option[String]): Option[(Long, Long)] = + recordFrameProfile("Delta", "HudiConverter.convertSnapshot") { + val log = snapshotToConvert.deltaLog + val metaClient = loadTableMetaClient(snapshotToConvert.deltaLog.dataPath.toString, + tableName, snapshotToConvert.metadata.partitionColumns, + log.newDeltaHadoopConf()) + val lastDeltaVersionConverted: Option[Long] = loadLastDeltaVersionConverted(metaClient) + val maxCommitsToConvert = + spark.sessionState.conf.getConf(DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT) + + // Nth to convert + if (lastDeltaVersionConverted.exists(_ == snapshotToConvert.version)) { + return None + } + + // Get the most recently converted delta snapshot, if applicable + val prevConvertedSnapshotOpt = (lastDeltaVersionConverted, txnOpt) match { + case (Some(version), Some(txn)) if version == txn.snapshot.version => + Some(txn.snapshot) + // Check how long it has been since we last converted to Hudi. If outside the threshold, + // fall back to state reconstruction to get the actions, to protect driver from OOMing. + case (Some(version), _) if snapshotToConvert.version - version <= maxCommitsToConvert => + try { + // TODO: We can optimize this by providing a checkpointHint to getSnapshotAt. Check if + // txn.snapshot.version < version. If true, use txn.snapshot's checkpoint as a hint. + Some(log.getSnapshotAt(version)) + } catch { + // If we can't load the file since the last time Hudi was converted, it's likely that + // the commit file expired. Treat this like a new Hudi table conversion. + case _: DeltaFileNotFoundException => None + } + case (_, _) => None + } + + val hudiTxn = new HudiConversionTransaction(log.newDeltaHadoopConf(), + snapshotToConvert, metaClient, lastDeltaVersionConverted) + + // Write out the actions taken since the last conversion (or since table creation). + // This is done in batches, with each batch corresponding either to one delta file, + // or to the specified batch size. + val actionBatchSize = + spark.sessionState.conf.getConf(DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT) + prevConvertedSnapshotOpt match { + case Some(prevSnapshot) => + // Read the actions directly from the delta json files. + // TODO: Run this as a spark job on executors + val deltaFiles = DeltaFileProviderUtils.getDeltaFilesInVersionRange( + spark, log, prevSnapshot.version + 1, snapshotToConvert.version) + + recordDeltaEvent( + snapshotToConvert.deltaLog, + "delta.hudi.conversion.deltaCommitRange", + data = Map( + "fromVersion" -> (prevSnapshot.version + 1), + "toVersion" -> snapshotToConvert.version, + "numDeltaFiles" -> deltaFiles.length + ) + ) + + val actionsToConvert = DeltaFileProviderUtils.parallelReadAndParseDeltaFilesAsIterator( + log, spark, deltaFiles) + actionsToConvert.foreach { actionsIter => + try { + actionsIter.grouped(actionBatchSize).foreach { actionStrs => + runHudiConversionForActions( + hudiTxn, + actionStrs.map(Action.fromJson)) + } + } finally { + actionsIter.close() + } + } + + // If we don't have a snapshot of the last converted version, get all the table addFiles + // (via state reconstruction). + case None => + val actionsToConvert = snapshotToConvert.allFiles.toLocalIterator().asScala + + recordDeltaEvent( + snapshotToConvert.deltaLog, + "delta.hudi.conversion.batch", + data = Map( + "version" -> snapshotToConvert.version, + "numDeltaFiles" -> snapshotToConvert.numOfFiles + ) + ) + + actionsToConvert.grouped(actionBatchSize) + .foreach { actions => + runHudiConversionForActions(hudiTxn, actions) + } + } + hudiTxn.commit() + Some(snapshotToConvert.version, snapshotToConvert.timestamp) + } + + def loadLastDeltaVersionConverted(snapshot: Snapshot, table: CatalogTable): Option[Long] = { + val metaClient = loadTableMetaClient(snapshot.deltaLog.dataPath.toString, + Option.apply(table.identifier.table), snapshot.metadata.partitionColumns, + snapshot.deltaLog.newDeltaHadoopConf()) + loadLastDeltaVersionConverted(metaClient) + } + + private def loadLastDeltaVersionConverted(metaClient: HoodieTableMetaClient): Option[Long] = { + val lastCompletedCommit = metaClient.getCommitsTimeline.filterCompletedInstants.lastInstant + if (!lastCompletedCommit.isPresent) { + return None + } + val extraMetadata = parseCommitExtraMetadata(lastCompletedCommit.get(), metaClient) + extraMetadata.get(HudiConverter.DELTA_VERSION_PROPERTY).map(_.toLong) + } + + private def parseCommitExtraMetadata(instant: HoodieInstant, + metaClient: HoodieTableMetaClient): Map[String, String] = { + try { + if (instant.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) { + HoodieReplaceCommitMetadata.fromBytes( + metaClient.getActiveTimeline.getInstantDetails(instant).get, + classOf[HoodieReplaceCommitMetadata]).getExtraMetadata.asScala.toMap + } else { + HoodieCommitMetadata.fromBytes( + metaClient.getActiveTimeline.getInstantDetails(instant).get, + classOf[HoodieCommitMetadata]).getExtraMetadata.asScala.toMap + } + } catch { + case ex: IOException => + throw new UncheckedIOException("Unable to read Hudi commit metadata", ex) + } + } + + private[delta] def runHudiConversionForActions( + hudiTxn: HudiConversionTransaction, + actionsToCommit: Seq[Action]): Unit = { + hudiTxn.setCommitFileUpdates(actionsToCommit) + } +} diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala new file mode 100644 index 00000000000..7b7fa9a6ed9 --- /dev/null +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala @@ -0,0 +1,86 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hudi + +import org.apache.avro.{LogicalTypes, Schema} +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.types._ + +import java.util +import scala.collection.JavaConverters._ + +object HudiSchemaUtils extends DeltaLogging { + + ///////////////// + // Public APIs // + ///////////////// + def convertDeltaSchemaToHudiSchema(deltaSchema: StructType): Schema = { + /** + * Recursively (i.e. for all nested elements) transforms the delta DataType `elem` into its + * corresponding Avro type. + */ + def transform[E <: DataType](elem: E, isNullable: Boolean): Schema = elem match { + case StructType(fields) => + + val avroFields: util.List[Schema.Field] = fields.map(f => + new Schema.Field( + f.name, + transform(f.dataType, f.nullable), + f.getComment().orNull)).toList.asJava + finalizeSchema( + Schema.createRecord(elem.typeName, null, null, false, avroFields), + isNullable) + // TODO: Add List and Map support: https://github.com/delta-io/delta/issues/2738 + case ArrayType(elementType, containsNull) => + throw new UnsupportedOperationException("UniForm Hudi doesn't support Array columns") + + case MapType(keyType, valueType, valueContainsNull) => + throw new UnsupportedOperationException("UniForm Hudi doesn't support Map columns") + + case atomicType: AtomicType => convertAtomic(atomicType, isNullable) + + case other => + throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Hudi") + } + + transform(deltaSchema, false) + } + + private def finalizeSchema(targetSchema: Schema, isNullable: Boolean): Schema = { + if (isNullable) return Schema.createUnion(Schema.create(Schema.Type.NULL), targetSchema) + targetSchema + } + + private def convertAtomic[E <: DataType](elem: E, isNullable: Boolean) = elem match { + case StringType => finalizeSchema(Schema.create(Schema.Type.STRING), isNullable) + case LongType => finalizeSchema(Schema.create(Schema.Type.LONG), isNullable) + case IntegerType | ShortType => finalizeSchema(Schema.create(Schema.Type.INT), isNullable) + case FloatType => finalizeSchema(Schema.create(Schema.Type.FLOAT), isNullable) + case DoubleType => finalizeSchema(Schema.create(Schema.Type.DOUBLE), isNullable) + case d: DecimalType => finalizeSchema(LogicalTypes.decimal(d.precision, d.scale) + .addToSchema(Schema.create(Schema.Type.BYTES)), isNullable) + case BooleanType => finalizeSchema(Schema.create(Schema.Type.BOOLEAN), isNullable) + case BinaryType => finalizeSchema(Schema.create(Schema.Type.BYTES), isNullable) + case DateType => finalizeSchema( + LogicalTypes.date.addToSchema(Schema.create(Schema.Type.INT)), isNullable) + case TimestampType => finalizeSchema( + LogicalTypes.timestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable) + case TimestampNTZType => finalizeSchema( + LogicalTypes.localTimestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable) + case _ => throw new UnsupportedOperationException(s"Could not convert atomic type $elem") + } +} diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala new file mode 100644 index 00000000000..b67358bc0c7 --- /dev/null +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala @@ -0,0 +1,141 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.client.WriteStatus +import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, HoodieTimelineTimeZone, HoodieDeltaWriteStat} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ExternalFilePathUtil +import org.apache.hudi.exception.TableNotFoundException +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.metering.DeltaLogging + +object HudiTransactionUtils extends DeltaLogging { + + ///////////////// + // Public APIs // + ///////////////// + def convertAddFile(addFile: AddFile, + tablePath: Path, + commitTime: String): WriteStatus = { + + val writeStatus = new WriteStatus + val path = addFile.toPath + val partitionPath = getPartitionPath(tablePath, path) + val fileName = path.getName + val fileId = fileName + val filePath = if (partitionPath.isEmpty) fileName else partitionPath + "/" + fileName + writeStatus.setFileId(fileId) + writeStatus.setPartitionPath(partitionPath) + val writeStat = new HoodieDeltaWriteStat + writeStat.setFileId(fileId) + writeStat.setPath( + ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker(filePath, commitTime)) + writeStat.setPartitionPath(partitionPath) + writeStat.setNumWrites(addFile.numLogicalRecords.getOrElse(0L)) + writeStat.setTotalWriteBytes(addFile.getFileSize) + writeStat.setFileSizeInBytes(addFile.getFileSize) + writeStatus.setStat(writeStat) + + writeStatus + } + + def getPartitionPath(tableBasePath: Path, filePath: Path): String = { + val fileName = filePath.getName + val pathStr = filePath.toUri.getPath + val tableBasePathStr = tableBasePath.toUri.getPath + if (pathStr.contains(tableBasePathStr)) { + // input file path is absolute + val startIndex = tableBasePath.toUri.getPath.length + 1 + val endIndex = pathStr.length - fileName.length - 1 + if (endIndex <= startIndex) "" + else pathStr.substring(startIndex, endIndex) + } else { + val lastSlash = pathStr.lastIndexOf("/") + if (lastSlash <= 0) "" + else pathStr.substring(0, pathStr.lastIndexOf("/")) + } + } + + /** + * Loads the meta client for the table at the base path if it exists. + * If it does not exist, initializes the Hudi table and returns the meta client. + * + * @param tableDataPath the path for the table + * @param tableName the name of the table + * @param partitionFields the fields used for partitioning + * @param conf the hadoop configuration + * @return {@link HoodieTableMetaClient} for the existing table or that was created + */ + def loadTableMetaClient(tableDataPath: String, + tableName: Option[String], + partitionFields: Seq[String], + conf: Configuration): HoodieTableMetaClient = { + try HoodieTableMetaClient.builder + .setBasePath(tableDataPath).setConf(conf) + .setLoadActiveTimelineOnLoad(false) + .build + catch { + case ex: TableNotFoundException => + log.debug("Hudi table does not exist, creating now.") + if (tableName.isEmpty) { + log.warn("No name is specified for the table. " + + "Creating a new Hudi table with a default name: 'table'.") + } + initializeHudiTable(tableDataPath, tableName.getOrElse("table"), partitionFields, conf) + } + } + + /** + * Initializes a Hudi table with the provided properties + * + * @param tableDataPath the base path for the data files in the table + * @param tableName the name of the table + * @param partitionFields the fields used for partitioning + * @param conf the hadoop configuration + * @return {@link HoodieTableMetaClient} for the table that was created + */ + private def initializeHudiTable(tableDataPath: String, + tableName: String, + partitionFields: Seq[String], + conf: Configuration): HoodieTableMetaClient = { + val keyGeneratorClass = getKeyGeneratorClass(partitionFields) + HoodieTableMetaClient + .withPropertyBuilder + .setCommitTimezone(HoodieTimelineTimeZone.UTC) + .setHiveStylePartitioningEnable(true) + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(tableName) + .setPayloadClass(classOf[HoodieAvroPayload]) + .setKeyGeneratorClassProp(keyGeneratorClass) + .setPopulateMetaFields(false) + .setPartitionFields(partitionFields.mkString(",")) + .initTable(conf, tableDataPath) + } + + private def getKeyGeneratorClass(partitionFields: Seq[String]): String = { + if (partitionFields.isEmpty) { + "org.apache.hudi.keygen.NonpartitionedKeyGenerator" + } else if (partitionFields.size > 1) { + "org.apache.hudi.keygen.CustomKeyGenerator" + } else { + "org.apache.hudi.keygen.SimpleKeyGenerator" + } + } +} diff --git a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala new file mode 100644 index 00000000000..13b91bf3520 --- /dev/null +++ b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala @@ -0,0 +1,293 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.spark.SparkContext +import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.delta.DeltaOperations.Truncate +import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction} +import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, RemoveFile} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{ManualClock, Utils} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import java.io.File +import java.time.Instant +import java.util.UUID +import java.util.stream.Collectors +import scala.collection.JavaConverters + +class ConvertToHudiSuite extends QueryTest with Eventually { + + private var _sparkSession: SparkSession = null + private var TMP_DIR: String = "" + private var testTableName: String = "" + private var testTablePath: String = "" + + override def spark: SparkSession = _sparkSession + + override def beforeAll(): Unit = { + super.beforeAll() + _sparkSession = createSparkSession() + _sparkSession.conf.set( + DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey, "true") + } + + override def beforeEach(): Unit = { + super.beforeEach() + TMP_DIR = Utils.createTempDir().getCanonicalPath + testTableName = UUID.randomUUID().toString.replace("-", "_") + testTablePath = s"$TMP_DIR/$testTableName" + } + + override def afterEach(): Unit = { + super.afterEach() + _sparkSession.sql(s"DROP TABLE IF EXISTS $testTableName") + Utils.deleteRecursively(new File(testTablePath)) + } + + override def afterAll(): Unit = { + super.afterAll() + SparkContext.getActive.foreach(_.stop()) + } + + test("basic test - managed table created with SQL") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 INT) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + _sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (123)") + verifyFilesAndSchemaMatch() + } + + test("basic test - catalog table created with DataFrame") { + withDefaultTablePropsInSQLConf(false, { + _sparkSession.range(10).write.format("delta") + .option("path", testTablePath) + .saveAsTable(testTableName) + }) + verifyFilesAndSchemaMatch() + + withDefaultTablePropsInSQLConf(false, { + _sparkSession.range(10, 20, 1) + .write.format("delta").mode("append") + .save(testTablePath) + }) + verifyFilesAndSchemaMatch() + } + + for (isPartitioned <- Seq(true, false)) { + test(s"validate multiple commits (partitioned = $isPartitioned)") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING, col3 STRING) USING DELTA + |${if (isPartitioned) "PARTITIONED BY (col3)" else ""} + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi', + | 'delta.enableDeletionVectors' = false + |)""".stripMargin) + // perform some inserts + _sparkSession.sql( + s"INSERT INTO `$testTableName` VALUES (1, 'instant1', 'a'), (2, 'instant1', 'a')") + verifyFilesAndSchemaMatch() + + _sparkSession.sql( + s"INSERT INTO `$testTableName` VALUES (3, 'instant2', 'b'), (4, 'instant2', 'b')") + verifyFilesAndSchemaMatch() + + _sparkSession.sql( + s"INSERT INTO `$testTableName` VALUES (5, 'instant3', 'b'), (6, 'instant3', 'a')") + verifyFilesAndSchemaMatch() + + // update the data from the first instant + _sparkSession.sql(s"UPDATE `$testTableName` SET col2 = 'instant4' WHERE col2 = 'instant1'") + verifyFilesAndSchemaMatch() + + // delete a single row + _sparkSession.sql(s"DELETE FROM `$testTableName` WHERE col1 = 5") + verifyFilesAndSchemaMatch() + } + } + + test("Enabling Delete Vector Throws Exception") { + intercept[DeltaUnsupportedOperationException] { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi', + | 'delta.enableDeletionVectors' = true + |)""".stripMargin) + } + } + + for (invalidFieldDef <- Seq("col3 ARRAY", "col3 MAP")) { + test(s"Table Throws Exception for Unsupported Type ($invalidFieldDef)") { + intercept[DeltaUnsupportedOperationException] { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING, $invalidFieldDef) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi', + | 'delta.enableDeletionVectors' = false + |)""".stripMargin) + } + } + } + + test("validate Hudi timeline archival and cleaning") { + val testOp = Truncate() + withDefaultTablePropsInSQLConf(true, { + val startTime = System.currentTimeMillis() - 12 * 24 * 60 * 60 * 1000 + val clock = new ManualClock(startTime) + val actualTestStartTime = System.currentTimeMillis() + val log = DeltaLog.forTable(_sparkSession, new Path(testTablePath), clock) + (1 to 20).foreach { i => + val txn = if (i == 1) startTxnWithManualLogCleanup(log) else log.startTransaction() + val file = AddFile(i.toString + ".parquet", Map.empty, 1, 1, true) :: Nil + val delete: Seq[Action] = if (i > 1) { + val timestamp = startTime + (System.currentTimeMillis() - actualTestStartTime) + RemoveFile((i - 1).toString + ".parquet", Some(timestamp), true) :: Nil + } else { + Nil + } + txn.commit(delete ++ file, testOp) + clock.advance(12.hours.toMillis) + // wait for each Hudi sync to complete + verifyNumHudiCommits(i) + } + + val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder + .setConf(log.newDeltaHadoopConf()).setBasePath(log.dataPath.toString) + .setLoadActiveTimelineOnLoad(true) + .build + // Timeline requires a clean commit for proper removal of entries from the Hudi Metadata Table + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() == 1, + "Cleaner timeline should have 1 instant") + // Older commits should move from active to archive timeline + assert(metaClient.getArchivedTimeline.getCommitsTimeline.filterInflights.countInstants == 2, + "Archived timeline should have 2 instants") + }) + } + + test("validate various data types") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 BIGINT, col2 BOOLEAN, col3 DATE, + | col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP) + | USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + val nowSeconds = Instant.now().getEpochSecond + _sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (123, true, " + + s"date(from_unixtime($nowSeconds)), 32.1, 1.23, 456, 'hello world', " + + s"timestamp(from_unixtime($nowSeconds)))") + verifyFilesAndSchemaMatch() + } + + def buildHudiMetaClient(): HoodieTableMetaClient = { + val hadoopConf: Configuration = _sparkSession.sparkContext.hadoopConfiguration + HoodieTableMetaClient.builder + .setConf(hadoopConf).setBasePath(testTablePath) + .setLoadActiveTimelineOnLoad(true) + .build + } + + def verifyNumHudiCommits(count: Integer): Unit = { + eventually(timeout(30.seconds)) { + val metaClient: HoodieTableMetaClient = buildHudiMetaClient() + val activeCommits = metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants.countInstants + val archivedCommits = metaClient.getArchivedTimeline.getCommitsTimeline + .filterCompletedInstants.countInstants + assert(activeCommits + archivedCommits == count) + } + } + + def verifyFilesAndSchemaMatch(): Unit = { + eventually(timeout(30.seconds)) { + // To avoid requiring Hudi spark dependencies, we first lookup the active base files and then + // assert by reading those active base files (parquet) directly + val hadoopConf: Configuration = _sparkSession.sparkContext.hadoopConfiguration + val metaClient: HoodieTableMetaClient = buildHudiMetaClient() + val engContext: HoodieLocalEngineContext = new HoodieLocalEngineContext(hadoopConf) + val fsView: HoodieMetadataFileSystemView = new HoodieMetadataFileSystemView(engContext, + metaClient, metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants, + HoodieMetadataConfig.newBuilder.enable(true).build) + val paths = JavaConverters.asScalaBuffer( + FSUtils.getAllPartitionPaths(engContext, testTablePath, true, false)) + .flatMap(partition => JavaConverters.asScalaBuffer(fsView.getLatestBaseFiles(partition) + .collect(Collectors.toList[HoodieBaseFile]))) + .map(baseFile => baseFile.getPath).sorted + val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema + val hudiSchemaAsStruct = SchemaConverters.toSqlType(avroSchema).dataType + .asInstanceOf[StructType] + + val deltaDF = _sparkSession.sql(s"SELECT * FROM $testTableName") + // Assert file paths are equivalent + val expectedFiles = deltaDF.inputFiles.map(path => path.substring(5)).toSeq.sorted + assert(paths.equals(expectedFiles), + s"Files do not match.\nExpected: $expectedFiles\nActual: $paths") + // Assert schemas are equal + val expectedSchema = deltaDF.schema + assert(hudiSchemaAsStruct.equals(expectedSchema), + s"Schemas do not match.\nExpected: $expectedSchema\nActual: $hudiSchemaAsStruct") + } + } + + def withDefaultTablePropsInSQLConf(enableInCommitTimestamp: Boolean, f: => Unit): Unit = { + withSQLConf( + DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey -> "name", + DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.defaultTablePropertyKey -> "hudi", + DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey -> + enableInCommitTimestamp.toString + ) { f } + } + + protected def startTxnWithManualLogCleanup(log: DeltaLog): OptimisticTransaction = { + val txn = log.startTransaction() + // This will pick up `spark.databricks.delta.properties.defaults.enableExpiredLogCleanup` to + // disable log cleanup. + txn.updateMetadata(Metadata()) + txn + } + + def createSparkSession(): SparkSession = { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + SparkSession.builder() + .master("local[*]") + .appName("UniformSession") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index a4bd7c27a27..4ce39179541 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2826,6 +2826,15 @@ trait DeltaErrorsBase cause = cause) } + def hudiClassMissing(sparkConf: SparkConf, cause: Throwable): Throwable = { + new DeltaIllegalStateException( + errorClass = "DELTA_MISSING_HUDI_CLASS", + messageParameters = Array( + generateDocsLink( + sparkConf, "/delta-utility.html#convert-a-parquet-table-to-a-delta-table")), + cause = cause) + } + def streamingMetadataEvolutionException( newSchema: StructType, newConfigs: Map[String, String], @@ -3040,6 +3049,26 @@ trait DeltaErrorsBase ) } + def uniFormHudiDeleteVectorCompat(): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_UNIVERSAL_FORMAT_VIOLATION", + messageParameters = Array( + UniversalFormat.HUDI_FORMAT, + "Requires delete vectors to be disabled." + ) + ) + } + + def uniFormHudiSchemaCompat(unsupportedType: DataType): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_UNIVERSAL_FORMAT_VIOLATION", + messageParameters = Array( + UniversalFormat.HUDI_FORMAT, + s"DataType: $unsupportedType is not currently supported." + ) + ) + } + def icebergCompatVersionMutualExclusive(version: Int): Throwable = { new DeltaUnsupportedOperationException( errorClass = "DELTA_ICEBERG_COMPAT_VIOLATION.VERSION_MUTUAL_EXCLUSIVE", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index bad9bd7edbb..b9bae649dcc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.DeletionVectorUtils import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files._ -import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory} +import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, HudiConverterHook, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory} import org.apache.spark.sql.delta.implicits.addFileEncoder import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, GetCommitsResponse, UpdatedActions} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -332,6 +332,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite // The CheckpointHook will only checkpoint if necessary, so always register it to run. registerPostCommitHook(CheckpointHook) registerPostCommitHook(IcebergConverterHook) + registerPostCommitHook(HudiConverterHook) /** The protocol of the snapshot that this transaction is reading at. */ def protocol: Protocol = newProtocol.getOrElse(snapshot.protocol) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala index 6803d1a996c..55091892489 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala @@ -44,9 +44,27 @@ trait ProvidesUniFormConverters { self: DeltaLog => throw ExceptionUtils.getRootCause(e) } + protected lazy val _hudiConverter: UniversalFormatConverter = try { + val clazz = + Utils.classForName("org.apache.spark.sql.delta.hudi.HudiConverter") + val constructor = clazz.getConstructor(classOf[SparkSession]) + constructor.newInstance(spark) + } catch { + case e: ClassNotFoundException => + logError(s"Failed to find Hudi converter class", e) + throw DeltaErrors.hudiClassMissing(spark.sparkContext.getConf, e) + case e: InvocationTargetException => + logError(s"Got error when creating an Hudi converter", e) + // The better error is within the cause + throw ExceptionUtils.getRootCause(e) + } + + /** Visible for tests (to be able to mock). */ private[delta] var testIcebergConverter: Option[UniversalFormatConverter] = None + private[delta] var testHudiConverter: Option[UniversalFormatConverter] = None def icebergConverter: UniversalFormatConverter = testIcebergConverter.getOrElse(_icebergConverter) + def hudiConverter: UniversalFormatConverter = testHudiConverter.getOrElse(_hudiConverter) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala index ffa8e3f4a28..0b202836852 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.types.{ArrayType, MapType, NullType} /** * Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature). @@ -47,12 +49,22 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable object UniversalFormat extends DeltaLogging { val ICEBERG_FORMAT = "iceberg" - val SUPPORTED_FORMATS = Set(ICEBERG_FORMAT) + val HUDI_FORMAT = "hudi" + val SUPPORTED_FORMATS = Set(HUDI_FORMAT, ICEBERG_FORMAT) def icebergEnabled(metadata: Metadata): Boolean = { DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.fromMetaData(metadata).contains(ICEBERG_FORMAT) } + def hudiEnabled(metadata: Metadata): Boolean = { + DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.fromMetaData(metadata).contains(HUDI_FORMAT) + } + + def hudiEnabled(properties: Map[String, String]): Boolean = { + properties.get(DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.key) + .exists(value => value.contains(HUDI_FORMAT)) + } + def icebergEnabled(properties: Map[String, String]): Boolean = { properties.get(DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.key) .exists(value => value.contains(ICEBERG_FORMAT)) @@ -70,10 +82,34 @@ object UniversalFormat extends DeltaLogging { newestMetadata: Metadata, isCreatingOrReorgTable: Boolean, actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = { + enforceHudiDependencies(newestMetadata, snapshot) enforceIcebergInvariantsAndDependencies( snapshot, newestProtocol, newestMetadata, isCreatingOrReorgTable, actions) } + /** + * If you are enabling Hudi, this method ensures that Deletion Vectors are not enabled. New + * conditions may be added here in the future to make sure the source is compatible with Hudi. + * @param newestMetadata the newest metadata + * @param snapshot current snapshot + * @return N/A, throws exception if condition is not met + */ + def enforceHudiDependencies(newestMetadata: Metadata, snapshot: Snapshot): Any = { + if (hudiEnabled(newestMetadata)) { + if (DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(newestMetadata)) { + throw DeltaErrors.uniFormHudiDeleteVectorCompat() + } + // TODO: remove once map/list support is added https://github.com/delta-io/delta/issues/2738 + SchemaUtils.findAnyTypeRecursively(newestMetadata.schema) { f => + f.isInstanceOf[MapType] || f.isInstanceOf[ArrayType] || f.isInstanceOf[NullType] + } match { + case Some(unsupportedType) => + throw DeltaErrors.uniFormHudiSchemaCompat(unsupportedType) + case _ => + } + } + } + /** * If you are enabling Universal Format (Iceberg), this method ensures that at least one of * IcebergCompat is enabled. If you are disabling Universal Format (Iceberg), this method diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 904c098a98e..94e1c9fde8c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -25,8 +25,7 @@ import org.apache.spark.sql.delta.DeltaColumnMapping.{dropColumnMappingMetadata, import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} import org.apache.spark.sql.delta.actions.DomainMetadata import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData -import org.apache.spark.sql.delta.hooks.{UpdateCatalog, UpdateCatalogFactory} -import org.apache.spark.sql.delta.hooks.IcebergConverterHook +import org.apache.spark.sql.delta.hooks.{HudiConverterHook, IcebergConverterHook, UpdateCatalog, UpdateCatalogFactory} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -198,6 +197,10 @@ case class CreateDeltaTableCommand( if (UniversalFormat.icebergEnabled(postCommitSnapshot.metadata)) { deltaLog.icebergConverter.convertSnapshot(postCommitSnapshot, tableWithLocation) } + + if (UniversalFormat.hudiEnabled(postCommitSnapshot.metadata)) { + deltaLog.hudiConverter.convertSnapshot(postCommitSnapshot, tableWithLocation) + } } /** @@ -682,6 +685,7 @@ case class CreateDeltaTableCommand( // During CREATE/REPLACE, we synchronously run conversion (if Uniform is enabled) so // we always remove the post commit hook here. txn.unregisterPostCommitHooksWhere(hook => hook.name == IcebergConverterHook.name) + txn.unregisterPostCommitHooksWhere(hook => hook.name == HudiConverterHook.name) txn } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/HudiConverterHook.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/HudiConverterHook.scala new file mode 100644 index 00000000000..47425923d01 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/HudiConverterHook.scala @@ -0,0 +1,49 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hooks + +import org.apache.spark.sql.delta.{OptimisticTransactionImpl, Snapshot, UniversalFormat} +import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.metering.DeltaLogging + +import org.apache.spark.sql.SparkSession + +/** Write a new Hudi commit for the version committed by the txn, if required. */ +object HudiConverterHook extends PostCommitHook with DeltaLogging { + override val name: String = "Post-commit Hudi metadata conversion" + + val ASYNC_HUDI_CONVERTER_THREAD_NAME = "async-hudi-converter" + + override def run( + spark: SparkSession, + txn: OptimisticTransactionImpl, + committedVersion: Long, + postCommitSnapshot: Snapshot, + committedActions: Seq[Action]): Unit = { + // Only convert to Hudi if the snapshot matches the version committed. + // This is to skip converting the same actions multiple times - they'll be written out + // by another commit anyways. + if (committedVersion != postCommitSnapshot.version || + !UniversalFormat.hudiEnabled(postCommitSnapshot.metadata)) { + return + } + postCommitSnapshot + .deltaLog + .hudiConverter + .enqueueSnapshotForConversion(postCommitSnapshot, txn) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 92576302264..f729dfc614b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1550,6 +1550,15 @@ trait DeltaSQLConfBase { .intConf .createWithDefault(100) + val HUDI_MAX_COMMITS_TO_CONVERT = buildConf("hudi.maxPendingCommits") + .doc(""" + |The maximum number of pending Delta commits to convert to Hudi incrementally. + |If the table hasn't been converted to Iceberg in longer than this number of commits, + |we start from scratch, replacing the previously converted Iceberg table contents. + |""".stripMargin) + .intConf + .createWithDefault(100) + val ICEBERG_MAX_ACTIONS_TO_CONVERT = buildConf("iceberg.maxPendingActions") .doc(""" |The maximum number of pending Delta actions to convert to Iceberg incrementally. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 0965a57175b..6b2fafdc189 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -1042,6 +1042,21 @@ class DeltaVacuumSuite } } + test("hudi metadata dir") { + withEnvironment { (tempDir, clock) => + spark.emptyDataset[Int].write.format("delta").save(tempDir) + val deltaLog = DeltaLog.forTable(spark, tempDir, clock) + gcTest(deltaLog, clock)( + CreateDirectory(".hoodie"), + CreateFile(".hoodie/00001.commit", false), + + AdvanceClock(defaultTombstoneInterval + 1000), + GC(dryRun = false, Seq(tempDir)), + CheckFiles(Seq(".hoodie", ".hoodie/00001.commit")) + ) + } + } + // Helper method to remove the DVs in Delta table and rewrite the data files def purgeDVs(tableName: String): Unit = { withSQLConf(