Skip to content

Commit

Permalink
Better logging and removing work after giving up
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Feb 27, 2024
1 parent b9b6132 commit 58714db
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
11 changes: 11 additions & 0 deletions app/src/main/scala/AppState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import lila.fishnet.Work.Move

type AppState = Map[WorkId, Work.Move]

enum GetWorkResult:
case NotFound
case Found(move: Work.Move)
case AcquiredByOther(move: Work.Move)

object AppState:
val empty: AppState = Map.empty
extension (state: AppState)
Expand All @@ -29,6 +34,12 @@ object AppState:
def earliestNonAcquiredMove: Option[Work.Move] =
state.values.filter(_.nonAcquired).minByOption(_.createdAt)

def getWork(workId: WorkId, key: ClientKey): GetWorkResult =
state.get(workId) match
case None => GetWorkResult.NotFound
case Some(move) if move.isAcquiredBy(key) => GetWorkResult.Found(move)
case Some(move) => GetWorkResult.AcquiredByOther(move)

def updateOrGiveUp(candidates: List[Work.Move]): (AppState, List[Work.Move]) =
candidates.foldLeft[(AppState, List[Work.Move])](state -> Nil) { case ((state, xs), m) =>
m.clearAssginedKey match
Expand Down
38 changes: 24 additions & 14 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,50 @@ object Executor:
IO.realTimeInstant.flatMap: at =>
ref.modify(_.tryAcquireMove(key, at))

def move(workId: WorkId, key: ClientKey, move: BestMove): IO[Unit] =
def move(workId: WorkId, key: ClientKey, response: BestMove): IO[Unit] =
ref.flatModify: state =>
state.get(workId) match
case None => state -> Logger[IO].info(s"Received unknown work $workId by $key")
case Some(work) if work.isAcquiredBy(key) =>
move.uci match
state.getWork(workId, key) match
case GetWorkResult.NotFound => state -> logNotFound(workId, key)
case GetWorkResult.AcquiredByOther(move) => state -> logNotAcquired(move, key)
case GetWorkResult.Found(work) =>
response.uci match
case Some(uci) =>
state - work.id -> (monitor.success(work) >>
client.send(Lila.Move(work.request.id, work.request.moves, uci)))
case _ =>
val (newState, io) = work.clearAssginedKey match
case None => state -> Logger[IO].warn(s"Give up move: $work")
case Some(clearWork) => state.updated(work.id, clearWork) -> IO.unit
newState ->
io *> Logger[IO].warn(s"Received invalid move $workId for ${work.request.id} by $key")
case Some(move) =>
state -> Logger[IO].info(
s"Received unacquired move ${workId} for ${move.request.id} by $key. Work current tries: ${move.tries} acquired: ${move.acquired}"
)
case None =>
state - workId -> Logger[IO].warn(
s"Give up move due to invalid move $response of key $key for $work"
)
case Some(updated) => state.updated(work.id, updated) -> IO.unit
newState -> io *> failure(work, key)

def clean(since: Instant): IO[Unit] =
ref.flatModify: state =>
val timedOut = state.acquiredBefore(since)
val logs = logIfTimedOut(state, timedOut)
val (newState, gavedUpMoves) = state.updateOrGiveUp(timedOut)
newState -> logs
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move: $m"))
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move due to clean up: $m"))
*> monitor.updateSize(newState)

private def logIfTimedOut(state: AppState, timeOut: List[Work.Move]): IO[Unit] =
IO.whenA(timeOut.nonEmpty):
Logger[IO].debug(s"cleaning ${timeOut.size} of ${state.size} moves")
*> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))

private def failure(work: Work.Move, clientKey: ClientKey) =
Logger[IO].warn(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")

private def logNotFound(id: WorkId, clientKey: ClientKey) =
Logger[IO].info(s"Received unknown work $id by $clientKey")

private def logNotAcquired(work: Work.Move, clientKey: ClientKey) =
Logger[IO].info(
s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
)

def fromRequest(req: Lila.Request): IO[Move] =
(IO(Work.makeId), IO.realTimeInstant).mapN: (id, now) =>
Move(
Expand Down
7 changes: 2 additions & 5 deletions app/src/test/scala/Helper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ object Helper:

val noopMonitor: Monitor =
new Monitor:
def success(work: Work.Move): IO[Unit] = IO.unit
def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit] = IO.unit
def notFound(id: WorkId, clientKey: ClientKey): IO[Unit] = IO.unit
def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit] = IO.unit
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit
def success(work: Work.Move): IO[Unit] = IO.unit
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit

val noopLilaClient: LilaClient =
new LilaClient:
Expand Down

0 comments on commit 58714db

Please sign in to comment.