Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename table operator #5

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ Run `sbt package`, then add

### Schema diff

`schema_diff` is a Spark SQL table-valued function. The expression
`refs_data_diff` is a Spark SQL table-valued function. The expression

```sql
schema_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)
refs_data_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)
```

yields a relation that compares the "from" table `PREFIX.FROM_SCHEMA.TABLE`
Expand All @@ -46,7 +46,7 @@ with the "to" table `PREFIX.TO_SCHEMA.TABLE`. Elements of "to" but not
For instance,

```sql
SELECT lakefs_change, Player, COUNT(*) FROM schema_diff('lakefs', 'main~', 'main', 'db.allstar_games')
SELECT lakefs_change, Player, COUNT(*) FROM refs_data_diff('lakefs', 'main~', 'main', 'db.allstar_games')
GROUP BY lakefs_change, Player;
```

Expand All @@ -58,5 +58,5 @@ you can set up a view with it:

```sql
CREATE TEMPORARY VIEW diff_allstar_games_main_last_commit AS
schema_diff('lakefs', 'main~', 'main', 'db.allstar_games');
refs_data_diff('lakefs', 'main~', 'main', 'db.allstar_games');
```
12 changes: 6 additions & 6 deletions src/main/scala/io/lakefs/iceberg/extension/Extension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.StringLiteral

// A table-valued function to compute the difference between the same table
// at two schemas.
object SchemaDiff {
object TableDataDiff {
private def computeString(e: Expression): String = {
val literalValue = StringLiteral.unapply(e)
literalValue match {
Expand Down Expand Up @@ -53,15 +53,15 @@ object SchemaDiff {
spark.sql(sqlString).queryExecution.logical
}

val function = (FunctionIdentifier("schema_diff"),
new ExpressionInfo("io.lakefs.iceberg.extension.SchemaDiff$",
"", "schema_diff", "schema_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')",
"schema_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')"),
val function = (FunctionIdentifier("refs_data_diff"),
new ExpressionInfo("io.lakefs.iceberg.extension.TableDataDiff$",
"", "refs_data_diff", "refs_data_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')",
"refs_data_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')"),
tdfBuilder _)
}

class LakeFSSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectTableFunction(SchemaDiff.function)
extensions.injectTableFunction(TableDataDiff.function)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ExtensionSpec extends AnyFunSpec
val df2 = Seq(("a", 1), ("xyzzy", 2), ("c", 3), ("d", 4)).toDF
df2.writeTo("spark_catalog.second.table").create()

val diff = spark.sql("SELECT * FROM schema_diff('spark_catalog', 'first', 'second', 'table')")
val diff = spark.sql("SELECT * FROM refs_data_diff('spark_catalog', 'first', 'second', 'table')")
.collect()
.toSet
diff should equal(Set(Row("-", "b", 2), Row("+", "xyzzy", 2), Row("+", "d", 4)))
Expand Down
Loading