Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Spark] Unable to run Spark Connect 3.5.1 with Delta 3.2.0 #3332

Open
3 tasks done
matt-gorman opened this issue Jul 2, 2024 · 8 comments
Open
3 tasks done

[BUG][Spark] Unable to run Spark Connect 3.5.1 with Delta 3.2.0 #3332

matt-gorman opened this issue Jul 2, 2024 · 8 comments
Labels
bug Something isn't working

Comments

@matt-gorman
Copy link

matt-gorman commented Jul 2, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone

Describe the problem

Running Spark and reading/writing Delta from a client connection to a Spark Connect (start-connect-server.sh) with Delta packages unable to convert or reports missing storage class. Unable to find specific documentation about running Delta 3.2.0 on Spark 3.5.1 and if any additional packages or configuration is needed. This appears to work using a PySpark shell, however the same packages with Spark Connect gives different results.

Steps to reproduce

  1. Install Spark 3.5.1 in Standalone Mode
  2. Start Spark Connect using included startup scripts, specifically sbin/start-connect-server.sh
  3. Connect via client and attempt to read/write Delta

Observed results

PySpark (Works)

Using the Delta documentation, this DOES work using a PySpark shell:

# ~: pyspark --packages io.delta:delta-spark_2.12:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Python 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/opt/spark-3.5.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/hdfs/.ivy2/cache
The jars for the packages stored in: /home/hdfs/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bf5546b9-5a4c-449b-b59d-89def6e14dcd;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 137ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-bf5546b9-5a4c-449b-b59d-89def6e14dcd
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/5ms)
24/07/02 14:19:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/02 14:19:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/

Using Python version 3.10.12 (main, Nov 20 2023 15:14:05)
Spark context Web UI available at http://master-node:4041
Spark context available as 'sc' (master = spark://master-node:7077, app id = app-20240702141949-0003).
SparkSession available as 'spark'.
>>> data = spark.range(0, 5)
>>> data.write.format("delta").save("/tmp/delta-table")
>>> test = spark.read.format("delta").load("/country_codes.delta")
>>> test.show()
+---------------+----------------+--------------------+--------+
|COUNTRY_DIGRAPH|COUNTRY_TRIGRAPH|                NAME|  REGION|
+---------------+----------------+--------------------+--------+
|             SH|             SHN|Saint Helena, Asc...|  Africa|
|             KN|             KNA|Saint Kitts and N...|Americas|
|             LC|             LCA|         Saint Lucia|Americas|
|             MF|             MAF|Saint Martin (Fre...|Americas|
|             PM|             SPM|Saint Pierre and ...|Americas|
|             VC|             VCT|Saint Vincent and...|Americas|
|             WS|             WSM|               Samoa| Oceania|
|             SM|             SMR|          San Marino|  Europe|
|             ST|             STP|Sao Tome and Prin...|  Africa|
|             SA|             SAU|        Saudi Arabia|    Asia|
|             SN|             SEN|             Senegal|  Africa|
|             RS|             SRB|              Serbia|  Europe|
|             SC|             SYC|          Seychelles|  Africa|
|             SL|             SLE|        Sierra Leone|  Africa|
|             SG|             SGP|           Singapore|    Asia|
|             SX|             SXM|Sint Maarten (Dut...|Americas|
|             SK|             SVK|            Slovakia|  Europe|
|             SI|             SVN|            Slovenia|  Europe|
|             SB|             SLB|     Solomon Islands| Oceania|
|             SO|             SOM|             Somalia|  Africa|
+---------------+----------------+--------------------+--------+
only showing top 20 rows

This both successfully reads and writes to a Hadoop cluster in Delta format.

Spark Connect (Doesn't Work)

Running Spark Connect with the same options does not have the same effect:

./start-connect-server.sh --master spark://master-node:7077 --packages org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Connect to Spark Connect according to the documentation:

# ~: pyspark --remote "sc://master-node"
Python 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/

Using Python version 3.10.12 (main, Nov 20 2023 15:14:05)
Client connected to the Spark Connect server at master-node
SparkSession available as 'spark'.
>>> 
>>> data = spark.range(0, 5)
>>> data.write.format("delta").save("/tmp/delta-table")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/readwriter.py", line 601, in save
    self._spark.client.execute_command(self._write.command(self._spark.client))
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 982, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(req)
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
    self._handle_rpc_error(error)
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
    raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 13) (X.X.X.X executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.datasources.WriteJobDescription.statsTrackers of type scala.collection.Seq in instance of org.apache.spark.sql.execution.datasources.WriteJobDescription
	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io...
>>> 
>>> test = spark.read.format("delta").load("/country_codes.delta")
>>> test.show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/dataframe.py", line 996, in show
    print(self._show_string(n, truncate, vertical))
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/dataframe.py", line 753, in _show_string
    ).toPandas()
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/dataframe.py", line 1663, in toPandas
    return self._session.client.to_pandas(query)
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 873, in to_pandas
    table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
    self._handle_rpc_error(error)
  File "/opt/spark-3.5.1-bin-hadoop3/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
    raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 26) (X.X.X.X executor 1): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675...

Comparing environment settings, the only differences were the additional JARs on Spark Connect (expected this):

org.apache.spark_spark-connect_2.12-3.5.1.jar
org.spark-project.spark_unused-1.0.0.jar

And a few other Spark configurations set with the PySpark App versus Spark Connect:

spark.rdd.compress=True
spark.serializer.objectStreamReset=100
spark.sql.catalogImplementation=hive
spark.ui.showConsoleProgress=true

Attempted to re-run Spark Connect with these settings, however the result was the same:

./start-connect-server.sh --master spark://master-node:7077 --packages org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.rdd.compress=True" --conf "spark.serializer.objectStreamReset=100" --conf "spark.sql.catalogImplementation=hive" --conf "spark.ui.showConsoleProgress=true"

Expected results

Expected that Spark Connect would act the same as a PySpark session on the Master Node.

Further details

Behavior is the same when connecting from a JupyterHub server:

from pyspark.sql import SparkSession

# -- Spark context
spark = (
    SparkSession.builder
        .remote("sc://master-node")
        .getOrCreate()
)

This produces the same result from Spark Connect running the same commands. Ideally working from JupyterHub using a Spark Connect remote client. Previously, the following configuration with Spark Connect worked without any of the issues above with Spark 3.4.0 and Delta (delta-core) 2.4.0.

./start-connect-server.sh --master spark://master-node:7077 --packages org.apache.spark:spark-connect_2.12:3.4.0,io.delta:delta-core_2.12:2.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Spark/Delta 4.0.0 preview

Additionally set this up using the documentation here. Spark 4.0.0 appears to also use delta-spark and the result was the same error messages as above.

Spark 3.4.3/Delta 2.4.0

These versions were similar to what was originally being run without similar issues (3.4.0/2.4.0) and setting this up proved successful. This uses delta-core instead of delta-spark.

Environment information

  • Delta Lake version: 3.2.0
  • Spark version: 3.5.1
  • Scala version: 2.12
  • Java: 11
  • OS: Ubuntu Server 22.04.1

Connect Python Deps:

Package                      Version         
---------------------------- ----------------
...
googleapis-common-protos     1.63.2
grpcio                       1.64.1
grpcio-status                1.64.1
pandas                       2.0.3
protobuf                     5.27.2
py4j                         0.10.9.7
pyarrow                      12.0.1
pyspark                      3.5.1
...

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • No. I cannot contribute a bug fix at this time.
@matt-gorman matt-gorman added the bug Something isn't working label Jul 2, 2024
@rhazegh
Copy link

rhazegh commented Jul 15, 2024

I am having the same issue. Using:

  • spark 3.5.1
  • Scala 2.12.18
  • OpenJDK 64-Bit Server VM, Java 17.0.11

This does NOT work using delta 3.2.0 (I have also tried 3.1.0):

from pyspark.sql import SparkSession

url = "spark://${SPARK_MASTER_IP}:7077"

spark = (
    SparkSession.builder.master(url)
    .appName("myapp")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:3.2.0,io.delta:delta-contribs_2.12:3.2.0")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

Here is the error log:

File /usr/local/spark/python/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self)
    495     sparkConf.set(key, value)
    496 # This SparkContext may be an existing one.
--> 497 sc = SparkContext.getOrCreate(sparkConf)
    498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared
    499 # by all sessions.
    500 session = SparkSession(sc, options=self._options)

File /usr/local/spark/python/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf)
    513 with SparkContext._lock:
    514     if SparkContext._active_spark_context is None:
--> 515         SparkContext(conf=conf or SparkConf())
    516     assert SparkContext._active_spark_context is not None
    517     return SparkContext._active_spark_context

File /usr/local/spark/python/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
    195 if gateway is not None and gateway.gateway_parameters.auth_token is None:
    196     raise ValueError(
    197         "You are trying to pass an insecure Py4j gateway to Spark. This"
    198         " is not allowed as it is a security risk."
    199     )
--> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    202 try:
    203     self._do_init(
    204         master,
    205         appName,
   (...)
    215         memory_profiler_cls,
    216     )

File /usr/local/spark/python/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
    434 with SparkContext._lock:
    435     if not SparkContext._gateway:
--> 436         SparkContext._gateway = gateway or launch_gateway(conf)
    437         SparkContext._jvm = SparkContext._gateway.jvm
    439     if instance:

File /usr/local/spark/python/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs)
    104     time.sleep(0.1)
    106 if not os.path.isfile(conn_info_file):
--> 107     raise PySparkRuntimeError(
    108         error_class="JAVA_GATEWAY_EXITED",
    109         message_parameters={},
    110     )
    112 with open(conn_info_file, "rb") as info:
    113     gateway_port = read_int(info)

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

But this works (using delta 2.4.0):

from pyspark.sql import SparkSession

url = "spark://${SPARK_MASTER_IP}:7077"

spark = (
    SparkSession.builder.master(url)
    .appName("myapp")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,io.delta:delta-contribs_2.12:2.4.0")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

I am not sure if I can use delta 2.4.0 with Spark 3.5.1 though. Can I?

@MrPowers
Copy link
Collaborator

Thanks for reporting this. Spark Connect support will be added in Delta 4, see this issue.

Feel free to chime in on #3240 if you have any suggestions!

@longvu-db
Copy link
Contributor

Hey @matt-gorman, you said that you followed the steps of https://docs.delta.io/latest/delta-spark-connect.html and you got some Error? Could you please confirm what is the error that you are seeing?

@rhazegh Using Delta over Spark Connect will only be fully available in Delta 4.0, however, you can try to use the Delta 4.0 Preview already. Could you give that a try?

@matt-gorman
Copy link
Author

@longvu-db,
The error using Delta Connect on Spark 4.0 preview with Delta 4.0 Preview was the same experience and errors as running Delta 3.2.0 on Spark 3.5.1.

Attempting to write a DataFrame:

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 13) (X.X.X.X executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.datasources.WriteJobDescription.statsTrackers of type scala.collection.Seq in instance of org.apache.spark.sql.execution.datasources.WriteJobDescription

Attempting to read a DataFrame from HDFS:

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 26) (X.X.X.X executor 1): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

The tracebacks similarly pointed back to core.py and this appeared to perhaps be a package or something similar missing when running delta-core with the new versions instead of delta-spark with the previous versions.

@longvu-db
Copy link
Contributor

longvu-db commented Jul 31, 2024

@matt-gorman Why are you using delta-core? AFAIK, this only works with delta-spark.

Are you able to start two processes, server and client on the same local machine and have them working with each other?

@matt-gorman
Copy link
Author

@longvu-db sorry, I mixed these up, delta-core is what we're using now with Spark 3.4.3; running Connect with delta-spark is where we're running into issues. Using these packages:

org.apache.spark:spark-connect_2.12:3.4.3
delta:delta-core_2.12:2.4.0

Connect starts and works fine with this command:

start-connect-server.sh --master spark://master-node:7077 --packages org.apache.spark:spark-connect_2.12:3.4.3,io.delta:delta-core_2.12:2.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

When trying to run Spark 3.5.1, delta-spark was the package and running Connect with these package versions:

org.apache.spark:spark-connect_2.12:3.5.1
io.delta:delta-spark_2.12:3.2.0
io.delta:delta-storage:3.2.0

With this command gives the errors above:

start-connect-server.sh --master spark://master-node:7077 --packages org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.2.0,io.delta:delta-storage:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

I can run PySpark locally on the master with those same packages without problems:

pyspark --packages io.delta:delta-spark_2.12:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

However, if I run a PySpark client using Connect started with those packages (the second start-connect-server.sh command above with delta-spark_2.12:3.2.0) from the master I get errors.

@longvu-db
Copy link
Contributor

@matt-gorman I understood the problem, there were some fixes that needed to go into Spark for Delta over Spark Connect, and those fixes went after 3.5.1, could you please use the Spark Connect 4.0.0 preview1 package like in the guide here?

@longvu-db
Copy link
Contributor

So for Delta over Spark Connect to work, you need to use the Spark Connect 4.0 preview version as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants