From feea473d8cbb4e70de5a8c128099755c151636d4 Mon Sep 17 00:00:00 2001 From: Avinder Bahra Date: Wed, 24 Jul 2024 07:15:45 +0100 Subject: [PATCH] some clean up --- .../zio/dynamodb/TypeSafeApiCrudSpec.scala | 8 ++--- .../zio/dynamodb/DynamoDBExecutorImpl.scala | 23 ++------------ .../scala/zio/dynamodb/DynamoDBQuery.scala | 31 +++++-------------- .../src/main/scala/zio/dynamodb/package.scala | 1 - 4 files changed, 12 insertions(+), 51 deletions(-) diff --git a/dynamodb/src/it/scala/zio/dynamodb/TypeSafeApiCrudSpec.scala b/dynamodb/src/it/scala/zio/dynamodb/TypeSafeApiCrudSpec.scala index ab75d923..04b7e674 100644 --- a/dynamodb/src/it/scala/zio/dynamodb/TypeSafeApiCrudSpec.scala +++ b/dynamodb/src/it/scala/zio/dynamodb/TypeSafeApiCrudSpec.scala @@ -13,8 +13,6 @@ import zio.stream.ZStream import zio.ZIO import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException import zio.Scope -// import zio.Schedule -// import zio.durationInt object TypeSafeApiCrudSpec extends DynamoDBLocalSpec { @@ -766,14 +764,12 @@ object TypeSafeApiCrudSpec extends DynamoDBLocalSpec { val person1 = Person("1", "Smith", Some("John"), 21) val person2 = Person("2", "Jones", Some("Tarlochan"), 42) for { - _ <- forEach(Chunk(person1, person2))(person => put("tableName", person)) -// .withRetryPolicy(Schedule.recurs(5) && Schedule.exponential(50.milliseconds)) - .execute + _ <- forEach(Chunk(person1, person2))(person => put(tableName, person)).execute stream <- scanAll[Person](tableName).execute people <- stream.runCollect } yield assertTrue(people.sortBy(_.id) == Chunk(person1, person2)) } - } @@ TestAspect.withLiveClock, + }, test("with a delete query") { withSingleIdKeyTable { tableName => val person1 = Person("1", "Smith", Some("John"), 21) diff --git a/dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutorImpl.scala b/dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutorImpl.scala index 9d8187e1..05162ae0 100644 --- a/dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutorImpl.scala +++ b/dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutorImpl.scala @@ -225,24 +225,15 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam case BatchRetryError() => true case _ => false } - println(s"XXXXXXXXXXXXXXXX Schedule: ${batchWriteItem.retryPolicy}") for { ref <- zio.Ref.make[MapOfSet[TableName, BatchWriteItem.Write]](batchWriteItem.requestItems) _ <- (for { unprocessedItems <- ref.get - _ <- - ZIO.debug( - s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with unprocessedItems.size=${unprocessedItems.size}" - ) response <- dynamoDb .batchWriteItem( awsBatchWriteItemRequest(batchWriteItem.copy(requestItems = unprocessedItems)) ) .mapError(_.toThrowable) - _ <- - ZIO.debug( - s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with unprocessedItems=${response.unprocessedItems}" - ) responseUnprocessedItemsOpt = response.unprocessedItems .map(map => mapOfListToMapOfSet(map.map { @@ -251,10 +242,8 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam ) .toOption _ <- responseUnprocessedItemsOpt match { - case Some(responseUnprocessedItems) => - ref.set(responseUnprocessedItems) - case None => - ZIO.unit + case Some(responseUnprocessedItems) => ref.set(responseUnprocessedItems) + case None => ZIO.unit } _ <- ZIO .fail(BatchRetryError()) @@ -342,7 +331,6 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam if (batchGetItem.requestItems.isEmpty) ZIO.succeed(BatchGetItem.Response()) else { - println(s"XXXXXXXXXXXXXXXXXXX batchGetItem.retryPolicy ${batchGetItem.retryPolicy}") val t: zio.Schedule[Any, Throwable, Any] = batchGetItem.retryPolicy.getOrElse(defaultRetryPolicy).whileInput { case BatchRetryError() => true case _ => false @@ -352,17 +340,10 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam collectedItems <- zio.Ref.make[MapOfSet[TableName, Item]](MapOfSet.empty) _ <- (for { unprocessed <- unprocessedKeys.get - _ <- ZIO.debug( - s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with unprocessed.size=${unprocessed.size}" - ) currentCollection <- collectedItems.get response <- dynamoDb .batchGetItem(awsBatchGetItemRequest(batchGetItem.copy(requestItems = unprocessed))) .mapError(_.toThrowable) - _ <- - ZIO.debug( - s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with response.unprocessedKeys=${response.unprocessedKeys}" - ) responseUnprocessedOpt = response.unprocessedKeys .map(_.map { diff --git a/dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala b/dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala index 9308a53e..a2b32fab 100644 --- a/dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala +++ b/dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala @@ -45,9 +45,6 @@ sealed trait DynamoDBQuery[-In, +Out] { self => val (indexedConstructors, (batchGetItem, batchGetIndexes), (batchWriteItem, batchWriteIndexes)) = batched(constructors) - println(s"XXXXXXXXX execute self: $self") - println(s"XXXXXXXXX execute batchWriteItem: $batchWriteItem") - val indexedNonBatchedResults = ZIO.foreachPar(indexedConstructors) { case (constructor, index) => @@ -368,22 +365,12 @@ sealed trait DynamoDBQuery[-In, +Out] { self => Zip(left.withRetryPolicy(retryPolicy), right.withRetryPolicy(retryPolicy), zippable) case Map(query, mapper) => Map(query.withRetryPolicy(retryPolicy), mapper) case Absolve(query) => Absolve(query.withRetryPolicy(retryPolicy)) - case s: BatchWriteItem => - s.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] - case s: BatchGetItem => - println(s"0 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy") - s.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] - case p: PutItem => - println(s"1 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy") - p.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] - case d: DeleteItem => - println(s"2 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy") - d.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] - case g: GetItem => - println(s"3 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy") - g.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] - case _ => - self // TODO: Avi - propagate retry to all atomic batchable queries eg Get/Put/Delete so that auto-batching can select it + case bw: BatchWriteItem => bw.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] + case bg: BatchGetItem => bg.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] + case p: PutItem => p.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] + case d: DeleteItem => d.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] + case g: GetItem => g.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]] + case _ => self } def sortOrder(ascending: Boolean): DynamoDBQuery[In, Out] = @@ -391,8 +378,8 @@ sealed trait DynamoDBQuery[-In, +Out] { self => case Zip(left, right, zippable) => Zip(left.sortOrder(ascending), right.sortOrder(ascending), zippable) case Map(query, mapper) => Map(query.sortOrder(ascending), mapper) case Absolve(query) => Absolve(query.sortOrder(ascending)) - case s: QuerySome => s.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]] - case s: QueryAll => s.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]] + case qs: QuerySome => qs.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]] + case qa: QueryAll => qa.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]] case _ => self } @@ -781,9 +768,7 @@ object DynamoDBQuery { itemMetrics: ReturnItemCollectionMetrics = ReturnItemCollectionMetrics.None, addList: Chunk[BatchWriteItem.Write] = Chunk.empty, retryPolicy: Option[Schedule[Any, Throwable, Any]] = None -// Schedule.recurs(3) && Schedule.exponential(50.milliseconds) // TODO: Avi delete ) extends Constructor[Any, BatchWriteItem.Response] { self => -// println(s"XXXXXXXXXXXX BatchWriteItem.retryPolicy ${self.retryPolicy}") def +[A](writeItem: Write[Any, A]): BatchWriteItem = writeItem match { case putItem @ PutItem(_, _, _, _, _, _, _) => diff --git a/dynamodb/src/main/scala/zio/dynamodb/package.scala b/dynamodb/src/main/scala/zio/dynamodb/package.scala index 6d631606..a431cec6 100644 --- a/dynamodb/src/main/scala/zio/dynamodb/package.scala +++ b/dynamodb/src/main/scala/zio/dynamodb/package.scala @@ -20,7 +20,6 @@ package object dynamodb { type Decoder[+A] = AttributeValue => Either[ItemError, A] private[dynamodb] def ddbExecute[A](query: DynamoDBQuery[_, A]): ZIO[DynamoDBExecutor, DynamoDBError, A] = - ZIO.debug(s"XXXXXXXXXXXXXXXXX ddbExecute query type ${query.getClass.getSimpleName}") *> ZIO.serviceWithZIO[DynamoDBExecutor](_.execute(query)) /**