Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purify AppState #273

Merged
merged 10 commits into from
Feb 27, 2024
72 changes: 23 additions & 49 deletions app/src/main/scala/AppState.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package lila.fishnet

import cats.syntax.all.*
import cats.effect.IO
import java.time.Instant
import lila.fishnet.Work.Move
import org.typelevel.log4cats.Logger

type AppState = Map[WorkId, Work.Move]

enum GetWorkResult:
case NotFound
case Found(move: Work.Move)
case AcquiredByOther(move: Work.Move)

object AppState:
val empty: AppState = Map.empty
extension (state: AppState)
Expand All @@ -19,56 +22,27 @@ object AppState:
state.updated(move.id, move) -> move.toRequestWithId.some
.getOrElse(state -> none)

def clearIfFull(maxSize: Int)(using Logger[IO]): (AppState, IO[Unit]) =
if state.size >= maxSize then
Map.empty -> Logger[IO].warn(s"MoveDB collection is full! maxSize=${maxSize}. Dropping all now!")
else state -> IO.unit

def addWork(move: Move, maxSize: Int)(using Logger[IO]): (AppState, IO[Unit]) =
val (newState, effect) = state.clearIfFull(maxSize)
newState + (move.id -> move) -> effect
def isFull(maxSize: Int): Boolean =
state.sizeIs >= maxSize

def applyMove(monitor: Monitor, client: LilaClient)(workId: WorkId, apikey: ClientKey, move: BestMove)(
using Logger[IO]
): (AppState, IO[Unit]) =
state.get(workId) match
case None =>
state -> monitor.notFound(workId, apikey)
case Some(work) if work.isAcquiredBy(apikey) =>
move.uci match
case Some(uci) =>
state - work.id -> (monitor.success(work) >> client.send(
Lila.Move(
work.request.id,
work.request.moves,
uci
)
))
case _ =>
val (newState, failedMove) = state.updateOrGiveUp(work.invalid)
newState -> (Logger[IO].warn(s"Give up move: $failedMove") >>
monitor.failure(work, apikey, new Exception("Missing move")))
case Some(move) =>
state -> monitor.notAcquired(move, apikey)
def addWork(move: Move): AppState =
state + (move.id -> move)

def clean(monitor: Monitor)(since: Instant)(using Logger[IO]): (AppState, IO[Unit]) =
val timedOut: List[Work.Move] = state.values.filter(_.acquiredBefore(since)).toList
val logIfTimedOut =
if timedOut.nonEmpty then
Logger[IO].debug(s"cleaning ${timedOut.size} of ${state.size} moves") >>
timedOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))
else IO.unit
val (newState, gavedUpMoves) = timedOut.foldLeft[(AppState, List[Work.Move])](state -> Nil): (x, m) =>
val (newState, move) = x._1.updateOrGiveUp(m.timeout)
(newState, move.fold(x._2)(_ :: x._2))
newState -> logIfTimedOut
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move: $m"))
*> monitor.updateSize(newState)
def acquiredBefore(since: Instant): List[Work.Move] =
state.values.filter(_.acquiredBefore(since)).toList

def earliestNonAcquiredMove: Option[Work.Move] =
state.values.filter(_.nonAcquired).minByOption(_.createdAt)

def updateOrGiveUp(move: Work.Move): (AppState, Option[Work.Move]) =
val newState = state - move.id
if move.isOutOfTries then (newState, move.some)
else (newState + (move.id -> move), none)
def getWork(workId: WorkId, key: ClientKey): GetWorkResult =
state.get(workId) match
case None => GetWorkResult.NotFound
case Some(move) if move.isAcquiredBy(key) => GetWorkResult.Found(move)
case Some(move) => GetWorkResult.AcquiredByOther(move)

def updateOrGiveUp(candidates: List[Work.Move]): (AppState, List[Work.Move]) =
candidates.foldLeft[(AppState, List[Work.Move])](state -> Nil) { case ((state, xs), m) =>
m.clearAssignedKey match
case None => (state - m.id, m :: xs)
case Some(move) => (state.updated(m.id, move), xs)
}
55 changes: 50 additions & 5 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ trait Executor:
def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove): IO[Unit]
// add work to queue
def add(work: Lila.Request): IO[Unit]
// clean up all works that are acquired before the given time
def clean(before: Instant): IO[Unit]

object Executor:
Expand All @@ -28,23 +29,67 @@ object Executor:

def instance(client: LilaClient, monitor: Monitor, config: ExecutorConfig)(using Logger[IO]): IO[Executor] =
Ref
.of[IO, AppState](Map.empty)
.of[IO, AppState](AppState.empty)
.map: ref =>
new Executor:

def add(work: Request): IO[Unit] =
fromRequest(work).flatMap: move =>
ref.flatModify(_.addWork(move, config.maxSize))
ref.flatModify: state =>
val (newState, effect) =
if state.isFull(config.maxSize) then
AppState.empty ->
Logger[IO].warn(s"StateSize=${state.size} maxSize=${config.maxSize}. Dropping all!")
else state -> IO.unit
newState.addWork(move) -> effect

def acquire(key: ClientKey): IO[Option[Work.RequestWithId]] =
IO.realTimeInstant.flatMap: at =>
ref.modify(_.tryAcquireMove(key, at))

def move(workId: WorkId, apikey: ClientKey, move: BestMove): IO[Unit] =
ref.flatModify(_.applyMove(monitor, client)(workId, apikey, move))
def move(workId: WorkId, key: ClientKey, response: BestMove): IO[Unit] =
ref.flatModify: state =>
state.getWork(workId, key) match
case GetWorkResult.NotFound => state -> logNotFound(workId, key)
case GetWorkResult.AcquiredByOther(move) => state -> logNotAcquired(move, key)
case GetWorkResult.Found(work) =>
response.uci match
case Some(uci) =>
state - work.id -> (monitor.success(work) >>
client.send(Lila.Move(work.request.id, work.request.moves, uci)))
case _ =>
val (newState, io) = work.clearAssignedKey match
case None =>
state - workId -> Logger[IO].warn(
s"Give up move due to invalid move $response of key $key for $work"
)
case Some(updated) => state.updated(work.id, updated) -> IO.unit
newState -> io *> failure(work, key)

def clean(since: Instant): IO[Unit] =
ref.flatModify(_.clean(monitor)(since))
ref.flatModify: state =>
val timedOut = state.acquiredBefore(since)
val logs = logIfTimedOut(state, timedOut)
val (newState, gavedUpMoves) = state.updateOrGiveUp(timedOut)
newState -> logs
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move due to clean up: $m"))
*> monitor.updateSize(newState)

private def logIfTimedOut(state: AppState, timeOut: List[Work.Move]): IO[Unit] =
IO.whenA(timeOut.nonEmpty):
Logger[IO].debug(s"cleaning ${timeOut.size} of ${state.size} moves")
*> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))

private def failure(work: Work.Move, clientKey: ClientKey) =
Logger[IO].warn(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")

private def logNotFound(id: WorkId, clientKey: ClientKey) =
Logger[IO].info(s"Received unknown work $id by $clientKey")

private def logNotAcquired(work: Work.Move, clientKey: ClientKey) =
Logger[IO].info(
s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
)

def fromRequest(req: Lila.Request): IO[Move] =
(IO(Work.makeId), IO.realTimeInstant).mapN: (id, now) =>
Expand Down
17 changes: 1 addition & 16 deletions app/src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@ import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import kamon.Kamon
import kamon.metric.Timer
import org.typelevel.log4cats.Logger
import java.time.Instant

trait Monitor:
def success(work: Work.Move): IO[Unit]
def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit]
def notFound(id: WorkId, clientKey: ClientKey): IO[Unit]
def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit]
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit]

object Monitor:
Expand All @@ -23,25 +19,14 @@ object Monitor:
val lvl8AcquiredTimeRequest: Timer = Kamon.timer("move.acquired.lvl8").withoutTags()
val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags()

def apply(using Logger[IO]): Monitor =
def apply: Monitor =
new Monitor:
def success(work: Work.Move): IO[Unit] =
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def failure(work: Work.Move, clientKey: ClientKey, e: Exception) =
Logger[IO].warn(e)(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")

def notFound(id: WorkId, clientKey: ClientKey) =
Logger[IO].info(s"Received unknown work $id by $clientKey")

def notAcquired(work: Work.Move, clientKey: ClientKey) =
Logger[IO].info(
s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
)

def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] =
IO.delay(dbSize.update(map.size.toDouble)) *>
IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *>
Expand Down
7 changes: 4 additions & 3 deletions app/src/main/scala/Work.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ object Work:
def assignTo(clientKey: ClientKey, at: Instant) =
copy(acquired = Some(Acquired(clientKey = clientKey, date = at)), tries = tries + 1)

def timeout = copy(acquired = None)
def invalid = copy(acquired = None)

def isOutOfTries = tries >= 3

def similar(to: Move) = request.id == to.request.id && request.moves == to.request.moves

// returns the move without the acquired key if it's not out of tries
def clearAssignedKey: Option[Work.Move] =
Option.when(!isOutOfTries)(copy(acquired = None))

override def toString =
s"id:$id game:${request.id} variant:${request.variant.key} level:${request.level} tries:$tries created:$createdAt acquired:$acquired move: ${request.moves}"

Expand Down
7 changes: 2 additions & 5 deletions app/src/test/scala/Helper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ object Helper:

val noopMonitor: Monitor =
new Monitor:
def success(work: Work.Move): IO[Unit] = IO.unit
def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit] = IO.unit
def notFound(id: WorkId, clientKey: ClientKey): IO[Unit] = IO.unit
def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit] = IO.unit
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit
def success(work: Work.Move): IO[Unit] = IO.unit
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit

val noopLilaClient: LilaClient =
new LilaClient:
Expand Down