Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch implementations for CDM (Synapse Link) #119

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.sneaksanddata.arcane.framework
package services.consumers

import models.ArcaneSchema
import models.querygen.{MergeQuery, MergeQueryCommons, OnSegment, OverwriteQuery, WhenMatchedDelete, WhenMatchedUpdate, WhenNotMatchedInsert}

object MatchedAppendOnlyDelete:
def apply(): WhenMatchedDelete = new WhenMatchedDelete {
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = true")
}

object MatchedAppendOnlyUpdate {
def apply(cols: Seq[String]): WhenMatchedUpdate = new WhenMatchedUpdate {
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = false AND ${MergeQueryCommons.SOURCE_ALIAS}.versionnumber > ${MergeQueryCommons.TARGET_ALIAS}.versionnumber")
override val columns: Seq[String] = cols
}
}

object NotMatchedAppendOnlyInsert {
def apply(cols: Seq[String]): WhenNotMatchedInsert = new WhenNotMatchedInsert {
override val columns: Seq[String] = cols
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = false")
}
}

object SynapseLinkMergeQuery:
def apply(targetName: String, sourceQuery: String, partitionValues: Map[String, List[String]], mergeKey: String, columns: Seq[String]): MergeQuery =
MergeQuery(targetName, sourceQuery)
++ OnSegment(partitionValues, mergeKey)
++ MatchedAppendOnlyDelete()
++ MatchedAppendOnlyUpdate(columns.filterNot(c => c == mergeKey))
++ NotMatchedAppendOnlyInsert(columns)

object SynapseLinkBackfillQuery:
def apply(targetName: String, sourceQuery: String): OverwriteQuery = OverwriteQuery(sourceQuery, targetName)

class SynapseLinkBackfillBatch(batchName: String, batchSchema: ArcaneSchema, targetName: String) extends StagedBackfillBatch:
override val name: String = batchName
override val schema: ArcaneSchema = batchSchema

override def reduceExpr: String =
// important to note that append-only nature of the source must be taken into account
// thus, we need identify which of the latest versions were deleted after we have found the latest versions for each `Id` - since for backfill we must exclude deletions
s"""SELECT * FROM (
| SELECT * FROM $name ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
|) WHERE IsDelete = false""".stripMargin

override val batchQuery: OverwriteQuery = SynapseLinkBackfillQuery(targetName, reduceExpr)

def archiveExpr: String = s"INSERT OVERWRITE ${targetName}_stream_archive $reduceExpr"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archive table name should be a parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will send a PR for this


object SynapseLinkBackfillBatch:
/**
*
*/
def apply(batchName: String, batchSchema: ArcaneSchema, targetName: String): StagedBackfillBatch = new SynapseLinkBackfillBatch(batchName: String, batchSchema: ArcaneSchema, targetName)

class SynapseLinkMergeBatch(batchName: String, batchSchema: ArcaneSchema, targetName: String, partitionValues: Map[String, List[String]], mergeKey: String) extends StagedVersionedBatch:
override val name: String = batchName
override val schema: ArcaneSchema = batchSchema

override def reduceExpr: String =
// for merge query, we must carry over deletions so they can be applied in a MERGE statement by MatchedAppendOnlyDelete
s"""SELECT * FROM (
| SELECT * FROM $name ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
|)""".stripMargin

override val batchQuery: MergeQuery =
SynapseLinkMergeQuery(targetName = targetName, sourceQuery = reduceExpr, partitionValues = partitionValues, mergeKey = mergeKey, columns = schema.map(f => f.name))

def archiveExpr: String = s"INSERT INTO ${targetName}_stream_archive $reduceExpr"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's archive for a different batch type


object SynapseLinkMergeBatch:
def apply(batchName: String, batchSchema: ArcaneSchema, targetName: String, partitionValues: Map[String, List[String]]): StagedVersionedBatch =
new SynapseLinkMergeBatch(batchName: String, batchSchema: ArcaneSchema, targetName: String, partitionValues: Map[String, List[String]], batchSchema.mergeKey.name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
MERGE INTO test.table_a t_o
USING (SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
)) t_s
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
colA = t_s.colA,
colB = t_s.colB,
Id = t_s.Id,
versionnumber = t_s.versionnumber
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
t_s.colA,
t_s.colB,
t_s.Id,
t_s.versionnumber)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
MERGE INTO test.table_a t_o
USING (SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
)) t_s
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY AND t_o.colA IN ('a','b','c')
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
colA = t_s.colA,
colB = t_s.colB,
Id = t_s.Id,
versionnumber = t_s.versionnumber
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
t_s.colA,
t_s.colB,
t_s.Id,
t_s.versionnumber)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
INSERT OVERWRITE test.table_a
SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
) WHERE IsDelete = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
MERGE INTO test.table_a t_o
USING (SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
)) t_s
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY AND t_o.colA IN ('a','b','c')
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
colA = t_s.colA,
colB = t_s.colB,
Id = t_s.Id,
versionnumber = t_s.versionnumber
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
t_s.colA,
t_s.colB,
t_s.Id,
t_s.versionnumber)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
INSERT OVERWRITE test.table_a
SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
) WHERE IsDelete = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.sneaksanddata.arcane.framework
package services.consumers

import models.ArcaneType.{LongType, StringType}
import models.{Field, MergeKeyField}

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.io.Source
import scala.util.Using

class SynapseLinkTests extends AnyFlatSpec with Matchers:

it should "generate a valid overwrite query" in {
val query = SynapseLinkBackfillQuery("test.table_a",
"""SELECT * FROM (
| SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
|) WHERE IsDelete = false""".stripMargin)
val expected = Using(Source.fromURL(getClass.getResource("/generate_an_overwrite_query_synapse_link.sql"))) {
_.getLines().mkString("\n")
}.get
query.query should equal(expected)
}

it should "generate a valid merge query" in {
val query = SynapseLinkMergeQuery(
"test.table_a",
"""SELECT * FROM (
| SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
|)""".stripMargin,
Map(),
"ARCANE_MERGE_KEY",
Seq("ARCANE_MERGE_KEY", "colA", "colB", "Id", "versionnumber")
)

val expected = Using(Source.fromURL(getClass.getResource("/generate_a_valid_merge_query_synapse_link.sql"))) {
_.getLines().mkString("\n")
}.get
query.query should equal(expected)
}

it should "generate a valid merge with partitions" in {
val query = SynapseLinkMergeQuery(
"test.table_a",
"""SELECT * FROM (
| SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
|)""".stripMargin,
Map(
"colA" -> List("a", "b", "c")
),
"ARCANE_MERGE_KEY",
Seq("ARCANE_MERGE_KEY", "colA", "colB", "Id", "versionnumber")
)

val expected = Using(Source.fromURL(getClass.getResource("/generate_a_valid_merge_query_with_partitions_synapse_link.sql"))) {
_.getLines().mkString("\n")
}.get

query.query should equal(expected)
}

"SynapseLinkBackfillBatch" should "generate a valid backfill batch" in {
val batch = SynapseLinkBackfillBatch("test.staged_a", Seq(
MergeKeyField,
Field(
name = "colA",
fieldType = StringType
),
Field(
name = "colB",
fieldType = StringType
),
Field(
name = "versionnumber",
fieldType = LongType
),
Field(
name = "Id",
fieldType = StringType
)
), "test.table_a")

val expected = Using(Source.fromURL(getClass.getResource("/generate_a_valid_synapse_link_backfill_batch_query.sql"))) {
_.getLines().mkString("\n")
}.get

batch.batchQuery.query should equal(expected)
}

"SynapseLinkMergeBatch" should "generate a valid versioned batch" in {
val batch = SynapseLinkMergeBatch("test.staged_a", Seq(
MergeKeyField,
Field(
name = "colA",
fieldType = StringType
),
Field(
name = "colB",
fieldType = StringType
),
Field(
name = "Id",
fieldType = StringType
),
Field(
name = "versionnumber",
fieldType = LongType
)
),
"test.table_a",
Map("colA" -> List("a", "b", "c")
))

val expected = Using(Source.fromURL(getClass.getResource("/generate_a_valid_synapse_link_merge_query_with_partitions.sql"))) {
_.getLines().mkString("\n")
}.get

batch.batchQuery.query should equal(expected)
}
Loading