From 9b5a34cdc8d8a80a69a0dd89d0cdb33e80d69338 Mon Sep 17 00:00:00 2001 From: LiuGuoHua <129264181+sjgllgh@users.noreply.github.com> Date: Tue, 26 Nov 2024 15:17:34 +0800 Subject: [PATCH] feat: add support for sparkmeasure (#5202) * add support for spark measure * 1. Improve the code * Modify the default path. * add third dependencies * Refactor SparkSqlMeasure.scala * Fix potential compilation issues that may arise when using asJava * remove debug info --- linkis-dist/release-docs/LICENSE | 1 + .../licenses/LICENSE-spark-measure.txt | 201 ++++++++++++++++++ linkis-engineconn-plugins/spark/pom.xml | 32 +++ .../spark/config/SparkConfiguration.scala | 23 ++ .../spark/executor/SparkSqlExecutor.scala | 61 +++++- .../factory/SparkSqlExecutorFactory.scala | 2 +- .../launch/SparkEngineConnLaunchBuilder.scala | 78 ++++++- .../spark/sparkmeasure/SparkSqlMeasure.scala | 133 ++++++++++++ .../spark/executor/TestSparkSqlExecutor.scala | 3 +- tool/dependencies/known-dependencies.txt | 1 + 10 files changed, 528 insertions(+), 7 deletions(-) create mode 100644 linkis-dist/release-docs/licenses/LICENSE-spark-measure.txt create mode 100644 linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/sparkmeasure/SparkSqlMeasure.scala diff --git a/linkis-dist/release-docs/LICENSE b/linkis-dist/release-docs/LICENSE index 462e0be824..756c58407c 100644 --- a/linkis-dist/release-docs/LICENSE +++ b/linkis-dist/release-docs/LICENSE @@ -600,6 +600,7 @@ See licenses/ for text of these licenses. (Apache License, version 2.0) seatunnel-core-flink (org.apache.seatunnel:seatunnel-core-flink:2.1.2 - https://seatunnel.apache.org) (Apache License, version 2.0) seatunnel-core-flink-sql (org.apache.seatunnel:seatunnel-core-flink-sql:2.1.2 - https://seatunnel.apache.org) (Apache License, version 2.0) seatunnel-core-spark (org.apache.seatunnel:seatunnel-core-spark:2.1.2 - https://seatunnel.apache.org) + (Apache License, version 2.0) spark-measure (ch.cern.sparkmeasure:spark-measure_2.12:0.24 - https://github.com/LucaCanali/sparkMeasure) ======================================================================== Third party CDDL licenses diff --git a/linkis-dist/release-docs/licenses/LICENSE-spark-measure.txt b/linkis-dist/release-docs/licenses/LICENSE-spark-measure.txt new file mode 100644 index 0000000000..9c8f3ea087 --- /dev/null +++ b/linkis-dist/release-docs/licenses/LICENSE-spark-measure.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml index 62f3da22f3..49a73d911e 100644 --- a/linkis-engineconn-plugins/spark/pom.xml +++ b/linkis-engineconn-plugins/spark/pom.xml @@ -433,6 +433,38 @@ kubernetes-model-core ${kubernetes-client.version} + + + ch.cern.sparkmeasure + spark-measure_2.12 + 0.24 + + + org.xerial.snappy + snappy-java + + + com.squareup.retrofit2 + retrofit + + + org.msgpack + msgpack-core + + + com.squareup.moshi + moshi + + + org.influxdb + influxdb-java + + + com.squareup.retrofit2 + converter-moshi + + + diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index 429048c77f..9fea6ec70d 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -188,6 +188,29 @@ object SparkConfiguration extends Logging { val SCALA_PARSE_APPEND_CODE = CommonVars("linkis.scala.parse.append.code", "val linkisVar=1").getValue + val SPARKMEASURE_AGGREGATE_TYPE = "linkis.sparkmeasure.aggregate.type" + + val SPARKMEASURE_FLIGHT_RECORDER_TYPE = + CommonVars[String]("linkis.sparkmeasure.flight.recorder.type", "") + + val SPARKMEASURE_OUTPUT_PREFIX = + CommonVars[String]("linkis.sparkmeasure.output.prefix", "/appcom/sparkmeasure") + + val SPARKMEASURE_FLIGHT_STAGE_CLASS = + "ch.cern.sparkmeasure.FlightRecorderStageMetrics" + + val SPARKMEASURE_FLIGHT_TASK_CLASS = "ch.cern.sparkmeasure.FlightRecorderTaskMetrics" + + val SPARKMEASURE_FLIGHT_RECORDER_KEY = "spark.extraListeners" + + val SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FORMAT_KEY = "spark.sparkmeasure.outputFormat" + + val SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FORMAT_JSON = "json" + + val SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FORMAT_JSON_HADOOP = "json_to_hadoop" + + val SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FILENAME_KEY = "spark.sparkmeasure.outputFilename" + private def getMainJarName(): String = { val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory]) if (somePath.isDefined) { diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala index f435314d5d..193fd5516c 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala @@ -17,23 +17,36 @@ package org.apache.linkis.engineplugin.spark.executor +import org.apache.linkis.common.io.FsPath import org.apache.linkis.common.utils.Utils import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.engineplugin.spark.common.{Kind, SparkSQL} import org.apache.linkis.engineplugin.spark.config.SparkConfiguration import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession -import org.apache.linkis.engineplugin.spark.utils.{ArrowUtils, DirectPushCache, EngineUtils} +import org.apache.linkis.engineplugin.spark.sparkmeasure.SparkSqlMeasure +import org.apache.linkis.engineplugin.spark.utils.{DirectPushCache, EngineUtils} import org.apache.linkis.governance.common.constant.job.JobRequestConstants import org.apache.linkis.governance.common.paser.SQLCodeParser +import org.apache.linkis.governance.common.utils.JobUtils +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.scheduler.executer._ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.sql.DataFrame import java.lang.reflect.InvocationTargetException +import java.util +import java.util.Date -class SparkSqlExecutor(sparkEngineSession: SparkEngineSession, id: Long) - extends SparkEngineConnExecutor(sparkEngineSession.sparkContext, id) { +import scala.collection.JavaConverters._ + +import ch.cern.sparkmeasure.{StageMetrics, TaskMetrics} + +class SparkSqlExecutor( + sparkEngineSession: SparkEngineSession, + id: Long, + options: util.Map[String, String] +) extends SparkEngineConnExecutor(sparkEngineSession.sparkContext, id) { override def init(): Unit = { @@ -83,6 +96,17 @@ class SparkSqlExecutor(sparkEngineSession: SparkEngineSession, id: Long) .setContextClassLoader(sparkEngineSession.sparkSession.sharedState.jarClassLoader) val extensions = org.apache.linkis.engineplugin.spark.extension.SparkSqlExtension.getSparkSqlExtensions() + + // Start capturing Spark metrics + val sparkMeasure: Option[SparkSqlMeasure] = + createSparkMeasure(engineExecutionContext, sparkEngineSession, code) + val sparkMetrics: Option[Either[StageMetrics, TaskMetrics]] = sparkMeasure.flatMap { + measure => + val metrics = measure.getSparkMetrics + metrics.foreach(measure.begin) + metrics + } + val df = sparkEngineSession.sqlContext.sql(code) Utils.tryQuietly( @@ -109,6 +133,13 @@ class SparkSqlExecutor(sparkEngineSession: SparkEngineSession, id: Long) engineExecutionContext ) } + + // Stop capturing Spark metrics and output the records to the specified file. + sparkMeasure.foreach { measure => + sparkMetrics.foreach(measure.end) + sparkMetrics.foreach(measure.outputMetrics) + } + SuccessExecuteResponse() } catch { case e: InvocationTargetException => @@ -124,5 +155,29 @@ class SparkSqlExecutor(sparkEngineSession: SparkEngineSession, id: Long) } } + private def createSparkMeasure( + engineExecutionContext: EngineExecutionContext, + sparkEngineSession: SparkEngineSession, + code: String + ): Option[SparkSqlMeasure] = { + val sparkMeasureType = engineExecutionContext.getProperties + .getOrDefault(SparkConfiguration.SPARKMEASURE_AGGREGATE_TYPE, "") + .toString + + if (sparkMeasureType.nonEmpty) { + val outputPrefix = SparkConfiguration.SPARKMEASURE_OUTPUT_PREFIX.getValue(options) + val outputPath = FsPath.getFsPath( + outputPrefix, + LabelUtil.getUserCreator(engineExecutionContext.getLabels.toList.asJava)._1, + sparkMeasureType, + JobUtils.getJobIdFromMap(engineExecutionContext.getProperties), + new Date().getTime.toString + ) + Some(new SparkSqlMeasure(sparkEngineSession.sparkSession, code, sparkMeasureType, outputPath)) + } else { + None + } + } + override protected def getExecutorIdPreFix: String = "SparkSqlExecutor_" } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkSqlExecutorFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkSqlExecutorFactory.scala index b98bd63cb1..66fee8dd5b 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkSqlExecutorFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkSqlExecutorFactory.scala @@ -41,7 +41,7 @@ class SparkSqlExecutorFactory extends ComputationExecutorFactory { ): ComputationExecutor = { engineConn.getEngineConnSession match { case sparkEngineSession: SparkEngineSession => - new SparkSqlExecutor(sparkEngineSession, id) + new SparkSqlExecutor(sparkEngineSession, id, engineCreationContext.getOptions) case _ => throw NotSupportSparkSqlTypeException(INVALID_CREATE_SPARKSQL.getErrorDesc) } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala index 8be047e2ff..8df105ed46 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala @@ -17,12 +17,13 @@ package org.apache.linkis.engineplugin.spark.launch +import org.apache.linkis.common.io.FsPath import org.apache.linkis.common.utils.JsonUtils +import org.apache.linkis.engineplugin.spark.config.{SparkConfiguration, SparkResourceConfiguration} import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ SPARK_CONF_DIR_ENV, SPARK_HOME_ENV } -import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration import org.apache.linkis.hadoop.common.conf.HadoopConf import org.apache.linkis.manager.common.protocol.bml.BmlResource import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration @@ -36,6 +37,10 @@ import org.apache.linkis.manager.label.entity.engine.{ UserCreatorLabel } import org.apache.linkis.manager.label.utils.LabelUtil +import org.apache.linkis.storage.FSFactory +import org.apache.linkis.storage.utils.StorageUtils + +import org.apache.commons.lang3.StringUtils import java.util @@ -48,8 +53,13 @@ class SparkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def getCommands(implicit engineConnBuildRequest: EngineConnBuildRequest ): Array[String] = { + val properties = engineConnBuildRequest.engineConnCreationDesc.properties + putSparkMeasureParams( + properties, + getUser(engineConnBuildRequest), + getTicketId(engineConnBuildRequest) + ) if (isOnceMode) { - val properties = engineConnBuildRequest.engineConnCreationDesc.properties properties.put( EnvConfiguration.ENGINE_CONN_MEMORY.key, SparkResourceConfiguration.LINKIS_SPARK_DRIVER_MEMORY.getValue(properties) @@ -70,6 +80,17 @@ class SparkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once } + private def getUser(engineConnBuildRequest: EngineConnBuildRequest): String = { + engineConnBuildRequest.labels.asScala + .find(_.isInstanceOf[UserCreatorLabel]) + .map { case label: UserCreatorLabel => label.getUser } + .get + } + + private def getTicketId(engineConnBuildRequest: EngineConnBuildRequest): String = { + engineConnBuildRequest.ticketId + } + override def getEnvironment(implicit engineConnBuildRequest: EngineConnBuildRequest ): util.Map[String, String] = { @@ -168,4 +189,57 @@ class SparkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { Lists.newArrayList("JarUDFLoadECMHook") } + private def putSparkMeasureParams( + properties: util.Map[String, String], + userName: String, + ticketId: String + ): Unit = { + val flightRecorderType = + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_TYPE.getValue(properties) + val sparkMeasureOutput = + SparkConfiguration.SPARKMEASURE_OUTPUT_PREFIX.getValue(properties) + + if (StringUtils.isNotBlank(flightRecorderType)) { + if ("stage".equals(flightRecorderType)) { + properties.put( + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_KEY, + SparkConfiguration.SPARKMEASURE_FLIGHT_STAGE_CLASS + ) + } else if ("task".equals(flightRecorderType)) { + properties.put( + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_KEY, + SparkConfiguration.SPARKMEASURE_FLIGHT_TASK_CLASS + ) + } + val fsPath = FsPath.getFsPath( + new FsPath(sparkMeasureOutput).getSchemaPath, + userName, + "flight_" + flightRecorderType, + ticketId + ) + val fs = FSFactory.getFs(fsPath) + if (!fs.exists(fsPath.getParent)) fs.mkdirs(fsPath.getParent) + if (StorageUtils.HDFS == fsPath.getFsType) { + val outputPath = StorageUtils.HDFS_SCHEMA + fsPath.getPath + properties.put( + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FORMAT_KEY, + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FORMAT_JSON_HADOOP + ) + properties.put( + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FILENAME_KEY, + outputPath + ) + } else { + properties.put( + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FORMAT_KEY, + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FORMAT_JSON + ) + properties.put( + SparkConfiguration.SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FILENAME_KEY, + fsPath.getPath + ) + } + } + } + } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/sparkmeasure/SparkSqlMeasure.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/sparkmeasure/SparkSqlMeasure.scala new file mode 100644 index 0000000000..9b613f0556 --- /dev/null +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/sparkmeasure/SparkSqlMeasure.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.spark.sparkmeasure + +import org.apache.linkis.common.io.FsPath +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.storage.FSFactory + +import org.apache.commons.collections4.MapUtils +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{DataFrame, SparkSession} + +import java.util + +import ch.cern.sparkmeasure.{StageMetrics, TaskMetrics} +import com.fasterxml.jackson.databind.ObjectMapper + +class SparkSqlMeasure( + sparkSession: SparkSession, + sql: String, + metricType: String, + outputPath: FsPath +) extends Logging { + + private val sqlType: String = determineSqlType + + def begin(metrics: Either[StageMetrics, TaskMetrics]): Unit = { + metrics match { + case Left(stageMetrics) => + stageMetrics.begin() + case Right(taskMetrics) => + taskMetrics.begin() + } + } + + def end(metrics: Either[StageMetrics, TaskMetrics]): Unit = { + metrics match { + case Left(stageMetrics) => + stageMetrics.end() + case Right(taskMetrics) => + taskMetrics.end() + } + } + + private def enableSparkMeasure: Boolean = { + sqlType match { + case "SELECT" | "INSERT" => true + case _ => false + } + } + + def getSparkMetrics: Option[Either[StageMetrics, TaskMetrics]] = { + if (enableSparkMeasure) { + metricType match { + case "stage" => Some(Left(StageMetrics(sparkSession))) + case "task" => Some(Right(TaskMetrics(sparkSession))) + case _ => None + } + } else { + None + } + } + + def outputMetrics(metrics: Either[StageMetrics, TaskMetrics]): Unit = { + if (enableSparkMeasure) { + val metricsMap = collectMetrics(metrics) + + if (MapUtils.isNotEmpty(metricsMap)) { + val retMap = new util.HashMap[String, Object]() + retMap.put("execution_code", sql) + retMap.put("metrics", metricsMap) + + val mapper = new ObjectMapper() + val bytes = mapper.writeValueAsBytes(retMap) + + val fs = FSFactory.getFs(outputPath) + try { + if (!fs.exists(outputPath.getParent)) fs.mkdirs(outputPath.getParent) + val out = fs.write(outputPath, true) + try { + out.write(bytes) + } finally { + IOUtils.closeQuietly(out) + } + } finally { + fs.close() + } + } + } + } + + private def determineSqlType: String = { + val parser = sparkSession.sessionState.sqlParser + val logicalPlan = parser.parsePlan(sql) + + logicalPlan.getClass.getSimpleName match { + case "UnresolvedWith" | "Project" | "GlobalLimit" => "SELECT" + case "InsertIntoStatement" | "CreateTableAsSelectStatement" | "CreateTableAsSelect" => + "INSERT" + case planName => + logger.info(s"Unsupported sql type") + planName + } + } + + private def collectMetrics( + metrics: Either[StageMetrics, TaskMetrics] + ): java.util.Map[String, Long] = { + metrics match { + case Left(stageMetrics) => + stageMetrics.aggregateStageMetricsJavaMap() + case Right(taskMetrics) => + taskMetrics.aggregateTaskMetricsJavaMap() + case _ => new util.HashMap[String, Long]() + } + } + +} diff --git a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/executor/TestSparkSqlExecutor.scala b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/executor/TestSparkSqlExecutor.scala index e5edf08546..abc894988f 100644 --- a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/executor/TestSparkSqlExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/executor/TestSparkSqlExecutor.scala @@ -68,7 +68,8 @@ class TestSparkSqlExecutor { sparkSession, outputDir ) - val sparkSqlExecutor = new SparkSqlExecutor(sparkEngineSession, 1L) + val sparkSqlExecutor = + new SparkSqlExecutor(sparkEngineSession, 1L, new java.util.HashMap[String, String]()) Assertions.assertFalse(sparkSqlExecutor.isEngineInitialized) sparkSqlExecutor.init() Assertions.assertTrue(sparkSqlExecutor.isEngineInitialized) diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index aaa27eefb6..de1652e8ca 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -824,3 +824,4 @@ zookeeper-3.9.2.jar zookeeper-jute-3.9.2.jar zstd-jni-1.4.5-6.jar zstd-jni-1.5.0-4.jar +spark-measure_2.12-0.24.jar \ No newline at end of file