Skip to content

Commit

Permalink
Move clean back to Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Feb 26, 2024
1 parent e651e9f commit a27d601
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
22 changes: 8 additions & 14 deletions app/src/main/scala/AppState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,18 @@ object AppState:
case Some(move) =>
state -> monitor.notAcquired(move, apikey)

def clean(monitor: Monitor)(since: Instant)(using Logger[IO]): (AppState, IO[Unit]) =
val timedOut: List[Work.Move] = state.values.filter(_.acquiredBefore(since)).toList
val logIfTimedOut =
if timedOut.nonEmpty then
Logger[IO].debug(s"cleaning ${timedOut.size} of ${state.size} moves") >>
timedOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))
else IO.unit
val (newState, gavedUpMoves) = timedOut.foldLeft[(AppState, List[Work.Move])](state -> Nil): (x, m) =>
val (newState, move) = x._1.updateOrGiveUp(m.timeout)
(newState, move.fold(x._2)(_ :: x._2))
newState -> logIfTimedOut
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move: $m"))
*> monitor.updateSize(newState)
def acquiredBefore(since: Instant): List[Work.Move] =
state.values.filter(_.acquiredBefore(since)).toList

def earliestNonAcquiredMove: Option[Work.Move] =
state.values.filter(_.nonAcquired).minByOption(_.createdAt)

def updateOrGiveUp(candidates: List[Work.Move]): (AppState, List[Work.Move]) =
candidates.foldLeft[(AppState, List[Work.Move])](state -> Nil): (x, m) =>
val (newState, move) = x._1.updateOrGiveUp(m.timeout)
newState -> move.fold(x._2)(_ :: x._2)

def updateOrGiveUp(move: Work.Move): (AppState, Option[Work.Move]) =
val newState = state - move.id
if move.isOutOfTries then (newState, move.some)
else (newState + (move.id -> move), none)
else newState.updated(move.id, move) -> none
14 changes: 13 additions & 1 deletion app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ trait Executor:
def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove): IO[Unit]
// add work to queue
def add(work: Lila.Request): IO[Unit]
// clean up all works that are acquired before the given time
def clean(before: Instant): IO[Unit]

object Executor:
Expand Down Expand Up @@ -50,7 +51,18 @@ object Executor:
ref.flatModify(_.applyMove(monitor, client)(workId, apikey, move))

def clean(since: Instant): IO[Unit] =
ref.flatModify(_.clean(monitor)(since))
ref.flatModify: state =>
val timedOut = state.acquiredBefore(since)
val logs = logIfTimedOut(state, timedOut)
val (newState, gavedUpMoves) = state.updateOrGiveUp(timedOut)
newState -> logs
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move: $m"))
*> monitor.updateSize(newState)

private def logIfTimedOut(state: AppState, timeOut: List[Work.Move]): IO[Unit] =
IO.whenA(timeOut.nonEmpty):
Logger[IO].debug(s"cleaning ${timeOut.size} of ${state.size} moves")
*> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))

def fromRequest(req: Lila.Request): IO[Move] =
(IO(Work.makeId), IO.realTimeInstant).mapN: (id, now) =>
Expand Down

0 comments on commit a27d601

Please sign in to comment.