Skip to content

The goal of this library is to provide a compatibility layer that makes it easier to adopt Spark Connect. The library is designed to be simply imported in your application and will then monkey-patch the existing API to provide the legacy functionality.

License

Notifications You must be signed in to change notification settings

databricks/congruity

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

66 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

congruity

GitHub Actions Build PyPI Downloads

In many ways, the migration from using classic Spark applications using the full power and flexibility to be using only the Spark Connect compatible DataFrame API can be challenging.

The goal of this library is to provide a compatibility layer that makes it easier to adopt Spark Connect. The library is designed to be simply imported in your application and will then monkey-patch the existing API to provide the legacy functionality.

Non-Goals

This library is not intended to be a long-term solution. The goal is to provide a compatibility layer that becomes obsolete over time. In addition, we do not aim to provide compatibility for all methods and features but only a select subset. Lastly, we do not aim to achieve the same performance as using some of the native RDD APIs.

Usage

Spark JVM & Spark Connect compatibility library.

pip install spark-congruity
import congruity.patch

Example

Here is code that works on Spark JVM:

from pyspark.sql import SparkSession

spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
spark.sparkContext.parallelize(data).toDF()

This code doesn't work with Spark Connect. The congruity library rearranges the code under the hood, so the old syntax works on Spark Connect clusters as well:

import congruity.patch  # noqa: F401
from pyspark.sql import SparkSession

spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
spark.sparkContext.parallelize(data).toDF()

Contributing

We very much welcome contributions to this project. The easiest way to start is to pick any of the below RDD or SparkContext methods and implement the compatibility layer. Once you have done that open a pull request and we will review it.

What's supported?

RDD

RDD API Comment
aggregate
aggregateByKey
barrier
cache
cartesian
checkpoint
cleanShuffleDependencies
coalesce
cogroup
collect
collectAsMap
collectWithJobGroup
combineByKey
count
countApprox
countByKey
countByValue
distinct
filter
first
flatMap
fold First version
foreach
foreachPartition
fullOuterJoin
getCheckpointFile
getNumPartitions
getResourceProfile
getStorageLevel
glom
groupBy
groupByKey
groupWith
histogram
id
intersection
isCheckpointed
isEmpty
isLocallyCheckpointed
join
keyBy
keys
leftOuterJoin
localCheckpoint
lookup
map
mapPartitions First version, based on mapInArrow.
mapPartitionsWithIndex
mapPartitionsWithSplit
mapValues
max
mean
meanApprox
min
name
partitionBy
persist
pipe
randomSplit
reduce
reduceByKey
repartition
repartitionAndSortWithinPartition
rightOuterJoin
sample
sampleByKey
sampleStdev
sampleVariance
saveAsHadoopDataset
saveAsHadoopFile
saveAsNewAPIHadoopDataset
saveAsNewAPIHadoopFile
saveAsPickleFile
saveAsTextFile
setName
sortBy
sortByKey
stats
stdev
subtract
substractByKey
sum First version.
sumApprox
take Ordering might not be guaranteed in the same way as it is in RDD.
takeOrdered
takeSample
toDF
toDebugString
toLocalIterator
top
treeAggregate
treeReduce
union
unpersist
values
variance
withResources
zip
zipWithIndex
zipWithUniqueId

SparkContext

RDD API Comment
parallelize Does not support numSlices yet.

Limitations

  • Error handling and checking is kind of limited right now. We try to emulate the existing behavior, but this is not always possible because the invariants are not encode in Python but rather somewhere in Scala.
  • numSlices - we don't emulate this behavior for now.

About

The goal of this library is to provide a compatibility layer that makes it easier to adopt Spark Connect. The library is designed to be simply imported in your application and will then monkey-patch the existing API to provide the legacy functionality.

Resources

License

Stars

Watchers

Forks

Packages

No packages published