diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala index b88701ddf5..ee393dd805 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchC import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution -import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpClientStatusError +import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.{HttpClientStatusError, HttpServerStatusError} import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream @@ -60,35 +60,46 @@ object ElasticSearchCoordinator { fetchViews(offset).evalMap { elem => elem .traverse { v => - cache.get(v.ref).flatMap { cachedView => - (cachedView, v) match { - case (Some(cached), active: ActiveViewDef) if cached.index == active.index => - for { - _ <- cache.put(active.ref, active) - _ <- logger.info(s"Index ${active.index} already exists and will not be recreated.") - } yield () - case (cached, active: ActiveViewDef) => - compile(active) - .flatMap { projection => - cleanupCurrent(cached, active.ref) >> - supervisor.run( - projection, - for { - _ <- createIndex(active) - _ <- cache.put(active.ref, active) - } yield () - ) - } - case (cached, deprecated: DeprecatedViewDef) => - cleanupCurrent(cached, deprecated.ref) + cache + .get(v.ref) + .flatMap { cachedView => + (cachedView, v) match { + case (Some(cached), active: ActiveViewDef) if cached.index == active.index => + for { + _ <- cache.put(active.ref, active) + _ <- logger.info(s"Index ${active.index} already exists and will not be recreated.") + } yield () + case (cached, active: ActiveViewDef) => + compile(active) + .flatMap { projection => + cleanupCurrent(cached, active.ref) >> + supervisor.run( + projection, + for { + _ <- createIndex(active) + _ <- cache.put(active.ref, active) + } yield () + ) + } + case (cached, deprecated: DeprecatedViewDef) => + cleanupCurrent(cached, deprecated.ref) + } } - } } - .onErrorRecover { + .onErrorRecoverWith { // If the current view does not translate to a projection or if there is a problem // with the mapping with the mapping / setting then we mark it as failed and move along - case p: ProjectionErr => elem.failed(p) - case http: HttpClientStatusError if http.code == StatusCodes.BadRequest => elem.failed(http) + case p: ProjectionErr => + val message = s"Projection for '${elem.project}/${elem.id}' failed for a compilation problem." + logger.error(p)(message).as(elem.failed(p)) + case http: HttpClientStatusError if http.code == StatusCodes.BadRequest => + val message = + s"Projection for '${elem.project}/${elem.id}' failed at index creation. Please check its mapping/settings." + logger.error(http)(message).as(elem.failed(http)) + case http: HttpServerStatusError => + val message = + s"Projection for '${elem.project}/${elem.id}' failed at index creation. Please check its mapping/settings." + logger.error(http)(message).as(elem.failed(http)) } .map(_.void) }