From e3ec6771f1ce290f0aa77902e87748136dd86110 Mon Sep 17 00:00:00 2001 From: Yumingxuan Guo Date: Wed, 16 Oct 2024 13:04:22 -0700 Subject: [PATCH] [Delta] Update Coordinated Commits Usage Logs (#3774) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description 1. Renames the full error message with - `"exceptionString" -> exceptionString(e)` to record the full error message; - `"exceptionClass" -> e.getClass.getName` to retain the error class information. 2. Adds field `registeredCommitCoordinators`. 3. Removes nested JSONizing. ## How was this patch tested? Existed UTs. ## Does this PR introduce _any_ user-facing changes? No. --- .../main/scala/org/apache/spark/sql/delta/Snapshot.scala | 4 +++- .../coordinatedcommits/CoordinatedCommitsUsageLogs.scala | 2 +- .../coordinatedcommits/CoordinatedCommitsUtils.scala | 9 +++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index b90f8c77a16..0ccf07e3c8d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.actions.Action.logSchema -import org.apache.spark.sql.delta.coordinatedcommits.{CommitCoordinatorClient, CoordinatedCommitsUsageLogs, CoordinatedCommitsUtils, TableCommitCoordinatorClient} +import org.apache.spark.sql.delta.coordinatedcommits.{CommitCoordinatorClient, CommitCoordinatorProvider, CoordinatedCommitsUsageLogs, CoordinatedCommitsUtils, TableCommitCoordinatorClient} import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils @@ -322,6 +322,8 @@ class Snapshot( CoordinatedCommitsUsageLogs.COMMIT_COORDINATOR_MISSING_IMPLEMENTATION_WRITE, data = Map( "commitCoordinatorName" -> coordinatorName.get, + "registeredCommitCoordinators" -> + CommitCoordinatorProvider.getRegisteredCoordinatorNames.mkString(", "), "readVersion" -> version.toString ) ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUsageLogs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUsageLogs.scala index 52cacfe55b5..ad1507730e2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUsageLogs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUsageLogs.scala @@ -44,7 +44,7 @@ object CoordinatedCommitsUsageLogs { val FS_COMMIT_COORDINATOR_LISTING_UNEXPECTED_GAPS = s"$PREFIX.listDeltaAndCheckpointFiles.unexpectedGapsInResults" - // Usage log emitted when a requested Commit Coordinator implementation is missing + // Usage log emitted when a requested Commit Coordinator implementation is missing. val COMMIT_COORDINATOR_MISSING_IMPLEMENTATION = s"$PREFIX.commitCoordinator.missingImplementation" diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala index a19c6ef1849..a731ef95fec 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala @@ -79,7 +79,12 @@ object CoordinatedCommitsUtils extends DeltaLogging { response } catch { case NonFatal(e) => - recordEvent(Map("exception" -> Utils.exceptionString(e))) + recordEvent( + Map( + "exceptionClass" -> e.getClass.getName, + "exceptionString" -> Utils.exceptionString(e) + ) + ) throw e } } @@ -168,7 +173,7 @@ object CoordinatedCommitsUtils extends DeltaLogging { "commitCoordinatorName" -> commitCoordinatorStr, "registeredCommitCoordinators" -> CommitCoordinatorProvider.getRegisteredCoordinatorNames.mkString(", "), - "commitCoordinatorConf" -> JsonUtils.toJson(coordinatorConf), + "commitCoordinatorConf" -> coordinatorConf, "failIfImplUnavailable" -> failIfImplUnavailable.toString ) )