Skip to content

Commit

Permalink
Skip view when Elasticsearch returns a 500 at index creation and log …
Browse files Browse the repository at this point in the history
…it (#4380)

* Skip view when Elasticsearch returns a 500 at index creation and log it

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Oct 19, 2023
1 parent fd3039e commit 2cf4e48
Showing 1 changed file with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchC
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpClientStatusError
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.{HttpClientStatusError, HttpServerStatusError}
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream
Expand Down Expand Up @@ -60,35 +60,46 @@ object ElasticSearchCoordinator {
fetchViews(offset).evalMap { elem =>
elem
.traverse { v =>
cache.get(v.ref).flatMap { cachedView =>
(cachedView, v) match {
case (Some(cached), active: ActiveViewDef) if cached.index == active.index =>
for {
_ <- cache.put(active.ref, active)
_ <- logger.info(s"Index ${active.index} already exists and will not be recreated.")
} yield ()
case (cached, active: ActiveViewDef) =>
compile(active)
.flatMap { projection =>
cleanupCurrent(cached, active.ref) >>
supervisor.run(
projection,
for {
_ <- createIndex(active)
_ <- cache.put(active.ref, active)
} yield ()
)
}
case (cached, deprecated: DeprecatedViewDef) =>
cleanupCurrent(cached, deprecated.ref)
cache
.get(v.ref)
.flatMap { cachedView =>
(cachedView, v) match {
case (Some(cached), active: ActiveViewDef) if cached.index == active.index =>
for {
_ <- cache.put(active.ref, active)
_ <- logger.info(s"Index ${active.index} already exists and will not be recreated.")
} yield ()
case (cached, active: ActiveViewDef) =>
compile(active)
.flatMap { projection =>
cleanupCurrent(cached, active.ref) >>
supervisor.run(
projection,
for {
_ <- createIndex(active)
_ <- cache.put(active.ref, active)
} yield ()
)
}
case (cached, deprecated: DeprecatedViewDef) =>
cleanupCurrent(cached, deprecated.ref)
}
}
}
}
.onErrorRecover {
.onErrorRecoverWith {
// If the current view does not translate to a projection or if there is a problem
// with the mapping with the mapping / setting then we mark it as failed and move along
case p: ProjectionErr => elem.failed(p)
case http: HttpClientStatusError if http.code == StatusCodes.BadRequest => elem.failed(http)
case p: ProjectionErr =>
val message = s"Projection for '${elem.project}/${elem.id}' failed for a compilation problem."
logger.error(p)(message).as(elem.failed(p))
case http: HttpClientStatusError if http.code == StatusCodes.BadRequest =>
val message =
s"Projection for '${elem.project}/${elem.id}' failed at index creation. Please check its mapping/settings."
logger.error(http)(message).as(elem.failed(http))
case http: HttpServerStatusError =>
val message =
s"Projection for '${elem.project}/${elem.id}' failed at index creation. Please check its mapping/settings."
logger.error(http)(message).as(elem.failed(http))
}
.map(_.void)
}
Expand Down

0 comments on commit 2cf4e48

Please sign in to comment.