diff --git a/delta/app/src/main/resources/app.conf b/delta/app/src/main/resources/app.conf index fab990a813..52e0e12892 100644 --- a/delta/app/src/main/resources/app.conf +++ b/delta/app/src/main/resources/app.conf @@ -306,6 +306,8 @@ app { delete-expired-every = 5 minutes # the time after which stored projection errors will be deleted failed-elem-ttl = 14 days + # the time after which tombstones will be deleted + tombstone-ttl = 7 days # the time after which stored restarts will be deleted restart-ttl = 1 hour query = ${app.defaults.query} diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala index 527413b0d3..ea4fa6bd8b 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala @@ -5,8 +5,10 @@ import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectionConfig, QueryConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._ +import ch.epfl.bluebrain.nexus.delta.sourcing.tombstone.TombstoneStore import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, PurgeElemFailures, Transactors} import izumi.distage.model.definition.ModuleDef @@ -41,7 +43,7 @@ object StreamModule extends ModuleDef { } make[Projections].from { (xas: Transactors, cfg: ProjectionConfig, clock: Clock[IO]) => - Projections(xas, cfg.query, cfg.restartTtl, clock) + Projections(xas, cfg.query, clock) } make[ProjectionErrors].from { (xas: Transactors, clock: Clock[IO], cfg: ProjectionConfig) => @@ -53,17 +55,23 @@ object StreamModule extends ModuleDef { projections: Projections, projectionErrors: ProjectionErrors, cfg: ProjectionConfig - ) => - Supervisor(projections, projectionErrors, cfg) + ) => Supervisor(projections, projectionErrors, cfg) + } + + make[PurgeProjectionCoordinator.type].fromEffect { + (supervisor: Supervisor, clock: Clock[IO], projections: Set[PurgeProjection]) => + PurgeProjectionCoordinator(supervisor, clock, projections) + } + + many[PurgeProjection].add { (config: ProjectionConfig, xas: Transactors) => + DeleteExpired(config.deleteExpiredEvery, xas) } - make[DeleteExpired].fromEffect { - (supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO]) => - DeleteExpired(supervisor, config, xas, clock) + many[PurgeProjection].add { (config: ProjectionConfig, xas: Transactors) => + PurgeElemFailures(config.failedElemPurge, xas) } - make[PurgeElemFailures].fromEffect { - (supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO]) => - PurgeElemFailures(supervisor, config, xas, clock) + many[PurgeProjection].add { (config: ProjectionConfig, xas: Transactors) => + TombstoneStore.deleteExpired(config.tombstonePurge, xas) } } diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala index e7ea72e7e7..b802baaee8 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala @@ -22,11 +22,10 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionProgress import java.time.Instant -import scala.concurrent.duration._ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures { - private lazy val projections = Projections(xas, queryConfig, 1.hour, clock) + private lazy val projections = Projections(xas, queryConfig, clock) private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock) private val myId = nxv + "myid" diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala index 760f1fcc1e..cebcc8e8e3 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala @@ -37,6 +37,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ReferenceRegistry, Supervisor} import distage.ModuleDef import izumi.distage.model.definition.Id @@ -139,28 +140,30 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { ) } - make[CompositeProjections].fromEffect { + make[CompositeRestartStore].from { (xas: Transactors) => + new CompositeRestartStore(xas) + } + + make[CompositeProjections].from { ( - supervisor: Supervisor, - xas: Transactors, + compositeRestartStore: CompositeRestartStore, config: CompositeViewsConfig, projectionConfig: ProjectionConfig, - clock: Clock[IO] + clock: Clock[IO], + xas: Transactors ) => - val compositeRestartStore = new CompositeRestartStore(xas) - val compositeProjections = - CompositeProjections( - compositeRestartStore, - xas, - projectionConfig.query, - projectionConfig.batch, - config.restartCheckInterval, - clock - ) - - CompositeRestartStore - .deleteExpired(compositeRestartStore, supervisor, projectionConfig, clock) - .as(compositeProjections) + CompositeProjections( + compositeRestartStore, + xas, + projectionConfig.query, + projectionConfig.batch, + config.restartCheckInterval, + clock + ) + } + + many[PurgeProjection].add { (compositeRestartStore: CompositeRestartStore, projectionConfig: ProjectionConfig) => + CompositeRestartStore.purgeExpiredRestarts(compositeRestartStore, projectionConfig.restartPurge) } make[CompositeSpaces].from { diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala index 68c6f7238a..862711790b 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala @@ -1,6 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store -import cats.effect.{Clock, IO} +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeRestart import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeRestart.entityType @@ -8,14 +8,14 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store.CompositeResta import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import doobie.implicits._ import doobie.postgres.implicits._ -import fs2.Stream +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection import io.circe.Json import io.circe.syntax.EncoderOps @@ -98,33 +98,11 @@ object CompositeRestartStore { * Register the task to delete expired restarts in the supervisor * @param store * the store - * @param supervisor - * the supervisor * @param config * the projection config */ - def deleteExpired( + def purgeExpiredRestarts( store: CompositeRestartStore, - supervisor: Supervisor, - config: ProjectionConfig, - clock: Clock[IO] - ): IO[Unit] = { - val deleteExpiredRestarts = - clock.realTimeInstant.flatMap { now => - store.deleteExpired(now.minusMillis(config.restartTtl.toMillis)) - } - supervisor - .run( - CompiledProjection.fromStream( - purgeCompositeRestartMetadata, - ExecutionStrategy.TransientSingleNode, - _ => - Stream - .awakeEvery[IO](config.deleteExpiredEvery) - .evalTap(_ => deleteExpiredRestarts) - .drain - ) - ) - .void - } + config: PurgeConfig + ): PurgeProjection = PurgeProjection(purgeCompositeRestartMetadata, config, store.deleteExpired) } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala index 3484f44801..7950070b34 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala @@ -28,13 +28,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ProjectionProgr import io.circe.JsonObject import java.time.Instant -import scala.concurrent.duration._ class ElasticSearchIndexingRoutesSpec extends ElasticSearchViewsRoutesFixtures { implicit private val uuidF: UUIDF = UUIDF.fixed(uuid) - private lazy val projections = Projections(xas, queryConfig, 1.hour, clock) + private lazy val projections = Projections(xas, queryConfig, clock) private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock) implicit private val fetchContext: FetchContext = FetchContextDummy(Map(project.value.ref -> project.value.context)) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala index 5389dbd100..c43e611720 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala @@ -1,22 +1,24 @@ package ch.epfl.bluebrain.nexus.delta.sourcing -import cats.effect.{Clock, IO} +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.DeleteExpired.logger -import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata import doobie.implicits._ import doobie.postgres.implicits._ -import fs2.Stream + +import java.time.Instant +import scala.concurrent.duration.{DurationInt, FiniteDuration} /** * Allow to delete expired ephemeral states */ -final class DeleteExpired private[sourcing] (xas: Transactors, clock: Clock[IO]) { +final class DeleteExpired private[sourcing] (xas: Transactors) { - def apply(): IO[Unit] = { + def apply(instant: Instant): IO[Unit] = { for { - instant <- clock.realTimeInstant deleted <- sql""" | DELETE FROM public.ephemeral_states | WHERE expires < $instant @@ -32,18 +34,12 @@ object DeleteExpired { private val metadata: ProjectionMetadata = ProjectionMetadata("system", "delete-expired", None, None) /** - * Creates a [[DeleteExpired]] instance and schedules in the supervisor the deletion of expired ephemeral states + * Creates a [[PurgeProjection]] instance so that it can be scheduled in the supervisor */ - def apply(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO]): IO[DeleteExpired] = { - val deleteExpired = new DeleteExpired(xas, clock) - - val stream = Stream - .awakeEvery[IO](config.deleteExpiredEvery) - .evalTap(_ => deleteExpired()) - .drain - - val deleteExpiredProjection = - CompiledProjection.fromStream(metadata, ExecutionStrategy.TransientSingleNode, _ => stream) - supervisor.run(deleteExpiredProjection).as(deleteExpired) + def apply(deleteExpiredEvery: FiniteDuration, xas: Transactors): PurgeProjection = { + // The ttl is defined in the ephemeral state via the expire column + val purgeConfig = PurgeConfig(deleteExpiredEvery, 0.second) + val deleteExpired = new DeleteExpired(xas) + PurgeProjection(metadata, purgeConfig, deleteExpired.apply) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala index d7a3162c03..9c8c4b2ba4 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala @@ -1,29 +1,28 @@ package ch.epfl.bluebrain.nexus.delta.sourcing -import cats.effect.{Clock, IO} +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.PurgeElemFailures.logger -import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata import doobie.implicits._ import doobie.postgres.implicits._ -import fs2.Stream -import scala.concurrent.duration._ +import java.time.Instant -final class PurgeElemFailures private[sourcing] (xas: Transactors, ttl: FiniteDuration, clock: Clock[IO]) { +final class PurgeElemFailures private[sourcing] (xas: Transactors) { /** - * Deletes the projection errors that are older than the given `ttl`. + * Deletes the projection errors that are older than the given instant. */ - def apply(): IO[Unit] = + def apply(instant: Instant): IO[Unit] = for { - threshold <- clock.realTimeInstant.map(_.minusMillis(ttl.toMillis)) - deleted <- sql""" + deleted <- sql""" | DELETE FROM public.failed_elem_logs - | WHERE instant < $threshold + | WHERE instant < $instant """.stripMargin.update.run.transact(xas.write) - _ <- IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted old indexing failures.")) + _ <- IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted old indexing failures.")) } yield () } @@ -33,24 +32,11 @@ object PurgeElemFailures { private val metadata = ProjectionMetadata("system", "delete-old-failed-elem", None, None) /** - * Creates a [[PurgeElemFailures]] instance and schedules in the supervisor the deletion of old projection errors. + * Creates a [[PurgeProjection]] to schedule in the supervisor the deletion of old projection errors. */ - def apply( - supervisor: Supervisor, - config: ProjectionConfig, - xas: Transactors, - clock: Clock[IO] - ): IO[PurgeElemFailures] = { - val purgeElemFailures = new PurgeElemFailures(xas, config.failedElemTtl, clock) - - val stream = Stream - .awakeEvery[IO](config.deleteExpiredEvery) - .evalTap(_ => purgeElemFailures()) - .drain - - supervisor - .run(CompiledProjection.fromStream(metadata, ExecutionStrategy.TransientSingleNode, _ => stream)) - .as(purgeElemFailures) + def apply(config: PurgeConfig, xas: Transactors): PurgeProjection = { + val purgeElemFailures = new PurgeElemFailures(xas) + PurgeProjection(metadata, config, purgeElemFailures.apply) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala index 33312c1f63..80960bf647 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala @@ -25,6 +25,8 @@ import scala.concurrent.duration.FiniteDuration * the life span of projection errors in database * @param restartTtl * the life span of projection restarts + * @param tombstoneTtl + * the life span of tombstones * @param query * a configuration for how to interact with the underlying store */ @@ -36,8 +38,15 @@ final case class ProjectionConfig( deleteExpiredEvery: FiniteDuration, failedElemTtl: FiniteDuration, restartTtl: FiniteDuration, + tombstoneTtl: FiniteDuration, query: QueryConfig -) +) { + + def failedElemPurge: PurgeConfig = PurgeConfig(deleteExpiredEvery, failedElemTtl) + def restartPurge: PurgeConfig = PurgeConfig(deleteExpiredEvery, restartTtl) + def tombstonePurge: PurgeConfig = PurgeConfig(deleteExpiredEvery, tombstoneTtl) + +} object ProjectionConfig { implicit final val projectionConfigReader: ConfigReader[ProjectionConfig] = diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/PurgeConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/PurgeConfig.scala new file mode 100644 index 0000000000..52aaf2bd96 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/PurgeConfig.scala @@ -0,0 +1,5 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.config + +import scala.concurrent.duration.FiniteDuration + +case class PurgeConfig(deleteExpiredEvery: FiniteDuration, ttl: FiniteDuration) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala index ffa29840d9..3741dc40cd 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala @@ -1,16 +1,17 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.projections import cats.effect.{Clock, IO} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.config.{PurgeConfig, QueryConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectionMetadata, ProjectionProgress, ProjectionStore} import ch.epfl.bluebrain.nexus.delta.sourcing.{ProgressStatistics, Transactors} -import scala.concurrent.duration.FiniteDuration +import java.time.Instant trait Projections { @@ -81,7 +82,7 @@ trait Projections { /** * Deletes projection restarts older than the configured period */ - def deleteExpiredRestarts(): IO[Unit] + def deleteExpiredRestarts(instant: Instant): IO[Unit] /** * Returns the statistics for the given projection in the given project @@ -98,7 +99,7 @@ trait Projections { object Projections { - def apply(xas: Transactors, config: QueryConfig, restartTtl: FiniteDuration, clock: Clock[IO]): Projections = + def apply(xas: Transactors, config: QueryConfig, clock: Clock[IO]): Projections = new Projections { private val projectionStore = ProjectionStore(xas, config, clock) private val projectionRestartStore = new ProjectionRestartStore(xas, config) @@ -122,10 +123,7 @@ object Projections { override def acknowledgeRestart(id: Offset): IO[Unit] = projectionRestartStore.acknowledge(id) - override def deleteExpiredRestarts(): IO[Unit] = - clock.realTimeInstant.flatMap { now => - projectionRestartStore.deleteExpired(now.minusMillis(restartTtl.toMillis)) - } + override def deleteExpiredRestarts(instant: Instant): IO[Unit] = projectionRestartStore.deleteExpired(instant) override def statistics( project: ProjectRef, @@ -138,4 +136,8 @@ object Projections { StreamingQuery.remaining(project, selectFilter, current.fold(Offset.start)(_.offset), xas) } yield ProgressStatistics(current, remaining) } + + val purgeRestartMetadata = ProjectionMetadata("system", "purge-projection-restarts", None, None) + def purgeExpiredRestarts(projections: Projections, config: PurgeConfig): PurgeProjection = + PurgeProjection(purgeRestartMetadata, config, projections.deleteExpiredRestarts) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PurgeProjectionCoordinator.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PurgeProjectionCoordinator.scala new file mode 100644 index 0000000000..0c67c1c9c0 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PurgeProjectionCoordinator.scala @@ -0,0 +1,37 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.syntax.all._ +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig +import fs2.Stream +import java.time.Instant + +sealed trait PurgeProjectionCoordinator + +object PurgeProjectionCoordinator extends PurgeProjectionCoordinator { + + final case class PurgeProjection(metadata: ProjectionMetadata, config: PurgeConfig, task: Instant => IO[Unit]) + + def apply( + supervisor: Supervisor, + clock: Clock[IO], + purgeProjections: Set[PurgeProjection] + ): IO[PurgeProjectionCoordinator.type] = + purgeProjections.toList + .traverse { projection => + val config = projection.config + def purgeForInstant = + clock.realTimeInstant.flatMap { now => projection.task(now.minusMillis(config.ttl.toMillis)) } + val compiledProjection = CompiledProjection.fromStream( + projection.metadata, + ExecutionStrategy.TransientSingleNode, + _ => + Stream + .awakeEvery[IO](config.deleteExpiredEvery) + .evalTap(_ => purgeForInstant) + .drain + ) + supervisor.run(compiledProjection) + } + .as(PurgeProjectionCoordinator) +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala index 5ec5c517a8..5d14ace5e6 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala @@ -1,13 +1,14 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream -import cats.effect.{Fiber, IO, Resource} - -import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy} +import cats.effect._ +import cats.effect.std.Semaphore +import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.syntax._ +import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy} import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStatus.Ignored import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStrategy.{EveryNode, PersistentSingleNode, TransientSingleNode} @@ -16,9 +17,6 @@ import fs2.concurrent.SignallingRef import scala.concurrent.TimeoutException import scala.concurrent.duration._ -import cats.effect.Ref -import cats.effect.std.Semaphore -import cats.implicits._ /** * Supervises the execution of projections based on a defined [[ExecutionStrategy]] that describes whether projections @@ -119,7 +117,6 @@ object Supervisor { ) private[sourcing] val watchRestartMetadata = ProjectionMetadata("system", "watch-restarts", None, None) - private[sourcing] val purgeRestartMetadata = ProjectionMetadata("system", "purge-projection-restarts", None, None) /** * Constructs a new [[Supervisor]] instance using the provided `store` and `cfg`. @@ -147,7 +144,6 @@ object Supervisor { supervisor = new Impl(projections, projectionErrors.saveFailedElems, cfg, semaphore, mapRef, signal, supervisionRef) _ <- watchRestarts(supervisor, projections) - _ <- purgeRestarts(supervisor, projections, cfg.deleteExpiredEvery) _ <- log.info("Delta supervisor is up") } yield supervisor @@ -226,21 +222,6 @@ object Supervisor { ) } - private def purgeRestarts(supervisor: Supervisor, projections: Projections, deleteExpiredEvery: FiniteDuration) = { - val stream = Stream - .awakeEvery[IO](deleteExpiredEvery) - .evalTap(_ => projections.deleteExpiredRestarts()) - .drain - supervisor - .run( - CompiledProjection.fromStream( - purgeRestartMetadata, - ExecutionStrategy.TransientSingleNode, - _ => stream - ) - ) - } - final private case class Supervised( metadata: ProjectionMetadata, executionStrategy: ExecutionStrategy, diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStore.scala index e128a71af6..832bd25ed3 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStore.scala @@ -1,11 +1,17 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.tombstone +import cats.effect.IO import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ResourceRef, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection import doobie._ import doobie.implicits._ import doobie.postgres.implicits._ @@ -14,8 +20,12 @@ import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredCodec import io.circe.syntax.EncoderOps +import java.time.Instant + object TombstoneStore { + private val logger = Logger[TombstoneStore.type] + /** * Saves a tombstone for the given entity for the provided tag so that indexing processes can take it into account */ @@ -67,28 +77,43 @@ object TombstoneStore { | )""".stripMargin.update.run.void } + def deleteExpired(instant: Instant, xas: Transactors): IO[Unit] = + sql"""DELETE FROM public.scoped_tombstones WHERE instant < $instant""".update.run + .transact(xas.write) + .flatTap { deleted => + IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted tombstones.")) + } + .void + final private[tombstone] case class Cause(deleted: Boolean, types: Set[Iri], schema: Option[ResourceRef]) private[tombstone] object Cause { val deleted: Cause = Cause(deleted = true, Set.empty, None) - implicit val causeEncoder: Codec[Cause] = { + implicit val causeCodec: Codec[Cause] = { implicit val configuration: Configuration = Configuration.default deriveConfiguredCodec[Cause] } + implicit val causeGet: Get[Cause] = pgDecoderGetT[Cause] + def diff(types: Set[Iri], schema: Option[ResourceRef]): Cause = Cause(deleted = false, types, schema) def diff[S <: ScopedState](original: Option[S], newState: S): Option[Cause] = original.flatMap { o => - val removedTypes = o.types.diff(newState.types) - val modifiedSchema = Option.when(o.schema != newState.schema)(o.schema) - Option.when(removedTypes.nonEmpty || modifiedSchema.isDefined)( - Cause.diff(removedTypes, modifiedSchema) - ) + val removedTypes = o.types.diff(newState.types) + Option.when(removedTypes.nonEmpty)(Cause.diff(removedTypes, None)) } } + private val metadata = ProjectionMetadata("system", "delete-old-tombstones", None, None) + + /** + * Creates a [[PurgeProjection]] to schedule in the supervisor the deletion of old tombstones. + */ + def deleteExpired(config: PurgeConfig, xas: Transactors): PurgeProjection = + PurgeProjection(metadata, config, TombstoneStore.deleteExpired(_, xas)) + } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala index d774a85062..ce9d84adb5 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala @@ -18,7 +18,6 @@ import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import munit.{AnyFixture, Location} import java.time.Instant -import scala.concurrent.duration.{DurationInt, FiniteDuration} class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with Doobie.Fixture with Doobie.Assertions { @@ -26,6 +25,7 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with private lazy val xas = doobie() + private val start = Instant.EPOCH implicit private lazy val mutableClock: MutableClock = mutableClockFixture() private lazy val store = FailedElemLogStore(xas, QueryConfig(10, RefreshStrategy.Stop), mutableClock) @@ -48,7 +48,7 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with private val entityType = EntityType("Test") private def createFailedElem(project: ProjectRef, offset: Long) = - FailedElem(entityType, id, Some(project), Instant.EPOCH.plusSeconds(offset), Offset.at(offset), error, rev) + FailedElem(entityType, id, Some(project), start.plusSeconds(offset), Offset.at(offset), error, rev) private val fail1 = createFailedElem(project1, 1L) private val fail2 = createFailedElem(project1, 2L) @@ -56,11 +56,9 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with private val fail4 = createFailedElem(project1, 4L) private val fail5 = createFailedElem(project2, 5L) - private def assertSave(metadata: ProjectionMetadata, failed: FailedElem) = - for { - _ <- mutableClock.set(failed.instant) - _ <- store.save(metadata, List(failed)) - } yield () + private def saveFailedElem(metadata: ProjectionMetadata, failed: FailedElem) = + mutableClock.set(failed.instant) >> + store.save(metadata, List(failed)) private def assertStream(metadata: ProjectionMetadata, offset: Offset, expected: List[FailedElem])(implicit loc: Location @@ -97,11 +95,11 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with test("Insert several failures") { for { - _ <- assertSave(metadata11, fail1) - _ <- assertSave(metadata12, fail2) - _ <- assertSave(metadata12, fail3) - _ <- assertSave(metadata12, fail4) - _ <- assertSave(metadata21, fail5) + _ <- saveFailedElem(metadata11, fail1) + _ <- saveFailedElem(metadata12, fail2) + _ <- saveFailedElem(metadata12, fail3) + _ <- saveFailedElem(metadata12, fail4) + _ <- saveFailedElem(metadata21, fail5) } yield () } @@ -174,20 +172,15 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with store.count(project1, projection12, between).assertEquals(1L) } - test("Purge failures after predefined ttl") { - val failedElemTtl = 14.days - val purgeElemFailures = new PurgeElemFailures(xas, failedElemTtl, mutableClock) - - def timeTravel(duration: FiniteDuration) = mutableClock.set(Instant.EPOCH.plusMillis(duration.toMillis)) + test("Purge failures before given instant") { + val purgeElemFailures = new PurgeElemFailures(xas) for { _ <- store.count.assertEquals(5L) - _ <- timeTravel(failedElemTtl - 500.millis) - _ <- purgeElemFailures() - // no elements are deleted after 13 days + _ <- purgeElemFailures(start.minusMillis(500L)) + // no elements are deleted before the start instant _ <- store.count.assertEquals(5L) - _ <- timeTravel(failedElemTtl + 10.seconds) - _ <- purgeElemFailures() + _ <- purgeElemFailures(start.plusSeconds(10L)) // all elements were deleted after 14 days _ <- store.count.assertEquals(0L) } yield () diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala index 716f8ac360..8f1ed0d140 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala @@ -7,7 +7,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, Message} -import ch.epfl.bluebrain.nexus.testkit.clock.FixedClock import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.implicits._ import munit.AnyFixture @@ -37,7 +36,7 @@ class EphemeralStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobi private val m2 = nxv + "m2" private val message2 = MessageState(m2, project1, "Bye !", alice, Instant.EPOCH.plusSeconds(60L), Anonymous) - private lazy val deleteExpired = new DeleteExpired(xas, FixedClock.atInstant(Instant.EPOCH.plusSeconds(6L))) + private lazy val deleteExpired = new DeleteExpired(xas) test("save the states") { for { @@ -55,8 +54,9 @@ class EphemeralStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobi } test("delete expired state " + m1) { + val threshold = Instant.EPOCH.plusSeconds(6L) for { - _ <- deleteExpired() + _ <- deleteExpired(threshold) _ <- store.get(project1, m1).assertEquals(None) _ <- store.get(project1, m2).assertEquals(Some(message2)) } yield () diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PurgeProjectionCoordinatorSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PurgeProjectionCoordinatorSuite.scala new file mode 100644 index 0000000000..26b1e6ab35 --- /dev/null +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PurgeProjectionCoordinatorSuite.scala @@ -0,0 +1,38 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.effect.IO +import cats.effect.kernel.Ref +import ch.epfl.bluebrain.nexus.delta.sourcing.config.PurgeConfig +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import ch.epfl.bluebrain.nexus.testkit.mu.ce.PatienceConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection +import munit.AnyFixture + +import java.time.Instant +import scala.concurrent.duration._ + +class PurgeProjectionCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture { + + override def munitFixtures: Seq[AnyFixture[_]] = List(supervisor) + + implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 10.millis) + + private lazy val sv = supervisor().supervisor + + test("Schedule and have the supervisor executing the given purge projection") { + val metadata = ProjectionMetadata("test", "purge") + val config = PurgeConfig(100.millis, 5.days) + val expectedInstant = Instant.EPOCH.minusMillis(config.ttl.toMillis) + + for { + ref <- Ref.of[IO, Instant](Instant.MIN) + purgeProjection = PurgeProjection(metadata, config, ref.set) + _ <- PurgeProjectionCoordinator(sv, clock, Set(purgeProjection)) + _ <- sv.describe(metadata.name) + .map(_.map(_.status)) + .assertEquals(Some(ExecutionStatus.Running)) + .eventually + _ <- ref.get.assertEquals(expectedInstant).eventually + } yield () + } +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSetup.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSetup.scala index 18af30a30d..6f95404e39 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSetup.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSetup.scala @@ -34,6 +34,7 @@ object SupervisorSetup { 10.millis, 14.days, 1.second, + 14.days, defaultQueryConfig ) resource(config, clock) @@ -44,7 +45,7 @@ object SupervisorSetup { clock: Clock[IO] ): Resource[IO, SupervisorSetup] = Doobie.resource().flatMap { xas => - val projections = Projections(xas, config.query, config.restartTtl, clock) + val projections = Projections(xas, config.query, clock) val projectionErrors = ProjectionErrors(xas, config.query, clock) Supervisor(projections, projectionErrors, config).map(s => SupervisorSetup(s, projections, projectionErrors)) } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStoreSuite.scala index d1efd0ae35..e0068d2cc0 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/tombstone/TombstoneStoreSuite.scala @@ -1,6 +1,5 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.tombstone -import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous @@ -14,105 +13,77 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.tombstone.TombstoneStoreSuite.{ent import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.implicits._ -import io.circe.Json -import io.circe.syntax.EncoderOps import munit.AnyFixture import java.time.Instant class TombstoneStoreSuite extends NexusSuite with Doobie.Fixture { - override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) + override def munitFixtures: Seq[AnyFixture[_]] = List(doobieTruncateAfterTest) - private lazy val xas = doobie() + private lazy val xas = doobieTruncateAfterTest() - private val id1 = nxv + "id" - private val state = SimpleResource( - id1, - Set(nxv + "SimpleResource", nxv + "SimpleResource2", nxv + "SimpleResource3"), - Latest(nxv + "schema") - ) + private val id1 = nxv + "id" + private val originalTypes = Set(nxv + "SimpleResource", nxv + "SimpleResource2", nxv + "SimpleResource3") + private val originalState = SimpleResource(id1, originalTypes, Instant.EPOCH) - private def select(id: Iri, tag: Tag) = + private def selectCause(id: Iri, tag: Tag) = sql""" | SELECT cause | FROM public.scoped_tombstones - | WHERE id = $id AND tag = $tag""".stripMargin.query[Json].option.transact(xas.read) + | WHERE id = $id AND tag = $tag""".stripMargin.query[Cause].option.transact(xas.read) - private def selectAsCause(id: Iri, tag: Tag) = - select(id, tag).flatMap { - case None => IO.none - case Some(json) => IO.fromEither(json.as[Cause]).map(Some(_)) - } + private def count = sql"""SELECT count(*) FROM public.scoped_tombstones""".query[Long].unique.transact(xas.read) test("Save a tombstone for the given tag") { val tag = UserTag.unsafe("v1") for { - _ <- TombstoneStore.save(entityType, state, tag).transact(xas.write).assert - _ <- select(id1, tag).assertEquals(Some(Cause.deleted.asJson)) + _ <- TombstoneStore.save(entityType, originalState, tag).transact(xas.write) + _ <- selectCause(id1, tag).assertEquals(Some(Cause.deleted)) } yield () } test("Not save a tombstone for a new resource") { val id2 = nxv + "id2" - val newState = SimpleResource(id2, Set(nxv + "SimpleResource2"), state.schema) + val newState = SimpleResource(id2, Set(nxv + "SimpleResource2"), Instant.EPOCH) for { - _ <- TombstoneStore - .save(entityType, None, newState) - .transact(xas.write) - .assert - _ <- select(id2, Tag.latest).assertEquals(None) + _ <- TombstoneStore.save(entityType, None, newState).transact(xas.write) + _ <- selectCause(id2, Tag.latest).assertEquals(None) } yield () } - test("Not save a tombstone for a resource when no type has been removed and schema remains the same") { + test("Not save a tombstone for a resource when no type has been removed") { val id2 = nxv + "id2" - val newState = SimpleResource(id2, state.types + (nxv + "SimpleResource4"), state.schema) + val newState = SimpleResource(id2, originalTypes + (nxv + "SimpleResource4"), Instant.EPOCH) for { - _ <- TombstoneStore - .save(entityType, Some(state), newState) - .transact(xas.write) - .assert - _ <- select(id2, Tag.latest).assertEquals(None) + _ <- TombstoneStore.save(entityType, Some(originalState), newState).transact(xas.write) + _ <- selectCause(id2, Tag.latest).assertEquals(None) } yield () } - test("Save a tombstone for a resource where types have been removed and schema remains the same") { + test("Save a tombstone for a resource where types have been removed") { val id3 = nxv + "id3" - val newState = SimpleResource(id3, Set(nxv + "SimpleResource2"), Latest(nxv + "schema")) + val newState = SimpleResource(id3, Set(nxv + "SimpleResource2"), Instant.EPOCH) for { - _ <- TombstoneStore.save(entityType, Some(state), newState).transact(xas.write).assert - _ <- selectAsCause(id3, Tag.latest).assertEquals( - Some( - Cause.diff(Set(nxv + "SimpleResource", nxv + "SimpleResource3"), None) - ) + _ <- TombstoneStore.save(entityType, Some(originalState), newState).transact(xas.write) + _ <- selectCause(id3, Tag.latest).assertEquals( + Some(Cause.diff(Set(nxv + "SimpleResource", nxv + "SimpleResource3"), None)) ) } yield () } - test("Save a tombstone for a resource where no type has been removed and schema changed") { - val id4 = nxv + "id4" - val newState = SimpleResource(id4, state.types, Latest(nxv + "schema2")) - for { - _ <- TombstoneStore.save(entityType, Some(state), newState).transact(xas.write).assert - _ <- selectAsCause(id4, Tag.latest).assertEquals( - Some( - Cause.diff(Set.empty[Iri], Some(state.schema)) - ) - ) - } yield () - } + test("Purge tombstone older than the given instant") { + def stateAt(instant: Instant) = SimpleResource(nxv + "id", Set(nxv + "SimpleResource2"), instant) + val threshold = Instant.now() + val toDelete = stateAt(threshold.minusMillis(1L)) + val toKeep = stateAt(threshold.plusMillis(1L)) - test("Save a tombstone for a resource where types have been removed and schema changed") { - val id5 = nxv + "id5" - val newState = SimpleResource(id5, Set(nxv + "SimpleResource2"), Latest(nxv + "schema2")) for { - _ <- TombstoneStore.save(entityType, Some(state), newState).transact(xas.write).assert - _ <- selectAsCause(id5, Tag.latest).assertEquals( - Some( - Cause.diff(Set(nxv + "SimpleResource", nxv + "SimpleResource3"), Some(state.schema)) - ) - ) + _ <- TombstoneStore.save(entityType, Some(originalState), toDelete).transact(xas.write) + _ <- TombstoneStore.save(entityType, Some(originalState), toKeep).transact(xas.write) + _ <- count.assertEquals(2L) + _ <- TombstoneStore.deleteExpired(threshold, xas) + _ <- count.assertEquals(1L) } yield () } @@ -122,11 +93,12 @@ object TombstoneStoreSuite { private val entityType = EntityType("simple") - final private[tombstone] case class SimpleResource(id: Iri, types: Set[Iri], schema: ResourceRef) - extends ScopedState { + final private[tombstone] case class SimpleResource(id: Iri, types: Set[Iri], updatedAt: Instant) extends ScopedState { override def project: ProjectRef = ProjectRef.unsafe("org", "proj") + override def schema: ResourceRef = Latest(nxv + "schema") + override def rev: Int = 0 override def deprecated: Boolean = false @@ -135,8 +107,6 @@ object TombstoneStoreSuite { override def createdBy: Identity.Subject = Anonymous - override def updatedAt: Instant = Instant.EPOCH - override def updatedBy: Identity.Subject = Anonymous }