Skip to content

Commit

Permalink
Merge pull request #281 from lichess-org/preserve-state-after-dead
Browse files Browse the repository at this point in the history
Preserve state after dead
  • Loading branch information
lenguyenthanh authored Mar 10, 2024
2 parents 791e9c6 + 37069b8 commit 84c18e2
Show file tree
Hide file tree
Showing 16 changed files with 231 additions and 83 deletions.
5 changes: 3 additions & 2 deletions app/src/main/scala/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ class FishnetApp(res: AppResources, config: AppConfig)(using Logger[IO]):
def run(): Resource[IO, Unit] =
for
lilaClient <- Resource.pure(LilaClient(res.redisPubsub))
monitor = Monitor.apply
executor <- Executor.instance(lilaClient, monitor, config.executor).toResource
monitor = Monitor.apply
repository = StateRepository.instance(config.repository.path)
executor <- Executor.instance(lilaClient, repository, monitor, config.executor)
httpApi = HttpApi(executor, HealthCheck(), config.server)
server <- MkHttpServer.apply.newEmber(config.server, httpApi.httpApp)
_ <- RedisSubscriberJob(executor, res.redisPubsub).run().background
Expand Down
11 changes: 9 additions & 2 deletions app/src/main/scala/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ object AppConfig:
RedisConfig.config,
HttpServerConfig.config,
KamonConfig.config,
ExecutorConfg.config
ExecutorConfg.config,
RepositoryConfig.config
).parMapN(AppConfig.apply)

case class AppConfig(
redis: RedisConfig,
server: HttpServerConfig,
kamon: KamonConfig,
executor: ExecutorConfig
executor: ExecutorConfig,
repository: RepositoryConfig
)

case class HttpServerConfig(host: Host, port: Port, apiLogger: Boolean)
Expand Down Expand Up @@ -49,3 +51,8 @@ case class ExecutorConfig(maxSize: Int)
object ExecutorConfg:
def maxSize = env("APP_MAX_MOVE_SIZE").or(prop("app.max.move.size")).as[Int].default(300)
def config = maxSize.map(ExecutorConfig.apply)

case class RepositoryConfig(path: Option[String])
object RepositoryConfig:
def path = env("APP_BACKUP_FILE").or(prop("app.backup.file")).as[String].option
def config = path.map(RepositoryConfig.apply)
5 changes: 5 additions & 0 deletions app/src/main/scala/AppState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ enum GetTaskResult:

object AppState:
val empty: AppState = Map.empty

def fromTasks(tasks: List[Work.Task]): AppState = tasks.map(t => t.id -> t).toMap

extension (state: AppState)

def tasks: List[Work.Task] = state.values.toList

inline def isFull(maxSize: Int): Boolean =
state.sizeIs >= maxSize

Expand Down
132 changes: 71 additions & 61 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package lila.fishnet

import cats.effect.kernel.Resource
import cats.effect.{ IO, Ref }
import cats.syntax.all.*
import org.typelevel.log4cats.Logger
Expand Down Expand Up @@ -27,69 +28,78 @@ object Executor:

import AppState.*

def instance(client: LilaClient, monitor: Monitor, config: ExecutorConfig)(using Logger[IO]): IO[Executor] =
def instance(client: LilaClient, repository: StateRepository, monitor: Monitor, config: ExecutorConfig)(
using Logger[IO]
): Resource[IO, Executor] =
Ref
.of[IO, AppState](AppState.empty)
.map: ref =>
new Executor:

def add(work: Lila.Request): IO[Unit] =
fromRequest(work).flatMap: task =>
ref.flatModify: state =>
val (newState, effect) =
if state.isFull(config.maxSize) then
AppState.empty ->
Logger[IO].warn(s"stateSize=${state.size} maxSize=${config.maxSize}. Dropping all!")
else state -> IO.unit
newState.add(task) -> effect *> monitor.updateSize(newState)

def acquire(key: ClientKey): IO[Option[Work.Task]] =
IO.realTimeInstant.flatMap: at =>
ref.modify(_.tryAcquire(key, at))

def move(workId: WorkId, key: ClientKey, response: BestMove): IO[Unit] =
ref.flatModify: state =>
state(workId, key) match
case GetTaskResult.NotFound => state -> logNotFound(workId, key)
case GetTaskResult.AcquiredByOther(task) => state -> logNotAcquired(task, key)
case GetTaskResult.Found(task) =>
response.uci match
case Some(uci) =>
state.remove(task.id) -> (monitor.success(task) >>
client.send(Lila.Move(task.request.id, task.request.moves, uci)))
case _ =>
val (newState, io) = task.clearAssignedKey match
case None =>
state.remove(workId) -> Logger[IO].warn(
s"Give up move due to invalid move $response by $key for $task"
)
case Some(updated) => state.add(updated) -> IO.unit
newState -> io *> failure(task, key)

def clean(since: Instant): IO[Unit] =
ref.flatModify: state =>
val timedOut = state.acquiredBefore(since)
val timedOutLogs = logTimedOut(state, timedOut)
val (newState, gavedUpMoves) = state.updateOrGiveUp(timedOut)
newState -> timedOutLogs
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move due to clean up: $m"))
*> monitor.updateSize(newState)

private def logTimedOut(state: AppState, timeOut: List[Work.Task]): IO[Unit] =
IO.whenA(timeOut.nonEmpty):
Logger[IO].info(s"cleaning ${timeOut.size} of ${state.size} moves")
*> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))

private def failure(work: Work.Task, clientKey: ClientKey) =
Logger[IO].warn(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")

private def logNotFound(id: WorkId, clientKey: ClientKey) =
Logger[IO].info(s"Received unknown work $id by $clientKey")

private def logNotAcquired(work: Work.Task, clientKey: ClientKey) =
Logger[IO].info(
s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
)
.toResource
.flatMap: ref =>
Resource
.make(repository.get.flatMap(ref.set))(_ => ref.get.flatMap(repository.save))
.as(instance(ref, client, monitor, config))

def instance(ref: Ref[IO, AppState], client: LilaClient, monitor: Monitor, config: ExecutorConfig)(using
Logger[IO]
): Executor =
new Executor:
def add(work: Lila.Request): IO[Unit] =
fromRequest(work).flatMap: task =>
ref.flatModify: state =>
val (newState, effect) =
if state.isFull(config.maxSize) then
AppState.empty ->
Logger[IO].warn(s"stateSize=${state.size} maxSize=${config.maxSize}. Dropping all!")
else state -> IO.unit
newState.add(task) -> effect *> monitor.updateSize(newState)

def acquire(key: ClientKey): IO[Option[Work.Task]] =
IO.realTimeInstant.flatMap: at =>
ref.modify(_.tryAcquire(key, at))

def move(workId: WorkId, key: ClientKey, response: BestMove): IO[Unit] =
ref.flatModify: state =>
state(workId, key) match
case GetTaskResult.NotFound => state -> logNotFound(workId, key)
case GetTaskResult.AcquiredByOther(task) => state -> logNotAcquired(task, key)
case GetTaskResult.Found(task) =>
response.uci match
case Some(uci) =>
state.remove(task.id) -> (monitor.success(task) >>
client.send(Lila.Move(task.request.id, task.request.moves, uci)))
case _ =>
val (newState, io) = task.clearAssignedKey match
case None =>
state.remove(workId) -> Logger[IO].warn(
s"Give up move due to invalid move $response by $key for $task"
)
case Some(updated) => state.add(updated) -> IO.unit
newState -> io *> failure(task, key)

def clean(since: Instant): IO[Unit] =
ref.flatModify: state =>
val timedOut = state.acquiredBefore(since)
val timedOutLogs = logTimedOut(state, timedOut)
val (newState, gavedUpMoves) = state.updateOrGiveUp(timedOut)
newState -> timedOutLogs
*> gavedUpMoves.traverse_(m => Logger[IO].warn(s"Give up move due to clean up: $m"))
*> monitor.updateSize(newState)

private def logTimedOut(state: AppState, timeOut: List[Work.Task]): IO[Unit] =
IO.whenA(timeOut.nonEmpty):
Logger[IO].info(s"cleaning ${timeOut.size} of ${state.size} moves")
*> timeOut.traverse_(m => Logger[IO].info(s"Timeout move: $m"))

private def failure(work: Work.Task, clientKey: ClientKey) =
Logger[IO].warn(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")

private def logNotFound(id: WorkId, clientKey: ClientKey) =
Logger[IO].info(s"Received unknown work $id by $clientKey")

private def logNotAcquired(work: Work.Task, clientKey: ClientKey) =
Logger[IO].info(
s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
)

def fromRequest(req: Lila.Request): IO[Work.Task] =
(makeId, IO.realTimeInstant).mapN: (id, now) =>
Expand Down
61 changes: 61 additions & 0 deletions app/src/main/scala/StateRepository.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package lila.fishnet

import cats.effect.IO
import cats.syntax.all.*
import org.typelevel.log4cats.Logger

trait StateRepository:
def get: IO[AppState]
def save(state: AppState): IO[Unit]

object StateRepository:

def instance(path: Option[String])(using Logger[IO]): StateRepository =
path.fold(noop)(file(_))

def noop(using Logger[IO]): StateRepository =
new StateRepository:
def get: IO[AppState] =
Logger[IO].info("There is no configed path, return empty AppState") *> IO(AppState.empty)
def save(state: AppState): IO[Unit] = Logger[IO].info("There is no configed path, do nothing")

def file(_path: String)(using Logger[IO]): StateRepository =
val path = fs2.io.file.Path(_path)
new StateRepository:
def get: IO[AppState] =
Logger[IO].info(s"Reading state from $path") *>
fs2.io.file
.Files[IO]
.readAll(path)
.through(TasksSerDe.deserialize)
.compile
.toList
.map(AppState.fromTasks)
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to read state from $path") *> IO.pure(AppState.empty)

def save(state: AppState): IO[Unit] =
Logger[IO].info(s"Saving ${state.size} tasks to $path") *>
fs2.Stream
.emits(state.tasks)
.through(TasksSerDe.serialize)
.through(fs2.text.utf8.encode)
.through(fs2.io.file.Files[IO].writeAll(path))
.compile
.drain
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to write state to $path")

object TasksSerDe:

import fs2.data.json.*
import fs2.data.json.circe.*

def deserialize: fs2.Pipe[IO, Byte, Work.Task] =
_.through(fs2.text.utf8.decode)
.through(tokens[IO, String])
.through(codec.deserialize[IO, Work.Task])

def serialize: fs2.Pipe[IO, Work.Task, String] =
_.through(codec.serialize[IO, Work.Task])
.through(render.compact)
7 changes: 5 additions & 2 deletions app/src/main/scala/Work.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package lila.fishnet

import io.circe.{ Codec, Decoder, Encoder }

import java.time.Instant

object Work:

case class Acquired(clientKey: ClientKey, date: Instant):
case class Acquired(clientKey: ClientKey, date: Instant) derives Codec.AsObject:
override def toString = s"by $clientKey at $date"

case class Task(
Expand All @@ -13,7 +15,7 @@ object Work:
tries: Int,
acquired: Option[Acquired],
createdAt: Instant
):
) derives Codec.AsObject:

def acquiredAt: Option[Instant] = acquired.map(_.date)
def isAcquired: Boolean = acquired.isDefined
Expand All @@ -38,5 +40,6 @@ object Work:
moves = request.moves,
variant = request.variant
)

override def toString =
s"id:$id game:${request.id} variant:${request.variant.key} level:${request.level} tries:$tries created:$createdAt acquired:$acquired move: ${request.moves}"
15 changes: 12 additions & 3 deletions app/src/main/scala/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@ object GameId:
given Decoder[GameId] = decodeString
extension (bm: GameId) def value: String = bm

object ChessCirceCodecs:
given Encoder[Fen.Epd] = encodeString.contramap(_.value)
given Decoder[Fen.Epd] = decodeString.map(Fen.Epd.apply)
given Encoder[Variant] = encodeString.contramap(_.name)
given Decoder[Variant] =
decodeString.emap: s =>
Variant.byName(s).toRight(s"Invalid variant: $s")

object Fishnet:

given Encoder[Fen.Epd] = Encoder.encodeString.contramap(_.value)
given Encoder[Variant] = Encoder.encodeString.contramap(_.name)
import ChessCirceCodecs.given

case class Acquire(fishnet: Fishnet) derives Codec.AsObject
case class Fishnet(version: String, apikey: ClientKey) derives Codec.AsObject
Expand All @@ -61,6 +68,8 @@ object Fishnet:

object Lila:

import ChessCirceCodecs.given

case class Move(gameId: GameId, moves: String, uci: Uci):
def sign = moves.takeRight(20).replace(" ", "")
def write = s"$gameId $sign ${uci.uci}"
Expand All @@ -72,7 +81,7 @@ object Lila:
moves: String,
level: Int,
clock: Option[Clock]
)
) derives Codec.AsObject

case class Clock(wtime: Int, btime: Int, inc: Int) derives Codec.AsObject

Expand Down
11 changes: 11 additions & 0 deletions app/src/test/scala/AppStateTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import Arbitraries.given

class AppStateTest extends ScalaCheckSuite:

override def scalaCheckInitialSeed = "lwfNzhdC038hCsaHpM4QBkFYs5eFtR9GLPHuzIE08KP="

test("tasks.fromTasks == identity"):
forAll: (state: AppState) =>
assertEquals(AppState.fromTasks(state.tasks), state)

test("tasks.fromTasks == tasks"):
forAll: (ts: List[Work.Task]) =>
val tasks = ts.distinctBy(_.id)
assertEquals(AppState.fromTasks(tasks).tasks.toSet, tasks.toSet)

test("isFull"):
forAll: (state: AppState) =>
state.isFull(10) || state.size < 10
Expand Down
2 changes: 2 additions & 0 deletions app/src/test/scala/CleaningJobTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ object CleaningJobTest extends SimpleIOSuite:
def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove) = IO.unit
def add(work: Lila.Request) = IO.unit
def clean(before: Instant) = ref.update(_ + 1)
def onStart = IO.unit
def onStop = IO.unit
Loading

0 comments on commit 84c18e2

Please sign in to comment.