Skip to content

Commit

Permalink
some clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
googley42 committed Jul 24, 2024
1 parent 72eb75a commit feea473
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 51 deletions.
8 changes: 2 additions & 6 deletions dynamodb/src/it/scala/zio/dynamodb/TypeSafeApiCrudSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import zio.stream.ZStream
import zio.ZIO
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException
import zio.Scope
// import zio.Schedule
// import zio.durationInt

object TypeSafeApiCrudSpec extends DynamoDBLocalSpec {

Expand Down Expand Up @@ -766,14 +764,12 @@ object TypeSafeApiCrudSpec extends DynamoDBLocalSpec {
val person1 = Person("1", "Smith", Some("John"), 21)
val person2 = Person("2", "Jones", Some("Tarlochan"), 42)
for {
_ <- forEach(Chunk(person1, person2))(person => put("tableName", person))
// .withRetryPolicy(Schedule.recurs(5) && Schedule.exponential(50.milliseconds))
.execute
_ <- forEach(Chunk(person1, person2))(person => put(tableName, person)).execute
stream <- scanAll[Person](tableName).execute
people <- stream.runCollect
} yield assertTrue(people.sortBy(_.id) == Chunk(person1, person2))
}
} @@ TestAspect.withLiveClock,
},
test("with a delete query") {
withSingleIdKeyTable { tableName =>
val person1 = Person("1", "Smith", Some("John"), 21)
Expand Down
23 changes: 2 additions & 21 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBExecutorImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,24 +225,15 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
case BatchRetryError() => true
case _ => false
}
println(s"XXXXXXXXXXXXXXXX Schedule: ${batchWriteItem.retryPolicy}")
for {
ref <- zio.Ref.make[MapOfSet[TableName, BatchWriteItem.Write]](batchWriteItem.requestItems)
_ <- (for {
unprocessedItems <- ref.get
_ <-
ZIO.debug(
s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with unprocessedItems.size=${unprocessedItems.size}"
)
response <- dynamoDb
.batchWriteItem(
awsBatchWriteItemRequest(batchWriteItem.copy(requestItems = unprocessedItems))
)
.mapError(_.toThrowable)
_ <-
ZIO.debug(
s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with unprocessedItems=${response.unprocessedItems}"
)
responseUnprocessedItemsOpt = response.unprocessedItems
.map(map =>
mapOfListToMapOfSet(map.map {
Expand All @@ -251,10 +242,8 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
)
.toOption
_ <- responseUnprocessedItemsOpt match {
case Some(responseUnprocessedItems) =>
ref.set(responseUnprocessedItems)
case None =>
ZIO.unit
case Some(responseUnprocessedItems) => ref.set(responseUnprocessedItems)
case None => ZIO.unit
}
_ <- ZIO
.fail(BatchRetryError())
Expand Down Expand Up @@ -342,7 +331,6 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
if (batchGetItem.requestItems.isEmpty)
ZIO.succeed(BatchGetItem.Response())
else {
println(s"XXXXXXXXXXXXXXXXXXX batchGetItem.retryPolicy ${batchGetItem.retryPolicy}")
val t: zio.Schedule[Any, Throwable, Any] = batchGetItem.retryPolicy.getOrElse(defaultRetryPolicy).whileInput {
case BatchRetryError() => true
case _ => false
Expand All @@ -352,17 +340,10 @@ private[dynamodb] final case class DynamoDBExecutorImpl private[dynamodb] (dynam
collectedItems <- zio.Ref.make[MapOfSet[TableName, Item]](MapOfSet.empty)
_ <- (for {
unprocessed <- unprocessedKeys.get
_ <- ZIO.debug(
s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with unprocessed.size=${unprocessed.size}"
)
currentCollection <- collectedItems.get
response <- dynamoDb
.batchGetItem(awsBatchGetItemRequest(batchGetItem.copy(requestItems = unprocessed)))
.mapError(_.toThrowable)
_ <-
ZIO.debug(
s"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR retrying with response.unprocessedKeys=${response.unprocessedKeys}"
)
responseUnprocessedOpt =
response.unprocessedKeys
.map(_.map {
Expand Down
31 changes: 8 additions & 23 deletions dynamodb/src/main/scala/zio/dynamodb/DynamoDBQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ sealed trait DynamoDBQuery[-In, +Out] { self =>
val (indexedConstructors, (batchGetItem, batchGetIndexes), (batchWriteItem, batchWriteIndexes)) =
batched(constructors)

println(s"XXXXXXXXX execute self: $self")
println(s"XXXXXXXXX execute batchWriteItem: $batchWriteItem")

val indexedNonBatchedResults =
ZIO.foreachPar(indexedConstructors) {
case (constructor, index) =>
Expand Down Expand Up @@ -368,31 +365,21 @@ sealed trait DynamoDBQuery[-In, +Out] { self =>
Zip(left.withRetryPolicy(retryPolicy), right.withRetryPolicy(retryPolicy), zippable)
case Map(query, mapper) => Map(query.withRetryPolicy(retryPolicy), mapper)
case Absolve(query) => Absolve(query.withRetryPolicy(retryPolicy))
case s: BatchWriteItem =>
s.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case s: BatchGetItem =>
println(s"0 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
s.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case p: PutItem =>
println(s"1 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
p.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case d: DeleteItem =>
println(s"2 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
d.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case g: GetItem =>
println(s"3 ZZZZZZZZZZZZZZZZZZZZZZ withRetryPolicy: $retryPolicy")
g.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case _ =>
self // TODO: Avi - propagate retry to all atomic batchable queries eg Get/Put/Delete so that auto-batching can select it
case bw: BatchWriteItem => bw.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case bg: BatchGetItem => bg.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case p: PutItem => p.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case d: DeleteItem => d.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case g: GetItem => g.copy(retryPolicy = Some(retryPolicy)).asInstanceOf[DynamoDBQuery[In, Out]]
case _ => self
}

def sortOrder(ascending: Boolean): DynamoDBQuery[In, Out] =
self match {
case Zip(left, right, zippable) => Zip(left.sortOrder(ascending), right.sortOrder(ascending), zippable)
case Map(query, mapper) => Map(query.sortOrder(ascending), mapper)
case Absolve(query) => Absolve(query.sortOrder(ascending))
case s: QuerySome => s.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case s: QueryAll => s.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case qs: QuerySome => qs.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case qa: QueryAll => qa.copy(ascending = ascending).asInstanceOf[DynamoDBQuery[In, Out]]
case _ => self
}

Expand Down Expand Up @@ -781,9 +768,7 @@ object DynamoDBQuery {
itemMetrics: ReturnItemCollectionMetrics = ReturnItemCollectionMetrics.None,
addList: Chunk[BatchWriteItem.Write] = Chunk.empty,
retryPolicy: Option[Schedule[Any, Throwable, Any]] = None
// Schedule.recurs(3) && Schedule.exponential(50.milliseconds) // TODO: Avi delete
) extends Constructor[Any, BatchWriteItem.Response] { self =>
// println(s"XXXXXXXXXXXX BatchWriteItem.retryPolicy ${self.retryPolicy}")
def +[A](writeItem: Write[Any, A]): BatchWriteItem =
writeItem match {
case putItem @ PutItem(_, _, _, _, _, _, _) =>
Expand Down
1 change: 0 additions & 1 deletion dynamodb/src/main/scala/zio/dynamodb/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package object dynamodb {
type Decoder[+A] = AttributeValue => Either[ItemError, A]

private[dynamodb] def ddbExecute[A](query: DynamoDBQuery[_, A]): ZIO[DynamoDBExecutor, DynamoDBError, A] =
ZIO.debug(s"XXXXXXXXXXXXXXXXX ddbExecute query type ${query.getClass.getSimpleName}") *>
ZIO.serviceWithZIO[DynamoDBExecutor](_.execute(query))

/**
Expand Down

0 comments on commit feea473

Please sign in to comment.