Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windowing functions cause errors in lineage harvesting #262

Closed
mfrictionless opened this issue Jun 24, 2021 · 32 comments
Closed

Windowing functions cause errors in lineage harvesting #262

mfrictionless opened this issue Jun 24, 2021 · 32 comments
Assignees
Labels
bug Something isn't working dependency: Spark 3.0+
Milestone

Comments

@mfrictionless
Copy link

mfrictionless commented Jun 24, 2021

Spline Team,

Testing some windowing functions in CTEs and temporary tables and we get the errors while processing. I've attached my sample notebook. The first two insert commands work fine. The final two fail.

  • spark-3.1-spline-agent-bundle_2.12-0.6.1
  • Databricks Runtime 8.3

spline-test-windowing-function.ipynb.txt

21/06/24 03:10:58 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: Databricks Shell #local-1624504140464
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.WindowExpression cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression
	at scala.collection.LinearSeqOptimized.find(LinearSeqOptimized.scala:115)
	at scala.collection.LinearSeqOptimized.find$(LinearSeqOptimized.scala:112)
	at scala.collection.immutable.List.find(List.scala:89)
	at za.co.absa.spline.harvester.builder.WindowNodeBuilder.resolveAttributeChild(WindowNodeBuilder.scala:31)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1$$anonfun$$lessinit$greater$1.apply(OperationNodeBuilder.scala:46)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1$$anonfun$$lessinit$greater$1.apply(OperationNodeBuilder.scala:46)
	at za.co.absa.spline.harvester.converter.AttributeConverter.convert(AttributeConverter.scala:41)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1.za$co$absa$commons$lang$CachingConverter$$super$convert(OperationNodeBuilder.scala:44)
	at za.co.absa.commons.lang.CachingConverter.$anonfun$convert$1(converters.scala:47)
	at scala.collection.mutable.MapLike.getOrElseUpdate(MapLike.scala:209)
	at scala.collection.mutable.MapLike.getOrElseUpdate$(MapLike.scala:206)
	at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:82)
	at za.co.absa.commons.lang.CachingConverter.convert(converters.scala:47)
	at za.co.absa.commons.lang.CachingConverter.convert$(converters.scala:44)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1.convert(OperationNodeBuilder.scala:44)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.$anonfun$outputAttributes$1(OperationNodeBuilder.scala:71)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.outputAttributes(OperationNodeBuilder.scala:71)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.outputAttributes$(OperationNodeBuilder.scala:70)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.outputAttributes$lzycompute(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.outputAttributes(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:41)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$10(LineageHarvester.scala:88)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$8(LineageHarvester.scala:88)
	at scala.Option.flatMap(Option.scala:271)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:81)
	at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
	at scala.Option.foreach(Option.scala:407)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:155)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:131)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:131)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:135)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:84)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1523)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
@wajda wajda added bug Something isn't working dependency: Spark 3.0+ labels Jun 24, 2021
@wajda wajda added this to the 0.6.2 milestone Jun 24, 2021
@wajda
Copy link
Contributor

wajda commented Aug 5, 2021

The Spark code that I've extracted from the log above.

DROP TABLE IF EXISTS playground.test_set01_customer;
DROP TABLE IF EXISTS playground.test_set01_order;
DROP TABLE IF EXISTS playground.test_set01_item;
DROP TABLE IF EXISTS playground.test_set01_order_item;
DROP TABLE IF EXISTS playground.test_set01_country_aggregate

CREATE TABLE IF NOT EXISTS playground.test_set01_customer (
  id STRING NOT NULL,
  name STRING NOT NULL,
  email STRING NOT NULL,
  city STRING,
  state STRING,
  country STRING,
  catagory INT NOT NULL
) USING PARQUET;

CREATE TABLE IF NOT EXISTS playground.test_set01_order (
  id STRING NOT NULL,
  customer_id STRING NOT NULL,
  ts TIMESTAMP NOT NULL
) USING PARQUET;

CREATE TABLE IF NOT EXISTS playground.test_set01_item (
  id STRING NOT NULL,
  name STRING NOT NULL,
  cost FLOAT NOT NULL,
  color STRING NOT NULL
) USING PARQUET;

CREATE TABLE IF NOT EXISTS playground.test_set01_order_item (
  order_id STRING NOT NULL,
  item_id STRING NOT NULL,
  price FLOAT NOT NULL,
  discount FLOAT COMMENT 'Order discount as floating percentage value (example: .16 would be a 16% discount)',
  cost_rank INT COMMENT 'Cost ranking of the item at the time of order with the 1 being the most expensive item'
) USING PARQUET;

INSERT INTO playground.test_set01_customer VALUES (
  'a925eaf2-e342-497b-85fb-773e81bf57f5',
  'Michael Fline',
  'test@test1.test',
  'Santa Monica',
  'CA',
  'USA',
  1
);

INSERT INTO playground.test_set01_customer VALUES (
  'a92702b8-db3a-4aaa-b424-51e7ae30d185',
  'Bobby Socks',
  'test@test2.test',
  'Portland',
  'OR',
  'USA',
  2
)

INSERT INTO playground.test_set01_order VALUES (
  '32b366fc-3644-4390-a02e-1fed6d658120',
  'a925eaf2-e342-497b-85fb-773e81bf57f5',
  current_timestamp
)

INSERT INTO playground.test_set01_order VALUES (
  '32b366fc-3644-4390-a02e-1fed6d658121',
  'a92702b8-db3a-4aaa-b424-51e7ae30d185',
  current_timestamp
)

INSERT INTO playground.test_set01_item VALUES (
  '42adb6d9-785d-47d3-8899-8eb0dbe1057c',
  'Teemo Bath Mat',
  17.25,
  'Green'
)

INSERT INTO playground.test_set01_item VALUES (
  '0630e2cf-59f7-4926-ae12-0c45b51baed3',
  'Ahri Hand Soap',
  1.15,
  'Blue'
)

INSERT INTO playground.test_set01_item VALUES (
  '5b560453-706e-48e6-a855-3af70447d3fa',
  'Garen Tooth Brush',.75,'Blue')

INSERT INTO playground.test_set01_item VALUES (
  '79700b78-f820-4451-b4c2-79408f4b96e7',
  'Darius Hair Brush',2.25,'Red')

spark.sql("""SELECT id, name, cost, color FROM playground.test_set01_item WHERE color = 'Blue'""").createOrReplaceTempView("test_set01_blue_item")

INSERT INTO playground.test_set01_order_item
   SELECT o.id,
      i.id,
      i.cost * 1.1,
      CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount,
      0
     FROM playground.test_set01_order o
    INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
    INNER JOIN test_set01_blue_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')


WITH test_set01_blue_item AS (
  SELECT id, name, cost, color
    FROM playground.test_set01_item
   WHERE color = 'Blue'
  )

INSERT INTO playground.test_set01_order_item
         SELECT o.id, i.id, i.cost * 1.1, CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount, 0
           FROM playground.test_set01_order o
          INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
          INNER JOIN test_set01_blue_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')

WITH test_set01_ranked_item AS (
  SELECT id, name, cost, RANK() OVER (PARTITION BY color ORDER BY cost DESC) color_cost_rank
    FROM playground.test_set01_item
)

INSERT INTO playground.test_set01_order_item
       SELECT o.id, i.id, i.cost * 1.1, CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount, i.color_cost_rank
         FROM playground.test_set01_order o
        INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
        INNER JOIN test_set01_ranked_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')
        WHERE i.color_cost_rank = 1

spark.sql("""SELECT id, name, cost, RANK() OVER (PARTITION BY color ORDER BY cost DESC) color_cost_rank FROM playground.test_set01_item""").createOrReplaceTempView("test_set01_ranked_item")

INSERT INTO playground.test_set01_order_item
       SELECT o.id, i.id, i.cost * 1.1, CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount, i.color_cost_rank
         FROM playground.test_set01_order o
        INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
        INNER JOIN test_set01_ranked_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')
        WHERE i.color_cost_rank = 1

@cerveada
Copy link
Contributor

cerveada commented Aug 6, 2021

Databricks Runtime 8.3 uses Apache Spark 3.1.1.

I Tested it on Spark 3.1.1 and 3.1.2 without any issues.
Maybe there is some change only present on Databricks that is causing this.

I will probably have to test this on an actual databricks instance to find the issue.

Spark Scala code I used for testing:

            spark.sql("DROP TABLE IF EXISTS playground.test_set01_customer;")
            spark.sql("DROP TABLE IF EXISTS playground.test_set01_order;")
            spark.sql("DROP TABLE IF EXISTS playground.test_set01_item;")
            spark.sql("DROP TABLE IF EXISTS playground.test_set01_order_item;")
            spark.sql("DROP TABLE IF EXISTS playground.test_set01_country_aggregate")

            spark.sql( """
                        CREATE TABLE IF NOT EXISTS playground.test_set01_customer (
                          id STRING NOT NULL,
                          name STRING NOT NULL,
                          email STRING NOT NULL,
                          city STRING,
                          state STRING,
                          country STRING,
                          catagory INT NOT NULL
                        ) USING PARQUET;
                        """)

            spark.sql( """
                        CREATE TABLE IF NOT EXISTS playground.test_set01_order (
                          id STRING NOT NULL,
                          customer_id STRING NOT NULL,
                          ts TIMESTAMP NOT NULL
                        ) USING PARQUET;
                        """)

            spark.sql( """
                        CREATE TABLE IF NOT EXISTS playground.test_set01_item (
                          id STRING NOT NULL,
                          name STRING NOT NULL,
                          cost FLOAT NOT NULL,
                          color STRING NOT NULL
                        ) USING PARQUET;
                        """)

            spark.sql( """
                        CREATE TABLE IF NOT EXISTS playground.test_set01_order_item (
                          order_id STRING NOT NULL,
                          item_id STRING NOT NULL,
                          price FLOAT NOT NULL,
                          discount FLOAT COMMENT 'Order discount as floating percentage value (example: .16 would be a 16% discount)',
                        cost_rank INT COMMENT 'Cost ranking of the item at the time of order with the 1 being the most expensive item'
                        ) USING PARQUET;
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_customer VALUES (
                          'a925eaf2-e342-497b-85fb-773e81bf57f5',
                        'Michael Fline',
                        'test@test1.test',
                        'Santa Monica',
                        'CA',
                        'USA',
                        1
                        );
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_customer VALUES (
                          'a92702b8-db3a-4aaa-b424-51e7ae30d185',
                        'Bobby Socks',
                        'test@test2.test',
                        'Portland',
                        'OR',
                        'USA',
                        2
                        )
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_order VALUES (
                        '32b366fc-3644-4390-a02e-1fed6d658120',
                        'a925eaf2-e342-497b-85fb-773e81bf57f5',
                        current_timestamp
                        )
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_order VALUES (
                        '32b366fc-3644-4390-a02e-1fed6d658121',
                        'a92702b8-db3a-4aaa-b424-51e7ae30d185',
                        current_timestamp
                        )
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_item VALUES (
                        '42adb6d9-785d-47d3-8899-8eb0dbe1057c',
                        'Teemo Bath Mat',
                        17.25,
                        'Green'
                        )
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_item VALUES (
                        '0630e2cf-59f7-4926-ae12-0c45b51baed3',
                        'Ahri Hand Soap',
                        1.15,
                        'Blue'
                        )
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_item VALUES (
                        '5b560453-706e-48e6-a855-3af70447d3fa',
                        'Garen Tooth Brush',.75,'Blue')
                        """)

            spark.sql( """
                        INSERT INTO playground.test_set01_item VALUES (
                        '79700b78-f820-4451-b4c2-79408f4b96e7',
                        'Darius Hair Brush',2.25,'Red')
                        """)

            spark
              .sql("""SELECT id, name, cost, color FROM playground.test_set01_item WHERE color = 'Blue'""")
              .createOrReplaceTempView("test_set01_blue_item")


            spark.sql( """
                        INSERT INTO playground.test_set01_order_item
                        SELECT o.id,
                        i.id,
                        i.cost * 1.1,
                        CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount,
                        0
                        FROM playground.test_set01_order o
                          INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
                        INNER JOIN test_set01_blue_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')
                        """)

            spark.sql( """
                        WITH test_set01_blue_item AS (
                          SELECT id, name, cost, color
                            FROM playground.test_set01_item
                            WHERE color = 'Blue'
                        )

                        INSERT INTO playground.test_set01_order_item
                        SELECT o.id, i.id, i.cost * 1.1, CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount, 0
                        FROM playground.test_set01_order o
                          INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
                        INNER JOIN test_set01_blue_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')
                        """)

            spark.sql( """
                        WITH test_set01_ranked_item AS (
                          SELECT id, name, cost, RANK() OVER (PARTITION BY color ORDER BY cost DESC) color_cost_rank
                            FROM playground.test_set01_item
                        )

                        INSERT INTO playground.test_set01_order_item
                        SELECT o.id, i.id, i.cost * 1.1, CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount, i.color_cost_rank
                        FROM playground.test_set01_order o
                          INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
                        INNER JOIN test_set01_ranked_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')
                        WHERE i.color_cost_rank = 1
                        """)

            spark
              .sql("""SELECT id, name, cost, RANK() OVER (PARTITION BY color ORDER BY cost DESC) color_cost_rank FROM playground.test_set01_item""")
              .createOrReplaceTempView("test_set01_ranked_item")

            spark.sql( """
                        INSERT INTO playground.test_set01_order_item
                        SELECT o.id, i.id, i.cost * 1.1, CASE WHEN c.catagory = 1 THEN .1 ELSE 0 END as discount, i.color_cost_rank
                        FROM playground.test_set01_order o
                          INNER JOIN playground.test_set01_customer c ON (o.customer_id = c.id)
                        INNER JOIN test_set01_ranked_item i ON (o.id = '32b366fc-3644-4390-a02e-1fed6d658120')
                        WHERE i.color_cost_rank = 1
                        """)

@wajda
Copy link
Contributor

wajda commented Aug 9, 2021

@GolamRashed can you share the code to reproduce the issue please?

@cerveada
Copy link
Contributor

I can replicate this on Databricks 8.3.
So it's Databricks specific issue. It seems to be caused by any Spark Window.

@GolamRashed
Copy link

I can replicate this on Databricks 8.3.
So it's Databricks specific issue. It seems to be caused by any Spark Window.

What could be the solution for this? Should I contact Databricks or Apache Spark dev team?

@cerveada
Copy link
Contributor

The problem is Databricks changed some code in Spark that we relay on. Now we will have to implement special handling for that in Spline. This is further complicated by the fact that Databricks is closed source, so we can't just look at the code and see the difference.

But we are working on it.

@GolamRashed
Copy link

GolamRashed commented Aug 13, 2021

I have raised a ticket through Azure Databricks. Just to confirm, is the issue confined to spark.sql.queryExecutionListeners?

@cerveada
Copy link
Contributor

No, It's not a bug in Azure, they simply differ from Spark. I tried to explain it in the previous comment.

@GolamRashed
Copy link

GolamRashed commented Aug 13, 2021

No, It's not a bug in Azure, they simply differ from Spark. I tried to explain it in the previous comment.

I understand what you mean, Azure is now contacting Databricks to find a solution for this. But my question is, is this issue related to spark.sql.queryExecutionListeners module?

@cerveada
Copy link
Contributor

Spline Agent itself is QueryExecutionListener so in that sense it is.

@wajda
Copy link
Contributor

wajda commented Aug 13, 2021

It would be awesome if Databricks folks contribute to Spline and added a proper support for their closed source stuff. Otherwise it will always be trial-and-error approach.

@GolamRashed, could you perhaps refer this issue in the Databricks ticket that you created?

@GolamRashed
Copy link

It would be awesome if Databricks folks contribute to Spline and added a proper support for their closed source stuff. Otherwise it will always be trial-and-error approach.

Great thinking, would be awesome if Databricks contributes! I have included the link for this thread in the support ticket. Spline could be a very good Data Lineage solution for the company.

@cerveada
Copy link
Contributor

Output from ObjectStructureDumper for Databricks Window

operation: org.apache.spark.sql.catalyst.plans.logical.Window 
  windowOutputSet: org.apache.spark.sql.catalyst.expressions.AttributeSet = {revenue_difference#73}
  validConstraints: org.apache.spark.sql.catalyst.expressions.ExpressionSet = ExpressionSet(isnotnull(revenue#68))
  projectList: scala.collection.Seq = 
    ArrayBuffer(
        product#66, 
        category#67, 
        revenue#68, 
        (   
            max(revenue#68) 
                windowspecdefinition(
                    category#67, 
                    revenue#68 ASC NULLS FIRST, 
                    specifiedwindowframe(
                        RangeFrame, 
                        unboundedpreceding$(), 
                        unboundedfollowing$()
                    )
                ) 
            - revenue#68
        ) AS revenue_difference#73
    )
  partitionSpec: scala.collection.Seq = ArrayBuffer(category#67)
  orderSpec: scala.collection.Seq = ArrayBuffer(revenue#68 ASC NULLS FIRST)
  bitmap$trans$0: boolean = true
  bitmap$0: boolean = true

wajda pushed a commit that referenced this issue Aug 16, 2021
* spark agent #262 special Databricks Window handling

* spark agent #262 add method comment
@GolamRashed
Copy link

@wajda Tried it, same error persists -

ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: Databricks Shell #app-20210816122957-0000
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.WindowExpression cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression

@wajda
Copy link
Contributor

wajda commented Aug 16, 2021

can you show the stack trace please?

@GolamRashed
Copy link

can you show the stack trace please?

This?

21/08/16 13:24:18 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: Databricks Shell #app-20210816122957-0000
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.WindowExpression cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression
	at scala.collection.LinearSeqOptimized.find(LinearSeqOptimized.scala:115)
	at scala.collection.LinearSeqOptimized.find$(LinearSeqOptimized.scala:112)
	at scala.collection.immutable.List.find(List.scala:89)
	at za.co.absa.spline.harvester.builder.WindowNodeBuilder.resolveAttributeChild(WindowNodeBuilder.scala:34)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1$$anonfun$$lessinit$greater$1.apply(OperationNodeBuilder.scala:46)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1$$anonfun$$lessinit$greater$1.apply(OperationNodeBuilder.scala:46)
	at za.co.absa.spline.harvester.converter.AttributeConverter.convert(AttributeConverter.scala:44)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1.za$co$absa$commons$lang$CachingConverter$$super$convert(OperationNodeBuilder.scala:44)
	at za.co.absa.commons.lang.CachingConverter.$anonfun$convert$1(converters.scala:47)
	at scala.collection.mutable.MapLike.getOrElseUpdate(MapLike.scala:209)
	at scala.collection.mutable.MapLike.getOrElseUpdate$(MapLike.scala:206)
	at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:82)
	at za.co.absa.commons.lang.CachingConverter.convert(converters.scala:47)
	at za.co.absa.commons.lang.CachingConverter.convert$(converters.scala:44)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder$$anon$1.convert(OperationNodeBuilder.scala:44)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.$anonfun$outputAttributes$1(OperationNodeBuilder.scala:71)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.outputAttributes(OperationNodeBuilder.scala:71)
	at za.co.absa.spline.harvester.builder.OperationNodeBuilder.outputAttributes$(OperationNodeBuilder.scala:70)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.outputAttributes$lzycompute(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.outputAttributes(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:41)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:26)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$10(LineageHarvester.scala:88)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$8(LineageHarvester.scala:88)
	at scala.Option.flatMap(Option.scala:271)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:81)
	at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
	at scala.Option.foreach(Option.scala:407)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:155)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:131)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:131)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:135)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1585)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

@wajda
Copy link
Contributor

wajda commented Aug 16, 2021

Thanks. Hm, I see that the fix doesn't actually address this error for some reason. I guess Adam faced another issue while trying to reproduce it and fixed that one, but not the original one. I can take a look at it. It would be great if you could assist me with testing, as the Databricks Community Cloud that I'm using seems to be down for the last couple of days - creating a cluster takes an hour and then fails.

@wajda
Copy link
Contributor

wajda commented Aug 16, 2021

@GolamRashed
Copy link

It seemed to be working, there were no errors and the related lineage was captured in Spline UI. Does it mean the bug was in Spline agent, not in Databrciks? If yes then I need to close the ticket with Azure/Databrciks. I am more than happy to test Spline in Databricks for you guys!

@wajda
Copy link
Contributor

wajda commented Aug 18, 2021

Thanks Rashed!
There was no bug per se. It's a mismatch between the open Spark and the Databricks code. I have added an additional check that filters out expressions that don't extend NamedExpression trait. It eliminates an error and works as a workaround. But since I can't see inside their code I cannot be sure that this doesn't break the computational lineage correctness. The best way to fix it would be either for us to add a proper support for the Databricks Spark variant (for which we need their source code or Databricks contribution), or for the Databricks to use open-source compatible abstractions that could extend the open Spark without introducing braking changes for the 3rd patry libraries.

@GolamRashed
Copy link

Can you please detail the issue with specifics so that I can forward it to Databricks? I didn't quite get it from your explanation above.

@wajda
Copy link
Contributor

wajda commented Aug 18, 2021

This is a definition of the Window operation in the open Spark - https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L982
Particularly we are interested in the windowExpressions that is defined as a sequence of NamedExpression.

case class Window(
    windowExpressions: Seq[NamedExpression], // <-- This one is different in the Databricks runtime
    partitionSpec: Seq[Expression],
    orderSpec: Seq[SortOrder],
    child: LogicalPlan) extends UnaryNode

But as you can see from the debug info here - #262 (comment) - Databricks runtime provides an incompatible structure of the Window class. Instead of the windowExpression there is another property - projectList that apparently in addition to NamedExpressions also contains other Expressions.

Without an access to the source code it's difficult to reason about those expressions semantics. Likewise there is no guarantee that this won't change silently in another Databricks version.

@GolamRashed
Copy link

Thank you @wajda , I will forward this to Databricks.

Just a request regarding the Spline UI. Is it possible to add an option to have each lineage capture event be named after the Table/file name that is written? Right now, it just says Databricks Shell.

@mfrictionless
Copy link
Author

Update on my side. WIth the SNAPSHOT bundle provided above, I now see success capturing plans with Windowing functions! It sounds like the above is still an issue due to the closed source nature of Databrick's implementation of the Windowing function.

I'll celebrate success for the moment as this clears a significant hurdle for broad use.

Screen Shot 2021-08-18 at 11 43 24 AM

Thank you @wajda and team!

@wajda
Copy link
Contributor

wajda commented Aug 19, 2021

Thank you all guys, I'm closing the ticket as resioved then.

Just a request regarding the Spline UI. Is it possible to add an option to have each lineage capture event be named after the Table/file name that is written? Right now, it just says Databricks Shell.

Not sure I get it. It also shows a target destination, both a short name and full path.
Anyway, that's an unrelated topic. Please create another ticket for that. (for UI stuff there is a spline-ui repo)

@wajda wajda closed this as completed Aug 19, 2021
@GolamRashed
Copy link

@wajda Databricks team replied through Azure support that they have opened an internal ticket for the engineering team to make the Spline library work with DBR. I hope this is good news for the Spline Dev team, and I will keep you posted with updates from Databricks.

@wajda wajda added this to Spline Mar 31, 2022
@wajda wajda moved this to New in Spline Mar 31, 2022
@wajda wajda moved this from New to Closed in Spline Apr 2, 2022
@wajda
Copy link
Contributor

wajda commented Apr 23, 2023

@harishyadavdevops you are referring to an old development snapshot of Agent release 0.6.2. Can't you use a released version?

@wajda
Copy link
Contributor

wajda commented Apr 24, 2023

@harishyadavdevops your question doesn't have anything to do with the current ticket. Moreover this ticket is closed, so your messages might simply be ignored by people. Please create separate tickets for separate issues. This time I'll create it for you and will move your messages there.

@harishyadavdevops
Copy link

harishyadavdevops commented Apr 24, 2023 via email

@wajda
Copy link
Contributor

wajda commented Apr 24, 2023

answered in #665

@AbsaOSS AbsaOSS locked as off-topic and limited conversation to collaborators Apr 24, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working dependency: Spark 3.0+
Projects
Status: Done
Development

No branches or pull requests

5 participants