Skip to content

Commit

Permalink
Merge pull request #350 from lichess-org/improve-kamon
Browse files Browse the repository at this point in the history
Make kamon calls blocking
  • Loading branch information
lenguyenthanh authored Sep 27, 2024
2 parents f000529 + 8bbea1b commit ae1ec89
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 20 deletions.
6 changes: 3 additions & 3 deletions app/src/main/scala/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ object App extends IOApp.Simple:

def app: Resource[IO, Unit] =
for
config <- AppConfig.load.toResource
config <- AppConfig.load().toResource
_ <- Logger[IO].info(s"Starting lila-fishnet with config: $config").toResource
_ <- KamonInitiator.apply.init(config.kamon).toResource
_ <- KamonInitiator.apply().init(config.kamon).toResource
res <- AppResources.instance(config.redis)
_ <- FishnetApp(res, config).run()
yield ()
Expand All @@ -34,4 +34,4 @@ class FishnetApp(res: AppResources, config: AppConfig)(using Logger[IO]):
private def createExecutor: Resource[IO, Executor] =
val lilaClient = LilaClient(res.redisPubsub)
val repository = StateRepository.instance(config.repository.path)
Executor.instance(lilaClient, repository, Monitor(), config.executor)
Monitor().toResource.flatMap(Executor.instance(lilaClient, repository, _, config.executor))
2 changes: 1 addition & 1 deletion app/src/main/scala/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.comcast.ip4s.*

object AppConfig:

def load: IO[AppConfig] = appConfig.load[IO]
def load(): IO[AppConfig] = appConfig.load[IO]

def appConfig = (
RedisConfig.config,
Expand Down
4 changes: 2 additions & 2 deletions app/src/main/scala/KamonInitiator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ trait KamonInitiator:
def init(config: KamonConfig): IO[Unit]

object KamonInitiator:
def apply: KamonInitiator = new:
def apply(): KamonInitiator = new:
def init(config: KamonConfig): IO[Unit] =
IO(Kamon.init()).whenA(config.enabled)
IO.blocking(Kamon.init()).whenA(config.enabled)
37 changes: 23 additions & 14 deletions app/src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lila.fishnet

import cats.effect.IO
import cats.syntax.all.*
import kamon.Kamon
import kamon.metric.Timer

Expand All @@ -20,17 +21,25 @@ object Monitor:
val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags()
val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags()

def apply(): Monitor = new:
def success(work: Work.Task): IO[Unit] =
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def updateSize(state: AppState): IO[Unit] =
IO(dbSize.update(state.size.toDouble)) *>
IO(dbQueued.update(state.count(_.nonAcquired).toDouble)) *>
IO(dbAcquired.update(state.count(_.isAcquired).toDouble)).void

private def record(timer: Timer, start: Instant, end: Instant): Unit =
val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS)
def apply(): IO[Monitor] =
(
IO.blocking(Kamon.gauge("db.size").withoutTags()),
IO.blocking(Kamon.gauge("db.queued").withoutTags()),
IO.blocking(Kamon.gauge("db.acquired").withoutTags()),
IO.blocking(Kamon.timer("move.acquired.lvl8").withoutTags()),
IO.blocking(Kamon.timer("move.full.lvl1").withoutTags())
).parMapN: (dbSize, dbQueued, dbAcquired, lvl8AcquiredTimeRequest, lvl1FullTimeRequest) =>
new Monitor:
def success(work: Work.Task): IO[Unit] =
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
else if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def updateSize(state: AppState): IO[Unit] =
IO(dbSize.update(state.size.toDouble)) *>
IO(dbQueued.update(state.count(_.nonAcquired).toDouble)) *>
IO(dbAcquired.update(state.count(_.isAcquired).toDouble)).void

private def record(timer: Timer, start: Instant, end: Instant): Unit =
val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS)

0 comments on commit ae1ec89

Please sign in to comment.