Skip to content

Commit

Permalink
propagate retry policy for auto-batching (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
googley42 authored Jul 27, 2024
1 parent 5895216 commit ec11d1f
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 75 deletions.
4 changes: 2 additions & 2 deletions dynamodb/src/it/scala/zio/dynamodb/LiveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ object LiveSpec extends DynamoDBLocalSpec {
count <- stream.runFold(0) { case (count, _) => count + 1 }
} yield assert(count)(equalTo(10000))
)
} @@ TestAspect.ignore, // DynamoDBLocal does not support parallel scan
},
test("parallel scan all typed") {
withTemporaryTable(
defaultTable,
Expand All @@ -698,7 +698,7 @@ object LiveSpec extends DynamoDBLocalSpec {
} yield assert(count)(equalTo(10000))
)
}
) @@ TestAspect.ignore, // DynamoDBLocal does not support parallel scan,
),
suite("query tables")(
test("query all projection expressions should handle keyword") {
withDefaultTable { tableName =>
Expand Down
16 changes: 8 additions & 8 deletions dynamodb/src/it/scala/zio/dynamodb/TypeSafeApiCrudSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,10 @@ object TypeSafeApiCrudSpec extends DynamoDBLocalSpec {
},
test("with forEach, catching a BatchError and resuming processing") {
withSingleIdKeyTable { tableName =>
type FailureWrapper = Either[String, Option[Person]]
val person1 = Person("1", "Smith", Some("John"), 21)
val person2 = Person("2", "Brown", None, 42)
val inputStream = ZStream(person1, person2)
val outputStream: ZStream[DynamoDBExecutor, DynamoDBError, FailureWrapper] = inputStream
val person1 = Person("1", "Smith", Some("John"), 21)
val person2 = Person("2", "Brown", None, 42)
val inputStream = ZStream(person1, person2)
val outputStream: ZStream[DynamoDBExecutor, DynamoDBError, Either[String, Option[Person]]] = inputStream
.grouped(2)
.mapZIO { chunk =>
val batchWriteItem = DynamoDBQuery
Expand All @@ -169,6 +168,7 @@ object TypeSafeApiCrudSpec extends DynamoDBLocalSpec {
for {
xs <- outputStream.runCollect
} yield assertTrue(xs == Chunk(Right(None), Right(None)))
// Note this test is only an example, we cannot force an AWS batch error with unprocessed item to occur in the local DynamoDB
}
}
)
Expand Down Expand Up @@ -333,7 +333,7 @@ object TypeSafeApiCrudSpec extends DynamoDBLocalSpec {
}
},
test(
"setIfNotExists fails silently when the attribute already exists" // this is AWS API behaviour
"setIfNotExists fails silently when the attribute already exists" // this is AWS API behavior
) {
withSingleIdKeyTable { tableName =>
val person = Person("1", "Smith", None, 21)
Expand Down Expand Up @@ -444,7 +444,7 @@ object TypeSafeApiCrudSpec extends DynamoDBLocalSpec {
}
},
test(
"remove'ing a map element when it does not exists fails silently" // this is AWS API behaviour
"remove'ing a map element when it does not exists fails silently" // this is AWS API behavior
) {
withSingleIdKeyTable { tableName =>
val person = PersonWithCollections(
Expand Down Expand Up @@ -785,7 +785,7 @@ object TypeSafeApiCrudSpec extends DynamoDBLocalSpec {
} yield assertTrue(people == Chunk.empty)
}
},
test("with an update query") { // not there is no AWS API for batch update so these queries are run in parallel
test("with an update query") { // note there is no AWS API for batch update so these queries are run in parallel
withSingleIdKeyTable { tableName =>
val person1 = Person("1", "Smith", Some("John"), 21)
val person2 = Person("2", "Brown", Some("Peter"), 42)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object TypeSafeScanAndQuerySpec extends DynamoDBLocalSpec {
.sortBy(_.id) // parallel scan order is not guaranteed
)
}
} @@ TestAspect.ignore, // DynamoDBLocal does not support parallel scan
},
test("with filter on forename exists") {
withSingleIdKeyTable { tableName =>
for {
Expand Down
15 changes: 12 additions & 3 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutorImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ import zio.{ Chunk, NonEmptyChunk, ZIO }
import scala.collection.immutable.{ Map => ScalaMap }
import software.amazon.awssdk.services.dynamodb.model.{ DynamoDbException => AwsSdkDynamoDbException }
import scala.annotation.nowarn
import zio.durationInt
import zio.Schedule

@nowarn
private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynamoDb: DynamoDb)
extends DynamoDBExecutor {
import DynamoDBExecutorImpl._

private val defaultRetryPolicy = Schedule.recurs(3) && Schedule.exponential(50.milliseconds)

def executeMap[A, B](map: Map[A, B]): ZIO[Any, Throwable, B] =
execute(map.query).map(map.mapper)

Expand Down Expand Up @@ -217,7 +221,7 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
if (batchWriteItem.requestItems.isEmpty) ZIO.succeed(BatchWriteItem.Response(None))
else {
// need to explicitly type this for Scala 3 compiler
val t: zio.Schedule[Any, Throwable, Any] = batchWriteItem.retryPolicy.whileInput {
val t: zio.Schedule[Any, Throwable, Any] = batchWriteItem.retryPolicy.getOrElse(defaultRetryPolicy).whileInput {
case BatchRetryError() => true
case _ => false
}
Expand Down Expand Up @@ -327,7 +331,7 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
if (batchGetItem.requestItems.isEmpty)
ZIO.succeed(BatchGetItem.Response())
else {
val t: zio.Schedule[Any, Throwable, Any] = batchGetItem.retryPolicy.whileInput {
val t: zio.Schedule[Any, Throwable, Any] = batchGetItem.retryPolicy.getOrElse(defaultRetryPolicy).whileInput {
case BatchRetryError() => true
case _ => false
}
Expand Down Expand Up @@ -666,7 +670,12 @@ case object DynamoDBExecutorImpl {
}

private[dynamodb] def writeRequestToBatchWrite(writeRequest: WriteRequest.ReadOnly): Option[BatchWriteItem.Write] =
writeRequest.putRequest.toOption.map(put => BatchWriteItem.Put(item = AttrMap(awsAttrMapToAttrMap(put.item))))
(writeRequest.putRequest.toOption, writeRequest.deleteRequest.toOption) match {
case (Some(put), None) => Some(BatchWriteItem.Put(item = AttrMap(awsAttrMapToAttrMap(put.item))))
case (None, Some(delete)) =>
Some(BatchWriteItem.Delete(key = AttrMap(awsAttrMapToAttrMap(delete.key))))
case _ => None
}

private def keysAndAttrsToTableGet(ka: KeysAndAttributes.ReadOnly): TableGet = {
val maybeProjectionExpressions = ka.projectionExpression.map(
Expand Down
55 changes: 30 additions & 25 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import zio.dynamodb.UpdateExpression.Action
import zio.prelude.ForEachOps
import zio.schema.Schema
import zio.stream.Stream
import zio.{ Chunk, Schedule, ZIO, Zippable => _, _ }
import zio.{ Chunk, Schedule, ZIO }
import scala.annotation.nowarn

sealed trait DynamoDBQuery[-In, +Out] { self =>
Expand Down Expand Up @@ -365,19 +365,21 @@ sealed trait DynamoDBQuery[-In, +Out] { self =>
Zip(left.withRetryPolicy(retryPolicy), right.withRetryPolicy(retryPolicy), zippable)
case Map(query, mapper) => Map(query.withRetryPolicy(retryPolicy), mapper)
case Absolve(query) => Absolve(query.withRetryPolicy(retryPolicy))
case s: BatchWriteItem => s.copy(retryPolicy = retryPolicy).asInstanceOf[DynamoDBQuery[In, Out]]
case s: BatchGetItem => s.copy(retryPolicy = retryPolicy).asInstanceOf[DynamoDBQuery[In, Out]]
case _ =>
self // TODO: Avi - popogate retry to all primitives eg Get/Put/Delete so that autobatching can select it
case bw: BatchWriteItem => bw.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case bg: BatchGetItem => bg.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case p: PutItem => p.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case d: DeleteItem => d.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case g: GetItem => g.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case _ => self
}

def sortOrder(ascending: Boolean): DynamoDBQuery[In, Out] =
self match {
case Zip(left, right, zippable) => Zip(left.sortOrder(ascending), right.sortOrder(ascending), zippable)
case Map(query, mapper) => Map(query.sortOrder(ascending), mapper)
case Absolve(query) => Absolve(query.sortOrder(ascending))
case s: QuerySome => s.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case s: QueryAll => s.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case qs: QuerySome => qs.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case qa: QueryAll => qa.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case _ => self
}

Expand Down Expand Up @@ -675,7 +677,8 @@ object DynamoDBQuery {
projections: List[ProjectionExpression[_, _]] =
List.empty, // If no attribute names are specified, then all attributes are returned
consistency: ConsistencyMode = ConsistencyMode.Weak,
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None,
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
) extends Constructor[Any, Option[Item]]

private[dynamodb] final case class BatchRetryError() extends Throwable
Expand All @@ -685,7 +688,7 @@ object DynamoDBQuery {
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None,
private[dynamodb] val orderedGetItems: Chunk[GetItem] =
Chunk.empty, // track order of added GetItems for later unpacking
retryPolicy: Schedule[Any, Throwable, Any] = Schedule.recurs(3) && Schedule.exponential(50.milliseconds)
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
) extends Constructor[Any, BatchGetItem.Response] { self =>

def +(getItem: GetItem): BatchGetItem = {
Expand All @@ -704,7 +707,8 @@ object DynamoDBQuery {
BatchGetItem(
self.requestItems + newEntry,
self.capacity,
self.orderedGetItems :+ getItem
self.orderedGetItems :+ getItem,
self.retryPolicy.orElse(getItem.retryPolicy) // inherit retry policy from GetItem if not set
)
}

Expand Down Expand Up @@ -763,26 +767,25 @@ object DynamoDBQuery {
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None,
itemMetrics: ReturnItemCollectionMetrics = ReturnItemCollectionMetrics.None,
addList: Chunk[BatchWriteItem.Write] = Chunk.empty,
retryPolicy: Schedule[Any, Throwable, Any] =
Schedule.recurs(3) && Schedule.exponential(50.milliseconds)
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
) extends Constructor[Any, BatchWriteItem.Response] { self =>
def +[A](writeItem: Write[Any, A]): BatchWriteItem =
writeItem match {
case putItem @ PutItem(_, _, _, _, _, _) =>
case putItem @ PutItem(_, _, _, _, _, _, _) =>
BatchWriteItem(
self.requestItems + ((putItem.tableName, Put(putItem.item))),
self.capacity,
self.itemMetrics,
self.addList :+ Put(putItem.item),
self.retryPolicy
self.retryPolicy.orElse(putItem.retryPolicy) // inherit retry policy from PutItem if not set
)
case deleteItem @ DeleteItem(_, _, _, _, _, _) =>
case deleteItem @ DeleteItem(_, _, _, _, _, _, _) =>
BatchWriteItem(
self.requestItems + ((deleteItem.tableName, Delete(deleteItem.key))),
self.capacity,
self.itemMetrics,
self.addList :+ Delete(deleteItem.key),
self.retryPolicy
self.retryPolicy.orElse(deleteItem.retryPolicy) // inherit retry policy from DeleteItem if not set
)
}

Expand Down Expand Up @@ -922,7 +925,8 @@ object DynamoDBQuery {
conditionExpression: Option[ConditionExpression[_]] = None,
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None,
itemMetrics: ReturnItemCollectionMetrics = ReturnItemCollectionMetrics.None,
returnValues: ReturnValues = ReturnValues.None // PutItem does not recognize any values other than NONE or ALL_OLD.
returnValues: ReturnValues = ReturnValues.None, // PutItem does not recognize any values other than NONE or ALL_OLD.
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
) extends Write[Any, Option[Item]]

private[dynamodb] final case class UpdateItem(
Expand All @@ -948,7 +952,8 @@ object DynamoDBQuery {
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None,
itemMetrics: ReturnItemCollectionMetrics = ReturnItemCollectionMetrics.None,
returnValues: ReturnValues =
ReturnValues.None // DeleteItem does not recognize any values other than NONE or ALL_OLD.
ReturnValues.None, // DeleteItem does not recognize any values other than NONE or ALL_OLD.
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
) extends Write[Any, Option[Item]]

private[dynamodb] final case class CreateTable(
Expand Down Expand Up @@ -1002,12 +1007,12 @@ object DynamoDBQuery {
constructors.zipWithIndex.foldLeft[(Chunk[IndexedConstructor], Chunk[IndexedGetItem], Chunk[IndexedWriteItem])](
(Chunk.empty, Chunk.empty, Chunk.empty)
) {
case ((nonBatched, gets, writes), (get @ GetItem(_, pk, pes, _, _), index)) =>
case ((nonBatched, gets, writes), (get @ GetItem(_, pk, pes, _, _, _), index)) =>
if (projectionsContainPrimaryKey(pes, pk))
(nonBatched, gets :+ (get -> index), writes)
else
(nonBatched :+ (get -> index), gets, writes)
case ((nonBatched, gets, writes), (put @ PutItem(_, _, conditionExpression, _, _, returnValues), index)) =>
case ((nonBatched, gets, writes), (put @ PutItem(_, _, conditionExpression, _, _, returnValues, _), index)) =>
conditionExpression match {
case Some(_) =>
(nonBatched :+ (put -> index), gets, writes)
Expand All @@ -1019,7 +1024,7 @@ object DynamoDBQuery {
}
case (
(nonBatched, gets, writes),
(delete @ DeleteItem(_, _, conditionExpression, _, _, returnValues), index)
(delete @ DeleteItem(_, _, conditionExpression, _, _, returnValues, _), index)
) =>
conditionExpression match {
case Some(_) =>
Expand All @@ -1030,7 +1035,7 @@ object DynamoDBQuery {
else
(nonBatched, gets, writes :+ (delete -> index))
}
case ((nonBatched, gets, writes), (nonGetItem, index)) =>
case ((nonBatched, gets, writes), (nonGetItem, index)) =>
(nonBatched :+ (nonGetItem -> index), gets, writes)
}

Expand Down Expand Up @@ -1127,15 +1132,15 @@ object DynamoDBQuery {
}
)

case getItem @ GetItem(_, _, _, _, _) =>
case getItem @ GetItem(_, _, _, _, _, _) =>
(
Chunk(getItem),
(results: Chunk[Any]) => {
results.head.asInstanceOf[A]
}
)

case putItem @ PutItem(_, _, _, _, _, _) =>
case putItem @ PutItem(_, _, _, _, _, _, _) =>
(
Chunk(putItem),
(results: Chunk[Any]) => {
Expand All @@ -1159,7 +1164,7 @@ object DynamoDBQuery {
}
)

case deleteItem @ DeleteItem(_, _, _, _, _, _) =>
case deleteItem @ DeleteItem(_, _, _, _, _, _, _) =>
(
Chunk(deleteItem),
(results: Chunk[Any]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ private[dynamodb] final case class TestDynamoDBExecutorImpl private[dynamodb] (
}
results.map(_ => BatchWriteItem.Response(None))

case GetItem(tableName, key, _, _, _) =>
case GetItem(tableName, key, _, _, _, _) =>
fakeGetItem(tableName.value, key)

case PutItem(tableName, item, _, _, _, _) =>
case PutItem(tableName, item, _, _, _, _, _) =>
fakePut(tableName.value, item)

// TODO Note UpdateItem is not currently supported as it uses an UpdateExpression

case DeleteItem(tableName, key, _, _, _, _) =>
case DeleteItem(tableName, key, _, _, _, _, _) =>
fakeDelete(tableName.value, key)

case ScanSome(tableName, limit, _, _, exclusiveStartKey, _, _, _, _) =>
Expand Down
Loading

0 comments on commit ec11d1f

Please sign in to comment.