Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Certain Python execution plans aren't captured on Databricks #278

Closed
GolamRashed opened this issue Aug 5, 2021 · 18 comments
Closed

Certain Python execution plans aren't captured on Databricks #278

GolamRashed opened this issue Aug 5, 2021 · 18 comments
Assignees
Labels
bug Something isn't working dependency: Databricks duplicate This issue or pull request already exists

Comments

@GolamRashed
Copy link

As I understand, the Spline UI shows the execution events once we write to a file. I am writing to a CSV file using the below code but Spline UI is not showing anything. There are several cells of SQL transformation happening before writing to CSV, but nothing is getting captured. Anyway to resolve this issue?

%python
AbsenceExtractAttendance = spark.table('Absence_Extract_Attendance')
Attendance_Path='/mnt/Absence_Extract_Attendance/{0}'.format(datetime.now().year)
dbutils.fs.rm(Attendance_Path, True)
AbsenceExtractAttendance.write.format('csv').options(header='true', quote='"', quoteMode='all').save(Attendance_Path)
@wajda
Copy link
Contributor

wajda commented Aug 5, 2021

Please verify that the Spline agent is enabled and listens to Spark event.
If everything is done correctly you should see something like the following in Spark logs:

21/08/05 12:26:04 INFO QueryExecutionEventHandlerFactory: Initializing Spline agent...
21/08/05 12:26:04 INFO QueryExecutionEventHandlerFactory: Spline init type: PROGRAMMATIC
21/08/05 12:26:04 INFO QueryExecutionEventHandlerFactory: Spline version: 0.6.2-SNAPSHOT (rev. )
21/08/05 12:26:04 INFO QueryExecutionEventHandlerFactory: Spline mode: BEST_EFFORT
...
21/08/05 12:26:06 INFO QueryExecutionEventHandlerFactory: Spline successfully initialized. Spark Lineage tracking is ENABLED.

@GolamRashed
Copy link
Author

Please verify that the Spline agent is enabled and listens to Spark event.
If everything is done correctly you should see something like the following in Spark logs:

Here is what I am seeing in the Driver logs, and it looks like similar to what you mentioned above:

image

Please note that I am saving the transformed files in CSV format, is that the reason why the lineage is not showing up? Should I save files in Parquet format instead?

@wajda
Copy link
Contributor

wajda commented Aug 5, 2021

No, the format should not matter.
Is there any error messages in logs?
Try to enable logging dispatcher and see if it prints some JSON to logs.

spline.lineageDispatcher=logging

@GolamRashed
Copy link
Author

GolamRashed commented Aug 5, 2021

spline.lineageDispatcher=logging

Should I enable it in the Spark config, or add it at the beginning of the notebook?

And, where would this logging information be written if enabled?

@wajda
Copy link
Contributor

wajda commented Aug 5, 2021

In the Spark config, the same place where the rest of Spline settings are put.

A lineage JSON should be printed to the Spark logs - the same logs where you've seen those Spline INFO messages

@wajda
Copy link
Contributor

wajda commented Aug 5, 2021

What Spark version are you using?

@GolamRashed
Copy link
Author

GolamRashed commented Aug 5, 2021

What Spark version are you using?

Spark 3.1.1 and Scala 2.12. Please note that I'm writing the CSV file using Python.

I should add that the example Scala code given in the Spline Databricks guide works perfectly.

@wajda
Copy link
Contributor

wajda commented Aug 5, 2021

Ok, we'll investigate.
Could you please send the entire Spark log file? Or check for the messages like:

XXXX was not recognized as a write-command

@wajda wajda transferred this issue from AbsaOSS/spline-getting-started Aug 5, 2021
@GolamRashed
Copy link
Author

Ok, we'll investigate.
Could you please send the entire Spark log file? Or check for the messages like:

I have emailed you, please check.

@wajda wajda changed the title Spline is not capturing the execution plan Certain Python execution plans aren't captured Aug 6, 2021
@wajda wajda self-assigned this Aug 6, 2021
@wajda wajda changed the title Certain Python execution plans aren't captured Certain Python execution plans aren't captured on Databricks Aug 6, 2021
@wajda
Copy link
Contributor

wajda commented Aug 6, 2021

I was able to (sort of) reproduced the issue.
It looks like a Databrcks issue. In certain cases the Spline agent does not receive any events from the Databricks runtime about the performed operation. The odd thing is that the issue is intermittent. At some point it starts working and then after some time idling it stops.

I was playing with different code snippets both on Databricks and my local PC, and according to my observations the issue is specific to Databricks+Python+"no transformations" combination.
By "no transformation" I mean that I was only able to reproduce it when reading from a table and immediately writing to a file, just like in your example:

df = spark.table('dummy1')
df.write.mode('overwrite').format('csv').options(header='true', quote='"', quoteMode='all').save('/data/dummy2.csv')

Adding any transformation operation, event a dummy filter after the read like this spark.table('dummy1').filter('1 == 1') fixes the issue. (At least I wasn't able to reproduce it on non-empty pipelines)

On pure PySpark it works for all cases.

@wajda wajda added dependency: Databricks bug Something isn't working labels Aug 6, 2021
@GolamRashed
Copy link
Author

GolamRashed commented Aug 6, 2021

The notebook I have is SQL, so the Transformations are happening in SQL before writing the results in Python. Do you think I should try with writing the CSVs in Scala? What's is the best solution for this issue in your opinion?

@wajda
Copy link
Contributor

wajda commented Aug 6, 2021

For some reason I cannot reproduce it anymore. Maybe it actually worked to me all the time, but the logs were slow to update, that's why I didn't see the captured events. I'm not sure :\

At this point I cannot suggest anything in particular, Try to use different Databricks runtime versions, and if that doesn't help, try performing the write in Scala.

@GolamRashed
Copy link
Author

Which Databricks runtime do you recommend for Spline 0.6.1 version? I'm using 8.2 currently.

@wajda
Copy link
Contributor

wajda commented Aug 6, 2021

I tested on 8.3

@GolamRashed
Copy link
Author

Try to enable logging dispatcher and see if it prints some JSON to logs.

spline.lineageDispatcher=logging

Is this syntax correct? The Spark Config text box takes only Key-Value pairs (else, an error is detected).

Anyway, I have written the below parameters for Spark Config but the http value pair is getting vanished automatically and I don't see any execution in the Spline UI. Once I remove the logging value pair, execution events are captured again (http value pair is not getting vanished in that case).

spark.spline.mode REQUIRED
spark.spline.lineageDispatcher http
spark.spline.lineageDispatcher.http.producer.url http://xxx.xxx.xxx.xxx:8080/producer
spark.spline.lineageDispatcher logging

@GolamRashed
Copy link
Author

I have now tried writing files in Parquet format, still no sign of the lineage being captured.

@GolamRashed
Copy link
Author

GolamRashed commented Aug 9, 2021

What does this error mean? It seems to be related to the Spline agent installed in Databricks (this is for Runtime 8.0, includes Apache Spark 3.1.1, Scala 2.12):

21/08/09 05:52:57 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: Databricks Shell #app-20210809052607-0000
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.WindowExpression cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression
	at scala.collection.LinearSeqOptimized.find(LinearSeqOptimized.scala:115)
	at scala.collection.LinearSeqOptimized.find$(LinearSeqOptimized.scala:112)
	at scala.collection.immutable.List.find(List.scala:89)
	at za.co.absa.spline.harvester.builder.WindowNodeBuilder.resolveAttributeChild(WindowNodeBuilder.scala:31)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1$$anonfun$$lessinit$greater$1.apply(OperationNodeBuilder.scala:46)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1$$anonfun$$lessinit$greater$1.apply(OperationNodeBuilder.scala:46)
	at za.co.absa.spline.harvester.converter.AttributeConverter.convert(AttributeConverter.scala:41)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1.za$co$absa$commons$lang$CachingConverter$$super$convert(OperationNodeBuilder.scala:44)
	at za.co.absa.commons.lang.CachingConverter.$anonfun$convert$1(converters.scala:47)
	at scala.collection.mutable.MapLike.getOrElseUpdate(MapLike.scala:209)
	at scala.collection.mutable.MapLike.getOrElseUpdate$(MapLike.scala:206)
	at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:82)
	at za.co.absa.commons.lang.CachingConverter.convert(converters.scala:47)
	at za.co.absa.commons.lang.CachingConverter.convert$(converters.scala:44)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1.convert(OperationNodeBuilder.scala:44)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.$anonfun$outputAttributes$1(OperationNodeBuilder.scala:71)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.outputAttributes(OperationNodeBuilder.scala:71)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.outputAttributes$(OperationNodeBuilder.scala:70)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.outputAttributes$lzycompute(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.outputAttributes(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:41)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$10(LineageHarvester.scala:88)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$8(LineageHarvester.scala:88)
	at scala.Option.flatMap(Option.scala:271)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:81)
	at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
	at scala.Option.foreach(Option.scala:407)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:155)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:131)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:131)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:135)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1585)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

@wajda
Copy link
Contributor

wajda commented Aug 9, 2021

It might be the cause.
The bug is already opened here - #262

@wajda wajda added this to Spline Mar 31, 2022
@wajda wajda moved this to New in Spline Mar 31, 2022
@wajda wajda added duplicate This issue or pull request already exists and removed investigating labels May 19, 2022
@wajda wajda closed this as completed May 19, 2022
Repository owner moved this from New to Done in Spline May 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working dependency: Databricks duplicate This issue or pull request already exists
Projects
Status: Done
Development

No branches or pull requests

2 participants