Skip to content

Commit

Permalink
Make AppState opaque
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Feb 28, 2024
1 parent f672cb5 commit 91e4b46
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 35 deletions.
33 changes: 20 additions & 13 deletions app/src/main/scala/AppState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.syntax.all.*
import java.time.Instant
import lila.fishnet.Work.Task

type AppState = Map[WorkId, Work.Task]
opaque type AppState = Map[WorkId, Work.Task]

enum GetTaskResult:
case NotFound
Expand All @@ -15,25 +15,26 @@ object AppState:
val empty: AppState = Map.empty
extension (state: AppState)

inline def isFull(maxSize: Int): Boolean =
state.sizeIs >= maxSize

inline def add(task: Task): AppState =
state + (task.id -> task)

inline def remove(id: WorkId): AppState =
state - id

inline def size: Int = state.size

inline def count(p: Task => Boolean): Int = state.count(x => p(x._2))

def tryAcquireTask(key: ClientKey, at: Instant): (AppState, Option[Task]) =
state.earliestNonAcquiredTask
.map: newTask =>
val assignedTask = newTask.assignTo(key, at)
state.updated(assignedTask.id, assignedTask) -> assignedTask.some
.getOrElse(state -> none)

def isFull(maxSize: Int): Boolean =
state.sizeIs >= maxSize

def addTask(move: Task): AppState =
state + (move.id -> move)

def acquiredBefore(since: Instant): List[Work.Task] =
state.values.filter(_.acquiredBefore(since)).toList

def earliestNonAcquiredTask: Option[Work.Task] =
state.values.filter(_.nonAcquired).minByOption(_.createdAt)

def apply(workId: WorkId, key: ClientKey): GetTaskResult =
state.get(workId) match
case None => GetTaskResult.NotFound
Expand All @@ -46,3 +47,9 @@ object AppState:
case None => (state - task.id, task :: xs)
case Some(unAssignedTask) => (state.updated(task.id, unAssignedTask), xs)
}

def acquiredBefore(since: Instant): List[Work.Task] =
state.values.filter(_.acquiredBefore(since)).toList

def earliestNonAcquiredTask: Option[Work.Task] =
state.values.filter(_.nonAcquired).minByOption(_.createdAt)
8 changes: 4 additions & 4 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object Executor:
AppState.empty ->
Logger[IO].warn(s"StateSize=${state.size} maxSize=${config.maxSize}. Dropping all!")
else state -> IO.unit
newState.addTask(task) -> effect
newState.add(task) -> effect

def acquire(key: ClientKey): IO[Option[Work.Task]] =
IO.realTimeInstant.flatMap: at =>
Expand All @@ -55,15 +55,15 @@ object Executor:
case GetTaskResult.Found(task) =>
response.uci match
case Some(uci) =>
state - task.id -> (monitor.success(task) >>
state.remove(task.id) -> (monitor.success(task) >>
client.send(Lila.Move(task.request.id, task.request.moves, uci)))
case _ =>
val (newState, io) = task.clearAssignedKey match
case None =>
state - workId -> Logger[IO].warn(
state.remove(workId) -> Logger[IO].warn(
s"Give up move due to invalid move $response of $key for $task"
)
case Some(updated) => state.updated(task.id, updated) -> IO.unit
case Some(updated) => state.add(updated) -> IO.unit
newState -> io *> failure(task, key)

def clean(since: Instant): IO[Unit] =
Expand Down
24 changes: 12 additions & 12 deletions app/src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package lila.fishnet

import cats.effect.IO
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import kamon.Kamon
import kamon.metric.Timer
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit

trait Monitor:
def success(work: Work.Task): IO[Unit]
def updateSize(map: Map[WorkId, Work.Task]): IO[Unit]
def updateSize(map: AppState): IO[Unit]

object Monitor:

val dbSize = Kamon.gauge("db.size").withoutTags()
val dbQueued = Kamon.gauge("db.queued").withoutTags()
val dbAcquired = Kamon.gauge("db.acquired").withoutTags()
val lvl8AcquiredTimeRequest: Timer = Kamon.timer("move.acquired.lvl8").withoutTags()
val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags()
val dbSize = Kamon.gauge("db.size").withoutTags()
val dbQueued = Kamon.gauge("db.queued").withoutTags()
val dbAcquired = Kamon.gauge("db.acquired").withoutTags()
val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags()
val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags()

def apply: Monitor =
new Monitor:
Expand All @@ -27,10 +27,10 @@ object Monitor:
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] =
IO.delay(dbSize.update(map.size.toDouble)) *>
IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *>
IO.delay(dbAcquired.update(map.count(_._2.isAcquired).toDouble)).void
def updateSize(map: AppState): IO[Unit] =
IO(dbSize.update(map.size.toDouble)) *>
IO(dbQueued.update(map.count(_.nonAcquired).toDouble)) *>
IO(dbAcquired.update(map.count(_.isAcquired).toDouble)).void

private def record(timer: Timer, start: Instant, end: Instant): Unit =
val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS)
9 changes: 5 additions & 4 deletions app/src/test/scala/ExecutorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ object ExecutorTest extends SimpleIOSuite:
clock = None
)

val key = ClientKey("key")
val key = ClientKey("key")
val key2 = ClientKey("key2")

val validMove = BestMove("e2e4")
val invalidMove = BestMove("2e4")
Expand All @@ -34,7 +35,7 @@ object ExecutorTest extends SimpleIOSuite:
acquired <- executor.acquire(key)
yield assert(acquired.isEmpty)

test("acquire when there is work should return work.some"):
test("acquire when there is work should return some work"):
for
executor <- createExecutor()
_ <- executor.add(request)
Expand All @@ -56,7 +57,7 @@ object ExecutorTest extends SimpleIOSuite:
executor <- createExecutor()
_ <- executor.add(request)
_ <- executor.acquire(key)
acquired <- executor.acquire(key)
acquired <- executor.acquire(key2)
yield assert(acquired.isEmpty)

test("post move after acquire should send move"):
Expand Down Expand Up @@ -147,7 +148,7 @@ object ExecutorTest extends SimpleIOSuite:
_ <- executor.add(request.copy(id = GameId("3")))
_ <- executor.add(request.copy(id = GameId("4")))
acquired <- executor.acquire(key)
empty <- executor.acquire(ClientKey("key2"))
empty <- executor.acquire(key2)
yield assert(acquired.isDefined && empty.isEmpty)

def createExecutor(config: ExecutorConfig = ExecutorConfig(300)): IO[Executor] =
Expand Down
4 changes: 2 additions & 2 deletions app/src/test/scala/Helper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ object Helper:

val noopMonitor: Monitor =
new Monitor:
def success(work: Work.Task): IO[Unit] = IO.unit
def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] = IO.unit
def success(work: Work.Task): IO[Unit] = IO.unit
def updateSize(map: AppState): IO[Unit] = IO.unit

val noopLilaClient: LilaClient =
new LilaClient:
Expand Down

0 comments on commit 91e4b46

Please sign in to comment.