From e789994bcf9c39e4b7b399e85e1c19042137c4e6 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Wed, 28 Feb 2024 12:37:22 +0700 Subject: [PATCH] Refactor & add comments - Rename Work.Move => Work.Task - Remove RequestWithId use Task instead - Use GameId instead of String --- app/src/main/scala/AppState.scala | 46 ++++++++-------- app/src/main/scala/Executor.scala | 54 +++++++++---------- app/src/main/scala/Jobs.scala | 2 +- app/src/main/scala/Monitor.scala | 8 +-- app/src/main/scala/Work.scala | 38 ++++++------- app/src/main/scala/model.scala | 6 +-- app/src/test/scala/ExecutorTest.scala | 2 +- app/src/test/scala/Helper.scala | 4 +- .../test/scala/http/FishnetRoutesTest.scala | 11 ++-- 9 files changed, 83 insertions(+), 88 deletions(-) diff --git a/app/src/main/scala/AppState.scala b/app/src/main/scala/AppState.scala index afae59e..c35d906 100644 --- a/app/src/main/scala/AppState.scala +++ b/app/src/main/scala/AppState.scala @@ -2,47 +2,47 @@ package lila.fishnet import cats.syntax.all.* import java.time.Instant -import lila.fishnet.Work.Move +import lila.fishnet.Work.Task -type AppState = Map[WorkId, Work.Move] +type AppState = Map[WorkId, Work.Task] -enum GetWorkResult: +enum GetTaskResult: case NotFound - case Found(move: Work.Move) - case AcquiredByOther(move: Work.Move) + case Found(task: Work.Task) + case AcquiredByOther(task: Work.Task) object AppState: val empty: AppState = Map.empty extension (state: AppState) - def tryAcquireMove(key: ClientKey, at: Instant): (AppState, Option[Work.RequestWithId]) = - state.earliestNonAcquiredMove - .map: m => - val move = m.assignTo(key, at) - state.updated(move.id, move) -> move.toRequestWithId.some + def tryAcquireTask(key: ClientKey, at: Instant): (AppState, Option[Task]) = + state.earliestNonAcquiredTask + .map: newTask => + val assignedTask = newTask.assignTo(key, at) + state.updated(assignedTask.id, assignedTask) -> assignedTask.some .getOrElse(state -> none) def isFull(maxSize: Int): Boolean = state.sizeIs >= maxSize - def addWork(move: Move): AppState = + def addTask(move: Task): AppState = state + (move.id -> move) - def acquiredBefore(since: Instant): List[Work.Move] = + def acquiredBefore(since: Instant): List[Work.Task] = state.values.filter(_.acquiredBefore(since)).toList - def earliestNonAcquiredMove: Option[Work.Move] = + def earliestNonAcquiredTask: Option[Work.Task] = state.values.filter(_.nonAcquired).minByOption(_.createdAt) - def getWork(workId: WorkId, key: ClientKey): GetWorkResult = + def apply(workId: WorkId, key: ClientKey): GetTaskResult = state.get(workId) match - case None => GetWorkResult.NotFound - case Some(move) if move.isAcquiredBy(key) => GetWorkResult.Found(move) - case Some(move) => GetWorkResult.AcquiredByOther(move) - - def updateOrGiveUp(candidates: List[Work.Move]): (AppState, List[Work.Move]) = - candidates.foldLeft[(AppState, List[Work.Move])](state -> Nil) { case ((state, xs), m) => - m.clearAssignedKey match - case None => (state - m.id, m :: xs) - case Some(move) => (state.updated(m.id, move), xs) + case None => GetTaskResult.NotFound + case Some(task) if task.isAcquiredBy(key) => GetTaskResult.Found(task) + case Some(task) => GetTaskResult.AcquiredByOther(task) + + def updateOrGiveUp(candidates: List[Work.Task]): (AppState, List[Work.Task]) = + candidates.foldLeft[(AppState, List[Work.Task])](state -> Nil) { case ((state, xs), task) => + task.clearAssignedKey match + case None => (state - task.id, task :: xs) + case Some(unAssignedTask) => (state.updated(task.id, unAssignedTask), xs) } diff --git a/app/src/main/scala/Executor.scala b/app/src/main/scala/Executor.scala index 4ef1960..1fe58e4 100644 --- a/app/src/main/scala/Executor.scala +++ b/app/src/main/scala/Executor.scala @@ -4,8 +4,6 @@ import cats.effect.IO import cats.effect.kernel.Ref import cats.syntax.all.* import java.time.Instant -import lila.fishnet.Lila.Request -import lila.fishnet.Work.Move import org.typelevel.log4cats.Logger /** Executor is responsible for: store work in memory @@ -14,13 +12,15 @@ import org.typelevel.log4cats.Logger * - adding work to the queue */ trait Executor: - // get a move from the queue return Work - def acquire(accquire: ClientKey): IO[Option[Work.RequestWithId]] - // get Work from Map => send to lila + // fishnet client tries to get an unassigned task + def acquire(accquire: ClientKey): IO[Option[Work.Task]] + // fishnet client sends the best move for it's assigned task def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove): IO[Unit] - // add work to queue + // Lila sends a position def add(work: Lila.Request): IO[Unit] - // clean up all works that are acquired before the given time + // clean up all works that are acquired before a given time + // this is to prevent tasks from being stuck in the queue + // this will be called periodically def clean(before: Instant): IO[Unit] object Executor: @@ -33,38 +33,38 @@ object Executor: .map: ref => new Executor: - def add(work: Request): IO[Unit] = - fromRequest(work).flatMap: move => + def add(work: Lila.Request): IO[Unit] = + fromRequest(work).flatMap: task => ref.flatModify: state => val (newState, effect) = if state.isFull(config.maxSize) then AppState.empty -> Logger[IO].warn(s"StateSize=${state.size} maxSize=${config.maxSize}. Dropping all!") else state -> IO.unit - newState.addWork(move) -> effect + newState.addTask(task) -> effect - def acquire(key: ClientKey): IO[Option[Work.RequestWithId]] = + def acquire(key: ClientKey): IO[Option[Work.Task]] = IO.realTimeInstant.flatMap: at => - ref.modify(_.tryAcquireMove(key, at)) + ref.modify(_.tryAcquireTask(key, at)) def move(workId: WorkId, key: ClientKey, response: BestMove): IO[Unit] = ref.flatModify: state => - state.getWork(workId, key) match - case GetWorkResult.NotFound => state -> logNotFound(workId, key) - case GetWorkResult.AcquiredByOther(move) => state -> logNotAcquired(move, key) - case GetWorkResult.Found(work) => + state(workId, key) match + case GetTaskResult.NotFound => state -> logNotFound(workId, key) + case GetTaskResult.AcquiredByOther(task) => state -> logNotAcquired(task, key) + case GetTaskResult.Found(task) => response.uci match case Some(uci) => - state - work.id -> (monitor.success(work) >> - client.send(Lila.Move(work.request.id, work.request.moves, uci))) + state - task.id -> (monitor.success(task) >> + client.send(Lila.Move(task.request.id, task.request.moves, uci))) case _ => - val (newState, io) = work.clearAssignedKey match + val (newState, io) = task.clearAssignedKey match case None => state - workId -> Logger[IO].warn( - s"Give up move due to invalid move $response of key $key for $work" + s"Give up move due to invalid move $response of key $key for $task" ) - case Some(updated) => state.updated(work.id, updated) -> IO.unit - newState -> io *> failure(work, key) + case Some(updated) => state.updated(task.id, updated) -> IO.unit + newState -> io *> failure(task, key) def clean(since: Instant): IO[Unit] = ref.flatModify: state => @@ -75,25 +75,25 @@ object Executor: *> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move due to clean up: $m")) *> monitor.updateSize(newState) - private def logIfTimedOut(state: AppState, timeOut: List[Work.Move]): IO[Unit] = + private def logIfTimedOut(state: AppState, timeOut: List[Work.Task]): IO[Unit] = IO.whenA(timeOut.nonEmpty): Logger[IO].debug(s"cleaning ${timeOut.size} of ${state.size} moves") *> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m")) - private def failure(work: Work.Move, clientKey: ClientKey) = + private def failure(work: Work.Task, clientKey: ClientKey) = Logger[IO].warn(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey") private def logNotFound(id: WorkId, clientKey: ClientKey) = Logger[IO].info(s"Received unknown work $id by $clientKey") - private def logNotAcquired(work: Work.Move, clientKey: ClientKey) = + private def logNotAcquired(work: Work.Task, clientKey: ClientKey) = Logger[IO].info( s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}" ) - def fromRequest(req: Lila.Request): IO[Move] = + def fromRequest(req: Lila.Request): IO[Work.Task] = (IO(Work.makeId), IO.realTimeInstant).mapN: (id, now) => - Move( + Work.Task( id = id, request = req, tries = 0, diff --git a/app/src/main/scala/Jobs.scala b/app/src/main/scala/Jobs.scala index 9e1604b..f16c1cd 100644 --- a/app/src/main/scala/Jobs.scala +++ b/app/src/main/scala/Jobs.scala @@ -22,7 +22,7 @@ object RedisSubscriberJob: .readMoveReq(msg.message) .match case Some(request) => executor.add(request) - case None => Logger[IO].warn(s"Failed to parse message: $msg") + case None => Logger[IO].warn(s"Failed to parse message from lila: $msg") >> Logger[IO].debug(s"Received message: $msg") ) *> pubsub.runMessages diff --git a/app/src/main/scala/Monitor.scala b/app/src/main/scala/Monitor.scala index 9e49e9b..38259bc 100644 --- a/app/src/main/scala/Monitor.scala +++ b/app/src/main/scala/Monitor.scala @@ -8,8 +8,8 @@ import kamon.metric.Timer import java.time.Instant trait Monitor: - def success(work: Work.Move): IO[Unit] - def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] + def success(work: Work.Task): IO[Unit] + def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] object Monitor: @@ -21,13 +21,13 @@ object Monitor: def apply: Monitor = new Monitor: - def success(work: Work.Move): IO[Unit] = + def success(work: Work.Task): IO[Unit] = IO.realTimeInstant.map: now => if work.request.level == 8 then work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now)) if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now) - def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = + def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] = IO.delay(dbSize.update(map.size.toDouble)) *> IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *> IO.delay(dbAcquired.update(map.count(_._2.isAcquired).toDouble)).void diff --git a/app/src/main/scala/Work.scala b/app/src/main/scala/Work.scala index 33f6fc9..9cab5d7 100644 --- a/app/src/main/scala/Work.scala +++ b/app/src/main/scala/Work.scala @@ -4,20 +4,10 @@ import java.time.Instant object Work: - case class RequestWithId(id: WorkId, request: Lila.Request): - def toResponse = - Fishnet.WorkResponse( - work = Fishnet.Work(id = id, level = request.level, clock = request.clock), - game_id = request.id.value, - position = request.initialFen, - moves = request.moves, - variant = request.variant - ) - case class Acquired(clientKey: ClientKey, date: Instant): override def toString = s"by $clientKey at $date" - case class Move( + case class Task( id: WorkId, request: Lila.Request, tries: Int, @@ -25,27 +15,29 @@ object Work: createdAt: Instant ): - def toRequestWithId = - RequestWithId(id, request) - - def acquiredAt = acquired.map(_.date) - def acquiredByKey: Option[ClientKey] = acquired.map(_.clientKey) - def isAcquiredBy(clientKey: ClientKey) = acquiredByKey.contains(clientKey) - def isAcquired = acquired.isDefined - def nonAcquired = !isAcquired - def acquiredBefore(date: Instant) = acquiredAt.exists(_.isBefore(date)) + def acquiredAt: Option[Instant] = acquired.map(_.date) + def isAcquired: Boolean = acquired.isDefined + def nonAcquired: Boolean = !isAcquired + def isAcquiredBy(clientKey: ClientKey): Boolean = acquired.exists(_.clientKey == clientKey) + def acquiredBefore(date: Instant): Boolean = acquiredAt.exists(_.isBefore(date)) def assignTo(clientKey: ClientKey, at: Instant) = copy(acquired = Some(Acquired(clientKey = clientKey, date = at)), tries = tries + 1) def isOutOfTries = tries >= 3 - def similar(to: Move) = request.id == to.request.id && request.moves == to.request.moves - // returns the move without the acquired key if it's not out of tries - def clearAssignedKey: Option[Work.Move] = + def clearAssignedKey: Option[Work.Task] = Option.when(!isOutOfTries)(copy(acquired = None)) + def toResponse = + Fishnet.WorkResponse( + work = Fishnet.Work(id = id, level = request.level, clock = request.clock), + game_id = request.id, + position = request.initialFen, + moves = request.moves, + variant = request.variant + ) override def toString = s"id:$id game:${request.id} variant:${request.variant.key} level:${request.level} tries:$tries created:$createdAt acquired:$acquired move: ${request.moves}" diff --git a/app/src/main/scala/model.scala b/app/src/main/scala/model.scala index e03632b..ae4b056 100644 --- a/app/src/main/scala/model.scala +++ b/app/src/main/scala/model.scala @@ -53,7 +53,7 @@ object Fishnet: case class WorkResponse( work: Work, - game_id: String, + game_id: GameId, position: Fen.Epd, moves: String, variant: Variant @@ -65,8 +65,6 @@ object Lila: def sign = moves.takeRight(20).replace(" ", "") def write = s"$gameId $sign ${uci.uci}" - case class Clock(wtime: Int, btime: Int, inc: Int) derives Codec.AsObject - case class Request( id: GameId, initialFen: Fen.Epd, @@ -76,6 +74,8 @@ object Lila: clock: Option[Clock] ) + case class Clock(wtime: Int, btime: Int, inc: Int) derives Codec.AsObject + def readMoveReq(msg: String): Option[Request] = msg.split(";", 6) match case Array(gameId, levelS, clockS, variantS, initialFenS, moves) => diff --git a/app/src/test/scala/ExecutorTest.scala b/app/src/test/scala/ExecutorTest.scala index 7ecaf42..34b24d6 100644 --- a/app/src/test/scala/ExecutorTest.scala +++ b/app/src/test/scala/ExecutorTest.scala @@ -117,7 +117,7 @@ object ExecutorTest extends SimpleIOSuite: _ <- executor.move(workId, key, invalidMove) acquiredOption <- executor.acquire(key) acquired = acquiredOption.get - yield expect.same(acquired, Work.RequestWithId(workId, request)) + yield expect.same(acquired.request, request) test("should not give up after 2 tries"): for diff --git a/app/src/test/scala/Helper.scala b/app/src/test/scala/Helper.scala index c507735..bae0ce6 100644 --- a/app/src/test/scala/Helper.scala +++ b/app/src/test/scala/Helper.scala @@ -6,8 +6,8 @@ object Helper: val noopMonitor: Monitor = new Monitor: - def success(work: Work.Move): IO[Unit] = IO.unit - def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit + def success(work: Work.Task): IO[Unit] = IO.unit + def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] = IO.unit val noopLilaClient: LilaClient = new LilaClient: diff --git a/app/src/test/scala/http/FishnetRoutesTest.scala b/app/src/test/scala/http/FishnetRoutesTest.scala index 4427018..4226784 100644 --- a/app/src/test/scala/http/FishnetRoutesTest.scala +++ b/app/src/test/scala/http/FishnetRoutesTest.scala @@ -47,7 +47,7 @@ object FishnetRoutesTest extends SimpleIOSuite: "variant": "Standard" }""" - val requestWithId = Work.RequestWithId( + val task = Work.Task( id = WorkId("workid"), request = Lila.Request( id = GameId("gameid"), @@ -56,7 +56,10 @@ object FishnetRoutesTest extends SimpleIOSuite: variant = chess.variant.Standard, level = 1, clock = Some(Lila.Clock(wtime = 600, btime = 600, inc = 0)) - ) + ), + tries = 1, + acquired = none, + createdAt = Instant.now ) test("POST /fishnet/acquire should return work response"): @@ -89,9 +92,9 @@ object FishnetRoutesTest extends SimpleIOSuite: def createExecutor(): Executor = new Executor: - def acquire(key: ClientKey) = IO.pure(requestWithId.some) + def acquire(key: ClientKey) = IO.pure(task.some) def move(id: WorkId, key: ClientKey, move: BestMove): IO[Unit] = - if id == requestWithId.id then IO.unit + if id == task.id then IO.unit else IO.raiseError(new Exception("invalid work id")) def add(request: Lila.Request): IO[Unit] = IO.unit def clean(before: Instant) = IO.unit