From a27d601d4107193040762742c7cafdb8c55cb2e5 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Mon, 26 Feb 2024 16:20:43 +0700 Subject: [PATCH] Move clean back to Executor --- app/src/main/scala/AppState.scala | 22 ++++++++-------------- app/src/main/scala/Executor.scala | 14 +++++++++++++- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/app/src/main/scala/AppState.scala b/app/src/main/scala/AppState.scala index ef56105..e88f3b2 100644 --- a/app/src/main/scala/AppState.scala +++ b/app/src/main/scala/AppState.scala @@ -48,24 +48,18 @@ object AppState: case Some(move) => state -> monitor.notAcquired(move, apikey) - def clean(monitor: Monitor)(since: Instant)(using Logger[IO]): (AppState, IO[Unit]) = - val timedOut: List[Work.Move] = state.values.filter(_.acquiredBefore(since)).toList - val logIfTimedOut = - if timedOut.nonEmpty then - Logger[IO].debug(s"cleaning ${timedOut.size} of ${state.size} moves") >> - timedOut.traverse_(m => Logger[IO].info(s"Timeout move: $m")) - else IO.unit - val (newState, gavedUpMoves) = timedOut.foldLeft[(AppState, List[Work.Move])](state -> Nil): (x, m) => - val (newState, move) = x._1.updateOrGiveUp(m.timeout) - (newState, move.fold(x._2)(_ :: x._2)) - newState -> logIfTimedOut - *> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move: $m")) - *> monitor.updateSize(newState) + def acquiredBefore(since: Instant): List[Work.Move] = + state.values.filter(_.acquiredBefore(since)).toList def earliestNonAcquiredMove: Option[Work.Move] = state.values.filter(_.nonAcquired).minByOption(_.createdAt) + def updateOrGiveUp(candidates: List[Work.Move]): (AppState, List[Work.Move]) = + candidates.foldLeft[(AppState, List[Work.Move])](state -> Nil): (x, m) => + val (newState, move) = x._1.updateOrGiveUp(m.timeout) + newState -> move.fold(x._2)(_ :: x._2) + def updateOrGiveUp(move: Work.Move): (AppState, Option[Work.Move]) = val newState = state - move.id if move.isOutOfTries then (newState, move.some) - else (newState + (move.id -> move), none) + else newState.updated(move.id, move) -> none diff --git a/app/src/main/scala/Executor.scala b/app/src/main/scala/Executor.scala index bf27084..704a8e7 100644 --- a/app/src/main/scala/Executor.scala +++ b/app/src/main/scala/Executor.scala @@ -20,6 +20,7 @@ trait Executor: def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove): IO[Unit] // add work to queue def add(work: Lila.Request): IO[Unit] + // clean up all works that are acquired before the given time def clean(before: Instant): IO[Unit] object Executor: @@ -50,7 +51,18 @@ object Executor: ref.flatModify(_.applyMove(monitor, client)(workId, apikey, move)) def clean(since: Instant): IO[Unit] = - ref.flatModify(_.clean(monitor)(since)) + ref.flatModify: state => + val timedOut = state.acquiredBefore(since) + val logs = logIfTimedOut(state, timedOut) + val (newState, gavedUpMoves) = state.updateOrGiveUp(timedOut) + newState -> logs + *> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move: $m")) + *> monitor.updateSize(newState) + + private def logIfTimedOut(state: AppState, timeOut: List[Work.Move]): IO[Unit] = + IO.whenA(timeOut.nonEmpty): + Logger[IO].debug(s"cleaning ${timeOut.size} of ${state.size} moves") + *> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m")) def fromRequest(req: Lila.Request): IO[Move] = (IO(Work.makeId), IO.realTimeInstant).mapN: (id, now) =>