diff --git a/app/src/main/scala/AppState.scala b/app/src/main/scala/AppState.scala index fd6eb80..278b0f8 100644 --- a/app/src/main/scala/AppState.scala +++ b/app/src/main/scala/AppState.scala @@ -6,6 +6,11 @@ import lila.fishnet.Work.Move type AppState = Map[WorkId, Work.Move] +enum GetWorkResult: + case NotFound + case Found(move: Work.Move) + case AcquiredByOther(move: Work.Move) + object AppState: val empty: AppState = Map.empty extension (state: AppState) @@ -29,6 +34,12 @@ object AppState: def earliestNonAcquiredMove: Option[Work.Move] = state.values.filter(_.nonAcquired).minByOption(_.createdAt) + def getWork(workId: WorkId, key: ClientKey): GetWorkResult = + state.get(workId) match + case None => GetWorkResult.NotFound + case Some(move) if move.isAcquiredBy(key) => GetWorkResult.Found(move) + case Some(move) => GetWorkResult.AcquiredByOther(move) + def updateOrGiveUp(candidates: List[Work.Move]): (AppState, List[Work.Move]) = candidates.foldLeft[(AppState, List[Work.Move])](state -> Nil) { case ((state, xs), m) => m.clearAssginedKey match diff --git a/app/src/main/scala/Executor.scala b/app/src/main/scala/Executor.scala index 6d30d7c..760ee7e 100644 --- a/app/src/main/scala/Executor.scala +++ b/app/src/main/scala/Executor.scala @@ -47,25 +47,24 @@ object Executor: IO.realTimeInstant.flatMap: at => ref.modify(_.tryAcquireMove(key, at)) - def move(workId: WorkId, key: ClientKey, move: BestMove): IO[Unit] = + def move(workId: WorkId, key: ClientKey, response: BestMove): IO[Unit] = ref.flatModify: state => - state.get(workId) match - case None => state -> Logger[IO].info(s"Received unknown work $workId by $key") - case Some(work) if work.isAcquiredBy(key) => - move.uci match + state.getWork(workId, key) match + case GetWorkResult.NotFound => state -> logNotFound(workId, key) + case GetWorkResult.AcquiredByOther(move) => state -> logNotAcquired(move, key) + case GetWorkResult.Found(work) => + response.uci match case Some(uci) => state - work.id -> (monitor.success(work) >> client.send(Lila.Move(work.request.id, work.request.moves, uci))) case _ => val (newState, io) = work.clearAssginedKey match - case None => state -> Logger[IO].warn(s"Give up move: $work") - case Some(clearWork) => state.updated(work.id, clearWork) -> IO.unit - newState -> - io *> Logger[IO].warn(s"Received invalid move $workId for ${work.request.id} by $key") - case Some(move) => - state -> Logger[IO].info( - s"Received unacquired move ${workId} for ${move.request.id} by $key. Work current tries: ${move.tries} acquired: ${move.acquired}" - ) + case None => + state - workId -> Logger[IO].warn( + s"Give up move due to invalid move $response of key $key for $work" + ) + case Some(updated) => state.updated(work.id, updated) -> IO.unit + newState -> io *> failure(work, key) def clean(since: Instant): IO[Unit] = ref.flatModify: state => @@ -73,7 +72,7 @@ object Executor: val logs = logIfTimedOut(state, timedOut) val (newState, gavedUpMoves) = state.updateOrGiveUp(timedOut) newState -> logs - *> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move: $m")) + *> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move due to clean up: $m")) *> monitor.updateSize(newState) private def logIfTimedOut(state: AppState, timeOut: List[Work.Move]): IO[Unit] = @@ -81,6 +80,17 @@ object Executor: Logger[IO].debug(s"cleaning ${timeOut.size} of ${state.size} moves") *> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m")) + private def failure(work: Work.Move, clientKey: ClientKey) = + Logger[IO].warn(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey") + + private def logNotFound(id: WorkId, clientKey: ClientKey) = + Logger[IO].info(s"Received unknown work $id by $clientKey") + + private def logNotAcquired(work: Work.Move, clientKey: ClientKey) = + Logger[IO].info( + s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}" + ) + def fromRequest(req: Lila.Request): IO[Move] = (IO(Work.makeId), IO.realTimeInstant).mapN: (id, now) => Move( diff --git a/app/src/test/scala/Helper.scala b/app/src/test/scala/Helper.scala index ea03b65..c507735 100644 --- a/app/src/test/scala/Helper.scala +++ b/app/src/test/scala/Helper.scala @@ -6,11 +6,8 @@ object Helper: val noopMonitor: Monitor = new Monitor: - def success(work: Work.Move): IO[Unit] = IO.unit - def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit] = IO.unit - def notFound(id: WorkId, clientKey: ClientKey): IO[Unit] = IO.unit - def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit] = IO.unit - def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit + def success(work: Work.Move): IO[Unit] = IO.unit + def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit val noopLilaClient: LilaClient = new LilaClient: