Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled and repeat API + retry refactoring #172

Merged
merged 40 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f5a0581
refactor retry* functions to scheduled*
micossow Jul 3, 2024
1c14d0e
refactor Schedule.Every -> FixedRate
micossow Jul 3, 2024
c449e30
refactor schedule -> runScheduled and add repeat*
micossow Jul 4, 2024
f526f74
fix compilation error
micossow Jul 4, 2024
c27ed5d
fix compilation error and warnings
micossow Jul 4, 2024
3728c62
fix doc compile
micossow Jul 4, 2024
d511f02
add Schedule.InitialDelay, refactor Exponential.initialDelay -> first…
micossow Jul 4, 2024
373ee38
Schedule doc cleanup and disambiguation
micossow Jul 4, 2024
c68a9d6
add todo
micossow Jul 5, 2024
686e152
remove Schedule.FixedRate and move the start-to-start concept to runS…
micossow Jul 5, 2024
bfa4da0
fix default delayPolicy for retry
micossow Jul 5, 2024
a3e71b3
add tolerance to fixedRate tests
micossow Jul 5, 2024
cc73acc
reduce sleep time in fixedRate tests
micossow Jul 5, 2024
d0d94b0
refactor runScheduled -> scheduled
micossow Jul 8, 2024
6406eff
refactor RetryPolicy -> RetryConfig
micossow Jul 8, 2024
013bbdf
refactor Schedule.fallbackTo -> andThen
micossow Jul 8, 2024
e1a5805
close todo
micossow Jul 8, 2024
3ba7005
fix compileDocumentation
micossow Jul 8, 2024
90d2164
restore private modifier to Exponential.delay
micossow Jul 8, 2024
8c3128e
[doc] replace policy with config where applicable
micossow Jul 8, 2024
3569463
[doc] repeat and RepeatConfig doc
micossow Jul 8, 2024
8834a18
add initialDelay to RepeatConfig
micossow Jul 9, 2024
60e51b6
another Schedule refactoring
micossow Jul 9, 2024
5c6ef40
refactor DelayPolicy -> SleepMode
micossow Jul 9, 2024
88e9cec
fix initialDelay combined with forever schedule
micossow Jul 9, 2024
739f1a7
add ImmediateRepeatTest
micossow Jul 9, 2024
3715c25
add adr
micossow Jul 9, 2024
732c37e
update docs
micossow Jul 9, 2024
e9d7660
fix accidentally modified generated doc
micossow Jul 9, 2024
e716e77
reformat adr
micossow Jul 9, 2024
809275d
fix doc
micossow Jul 10, 2024
8f9cbc7
refactor attempt -> iteration in Schedule
micossow Jul 10, 2024
c966a90
remove shouldContinueOnError from repeat
micossow Jul 10, 2024
a893508
[doc] update repeat doc about combination with retry
micossow Jul 10, 2024
657534a
refactor variable/parameter names in scheduled
micossow Jul 10, 2024
0483281
document to prefer to use repeat, retry instead of scheduled
micossow Jul 10, 2024
6bfb177
[doc] mention how Retry/RetryConfig are special case of ScheduledConfig
micossow Jul 10, 2024
493888f
scheduled: rename iteration -> invocation
micossow Jul 10, 2024
a25f23e
fix doc compile
micossow Jul 10, 2024
5b3290c
combining Schedules refactoring and examples
micossow Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/ox/resilience/RetryPolicy.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ox.resilience

import ox.scheduling.{Jitter, Schedule}

import scala.concurrent.duration.*

/** A policy that defines how to retry a failed operation.
Expand Down Expand Up @@ -88,7 +90,7 @@ object RetryPolicy:
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))
RetryPolicy(Schedule.Exponential(maxRetries, initialDelay, maxDelay, jitter))

/** Creates a policy that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
* [[ResultPolicy]].
Expand All @@ -111,4 +113,4 @@ object RetryPolicy:
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
RetryPolicy(Schedule.Exponential.forever(initialDelay, maxDelay, jitter))
144 changes: 0 additions & 144 deletions core/src/main/scala/ox/resilience/Schedule.scala

This file was deleted.

44 changes: 11 additions & 33 deletions core/src/main/scala/ox/resilience/retry.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package ox.resilience

import ox.{EitherMode, ErrorMode, sleep}
import ox.{EitherMode, ErrorMode}
import ox.scheduling.*

import scala.annotation.tailrec
import scala.concurrent.duration.*
import scala.util.Try

/** Retries an operation returning a direct result until it succeeds or the policy decides to stop.
Expand Down Expand Up @@ -48,33 +47,12 @@ def retryEither[E, T](policy: RetryPolicy[E, T])(operation: => Either[E, T]): Ei
* - the error `E` in context `F` as returned by the last attempt if the policy decides to stop.
*/
def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(policy: RetryPolicy[E, T])(operation: => F[T]): F[T] =
@tailrec
def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] =
def sleepIfNeeded =
val delay = policy.schedule.nextDelay(attempt, lastDelay)
if delay.toMillis > 0 then sleep(delay)
delay

operation match
case v if em.isError(v) =>
val error = em.getError(v)
policy.onRetry(attempt, Left(error))

if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay))
else v
case v =>
val result = em.getT(v)
policy.onRetry(attempt, Right(result))

if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay))
else v

val remainingAttempts = policy.schedule match
case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries)
case _ => None

loop(1, remainingAttempts, None)
runScheduledWithErrorMode(em)(
RunScheduledConfig(
policy.schedule,
policy.onRetry,
shouldContinueOnError = policy.resultPolicy.isWorthRetrying,
shouldContinueOnResult = t => !policy.resultPolicy.isSuccess(t),
delayPolicy = DelayPolicy.SinceTheEndOfTheLastInvocation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the delay policy be part of RetryPolicy, so that it's also user-configurable if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to create a public API runScheduled* with the all configurable options and make retry* and repeat* as "thin" as possible - basically an opinionated variants of runScheduled*, specifically:

  • retry - that behaves similarly to resillience4j retries, where sleep time is counted since the end of previous operation, has an optional onRetry callback and initial delay (sleep before first operation) is not relevant
  • repeat - that behaves similarly to Akka's Source.tick, where sleep time is counted since the start of previous operation (interval), doesn't expose onRepeat callback (not relevant) and initial delay is allowed
    WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds good. Let's just document these defaults quite clearly, plus all the flexibility that runScheduled (or maybe simply scheduled?) gives you

)
)(operation)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ox.resilience
package ox.scheduling

/** A random factor used for calculating the delay between subsequent retries when a backoff strategy is used for calculating the delay.
*
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/scala/ox/scheduling/RepeatConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ox.scheduling

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationInt

case class RepeatConfig[E, T](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have RepeatConfig, but RetryPolicy - why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RetryPolicy has been refactored to RetryConfig

schedule: Schedule,
shouldContinueOnError: E => Boolean = (_: E) => false,
shouldContinueOnResult: T => Boolean = (_: T) => true
)

object RepeatConfig:
def immediate[E, T](repeats: Int): RepeatConfig[E, T] = RepeatConfig(Schedule.Immediate(repeats))
def immediateForever[E, T]: RepeatConfig[E, T] = RepeatConfig(Schedule.Immediate.forever)

def fixedRate[E, T](repeats: Int, delay: FiniteDuration): RepeatConfig[E, T] = RepeatConfig(Schedule.Delay(repeats, delay))
def fixedRateForever[E, T](delay: FiniteDuration): RepeatConfig[E, T] = RepeatConfig(Schedule.Delay.forever(delay))

def exponential[E, T](
repeats: Int,
firstDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RepeatConfig[E, T] = RepeatConfig(Schedule.Exponential(repeats, firstDelay, maxDelay, jitter))

def exponentialForever[E, T](
firstDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RepeatConfig[E, T] = RepeatConfig(Schedule.Exponential.forever(firstDelay, maxDelay, jitter))
Loading
Loading