Skip to content

Commit

Permalink
propagate retry for batched get
Browse files Browse the repository at this point in the history
  • Loading branch information
googley42 committed Jul 23, 2024
1 parent 468a8cd commit 72eb75a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
if (batchGetItem.requestItems.isEmpty)
ZIO.succeed(BatchGetItem.Response())
else {
println(s"XXXXXXXXXXXXXXXXXXX batchGetItem.retryPolicy ${batchGetItem.retryPolicy}")
val t: zio.Schedule[Any, Throwable, Any] = batchGetItem.retryPolicy.getOrElse(defaultRetryPolicy).whileInput {
case BatchRetryError() => true
case _ => false
Expand All @@ -351,10 +352,17 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
collectedItems <- zio.Ref.make[MapOfSet[TableName, Item]](MapOfSet.empty)
_ <- (for {
unprocessed <- unprocessedKeys.get
_ <- ZIO.debug(
s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with unprocessed.size=${unprocessed.size}"
)
currentCollection <- collectedItems.get
response <- dynamoDb
.batchGetItem(awsBatchGetItemRequest(batchGetItem.copy(requestItems = unprocessed)))
.mapError(_.toThrowable)
_ <-
ZIO.debug(
s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with response.unprocessedKeys=${response.unprocessedKeys}"
)
responseUnprocessedOpt =
response.unprocessedKeys
.map(_.map {
Expand Down
23 changes: 15 additions & 8 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,18 @@ sealed trait DynamoDBQuery[-In, +Out] { self =>
case Absolve(query) => Absolve(query.withRetryPolicy(retryPolicy))
case s: BatchWriteItem =>
s.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case s: BatchGetItem => s.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case s: BatchGetItem =>
println(s"0 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
s.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case p: PutItem =>
println(s"ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
println(s"1 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
p.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case d: DeleteItem =>
println(s"ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
println(s"2 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
d.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case g: GetItem =>
println(s"3 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
g.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case _ =>
self // TODO: Avi - propagate retry to all atomic batchable queries eg Get/Put/Delete so that auto-batching can select it
}
Expand Down Expand Up @@ -685,7 +690,8 @@ object DynamoDBQuery {
projections: List[ProjectionExpression[_, _]] =
List.empty, // If no attribute names are specified, then all attributes are returned
consistency: ConsistencyMode = ConsistencyMode.Weak,
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None
capacity: ReturnConsumedCapacity = ReturnConsumedCapacity.None,
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
) extends Constructor[Any, Option[Item]]

private[dynamodb] final case class BatchRetryError() extends Throwable
Expand Down Expand Up @@ -714,7 +720,8 @@ object DynamoDBQuery {
BatchGetItem(
self.requestItems + newEntry,
self.capacity,
self.orderedGetItems :+ getItem
self.orderedGetItems :+ getItem,
self.retryPolicy.orElse(getItem.retryPolicy) // inherit retry policy from GetItem if not set
)
}

Expand Down Expand Up @@ -776,7 +783,7 @@ object DynamoDBQuery {
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
// Schedule.recurs(3) && Schedule.exponential(50.milliseconds) // TODO: Avi delete
) extends Constructor[Any, BatchWriteItem.Response] { self =>
println(s"XXXXXXXXXXXX BatchWriteItem.retryPolicy ${self.retryPolicy}")
// println(s"XXXXXXXXXXXX BatchWriteItem.retryPolicy ${self.retryPolicy}")
def +[A](writeItem: Write[Any, A]): BatchWriteItem =
writeItem match {
case putItem @ PutItem(_, _, _, _, _, _, _) =>
Expand Down Expand Up @@ -1015,7 +1022,7 @@ object DynamoDBQuery {
constructors.zipWithIndex.foldLeft[(Chunk[IndexedConstructor], Chunk[IndexedGetItem], Chunk[IndexedWriteItem])](
(Chunk.empty, Chunk.empty, Chunk.empty)
) {
case ((nonBatched, gets, writes), (get @ GetItem(_, pk, pes, _, _), index)) =>
case ((nonBatched, gets, writes), (get @ GetItem(_, pk, pes, _, _, _), index)) =>
if (projectionsContainPrimaryKey(pes, pk))
(nonBatched, gets :+ (get -> index), writes)
else
Expand Down Expand Up @@ -1140,7 +1147,7 @@ object DynamoDBQuery {
}
)

case getItem @ GetItem(_, _, _, _, _) =>
case getItem @ GetItem(_, _, _, _, _, _) =>
(
Chunk(getItem),
(results: Chunk[Any]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[dynamodb] final case class TestDynamoDBExecutorImpl private[dynamodb] (
}
results.map(_ => BatchWriteItem.Response(None))

case GetItem(tableName, key, _, _, _) =>
case GetItem(tableName, key, _, _, _, _) =>
fakeGetItem(tableName.value, key)

case PutItem(tableName, item, _, _, _, _, _) =>
Expand Down
38 changes: 25 additions & 13 deletions dynamodb/src/test/scala/zio/dynamodb/AutoBatchedFailureSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
retryPolicy = Some(Schedule.recurs(1))
)

val batchWriteDeleteItemRequestItemOneAndTwo = BatchWriteItem(
private val batchWriteDeleteItemRequestItemOneAndTwo = BatchWriteItem(
requestItems = MapOfSet.empty[TableName, BatchWriteItem.Write] + (TableName(mockBatches) -> BatchWriteItem.Delete(
itemOne
)) + (TableName(mockBatches) -> BatchWriteItem.Delete(itemTwo)),
Expand All @@ -98,7 +98,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
batchWriteDeleteItemRequestItemOneAndTwo
)

private def failedMockBatchGet(peSet: Set[ProjectionExpression[_, _]] = Set.empty): ULayer[DynamoDb] =
private def failedMockBatchGet(peSet: Set[ProjectionExpression[_, _]] = Set.empty, atMost: Int): ULayer[DynamoDb] =
DynamoDbMock
.BatchGetItem(
equalTo(getRequestItemOneAndTwo(peSet)),
Expand All @@ -117,7 +117,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
).asReadOnly
)
)
.atMost(4)
.atMost(atMost)

private val failedPartialMockBatchGetTwoItems: ULayer[DynamoDb] = DynamoDbMock
.BatchGetItem(
Expand Down Expand Up @@ -196,7 +196,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
)
)
)
}.provideLayer(failedMockBatchGet() >>> DynamoDBExecutor.live) @@ TestAspect.withLiveClock,
}.provideLayer(failedMockBatchGet(atMost = 4) >>> DynamoDBExecutor.live) @@ TestAspect.withLiveClock,
test("should return all keys in unprocessedKeys for forEach case") {
val autoBatched = forEach(List(itemOne, itemTwo))(item => getItem("mockBatches", item))
assertZIO(autoBatched.execute.exit)(
Expand All @@ -206,7 +206,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
)
)
)
}.provideLayer(failedMockBatchGet() >>> DynamoDBExecutor.live) @@ TestAspect.withLiveClock,
}.provideLayer(failedMockBatchGet(atMost = 4) >>> DynamoDBExecutor.live) @@ TestAspect.withLiveClock,
test("should return all keys in unprocessedKeys for forEach case using type safe API") {
val autoBatched = forEach(List("v1", "v2")) { id =>
get("mockBatches")(TestItem.k1.partitionKey === id)
Expand All @@ -219,7 +219,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
)
)
}.provideLayer(
(failedMockBatchGet(Set(ProjectionExpression.$("k1")))) >>> DynamoDBExecutor.live
(failedMockBatchGet(Set(ProjectionExpression.$("k1")), atMost = 4)) >>> DynamoDBExecutor.live
) @@ TestAspect.withLiveClock
),
suite("partial batched requests fail")(
Expand All @@ -246,18 +246,18 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
).provideLayer(failedPartialMockBatchGetTwoItems >>> DynamoDBExecutor.live) @@ TestAspect.withLiveClock
)

private val itemOneWriteRequest = Set(
private val itemOneWriteRequest = Set(
DynamoDBExecutorImpl.awsWriteRequest(BatchWriteItem.Put(itemOne))
)
private val itemOneAndTwoPutWriteRequest = Set(
private val itemOneAndTwoPutWriteRequest = Set(
DynamoDBExecutorImpl.awsWriteRequest(BatchWriteItem.Put(itemOne)),
DynamoDBExecutorImpl.awsWriteRequest(BatchWriteItem.Put(itemTwo))
)
val itemOneAndTwoDeleteWriteRequest = Set(
private val itemOneAndTwoDeleteWriteRequest = Set(
DynamoDBExecutorImpl.awsWriteRequest(BatchWriteItem.Delete(itemOne)),
DynamoDBExecutorImpl.awsWriteRequest(BatchWriteItem.Delete(itemTwo))
)
def failedMockBatchWritePutTwoItems(atMost: Int): ULayer[DynamoDb] =
private def failedMockBatchWritePutTwoItems(atMost: Int): ULayer[DynamoDb] =
DynamoDbMock
.BatchWriteItem(
equalTo(writePutItemRequestItemOneAndTwo),
Expand All @@ -272,7 +272,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
)
)
.atMost(atMost) // 4
def failedMockBatchWriteDeleteTwoItems(atMost: Int): ULayer[DynamoDb] =
private def failedMockBatchWriteDeleteTwoItems(atMost: Int): ULayer[DynamoDb] =
DynamoDbMock
.BatchWriteItem(
equalTo(writeDeleteItemRequestItemOneAndTwo),
Expand Down Expand Up @@ -365,7 +365,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
).provideLayer(successfulMockBatchWriteItemOneAndTwo >>> DynamoDBExecutor.live),
suite("all batched request fail")(
suite("custom retry policy")(
test("should propagate custom retry policy for auto-batched PutItems") {
test("should propagate custom retry policy for auto-batched PutItem's") {
val autoBatchedPutItems = putItem("mockBatches", itemOne) zip putItem("mockBatches", itemTwo)
assertZIO(autoBatchedPutItems.withRetryPolicy(Schedule.recurs(5)).execute.exit)(
fails(
Expand All @@ -377,7 +377,7 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
}.provideLayer(
failedMockBatchWritePutTwoItems(atMost = 6) >>> DynamoDBExecutor.live
) @@ TestAspect.withLiveClock,
test("should propagate custom retry policy for auto-batched DeleteItems") {
test("should propagate custom retry policy for auto-batched DeleteItem's") {
val autoBatchedDeleteItems = deleteItem("mockBatches", itemOne) zip deleteItem("mockBatches", itemTwo)
assertZIO(autoBatchedDeleteItems.withRetryPolicy(Schedule.recurs(5)).execute.exit)(
fails(
Expand All @@ -388,6 +388,18 @@ object AutoBatchedFailureSpec extends ZIOSpecDefault with DynamoDBFixtures {
) // withRetryPolicy has set recurs to a higher value than the default so we need to increase the atMost value
}.provideLayer(
failedMockBatchWriteDeleteTwoItems(atMost = 6) >>> DynamoDBExecutor.live
) @@ TestAspect.withLiveClock,
test("should propagate custom retry policy for auto-batched GetItem's") {
val autoBatchedGetItems = getItem("mockBatches", itemOne) zip getItem("mockBatches", itemTwo)
assertZIO(autoBatchedGetItems.withRetryPolicy(Schedule.recurs(5)).execute.exit)(
fails(
assertDynamoDBBatchGetError(
ScalaMap("mockBatches" -> Set(itemOne, itemTwo))
)
)
) // withRetryPolicy has set recurs to a higher value than the default so we need to increase the atMost value
}.provideLayer(
failedMockBatchGet(atMost = 6) >>> DynamoDBExecutor.live
) @@ TestAspect.withLiveClock
),
test("should return all keys in unprocessedItems in Zipped failure case") {
Expand Down

0 comments on commit 72eb75a

Please sign in to comment.