Skip to content

Commit

Permalink
Merge pull request #448 from jluhrs/log_subscription
Browse files Browse the repository at this point in the history
Implemented logs subscription in GraphQl.
  • Loading branch information
jluhrs authored Nov 7, 2023
2 parents fc5ac1a + 93fc57a commit 6b79489
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 110 deletions.
20 changes: 19 additions & 1 deletion modules/web/server/src/main/resources/NewTCC.graphql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#import NonEmptyString, Sidereal, TargetId, Nonsidereal from "lucuma/schemas/ObservationDB.graphql"
#import SiderealInput, NonsiderealInput, WavelengthInput, AngleInput from "lucuma/schemas/ObservationDB.graphql"
#import Long, BigDecimal from "lucuma/schemas/ObservationDB.graphql"
#import Long, BigDecimal, Timestamp from "lucuma/schemas/ObservationDB.graphql"

"""Target properties input"""
input TargetPropertiesInput {
Expand Down Expand Up @@ -121,6 +121,21 @@ type Target {
nonsidereal: Nonsidereal
}

enum LogLevel {
ERROR
WARN
INFO
DEBUG
TRACE
}

type LogMessage {
timestamp: Timestamp!
level: LogLevel!
thread: String!
message: String!
}

enum OperationResult {
SUCCESS
FAILURE
Expand Down Expand Up @@ -149,3 +164,6 @@ type Mutation {
oiwfsFollow(enable: Boolean!): OperationOutcome!
}

type Subscription {
logMessage: LogMessage!
}
6 changes: 3 additions & 3 deletions modules/web/server/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
<configuration scan="true" scanPeriod="60 seconds">
<logger name="org.http4s" level="INFO"/>

<if condition='property("HOSTNAME").contains("observe")'>
<if condition='property("HOSTNAME").contains("navigate")'>
<!-- Configuration when running on the test or production servers -->
<then>
<!-- Files are rotated daily for up to 90 days -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/gemsoft/var/log/observe/observe.log</file>
<file>/gemsoft/var/log/navigate/navigate.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/gemsoft/var/log/observe/observe.%d{yyyy-MM-dd}.log</fileNamePattern>
<fileNamePattern>/gemsoft/var/log/navigate/navigate.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>90</maxHistory>
<totalSizeCap>3GB</totalSizeCap>
</rollingPolicy>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package navigate.web.server.http4s

import cats.effect.Async
import cats.syntax.all.*
import ch.qos.logback.classic.spi.ILoggingEvent
import fs2.compression.Compression
import fs2.concurrent.Topic
import lucuma.graphql.routes.GraphQLService
import lucuma.graphql.routes.Routes
import natchez.Trace
Expand All @@ -22,11 +24,15 @@ import org.http4s.server.websocket.WebSocketBuilder2
import org.typelevel.log4cats.Logger

class GraphQlRoutes[F[_]: Async: Logger: Trace: Compression](
eng: NavigateEngine[F]
eng: NavigateEngine[F],
logTopic: Topic[F, ILoggingEvent]
) extends Http4sDsl[F] {

private def commandServices(wsb: WebSocketBuilder2[F]): HttpRoutes[F] = GZip(
Routes.forService(_ => NavigateMappings(eng).map(GraphQLService[F](_).some), wsb, "navigate")
Routes.forService(_ => NavigateMappings(eng, logTopic).map(GraphQLService[F](_).some),
wsb,
"navigate"
)
)

def service(wsb: WebSocketBuilder2[F]): HttpRoutes[F] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import cats.data.NonEmptyChain
import cats.effect.Sync
import cats.effect.syntax.all.*
import cats.syntax.all.*
import ch.qos.logback.classic.spi.ILoggingEvent
import edu.gemini.schema.util.SchemaStitcher
import edu.gemini.schema.util.SourceResolver
import fs2.concurrent.Topic
import grackle.Cursor
import grackle.Env
import grackle.Mapping
Expand Down Expand Up @@ -84,8 +86,11 @@ import spire.math.Algebraic.Expr.Sub
import java.nio.file.Path as JPath
import scala.reflect.classTag

class NavigateMappings[F[_]: Sync](server: NavigateEngine[F])(override val schema: Schema)
extends CirceMapping[F] {
import encoder.given

class NavigateMappings[F[_]: Sync](server: NavigateEngine[F], logTopic: Topic[F, ILoggingEvent])(
override val schema: Schema
) extends CirceMapping[F] {
import NavigateMappings._

def mountPark(p: Path, env: Env): F[Result[OperationOutcome]] =
Expand Down Expand Up @@ -251,6 +256,7 @@ class NavigateMappings[F[_]: Sync](server: NavigateEngine[F])(override val schem
)

val MutationType: TypeRef = schema.ref("Mutation")
val SubscriptionType: TypeRef = schema.ref("Subscription")
val ParkStatusType: TypeRef = schema.ref("ParkStatus")
val FollowStatusType: TypeRef = schema.ref("FollowStatus")
val OperationOutcomeType: TypeRef = schema.ref("OperationOutcome")
Expand Down Expand Up @@ -320,6 +326,18 @@ class NavigateMappings[F[_]: Sync](server: NavigateEngine[F])(override val schem
RootEffect.computeEncodable("oiwfsFollow")((p, env) => oiwfsFollow(p, env))
)
),
ObjectMapping(
tpe = SubscriptionType,
List(
RootStream.computeCursor("logMessage") { (p, env) =>
logTopic
.subscribe(10)
.map(_.asJson)
.map(circeCursor(p, env, _))
.map(Result.success)
}
)
),
LeafMapping[ParkStatus](ParkStatusType),
LeafMapping[FollowStatus](FollowStatusType),
LeafMapping[OperationOutcome](OperationOutcomeType),
Expand All @@ -334,9 +352,13 @@ object NavigateMappings extends GrackleParsers {
.apply[F](JPath.of("NewTCC.graphql"), SourceResolver.fromResource(getClass.getClassLoader))
.build

def apply[F[_]: Sync](server: NavigateEngine[F]): F[NavigateMappings[F]] = loadSchema.flatMap {
case Result.Success(schema) => new NavigateMappings[F](server)(schema).pure[F]
case Result.Warning(problems, schema) => new NavigateMappings[F](server)(schema).pure[F]
def apply[F[_]: Sync](
server: NavigateEngine[F],
logTopic: Topic[F, ILoggingEvent]
): F[NavigateMappings[F]] = loadSchema.flatMap {
case Result.Success(schema) => new NavigateMappings[F](server, logTopic)(schema).pure[F]
case Result.Warning(problems, schema) =>
new NavigateMappings[F](server, logTopic)(schema).pure[F]
case Result.Failure(problems) =>
Sync[F].raiseError[NavigateMappings[F]](
new Throwable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import navigate.web.server.common.LogInitialization
import navigate.web.server.common.RedirectToHttpsRoutes
import navigate.web.server.common.StaticRoutes
import navigate.web.server.config.*
import navigate.web.server.logging.AppenderForClients
import navigate.web.server.logging.SubscriptionAppender
import navigate.web.server.logging.given
import navigate.web.server.security.AuthenticationService
import org.http4s.HttpRoutes
Expand Down Expand Up @@ -113,6 +113,7 @@ object WebServerLauncher extends IOApp with LogInitialization {
conf: NavigateConfiguration,
as: AuthenticationService[F],
outputs: Topic[F, NavigateEvent],
logTopic: Topic[F, ILoggingEvent],
se: NavigateEngine[F],
clientsDb: ClientsSetDb[F]
): Resource[F, Server] = {
Expand All @@ -132,7 +133,7 @@ object WebServerLauncher extends IOApp with LogInitialization {
"/api/navigate/commands" -> new NavigateCommandRoutes(as, se).service,
"/api" -> new NavigateUIApiRoutes(conf.site, conf.mode, as, clientsDb, outputs)
.service(wsBuilder),
"/graphqlapi" -> new GraphQlRoutes(se).service(wsBuilder)
"/graphqlapi" -> new GraphQlRoutes(se, logTopic).service(wsBuilder)
)

val pingRouter = Router[F](
Expand Down Expand Up @@ -175,14 +176,14 @@ object WebServerLauncher extends IOApp with LogInitialization {
// We need to manually update the configuration of the logging subsystem
// to support capturing log messages and forward them to the clients
def logToClients(
out: Topic[IO, NavigateEvent],
out: Topic[IO, ILoggingEvent],
dispatcher: Dispatcher[IO]
): IO[Appender[ILoggingEvent]] = IO.apply {
import ch.qos.logback.classic.{AsyncAppender, Logger, LoggerContext}
import org.slf4j.LoggerFactory

val asyncAppender = new AsyncAppender
val appender = new AppenderForClients(out)(dispatcher)
val appender = new SubscriptionAppender[IO](out)(using dispatcher)
Option(LoggerFactory.getILoggerFactory)
.collect { case lc: LoggerContext =>
lc
Expand Down Expand Up @@ -240,12 +241,13 @@ object WebServerLauncher extends IOApp with LogInitialization {
def webServerIO(
conf: NavigateConfiguration,
out: Topic[IO, NavigateEvent],
log: Topic[IO, ILoggingEvent],
en: NavigateEngine[IO],
cs: ClientsSetDb[IO]
): Resource[IO, Unit] =
for {
as <- Resource.eval(authService[IO](conf.mode, conf.authentication))
_ <- webServer[IO](conf, as, out, en, cs)
_ <- webServer[IO](conf, as, out, log, en, cs)
} yield ()

def publishStats[F[_]: Temporal](cs: ClientsSetDb[F]): Stream[F, Unit] =
Expand All @@ -259,14 +261,15 @@ object WebServerLauncher extends IOApp with LogInitialization {
_ <- Resource.eval(printBanner(conf))
cli <- client(10.seconds)
out <- Resource.eval(Topic[IO, NavigateEvent])
log <- Resource.eval(Topic[IO, ILoggingEvent])
dsp <- Dispatcher.sequential[IO]
_ <- Resource.eval(logToClients(out, dsp))
_ <- Resource.eval(logToClients(log, dsp))
cs <- Resource.eval(
Ref.of[IO, ClientsSetDb.ClientsSet](Map.empty).map(ClientsSetDb.apply[IO](_))
)
_ <- Resource.eval(publishStats(cs).compile.drain.start)
engine <- engineIO(conf, cli)
_ <- webServerIO(conf, out, engine, cs)
_ <- webServerIO(conf, out, log, engine, cs)
_ <- Resource.eval(
out.subscribers
.evalMap(l => Logger[IO].debug(s"Subscribers amount: $l").whenA(l > 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,20 @@

package navigate.web.server.http4s

package object encoder
import ch.qos.logback.classic.spi.ILoggingEvent
import io.circe.Encoder
import io.circe.Json
import io.circe.syntax.*
import lucuma.core.util.Timestamp

package object encoder {

given Encoder[ILoggingEvent] = e =>
Json.obj(
"timestamp" -> Timestamp.fromInstant(e.getInstant).getOrElse(Timestamp.Min).asJson,
"level" -> e.getLevel.toString.asJson,
"thread" -> e.getThreadName.asJson,
"message" -> e.getFormattedMessage.asJson
)

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package navigate.web.server.logging

import cats.effect.std.Dispatcher
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.UnsynchronizedAppenderBase
import fs2.concurrent.Topic

class SubscriptionAppender[F[_]](logTopic: Topic[F, ILoggingEvent])(using dispatcher: Dispatcher[F])
extends UnsynchronizedAppenderBase[ILoggingEvent] {
// Remove some loggers. This is a weak form of protection where he don't send some
// loggers to the client, e.g. security related logs
private val blackListedLoggers = List(""".*\.security\..*""".r)

override def append(event: ILoggingEvent): Unit =
// Send a message to the clients if level is INFO or higher
// We are outside the normal execution loop, thus we need to call unsafePerformSync directly
if (
event.getLevel.isGreaterOrEqual(Level.INFO) && !blackListedLoggers.exists(
_.findFirstIn(event.getLoggerName).isDefined
)
)
dispatcher.unsafeRunAndForget(
logTopic.publish1(event)
)
else ()

}

This file was deleted.

Loading

0 comments on commit 6b79489

Please sign in to comment.