Skip to content

Commit

Permalink
Refactor & add comments
Browse files Browse the repository at this point in the history
- Rename Work.Move => Work.Task
- Remove RequestWithId use Task instead
- Use GameId instead of String
  • Loading branch information
lenguyenthanh committed Feb 28, 2024
1 parent 1b1a080 commit e789994
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 88 deletions.
46 changes: 23 additions & 23 deletions app/src/main/scala/AppState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,47 @@ package lila.fishnet

import cats.syntax.all.*
import java.time.Instant
import lila.fishnet.Work.Move
import lila.fishnet.Work.Task

type AppState = Map[WorkId, Work.Move]
type AppState = Map[WorkId, Work.Task]

enum GetWorkResult:
enum GetTaskResult:
case NotFound
case Found(move: Work.Move)
case AcquiredByOther(move: Work.Move)
case Found(task: Work.Task)
case AcquiredByOther(task: Work.Task)

object AppState:
val empty: AppState = Map.empty
extension (state: AppState)

def tryAcquireMove(key: ClientKey, at: Instant): (AppState, Option[Work.RequestWithId]) =
state.earliestNonAcquiredMove
.map: m =>
val move = m.assignTo(key, at)
state.updated(move.id, move) -> move.toRequestWithId.some
def tryAcquireTask(key: ClientKey, at: Instant): (AppState, Option[Task]) =
state.earliestNonAcquiredTask
.map: newTask =>
val assignedTask = newTask.assignTo(key, at)
state.updated(assignedTask.id, assignedTask) -> assignedTask.some
.getOrElse(state -> none)

def isFull(maxSize: Int): Boolean =
state.sizeIs >= maxSize

def addWork(move: Move): AppState =
def addTask(move: Task): AppState =
state + (move.id -> move)

def acquiredBefore(since: Instant): List[Work.Move] =
def acquiredBefore(since: Instant): List[Work.Task] =
state.values.filter(_.acquiredBefore(since)).toList

def earliestNonAcquiredMove: Option[Work.Move] =
def earliestNonAcquiredTask: Option[Work.Task] =
state.values.filter(_.nonAcquired).minByOption(_.createdAt)

def getWork(workId: WorkId, key: ClientKey): GetWorkResult =
def apply(workId: WorkId, key: ClientKey): GetTaskResult =
state.get(workId) match
case None => GetWorkResult.NotFound
case Some(move) if move.isAcquiredBy(key) => GetWorkResult.Found(move)
case Some(move) => GetWorkResult.AcquiredByOther(move)

def updateOrGiveUp(candidates: List[Work.Move]): (AppState, List[Work.Move]) =
candidates.foldLeft[(AppState, List[Work.Move])](state -> Nil) { case ((state, xs), m) =>
m.clearAssignedKey match
case None => (state - m.id, m :: xs)
case Some(move) => (state.updated(m.id, move), xs)
case None => GetTaskResult.NotFound
case Some(task) if task.isAcquiredBy(key) => GetTaskResult.Found(task)
case Some(task) => GetTaskResult.AcquiredByOther(task)

def updateOrGiveUp(candidates: List[Work.Task]): (AppState, List[Work.Task]) =
candidates.foldLeft[(AppState, List[Work.Task])](state -> Nil) { case ((state, xs), task) =>
task.clearAssignedKey match
case None => (state - task.id, task :: xs)
case Some(unAssignedTask) => (state.updated(task.id, unAssignedTask), xs)
}
54 changes: 27 additions & 27 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import cats.effect.IO
import cats.effect.kernel.Ref
import cats.syntax.all.*
import java.time.Instant
import lila.fishnet.Lila.Request
import lila.fishnet.Work.Move
import org.typelevel.log4cats.Logger

/** Executor is responsible for: store work in memory
Expand All @@ -14,13 +12,15 @@ import org.typelevel.log4cats.Logger
* - adding work to the queue
*/
trait Executor:
// get a move from the queue return Work
def acquire(accquire: ClientKey): IO[Option[Work.RequestWithId]]
// get Work from Map => send to lila
// fishnet client tries to get an unassigned task
def acquire(accquire: ClientKey): IO[Option[Work.Task]]
// fishnet client sends the best move for it's assigned task
def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove): IO[Unit]
// add work to queue
// Lila sends a position
def add(work: Lila.Request): IO[Unit]
// clean up all works that are acquired before the given time
// clean up all works that are acquired before a given time
// this is to prevent tasks from being stuck in the queue
// this will be called periodically
def clean(before: Instant): IO[Unit]

object Executor:
Expand All @@ -33,38 +33,38 @@ object Executor:
.map: ref =>
new Executor:

def add(work: Request): IO[Unit] =
fromRequest(work).flatMap: move =>
def add(work: Lila.Request): IO[Unit] =
fromRequest(work).flatMap: task =>
ref.flatModify: state =>
val (newState, effect) =
if state.isFull(config.maxSize) then
AppState.empty ->
Logger[IO].warn(s"StateSize=${state.size} maxSize=${config.maxSize}. Dropping all!")
else state -> IO.unit
newState.addWork(move) -> effect
newState.addTask(task) -> effect

def acquire(key: ClientKey): IO[Option[Work.RequestWithId]] =
def acquire(key: ClientKey): IO[Option[Work.Task]] =
IO.realTimeInstant.flatMap: at =>
ref.modify(_.tryAcquireMove(key, at))
ref.modify(_.tryAcquireTask(key, at))

def move(workId: WorkId, key: ClientKey, response: BestMove): IO[Unit] =
ref.flatModify: state =>
state.getWork(workId, key) match
case GetWorkResult.NotFound => state -> logNotFound(workId, key)
case GetWorkResult.AcquiredByOther(move) => state -> logNotAcquired(move, key)
case GetWorkResult.Found(work) =>
state(workId, key) match
case GetTaskResult.NotFound => state -> logNotFound(workId, key)
case GetTaskResult.AcquiredByOther(task) => state -> logNotAcquired(task, key)
case GetTaskResult.Found(task) =>
response.uci match
case Some(uci) =>
state - work.id -> (monitor.success(work) >>
client.send(Lila.Move(work.request.id, work.request.moves, uci)))
state - task.id -> (monitor.success(task) >>
client.send(Lila.Move(task.request.id, task.request.moves, uci)))
case _ =>
val (newState, io) = work.clearAssignedKey match
val (newState, io) = task.clearAssignedKey match
case None =>
state - workId -> Logger[IO].warn(
s"Give up move due to invalid move $response of key $key for $work"
s"Give up move due to invalid move $response of key $key for $task"
)
case Some(updated) => state.updated(work.id, updated) -> IO.unit
newState -> io *> failure(work, key)
case Some(updated) => state.updated(task.id, updated) -> IO.unit
newState -> io *> failure(task, key)

def clean(since: Instant): IO[Unit] =
ref.flatModify: state =>
Expand All @@ -75,25 +75,25 @@ object Executor:
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move due to clean up: $m"))
*> monitor.updateSize(newState)

private def logIfTimedOut(state: AppState, timeOut: List[Work.Move]): IO[Unit] =
private def logIfTimedOut(state: AppState, timeOut: List[Work.Task]): IO[Unit] =
IO.whenA(timeOut.nonEmpty):
Logger[IO].debug(s"cleaning ${timeOut.size} of ${state.size} moves")
*> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))

private def failure(work: Work.Move, clientKey: ClientKey) =
private def failure(work: Work.Task, clientKey: ClientKey) =
Logger[IO].warn(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")

private def logNotFound(id: WorkId, clientKey: ClientKey) =
Logger[IO].info(s"Received unknown work $id by $clientKey")

private def logNotAcquired(work: Work.Move, clientKey: ClientKey) =
private def logNotAcquired(work: Work.Task, clientKey: ClientKey) =
Logger[IO].info(
s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
)

def fromRequest(req: Lila.Request): IO[Move] =
def fromRequest(req: Lila.Request): IO[Work.Task] =
(IO(Work.makeId), IO.realTimeInstant).mapN: (id, now) =>
Move(
Work.Task(
id = id,
request = req,
tries = 0,
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/scala/Jobs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object RedisSubscriberJob:
.readMoveReq(msg.message)
.match
case Some(request) => executor.add(request)
case None => Logger[IO].warn(s"Failed to parse message: $msg")
case None => Logger[IO].warn(s"Failed to parse message from lila: $msg")
>> Logger[IO].debug(s"Received message: $msg")
) *> pubsub.runMessages

Expand Down
8 changes: 4 additions & 4 deletions app/src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import kamon.metric.Timer
import java.time.Instant

trait Monitor:
def success(work: Work.Move): IO[Unit]
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit]
def success(work: Work.Task): IO[Unit]
def updateSize(map: Map[WorkId, Work.Task]): IO[Unit]

object Monitor:

Expand All @@ -21,13 +21,13 @@ object Monitor:

def apply: Monitor =
new Monitor:
def success(work: Work.Move): IO[Unit] =
def success(work: Work.Task): IO[Unit] =
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] =
def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] =
IO.delay(dbSize.update(map.size.toDouble)) *>
IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *>
IO.delay(dbAcquired.update(map.count(_._2.isAcquired).toDouble)).void
Expand Down
38 changes: 15 additions & 23 deletions app/src/main/scala/Work.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,40 @@ import java.time.Instant

object Work:

case class RequestWithId(id: WorkId, request: Lila.Request):
def toResponse =
Fishnet.WorkResponse(
work = Fishnet.Work(id = id, level = request.level, clock = request.clock),
game_id = request.id.value,
position = request.initialFen,
moves = request.moves,
variant = request.variant
)

case class Acquired(clientKey: ClientKey, date: Instant):
override def toString = s"by $clientKey at $date"

case class Move(
case class Task(
id: WorkId,
request: Lila.Request,
tries: Int,
acquired: Option[Acquired],
createdAt: Instant
):

def toRequestWithId =
RequestWithId(id, request)

def acquiredAt = acquired.map(_.date)
def acquiredByKey: Option[ClientKey] = acquired.map(_.clientKey)
def isAcquiredBy(clientKey: ClientKey) = acquiredByKey.contains(clientKey)
def isAcquired = acquired.isDefined
def nonAcquired = !isAcquired
def acquiredBefore(date: Instant) = acquiredAt.exists(_.isBefore(date))
def acquiredAt: Option[Instant] = acquired.map(_.date)
def isAcquired: Boolean = acquired.isDefined
def nonAcquired: Boolean = !isAcquired
def isAcquiredBy(clientKey: ClientKey): Boolean = acquired.exists(_.clientKey == clientKey)
def acquiredBefore(date: Instant): Boolean = acquiredAt.exists(_.isBefore(date))

def assignTo(clientKey: ClientKey, at: Instant) =
copy(acquired = Some(Acquired(clientKey = clientKey, date = at)), tries = tries + 1)

def isOutOfTries = tries >= 3

def similar(to: Move) = request.id == to.request.id && request.moves == to.request.moves

// returns the move without the acquired key if it's not out of tries
def clearAssignedKey: Option[Work.Move] =
def clearAssignedKey: Option[Work.Task] =
Option.when(!isOutOfTries)(copy(acquired = None))

def toResponse =
Fishnet.WorkResponse(
work = Fishnet.Work(id = id, level = request.level, clock = request.clock),
game_id = request.id,
position = request.initialFen,
moves = request.moves,
variant = request.variant
)
override def toString =
s"id:$id game:${request.id} variant:${request.variant.key} level:${request.level} tries:$tries created:$createdAt acquired:$acquired move: ${request.moves}"

Expand Down
6 changes: 3 additions & 3 deletions app/src/main/scala/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object Fishnet:

case class WorkResponse(
work: Work,
game_id: String,
game_id: GameId,
position: Fen.Epd,
moves: String,
variant: Variant
Expand All @@ -65,8 +65,6 @@ object Lila:
def sign = moves.takeRight(20).replace(" ", "")
def write = s"$gameId $sign ${uci.uci}"

case class Clock(wtime: Int, btime: Int, inc: Int) derives Codec.AsObject

case class Request(
id: GameId,
initialFen: Fen.Epd,
Expand All @@ -76,6 +74,8 @@ object Lila:
clock: Option[Clock]
)

case class Clock(wtime: Int, btime: Int, inc: Int) derives Codec.AsObject

def readMoveReq(msg: String): Option[Request] =
msg.split(";", 6) match
case Array(gameId, levelS, clockS, variantS, initialFenS, moves) =>
Expand Down
2 changes: 1 addition & 1 deletion app/src/test/scala/ExecutorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object ExecutorTest extends SimpleIOSuite:
_ <- executor.move(workId, key, invalidMove)
acquiredOption <- executor.acquire(key)
acquired = acquiredOption.get
yield expect.same(acquired, Work.RequestWithId(workId, request))
yield expect.same(acquired.request, request)

test("should not give up after 2 tries"):
for
Expand Down
4 changes: 2 additions & 2 deletions app/src/test/scala/Helper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ object Helper:

val noopMonitor: Monitor =
new Monitor:
def success(work: Work.Move): IO[Unit] = IO.unit
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit
def success(work: Work.Task): IO[Unit] = IO.unit
def updateSize(map: Map[WorkId, Work.Task]): IO[Unit] = IO.unit

val noopLilaClient: LilaClient =
new LilaClient:
Expand Down
11 changes: 7 additions & 4 deletions app/src/test/scala/http/FishnetRoutesTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object FishnetRoutesTest extends SimpleIOSuite:
"variant": "Standard"
}"""

val requestWithId = Work.RequestWithId(
val task = Work.Task(
id = WorkId("workid"),
request = Lila.Request(
id = GameId("gameid"),
Expand All @@ -56,7 +56,10 @@ object FishnetRoutesTest extends SimpleIOSuite:
variant = chess.variant.Standard,
level = 1,
clock = Some(Lila.Clock(wtime = 600, btime = 600, inc = 0))
)
),
tries = 1,
acquired = none,
createdAt = Instant.now
)

test("POST /fishnet/acquire should return work response"):
Expand Down Expand Up @@ -89,9 +92,9 @@ object FishnetRoutesTest extends SimpleIOSuite:

def createExecutor(): Executor =
new Executor:
def acquire(key: ClientKey) = IO.pure(requestWithId.some)
def acquire(key: ClientKey) = IO.pure(task.some)
def move(id: WorkId, key: ClientKey, move: BestMove): IO[Unit] =
if id == requestWithId.id then IO.unit
if id == task.id then IO.unit
else IO.raiseError(new Exception("invalid work id"))
def add(request: Lila.Request): IO[Unit] = IO.unit
def clean(before: Instant) = IO.unit

0 comments on commit e789994

Please sign in to comment.