Skip to content

Commit

Permalink
Delta Test Suite Refactoring
Browse files Browse the repository at this point in the history
Closes delta-io#2292

GitOrigin-RevId: 27e88f1e2dc5c8b8dad5ea36fa348476dd6f3d6c
  • Loading branch information
ericm-db authored and allisonport-db committed Nov 20, 2023
1 parent 246fcf3 commit 35e5d69
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
} else {
Seq(
// This makes it move to the next batch.
AssertOnQuery(waitUntilBatchProcessed(1, _)),
AssertOnQuery { q =>
eventually("Next batch was never processed") {
// Ensure we only processed a single batch with the DML commands.
Expand All @@ -208,18 +207,6 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
testStream(df)((baseActions ++ expectations): _*)
}

protected def waitUntilBatchProcessed(batchId: Int, currentStream: StreamExecution): Boolean = {
eventually("Next batch was never processed") {
if (!currentStream.exception.isDefined) {
assert(currentStream.commitLog.getLatestBatchId().get >= batchId)
}
}
if (currentStream.exception.isDefined) {
throw currentStream.exception.get
}
true
}

protected def eventually[T](message: String)(func: => T): T = {
try {
Eventually.eventually(Timeout(streamingTimeout)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils

protected def testSchemaEvolution(
testName: String,
columnMapping: Boolean = true)(f: DeltaLog => Unit): Unit = {
super.test(testName) {
columnMapping: Boolean = true,
tags: Seq[org.scalatest.Tag] = Seq.empty)(f: DeltaLog => Unit): Unit = {
super.test(testName, tags: _*) {
if (columnMapping) {
withStarterTable { log =>
f(log)
Expand Down Expand Up @@ -1415,7 +1416,8 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils
}
}

testSchemaEvolution("multiple sources with schema evolution") { implicit log =>
testSchemaEvolution("multiple sources with schema evolution"
) { implicit log =>
val v5 = log.update().version // v5 has an ADD file action with value (4, 4)
renameColumn("b", "c")
addData(5 until 10)
Expand Down

0 comments on commit 35e5d69

Please sign in to comment.