Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support for the sparklens in spark-3.0.0 and later version of spark with scala-2.12 #63

Open
wants to merge 1 commit into
base: SPARK30
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,31 @@ Note: Apart from the console based report, you can also get an UI based report s
`--conf spark.sparklens.report.email=<email>` along with other relevant confs mentioned below.
This functionality is available in Sparklens 0.3.2 and above.

Use the following arguments to `spark-submit` or `spark-shell`:
Use the following arguments to `spark-submit` or `spark-shell` for spark-3.0.0 and latest version of spark:
```
--packages qubole:sparklens:0.4.0-s_2.12
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
```

Use the following arguments to `spark-submit` or `spark-shell` for spark-2.4.x and lower version of spark:
```
--packages qubole:sparklens:0.3.2-s_2.11
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
```


#### 2. Run from Sparklens offline data ####

You can choose not to run sparklens inside the app, but at a later time. Run your app as above
with additional configuration parameters:
with additional configuration parameters
For spark-3.0.0 and latest version of spark:
```
--packages qubole:sparklens:0.4.0-s_2.12
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
--conf spark.sparklens.reporting.disabled=true
```

For spark-2.4.x and lower version of spark:
```
--packages qubole:sparklens:0.3.2-s_2.11
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
Expand All @@ -111,7 +126,7 @@ with additional configuration parameters:
This will not run reporting, but instead create a Sparklens JSON file for the application which is
stored in the **spark.sparklens.data.dir** directory (by default, **/tmp/sparklens/**). Note that this will be stored on HDFS by default. To save this file to s3, please set **spark.sparklens.data.dir** to s3 path. This data file can now be used to run Sparklens reporting independently, using `spark-submit` command as follows:

`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename>`
`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename>`

`<filename>` should be replaced by the full path of sparklens json file. If the file is on s3 use the full s3 path. For files on local file system, use file:// prefix with the local file location. HDFS is supported as well.

Expand All @@ -124,11 +139,11 @@ running via `sparklens-json-file` above) with another option specifying that is
event history file. This file can be in any of the formats the event history files supports, i.e. **text, snappy, lz4
or lzf**. Note the extra `source=history` parameter in this example:

`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename> source=history`
`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename> source=history`

It is also possible to convert an event history file to a Sparklens json file using the following command:

`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.EventHistoryToSparklensJson qubole-dummy-arg <srcDir> <targetDir>`
`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.EventHistoryToSparklensJson qubole-dummy-arg <srcDir> <targetDir>`

EventHistoryToSparklensJson is designed to work on local file system only. Please make sure that the source and target directories are on local file system.

Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
name := "sparklens"
organization := "com.qubole"

scalaVersion := "2.11.8"
scalaVersion := "2.12.10"

crossScalaVersions := Seq("2.10.6", "2.11.8")
crossScalaVersions := Seq("2.10.6", "2.11.12", "2.12.10")

spName := "qubole/sparklens"

sparkVersion := "2.0.0"
sparkVersion := "3.0.1"

spAppendScalaVersion := true

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/qubole/sparklens/QuboleJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
if (stageCompleted.stageInfo.failureReason.isDefined) {
//stage failed
val si = stageCompleted.stageInfo
failedStages += s""" Stage ${si.stageId} attempt ${si.attemptId} in job ${stageIDToJobID(si.stageId)} failed.
// attempt-id is deprecated and attemptNumber is used to get attempt-id from spark-3.0.0
failedStages += s""" Stage ${si.stageId} attempt ${si.attemptNumber} in job ${stageIDToJobID(si.stageId)} failed.
Stage tasks: ${si.numTasks}
"""
stageTimeSpan.finalUpdate()
Expand Down
17 changes: 11 additions & 6 deletions src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ import org.apache.spark.deploy.SparkHadoopUtil
object HDFSConfigHelper {

def getHadoopConf(sparkConfOptional:Option[SparkConf]): Configuration = {
if (sparkConfOptional.isDefined) {
SparkHadoopUtil.get.newConfiguration(sparkConfOptional.get)
}else {
val sparkConf = new SparkConf()
SparkHadoopUtil.get.newConfiguration(sparkConf)
}
// After Spark 3.0.0 SparkHadoopUtil is made private to make it work only within the spark
// using reflection code here to access the newConfiguration method of the SparkHadoopUtil
val sparkHadoopUtilClass = Class.forName("org.apache.spark.deploy.SparkHadoopUtil")
val sparkHadoopUtil = sparkHadoopUtilClass.newInstance()
val newConfigurationMethod = sparkHadoopUtilClass.getMethod("newConfiguration", classOf[SparkConf])
if (sparkConfOptional.isDefined) {
newConfigurationMethod.invoke(sparkHadoopUtil, sparkConfOptional.get).asInstanceOf[Configuration]
} else {
val sparkConf = new SparkConf()
newConfigurationMethod.invoke(sparkHadoopUtil, sparkConf).asInstanceOf[Configuration]
}
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.3.2"
version in ThisBuild := "0.4.0"