diff --git a/app/src/main/scala/AppState.scala b/app/src/main/scala/AppState.scala index c35d906..2edce14 100644 --- a/app/src/main/scala/AppState.scala +++ b/app/src/main/scala/AppState.scala @@ -4,7 +4,7 @@ import cats.syntax.all.* import java.time.Instant import lila.fishnet.Work.Task -type AppState = Map[WorkId, Work.Task] +opaque type AppState = Map[WorkId, Work.Task] enum GetTaskResult: case NotFound @@ -15,6 +15,19 @@ object AppState: val empty: AppState = Map.empty extension (state: AppState) + inline def isFull(maxSize: Int): Boolean = + state.sizeIs >= maxSize + + inline def add(task: Task): AppState = + state + (task.id -> task) + + inline def remove(id: WorkId): AppState = + state - id + + inline def size: Int = state.size + + inline def count(p: Task => Boolean): Int = state.count(x => p(x._2)) + def tryAcquireTask(key: ClientKey, at: Instant): (AppState, Option[Task]) = state.earliestNonAcquiredTask .map: newTask => @@ -22,18 +35,6 @@ object AppState: state.updated(assignedTask.id, assignedTask) -> assignedTask.some .getOrElse(state -> none) - def isFull(maxSize: Int): Boolean = - state.sizeIs >= maxSize - - def addTask(move: Task): AppState = - state + (move.id -> move) - - def acquiredBefore(since: Instant): List[Work.Task] = - state.values.filter(_.acquiredBefore(since)).toList - - def earliestNonAcquiredTask: Option[Work.Task] = - state.values.filter(_.nonAcquired).minByOption(_.createdAt) - def apply(workId: WorkId, key: ClientKey): GetTaskResult = state.get(workId) match case None => GetTaskResult.NotFound @@ -46,3 +47,9 @@ object AppState: case None => (state - task.id, task :: xs) case Some(unAssignedTask) => (state.updated(task.id, unAssignedTask), xs) } + + def acquiredBefore(since: Instant): List[Work.Task] = + state.values.filter(_.acquiredBefore(since)).toList + + def earliestNonAcquiredTask: Option[Work.Task] = + state.values.filter(_.nonAcquired).minByOption(_.createdAt) diff --git a/app/src/main/scala/Executor.scala b/app/src/main/scala/Executor.scala index edc78ad..120d508 100644 --- a/app/src/main/scala/Executor.scala +++ b/app/src/main/scala/Executor.scala @@ -41,7 +41,7 @@ object Executor: AppState.empty -> Logger[IO].warn(s"StateSize=${state.size} maxSize=${config.maxSize}. Dropping all!") else state -> IO.unit - newState.addTask(task) -> effect + newState.add(task) -> effect def acquire(key: ClientKey): IO[Option[Work.Task]] = IO.realTimeInstant.flatMap: at => @@ -55,15 +55,15 @@ object Executor: case GetTaskResult.Found(task) => response.uci match case Some(uci) => - state - task.id -> (monitor.success(task) >> + state.remove(task.id) -> (monitor.success(task) >> client.send(Lila.Move(task.request.id, task.request.moves, uci))) case _ => val (newState, io) = task.clearAssignedKey match case None => - state - workId -> Logger[IO].warn( + state.remove(workId) -> Logger[IO].warn( s"Give up move due to invalid move $response of $key for $task" ) - case Some(updated) => state.updated(task.id, updated) -> IO.unit + case Some(updated) => state.add(updated) -> IO.unit newState -> io *> failure(task, key) def clean(since: Instant): IO[Unit] = diff --git a/app/src/main/scala/Monitor.scala b/app/src/main/scala/Monitor.scala index 38259bc..57501ff 100644 --- a/app/src/main/scala/Monitor.scala +++ b/app/src/main/scala/Monitor.scala @@ -1,23 +1,23 @@ package lila.fishnet import cats.effect.IO -import java.time.temporal.ChronoUnit -import java.util.concurrent.TimeUnit import kamon.Kamon import kamon.metric.Timer import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit trait Monitor: def success(work: Work.Task): IO[Unit] - def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] + def updateSize(map: AppState): IO[Unit] object Monitor: - val dbSize = Kamon.gauge("db.size").withoutTags() - val dbQueued = Kamon.gauge("db.queued").withoutTags() - val dbAcquired = Kamon.gauge("db.acquired").withoutTags() - val lvl8AcquiredTimeRequest: Timer = Kamon.timer("move.acquired.lvl8").withoutTags() - val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags() + val dbSize = Kamon.gauge("db.size").withoutTags() + val dbQueued = Kamon.gauge("db.queued").withoutTags() + val dbAcquired = Kamon.gauge("db.acquired").withoutTags() + val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags() + val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags() def apply: Monitor = new Monitor: @@ -27,10 +27,10 @@ object Monitor: work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now)) if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now) - def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] = - IO.delay(dbSize.update(map.size.toDouble)) *> - IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *> - IO.delay(dbAcquired.update(map.count(_._2.isAcquired).toDouble)).void + def updateSize(map: AppState): IO[Unit] = + IO(dbSize.update(map.size.toDouble)) *> + IO(dbQueued.update(map.count(_.nonAcquired).toDouble)) *> + IO(dbAcquired.update(map.count(_.isAcquired).toDouble)).void private def record(timer: Timer, start: Instant, end: Instant): Unit = val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS) diff --git a/app/src/test/scala/ExecutorTest.scala b/app/src/test/scala/ExecutorTest.scala index 34b24d6..d96eafa 100644 --- a/app/src/test/scala/ExecutorTest.scala +++ b/app/src/test/scala/ExecutorTest.scala @@ -23,7 +23,8 @@ object ExecutorTest extends SimpleIOSuite: clock = None ) - val key = ClientKey("key") + val key = ClientKey("key") + val key2 = ClientKey("key2") val validMove = BestMove("e2e4") val invalidMove = BestMove("2e4") @@ -34,7 +35,7 @@ object ExecutorTest extends SimpleIOSuite: acquired <- executor.acquire(key) yield assert(acquired.isEmpty) - test("acquire when there is work should return work.some"): + test("acquire when there is work should return some work"): for executor <- createExecutor() _ <- executor.add(request) @@ -56,7 +57,7 @@ object ExecutorTest extends SimpleIOSuite: executor <- createExecutor() _ <- executor.add(request) _ <- executor.acquire(key) - acquired <- executor.acquire(key) + acquired <- executor.acquire(key2) yield assert(acquired.isEmpty) test("post move after acquire should send move"): @@ -147,7 +148,7 @@ object ExecutorTest extends SimpleIOSuite: _ <- executor.add(request.copy(id = GameId("3"))) _ <- executor.add(request.copy(id = GameId("4"))) acquired <- executor.acquire(key) - empty <- executor.acquire(ClientKey("key2")) + empty <- executor.acquire(key2) yield assert(acquired.isDefined && empty.isEmpty) def createExecutor(config: ExecutorConfig = ExecutorConfig(300)): IO[Executor] = diff --git a/app/src/test/scala/Helper.scala b/app/src/test/scala/Helper.scala index bae0ce6..e5dc985 100644 --- a/app/src/test/scala/Helper.scala +++ b/app/src/test/scala/Helper.scala @@ -6,8 +6,8 @@ object Helper: val noopMonitor: Monitor = new Monitor: - def success(work: Work.Task): IO[Unit] = IO.unit - def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] = IO.unit + def success(work: Work.Task): IO[Unit] = IO.unit + def updateSize(map: AppState): IO[Unit] = IO.unit val noopLilaClient: LilaClient = new LilaClient: