Skip to content

Commit

Permalink
Add purge routine for tombstones (#5031)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Jun 19, 2024
1 parent 9846da9 commit 0ca8dbc
Show file tree
Hide file tree
Showing 19 changed files with 269 additions and 237 deletions.
2 changes: 2 additions & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ app {
delete-expired-every = 5 minutes
# the time after which stored projection errors will be deleted
failed-elem-ttl = 14 days
# the time after which tombstones will be deleted
tombstone-ttl = 7 days
# the time after which stored restarts will be deleted
restart-ttl = 1 hour
query = ${app.defaults.query}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
import ch.epfl.bluebrain.nexus.delta.sourcing.tombstone.TombstoneStore
import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, PurgeElemFailures, Transactors}
import izumi.distage.model.definition.ModuleDef

Expand Down Expand Up @@ -41,7 +43,7 @@ object StreamModule extends ModuleDef {
}

make[Projections].from { (xas: Transactors, cfg: ProjectionConfig, clock: Clock[IO]) =>
Projections(xas, cfg.query, cfg.restartTtl, clock)
Projections(xas, cfg.query, clock)
}

make[ProjectionErrors].from { (xas: Transactors, clock: Clock[IO], cfg: ProjectionConfig) =>
Expand All @@ -53,17 +55,23 @@ object StreamModule extends ModuleDef {
projections: Projections,
projectionErrors: ProjectionErrors,
cfg: ProjectionConfig
) =>
Supervisor(projections, projectionErrors, cfg)
) => Supervisor(projections, projectionErrors, cfg)
}

make[PurgeProjectionCoordinator.type].fromEffect {
(supervisor: Supervisor, clock: Clock[IO], projections: Set[PurgeProjection]) =>
PurgeProjectionCoordinator(supervisor, clock, projections)
}

many[PurgeProjection].add { (config: ProjectionConfig, xas: Transactors) =>
DeleteExpired(config.deleteExpiredEvery, xas)
}

make[DeleteExpired].fromEffect {
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO]) =>
DeleteExpired(supervisor, config, xas, clock)
many[PurgeProjection].add { (config: ProjectionConfig, xas: Transactors) =>
PurgeElemFailures(config.failedElemPurge, xas)
}

make[PurgeElemFailures].fromEffect {
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO]) =>
PurgeElemFailures(supervisor, config, xas, clock)
many[PurgeProjection].add { (config: ProjectionConfig, xas: Transactors) =>
TombstoneStore.deleteExpired(config.tombstonePurge, xas)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionProgress

import java.time.Instant
import scala.concurrent.duration._

class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures {

private lazy val projections = Projections(xas, queryConfig, 1.hour, clock)
private lazy val projections = Projections(xas, queryConfig, clock)
private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock)

private val myId = nxv + "myid"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ReferenceRegistry, Supervisor}
import distage.ModuleDef
import izumi.distage.model.definition.Id
Expand Down Expand Up @@ -139,28 +140,30 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
)
}

make[CompositeProjections].fromEffect {
make[CompositeRestartStore].from { (xas: Transactors) =>
new CompositeRestartStore(xas)
}

make[CompositeProjections].from {
(
supervisor: Supervisor,
xas: Transactors,
compositeRestartStore: CompositeRestartStore,
config: CompositeViewsConfig,
projectionConfig: ProjectionConfig,
clock: Clock[IO]
clock: Clock[IO],
xas: Transactors
) =>
val compositeRestartStore = new CompositeRestartStore(xas)
val compositeProjections =
CompositeProjections(
compositeRestartStore,
xas,
projectionConfig.query,
projectionConfig.batch,
config.restartCheckInterval,
clock
)

CompositeRestartStore
.deleteExpired(compositeRestartStore, supervisor, projectionConfig, clock)
.as(compositeProjections)
CompositeProjections(
compositeRestartStore,
xas,
projectionConfig.query,
projectionConfig.batch,
config.restartCheckInterval,
clock
)
}

many[PurgeProjection].add { (compositeRestartStore: CompositeRestartStore, projectionConfig: ProjectionConfig) =>
CompositeRestartStore.purgeExpiredRestarts(compositeRestartStore, projectionConfig.restartPurge)
}

make[CompositeSpaces].from {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store

import cats.effect.{Clock, IO}
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeRestart
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeRestart.entityType
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store.CompositeRestartStore.logger
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import doobie.implicits._
import doobie.postgres.implicits._
import fs2.Stream
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import io.circe.Json
import io.circe.syntax.EncoderOps

Expand Down Expand Up @@ -98,33 +98,11 @@ object CompositeRestartStore {
* Register the task to delete expired restarts in the supervisor
* @param store
* the store
* @param supervisor
* the supervisor
* @param config
* the projection config
*/
def deleteExpired(
def purgeExpiredRestarts(
store: CompositeRestartStore,
supervisor: Supervisor,
config: ProjectionConfig,
clock: Clock[IO]
): IO[Unit] = {
val deleteExpiredRestarts =
clock.realTimeInstant.flatMap { now =>
store.deleteExpired(now.minusMillis(config.restartTtl.toMillis))
}
supervisor
.run(
CompiledProjection.fromStream(
purgeCompositeRestartMetadata,
ExecutionStrategy.TransientSingleNode,
_ =>
Stream
.awakeEvery[IO](config.deleteExpiredEvery)
.evalTap(_ => deleteExpiredRestarts)
.drain
)
)
.void
}
config: PurgeConfig
): PurgeProjection = PurgeProjection(purgeCompositeRestartMetadata, config, store.deleteExpired)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ProjectionProgr
import io.circe.JsonObject

import java.time.Instant
import scala.concurrent.duration._

class ElasticSearchIndexingRoutesSpec extends ElasticSearchViewsRoutesFixtures {

implicit private val uuidF: UUIDF = UUIDF.fixed(uuid)

private lazy val projections = Projections(xas, queryConfig, 1.hour, clock)
private lazy val projections = Projections(xas, queryConfig, clock)
private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock)

implicit private val fetchContext: FetchContext = FetchContextDummy(Map(project.value.ref -> project.value.context))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package ch.epfl.bluebrain.nexus.delta.sourcing

import cats.effect.{Clock, IO}
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.DeleteExpired.logger
import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata
import doobie.implicits._
import doobie.postgres.implicits._
import fs2.Stream

import java.time.Instant
import scala.concurrent.duration.{DurationInt, FiniteDuration}

/**
* Allow to delete expired ephemeral states
*/
final class DeleteExpired private[sourcing] (xas: Transactors, clock: Clock[IO]) {
final class DeleteExpired private[sourcing] (xas: Transactors) {

def apply(): IO[Unit] = {
def apply(instant: Instant): IO[Unit] = {
for {
instant <- clock.realTimeInstant
deleted <- sql"""
| DELETE FROM public.ephemeral_states
| WHERE expires < $instant
Expand All @@ -32,18 +34,12 @@ object DeleteExpired {
private val metadata: ProjectionMetadata = ProjectionMetadata("system", "delete-expired", None, None)

/**
* Creates a [[DeleteExpired]] instance and schedules in the supervisor the deletion of expired ephemeral states
* Creates a [[PurgeProjection]] instance so that it can be scheduled in the supervisor
*/
def apply(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO]): IO[DeleteExpired] = {
val deleteExpired = new DeleteExpired(xas, clock)

val stream = Stream
.awakeEvery[IO](config.deleteExpiredEvery)
.evalTap(_ => deleteExpired())
.drain

val deleteExpiredProjection =
CompiledProjection.fromStream(metadata, ExecutionStrategy.TransientSingleNode, _ => stream)
supervisor.run(deleteExpiredProjection).as(deleteExpired)
def apply(deleteExpiredEvery: FiniteDuration, xas: Transactors): PurgeProjection = {
// The ttl is defined in the ephemeral state via the expire column
val purgeConfig = PurgeConfig(deleteExpiredEvery, 0.second)
val deleteExpired = new DeleteExpired(xas)
PurgeProjection(metadata, purgeConfig, deleteExpired.apply)
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
package ch.epfl.bluebrain.nexus.delta.sourcing

import cats.effect.{Clock, IO}
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.PurgeElemFailures.logger
import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata
import doobie.implicits._
import doobie.postgres.implicits._
import fs2.Stream

import scala.concurrent.duration._
import java.time.Instant

final class PurgeElemFailures private[sourcing] (xas: Transactors, ttl: FiniteDuration, clock: Clock[IO]) {
final class PurgeElemFailures private[sourcing] (xas: Transactors) {

/**
* Deletes the projection errors that are older than the given `ttl`.
* Deletes the projection errors that are older than the given instant.
*/
def apply(): IO[Unit] =
def apply(instant: Instant): IO[Unit] =
for {
threshold <- clock.realTimeInstant.map(_.minusMillis(ttl.toMillis))
deleted <- sql"""
deleted <- sql"""
| DELETE FROM public.failed_elem_logs
| WHERE instant < $threshold
| WHERE instant < $instant
""".stripMargin.update.run.transact(xas.write)
_ <- IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted old indexing failures."))
_ <- IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted old indexing failures."))
} yield ()
}

Expand All @@ -33,24 +32,11 @@ object PurgeElemFailures {
private val metadata = ProjectionMetadata("system", "delete-old-failed-elem", None, None)

/**
* Creates a [[PurgeElemFailures]] instance and schedules in the supervisor the deletion of old projection errors.
* Creates a [[PurgeProjection]] to schedule in the supervisor the deletion of old projection errors.
*/
def apply(
supervisor: Supervisor,
config: ProjectionConfig,
xas: Transactors,
clock: Clock[IO]
): IO[PurgeElemFailures] = {
val purgeElemFailures = new PurgeElemFailures(xas, config.failedElemTtl, clock)

val stream = Stream
.awakeEvery[IO](config.deleteExpiredEvery)
.evalTap(_ => purgeElemFailures())
.drain

supervisor
.run(CompiledProjection.fromStream(metadata, ExecutionStrategy.TransientSingleNode, _ => stream))
.as(purgeElemFailures)
def apply(config: PurgeConfig, xas: Transactors): PurgeProjection = {
val purgeElemFailures = new PurgeElemFailures(xas)
PurgeProjection(metadata, config, purgeElemFailures.apply)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.concurrent.duration.FiniteDuration
* the life span of projection errors in database
* @param restartTtl
* the life span of projection restarts
* @param tombstoneTtl
* the life span of tombstones
* @param query
* a configuration for how to interact with the underlying store
*/
Expand All @@ -36,8 +38,15 @@ final case class ProjectionConfig(
deleteExpiredEvery: FiniteDuration,
failedElemTtl: FiniteDuration,
restartTtl: FiniteDuration,
tombstoneTtl: FiniteDuration,
query: QueryConfig
)
) {

def failedElemPurge: PurgeConfig = PurgeConfig(deleteExpiredEvery, failedElemTtl)
def restartPurge: PurgeConfig = PurgeConfig(deleteExpiredEvery, restartTtl)
def tombstonePurge: PurgeConfig = PurgeConfig(deleteExpiredEvery, tombstoneTtl)

}

object ProjectionConfig {
implicit final val projectionConfigReader: ConfigReader[ProjectionConfig] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.config

import scala.concurrent.duration.FiniteDuration

case class PurgeConfig(deleteExpiredEvery: FiniteDuration, ttl: FiniteDuration)
Loading

0 comments on commit 0ca8dbc

Please sign in to comment.