Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

single queries should not be auto-batched #456

Merged
merged 5 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package zio.dynamodb

import zio.aws.dynamodb.DynamoDb
import zio.stm.{ STM, TMap }
import zio.{ ULayer, URLayer, ZIO, ZLayer }
import zio.{ Ref, ULayer, URLayer, ZIO, ZLayer }

trait DynamoDBExecutor {
def execute[A](atomicQuery: DynamoDBQuery[_, A]): ZIO[Any, DynamoDBError, A]
def execute[A](query: DynamoDBQuery[_, A]): ZIO[Any, DynamoDBError, A]
}

object DynamoDBExecutor {
Expand All @@ -16,16 +16,18 @@ object DynamoDBExecutor {

lazy val test: ULayer[DynamoDBExecutor with TestDynamoDBExecutor] = {
val effect = for {
ref <- Ref.make(List.empty[DynamoDBQuery[_, _]])
test <- (for {
tableMap <- TMap.empty[String, TMap[PrimaryKey, Item]]
tablePkNameMap <- TMap.empty[String, String]
} yield TestDynamoDBExecutorImpl(tableMap, tablePkNameMap)).commit
} yield TestDynamoDBExecutorImpl(ref, tableMap, tablePkNameMap)).commit
} yield test
ZLayer.fromZIO(effect)
}

def test(tableDefs: TableNameAndPK*): ULayer[DynamoDBExecutor with TestDynamoDBExecutor] = {
val effect = for {
ref <- Ref.make(List.empty[DynamoDBQuery[_, _]])
test <- (for {
tableMap <- TMap.empty[String, TMap[PrimaryKey, Item]]
tablePkNameMap <- TMap.empty[String, String]
Expand All @@ -37,7 +39,7 @@ object DynamoDBExecutor {
_ <- tableMap.put(tableName, pkToItemMap)
} yield ()
}
} yield TestDynamoDBExecutorImpl(tableMap, tablePkNameMap)).commit
} yield TestDynamoDBExecutorImpl(ref, tableMap, tablePkNameMap)).commit
} yield test
ZLayer.fromZIO(effect)
}
Expand Down
11 changes: 6 additions & 5 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutorImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
case Fail(dynamoDBError) => ZIO.fail(dynamoDBError())
}

override def execute[A](atomicQuery: DynamoDBQuery[_, A]): ZIO[Any, DynamoDBError, A] = {
val result = atomicQuery match {
override def execute[A](query: DynamoDBQuery[_, A]): ZIO[Any, DynamoDBError, A] = {

val result = query match {
case constructor: Constructor[_, A] => executeConstructor(constructor)
case zip @ Zip(_, _, _) => executeZip(zip)
case map @ Map(_, _) => executeMap(map)
Expand Down Expand Up @@ -153,7 +154,7 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
private def executeDeleteItem(deleteItem: DeleteItem): ZIO[Any, Throwable, Option[Item]] =
dynamoDb
.deleteItem(awsDeleteItemRequest(deleteItem))
.mapBoth(_.toThrowable, _.attributes.toOption.map(dynamoDBItem(_)))
.mapBoth(_.toThrowable, _.attributes.toOption.map(dynamoDBItem))

private def executeDeleteTable(deleteTable: DeleteTable): ZIO[Any, Throwable, Unit] =
dynamoDb
Expand All @@ -162,13 +163,13 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
.unit

private def executePutItem(putItem: PutItem): ZIO[Any, Throwable, Option[Item]] =
dynamoDb.putItem(awsPutItemRequest(putItem)).mapBoth(_.toThrowable, _.attributes.toOption.map(dynamoDBItem(_)))
dynamoDb.putItem(awsPutItemRequest(putItem)).mapBoth(_.toThrowable, _.attributes.toOption.map(dynamoDBItem))

private def executeGetItem(getItem: GetItem): ZIO[Any, Throwable, Option[Item]] =
dynamoDb
.getItem(awsGetItemRequest(getItem))
.mapBoth(_.toThrowable, _.item.map(dynamoDBItem))
.map(_.toOption)
.map(_.toOption.flatMap(item => if (item.map.isEmpty) None else Some(item)))

private def executeUpdateItem(updateItem: UpdateItem): ZIO[Any, Throwable, Option[Item]] =
dynamoDb.updateItem(awsUpdateItemRequest(updateItem)).mapBoth(_.toThrowable, optionalItem)
Expand Down
52 changes: 31 additions & 21 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1003,40 +1003,50 @@ object DynamoDBQuery {
hasNoProjections || matchedPrimaryKeys.filter(_ == true).size == pk.map.size
}

val isSingleQuery = constructors.size == 1 // single queries are not batched

val (indexedNonBatched, indexedGets, indexedWrites) =
constructors.zipWithIndex.foldLeft[(Chunk[IndexedConstructor], Chunk[IndexedGetItem], Chunk[IndexedWriteItem])](
(Chunk.empty, Chunk.empty, Chunk.empty)
) {
case ((nonBatched, gets, writes), (get @ GetItem(_, pk, pes, _, _, _), index)) =>
if (projectionsContainPrimaryKey(pes, pk))
if (isSingleQuery)
(nonBatched :+ (get -> index), gets, writes)
else if (projectionsContainPrimaryKey(pes, pk))
(nonBatched, gets :+ (get -> index), writes)
else
(nonBatched :+ (get -> index), gets, writes)
case ((nonBatched, gets, writes), (put @ PutItem(_, _, conditionExpression, _, _, returnValues, _), index)) =>
conditionExpression match {
case Some(_) =>
(nonBatched :+ (put -> index), gets, writes)
case None =>
if (returnValues != ReturnValues.None)
(nonBatched :+ (put -> index), gets, writes)
else
(nonBatched, gets, writes :+ (put -> index))
}
if (isSingleQuery)
(nonBatched :+ (put -> index), gets, writes)
else
conditionExpression match {
case Some(_) =>
(nonBatched :+ (put -> index), gets, writes)
case None =>
if (returnValues != ReturnValues.None)
(nonBatched :+ (put -> index), gets, writes)
else
(nonBatched, gets, writes :+ (put -> index))
}
case (
(nonBatched, gets, writes),
(delete @ DeleteItem(_, _, conditionExpression, _, _, returnValues, _), index)
) =>
conditionExpression match {
case Some(_) =>
(nonBatched :+ (delete -> index), gets, writes)
case None =>
if (returnValues != ReturnValues.None)
(nonBatched :+ (delete -> index), gets, writes)
else
(nonBatched, gets, writes :+ (delete -> index))
}
case ((nonBatched, gets, writes), (nonGetItem, index)) =>
(nonBatched :+ (nonGetItem -> index), gets, writes)
if (isSingleQuery)
(nonBatched :+ (delete -> index), gets, writes)
else
conditionExpression match {
case Some(_) =>
(nonBatched :+ (delete -> index), gets, writes)
case None =>
if (returnValues != ReturnValues.None)
(nonBatched :+ (delete -> index), gets, writes)
else
(nonBatched, gets, writes :+ (delete -> index))
}
case ((nonBatched, gets, writes), (nonBatchable, index)) =>
(nonBatched :+ (nonBatchable -> index), gets, writes)
}

val indexedBatchGetItem: (BatchGetItem, Chunk[Int]) = indexedGets
Expand Down
17 changes: 17 additions & 0 deletions dynamodb/src/main/scala/zio/dynamodb/TestDynamoDBExecutor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zio.dynamodb

import zio.{ UIO, ZIO }
import zio.dynamodb.DynamoDBQuery.{ BatchGetItem, BatchWriteItem }

/**
* A Fake implementation of `DynamoDBExecutor.Service` that currently has the very modest aspiration of providing bare minimum
Expand Down Expand Up @@ -33,6 +34,7 @@ import zio.{ UIO, ZIO }
trait TestDynamoDBExecutor {
def addTable(tableName: String, pkFieldName: String, pkAndItems: PkAndItem*): UIO[Unit]
def addItems(tableName: String, pkAndItems: PkAndItem*): ZIO[Any, DynamoDBError, Unit]
def queries: UIO[List[DynamoDBQuery[_, _]]]
}

object TestDynamoDBExecutor {
Expand All @@ -47,4 +49,19 @@ object TestDynamoDBExecutor {
def addItems(tableName: String, pkAndItems: PkAndItem*): ZIO[TestDynamoDBExecutor, DynamoDBError, Unit] =
ZIO.serviceWithZIO[TestDynamoDBExecutor](_.addItems(tableName, pkAndItems: _*))

def queries: ZIO[TestDynamoDBExecutor, Nothing, List[DynamoDBQuery[_, _]]] =
ZIO.serviceWithZIO[TestDynamoDBExecutor](_.queries)

def isEmptyBatchWrite(q: DynamoDBQuery[_, _]) =
q match {
case BatchWriteItem(items, _, _, _, _) => items.isEmpty
case _ => false
}

def isEmptyBatchGet(q: DynamoDBQuery[_, _]) =
q match {
case BatchGetItem(items, _, _, _) => items.isEmpty
case _ => false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import zio.dynamodb.DynamoDBQuery.BatchGetItem.TableGet
import zio.dynamodb.DynamoDBQuery._
import zio.stm.{ STM, TMap, ZSTM }
import zio.stream.{ Stream, ZStream }
import zio.{ Chunk, IO, UIO, ZIO }
import zio.{ Chunk, IO, Ref, UIO, ZIO }
import scala.annotation.nowarn

@nowarn
private[dynamodb] final case class TestDynamoDBExecutorImpl private[dynamodb] (
recordedQueries: Ref[List[DynamoDBQuery[_, _]]],
tableMap: TMap[String, TMap[PrimaryKey, Item]],
tablePkNameMap: TMap[String, String]
) extends DynamoDBExecutor
with TestDynamoDBExecutor {
self =>

override def execute[A](atomicQuery: DynamoDBQuery[_, A]): ZIO[Any, DynamoDBError, A] = {
val result: ZIO[Any, DynamoDBError, A] = atomicQuery match {
override def execute[A](query: DynamoDBQuery[_, A]): ZIO[Any, DynamoDBError, A] = {
val result: ZIO[Any, DynamoDBError, A] = query match {
case BatchGetItem(requestItemsMap, _, _, _) =>
val requestItems: Seq[(TableName, TableGet)] = requestItemsMap.toList

Expand Down Expand Up @@ -84,7 +85,7 @@ private[dynamodb] final case class TestDynamoDBExecutorImpl private[dynamodb] (
ZIO.die(new Exception(s"Constructor $unknown not implemented yet"))
}

result
recordedQueries.update(_ :+ query) *> result
}

private def tableError(tableName: String): DynamoDBError =
Expand Down Expand Up @@ -231,4 +232,6 @@ private[dynamodb] final case class TestDynamoDBExecutorImpl private[dynamodb] (
case (pk, item) => tableMap.put(pk, item)
}
} yield ()).commit

override def queries: UIO[List[DynamoDBQuery[_, _]]] = self.recordedQueries.get
}
42 changes: 35 additions & 7 deletions dynamodb/src/test/scala/zio/dynamodb/BatchingDSLSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.dynamodb
import zio.Chunk
import zio.dynamodb.DynamoDBQuery._
import zio.test.Assertion._
import zio.test.{ assert, TestAspect, ZIOSpecDefault }
import zio.test.{ assert, assertTrue, TestAspect, ZIOSpecDefault }
import zio.test.Spec

object BatchingDSLSpec extends ZIOSpecDefault with DynamoDBFixtures {
Expand All @@ -19,15 +19,35 @@ object BatchingDSLSpec extends ZIOSpecDefault with DynamoDBFixtures {
)

override def spec: Spec[Environment, Any] =
suite("Batching")(crudSuite, scanAndQuerySuite, batchingSuite).provideLayer(DynamoDBExecutor.test)
suite("Batching")(crudSuite, scanAndQuerySuite, batchingSuite, singleQueryDoesNotBatchSuite).provideLayer(
DynamoDBExecutor.test
)

private val crudSuite = suite("single Item CRUD suite")(
test("getItem") {
private val singleQueryDoesNotBatchSuite = suite("single query does not batch CRUD suite")(
test("a single getItem does not get auto-batch") {
for {
_ <- TestDynamoDBExecutor.addTable(tableName1.value, "k1", primaryKeyT1 -> itemT1, primaryKeyT1_2 -> itemT1_2)
result <- getItemT1.execute
} yield assert(result)(equalTo(Some(itemT1)))
_ <- TestDynamoDBExecutor.addTable(tableName1.value, "k1", primaryKeyT1 -> itemT1, primaryKeyT1_2 -> itemT1_2)
result <- getItemT1.execute
queries <- TestDynamoDBExecutor.queries
} yield assert(result)(equalTo(Some(itemT1))) && assertQueryNotBatched(queries)
},
test("a single putItem does not get auto-batch") {
for {
_ <- TestDynamoDBExecutor.addTable(tableName1.value, "k1", primaryKeyT1 -> itemT1, primaryKeyT1_2 -> itemT1_2)
result <- putItemT1.execute
queries <- TestDynamoDBExecutor.queries
} yield assert(result)(equalTo(None)) && assertQueryNotBatched(queries)
},
test("a single deleteItem does not get auto-batch") {
for {
_ <- TestDynamoDBExecutor.addTable(tableName1.value, "k1", primaryKeyT1 -> itemT1, primaryKeyT1_2 -> itemT1_2)
result <- deleteItemT1.execute
queries <- TestDynamoDBExecutor.queries
} yield assert(result)(equalTo(None)) && assertQueryNotBatched(queries)
}
)

private val crudSuite = suite("single Item CRUD suite")(
test("getItem returns an error when table does not exist") {
for {
result <- getItem("TABLE_DOES_NOT_EXISTS", primaryKeyT1).execute.either
Expand Down Expand Up @@ -175,4 +195,12 @@ object BatchingDSLSpec extends ZIOSpecDefault with DynamoDBFixtures {
} yield assert(result)(equalTo(List(Some(itemT1), Some(itemT1_2))))
} @@ beforeAddTable1AndTable2
)

private def assertQueryNotBatched(queries: List[DynamoDBQuery[_, _]]) =
assertTrue(queries.size == 3) && assertTrue(
queries.find(TestDynamoDBExecutor.isEmptyBatchWrite).isDefined
) && assertTrue(
queries.find(TestDynamoDBExecutor.isEmptyBatchGet).isDefined
)

}
Loading