Skip to content

Commit

Permalink
Add the project last update projection for incoming passivation featu…
Browse files Browse the repository at this point in the history
…re (#5172)

* Add the project last update projection for incoming passivation feature

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Oct 8, 2024
1 parent 24f88bc commit 75061c7
Show file tree
Hide file tree
Showing 19 changed files with 464 additions and 50 deletions.
14 changes: 14 additions & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,20 @@ app {
}
}

project-last-update {
batch {
# the maximum batching size, corresponding to the maximum number of elements being aggregated
# at the same time before pushing the update.
max-elements = 100
# the maximum batching duration.
max-interval = 1 seconds
}
query {
batch-size = 30
refresh-strategy = 1s
}
}

# Type hierarchy configuration
type-hierarchy {
# the type hierarchy event-log configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.ResourcesConfig
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.SchemasConfig
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseConfig
import ch.epfl.bluebrain.nexus.delta.sdk.typehierarchy.TypeHierarchyConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportConfig
import com.typesafe.config.Config
import pureconfig.ConfigReader
Expand Down Expand Up @@ -52,6 +52,7 @@ final case class AppConfig(
serviceAccount: ServiceAccountConfig,
sse: SseConfig,
projections: ProjectionConfig,
projectLastUpdate: ProjectLastUpdateConfig,
fusion: FusionConfig,
`export`: ExportConfig,
jws: JWSConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{OwnerPermissionsScopeInitialization, ProjectsConfig, ScopeInitializationErrorStore}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}
Expand All @@ -57,6 +57,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[FusionConfig].from { appCfg.fusion }
make[ProjectsConfig].from { appCfg.projects }
make[ProjectionConfig].from { appCfg.projections }
make[ProjectLastUpdateConfig].from { appCfg.projectLastUpdate }
make[QueryConfig].from { appCfg.projections.query }
make[BaseUri].from { appCfg.http.baseUri }
make[StrictEntity].from { appCfg.http.strictEntityTimeout }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.wiring
import cats.effect.{Clock, IO, Sync}
import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
Expand All @@ -18,13 +18,8 @@ import izumi.distage.model.definition.ModuleDef
object StreamModule extends ModuleDef {
addImplicit[Sync[IO]]

make[GraphResourceStream].from {
(
qc: QueryConfig,
xas: Transactors,
shifts: ResourceShifts
) =>
GraphResourceStream(qc, xas, shifts)
make[GraphResourceStream].from { (qc: QueryConfig, xas: Transactors, shifts: ResourceShifts) =>
GraphResourceStream(qc, xas, shifts)
}

many[PipeDef].add(DiscardMetadata)
Expand Down Expand Up @@ -58,6 +53,13 @@ object StreamModule extends ModuleDef {
) => Supervisor(projections, projectionErrors, cfg)
}

make[ProjectLastUpdateStore].from { (xas: Transactors) => ProjectLastUpdateStore(xas) }

make[ProjectLastUpdateProjection].fromEffect {
(supervisor: Supervisor, store: ProjectLastUpdateStore, xas: Transactors, config: ProjectLastUpdateConfig) =>
ProjectLastUpdateProjection(supervisor, store, xas, config.batch, config.query)
}

make[PurgeProjectionCoordinator.type].fromEffect {
(supervisor: Supervisor, clock: Clock[IO], projections: Set[PurgeProjection]) =>
PurgeProjectionCoordinator(supervisor, clock, projections)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ch.epfl.bluebrain.nexus.delta.kernel.utils

object CollectionUtils {

/**
* Displays all elements of this collection between quotes and separated by commas.
* @param iterable
* the collection to display
* @return
* a string representation of the colleciton
* @example
* `CollectionUtils.quote(List(1, 2, 3)) = "'1','2','3'"`
*/
def quote(iterable: Iterable[_]): String = iterable.mkString("'", "','", "'")

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileState
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef, Tag}
Expand Down Expand Up @@ -102,7 +102,7 @@ object GraphAnalyticsStream {
case _ => IO.pure(Noop)
}

StreamingQuery.elems(project, start, SelectFilter.latest, qc, xas, decode)
StreamingQuery.elems(Scope(project), start, SelectFilter.latest, qc, xas, decode)
}
// $COVERAGE-ON$

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse
import akka.http.scaladsl.model.sse.ServerSentEvent
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling.defaultPrinter
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
Expand Down Expand Up @@ -59,12 +59,12 @@ object SseElemStream {
def apply(qc: QueryConfig, xas: Transactors): SseElemStream = new SseElemStream {

override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream =
StreamingQuery.elems(project, start, selectFilter, qc, xas).map(toServerSentEvent)
StreamingQuery.elems(Scope(project), start, selectFilter, qc, xas).map(toServerSentEvent)

override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream =
StreamingQuery
.elems(
project,
Scope(project),
start,
selectFilter,
qc.copy(refreshStrategy = RefreshStrategy.Stop),
Expand All @@ -77,7 +77,7 @@ object SseElemStream {
selectFilter: SelectFilter,
start: Offset
): IO[Option[RemainingElems]] =
StreamingQuery.remaining(project, selectFilter, start, xas)
StreamingQuery.remaining(Scope(project), selectFilter, start, xas)
}

private[sse] def toServerSentEvent(elem: Elem[Unit]): ServerSentEvent = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.stream

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
Expand Down Expand Up @@ -77,11 +77,11 @@ object GraphResourceStream {
): GraphResourceStream = new GraphResourceStream {

override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ElemStream[GraphResource] =
StreamingQuery.elems(project, start, selectFilter, qc, xas, shifts.decodeGraphResource(_, _))
StreamingQuery.elems(Scope(project), start, selectFilter, qc, xas, shifts.decodeGraphResource(_, _))

override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ElemStream[GraphResource] =
StreamingQuery.elems(
project,
Scope(project),
start,
selectFilter,
qc.copy(refreshStrategy = RefreshStrategy.Stop),
Expand All @@ -94,7 +94,7 @@ object GraphResourceStream {
selectFilter: SelectFilter,
start: Offset
): IO[Option[RemainingElems]] =
StreamingQuery.remaining(project, selectFilter, start, xas)
StreamingQuery.remaining(Scope(project), selectFilter, start, xas)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS public.project_last_updates(
org text NOT NULL,
project text NOT NULL,
last_instant timestamptz NOT NULL,
last_state_ordering bigint NOT NULL,
PRIMARY KEY(org, project)
);

CREATE INDEX IF NOT EXISTS project_last_updates_last_instant_idx ON public.project_last_updates(last_instant);

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.config

import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader

final case class ProjectLastUpdateConfig(batch: BatchConfig, query: QueryConfig)

object ProjectLastUpdateConfig {
implicit final val projectLastUpdateConfig: ConfigReader[ProjectLastUpdateConfig] =
deriveReader[ProjectLastUpdateConfig]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.projections

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate.ProjectLastUpdateMap
import doobie.Fragments
import doobie.syntax.all._
import doobie.postgres.implicits._

import java.time.Instant

/**
* Keeps track of the last update on a given project
*/
trait ProjectLastUpdateStore {

/**
* * Delete the entry for the given project
*/
def delete(project: ProjectRef): IO[Unit]

/**
* Inserts/updates a list of updates
*/
def save(updates: List[ProjectLastUpdate]): IO[Unit]

/**
* Fetch all updates from the database
*/
def fetchAll: IO[ProjectLastUpdateMap]

/**
* Fetch updates older than the given instant
*/
def fetchUpdates(after: Instant): IO[ProjectLastUpdateMap]

}

object ProjectLastUpdateStore {

def apply(xas: Transactors): ProjectLastUpdateStore = new ProjectLastUpdateStore {

override def delete(project: ProjectRef): IO[Unit] =
sql"""DELETE FROM project_last_updates WHERE org = ${project.organization} and project = ${project.project}""".update.run
.transact(xas.write)
.void

override def save(updates: List[ProjectLastUpdate]): IO[Unit] =
updates
.traverse(saveOne)
.transact(xas.write)
.void

private def saveOne(p: ProjectLastUpdate) =
sql"""INSERT INTO project_last_updates (org, project, last_instant, last_state_ordering)
|VALUES (${p.project.organization}, ${p.project.project} ,${p.lastInstant}, ${p.lastOrdering})
|ON CONFLICT (org, project)
|DO UPDATE set
| last_instant = EXCLUDED.last_instant,
| last_state_ordering = EXCLUDED.last_state_ordering;
|""".stripMargin.update.run

override def fetchAll: IO[ProjectLastUpdateMap] = fetch(None)

override def fetchUpdates(after: Instant): IO[ProjectLastUpdateMap] = fetch(Some(after))

private def fetch(after: Option[Instant]) = {
val afterFragment = after.map { a => fr"last_instant > $a" }
sql"""SELECT * from project_last_updates ${Fragments.whereAndOpt(afterFragment)}"""
.query[ProjectLastUpdate]
.map { plu => plu.project -> plu }
.toMap
.transact(xas.read)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestar
import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectionMetadata, ProjectionProgress, ProjectionStore}
import ch.epfl.bluebrain.nexus.delta.sourcing.{ProgressStatistics, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.{ProgressStatistics, Scope, Transactors}
import fs2.Stream

import java.time.Instant
Expand Down Expand Up @@ -135,11 +135,12 @@ object Projections {
for {
current <- progress(projectionId)
remaining <-
StreamingQuery.remaining(project, selectFilter, current.fold(Offset.start)(_.offset), xas)
StreamingQuery.remaining(Scope(project), selectFilter, current.fold(Offset.start)(_.offset), xas)
} yield ProgressStatistics(current, remaining)
}

val purgeRestartMetadata = ProjectionMetadata("system", "purge-projection-restarts", None, None)
private val purgeRestartMetadata = ProjectionMetadata("system", "purge-projection-restarts", None, None)

def purgeExpiredRestarts(projections: Projections, config: PurgeConfig): PurgeProjection =
PurgeProjection(purgeRestartMetadata, config, projections.deleteExpiredRestarts)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.projections.model

import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import doobie.postgres.implicits._
import doobie.Read

import java.time.Instant

final case class ProjectLastUpdate(project: ProjectRef, lastInstant: Instant, lastOrdering: Offset)

object ProjectLastUpdate {

type ProjectLastUpdateMap = Map[ProjectRef, ProjectLastUpdate]

implicit val projectLastUpdateRead: Read[ProjectLastUpdate] =
Read[(Label, Label, Instant, Offset)].map { case (org, project, instant, offset) =>
ProjectLastUpdate(ProjectRef(org, project), instant, offset)
}

}
Loading

0 comments on commit 75061c7

Please sign in to comment.