From b71d5a83f33c614e59438674ad32b99a2f41369f Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 21 Aug 2023 12:21:30 +0200 Subject: [PATCH] Fix elasticsearch indexing when the resource is indexed twice in the same chunk (#4177) * Fix elasticsearch indexing when the resource is indexed twice in the same chunk --------- Co-authored-by: Simon Dumas --- .../indexing/ElasticSearchSink.scala | 8 +-- .../indexing/ElasticSearchSinkSuite.scala | 61 ++++++++++++++++--- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala index 1c089c4112..0826626771 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala @@ -43,14 +43,14 @@ final class ElasticSearchSink private ( override def inType: Typeable[Json] = Typeable[Json] override def apply(elements: Chunk[Elem[Json]]): Task[Chunk[Elem[Unit]]] = { - val bulk = elements.foldLeft(List.empty[ElasticSearchBulk]) { + val bulk = elements.foldLeft(Vector.empty[ElasticSearchBulk]) { case (acc, successElem @ Elem.SuccessElem(_, _, _, _, _, json, _)) => if (json.isEmpty()) { - ElasticSearchBulk.Delete(index, documentId(successElem)) :: acc + acc :+ ElasticSearchBulk.Delete(index, documentId(successElem)) } else - ElasticSearchBulk.Index(index, documentId(successElem), json) :: acc + acc :+ ElasticSearchBulk.Index(index, documentId(successElem), json) case (acc, droppedElem: Elem.DroppedElem) => - ElasticSearchBulk.Delete(index, documentId(droppedElem)) :: acc + acc :+ ElasticSearchBulk.Delete(index, documentId(droppedElem)) case (acc, _: Elem.FailedElem) => acc } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala index 19d920ecb2..bdcd072ae5 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala @@ -1,10 +1,12 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing +import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.Uri.Query import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{IndexLabel, QueryBuilder} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink.BulkUpdateException +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset @@ -12,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedEl import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import fs2.Chunk +import io.circe.Json import munit.AnyFixture import java.time.Instant @@ -21,12 +24,15 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt override def munitFixtures: Seq[AnyFixture[_]] = List(esClient) - private lazy val client = esClient() - private lazy val sink = ElasticSearchSink.events(client, 2, 50.millis, index, Refresh.True) + private def createSink(index: IndexLabel) = + ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True) private val membersEntity = EntityType("members") private val index = IndexLabel.unsafe("test_members") + private lazy val client = esClient() + private lazy val sink = createSink(index) + private val alice = (nxv + "alice", json"""{"name": "Alice", "age": 25 }""") private val bob = (nxv + "bob", json"""{"name": "Bob", "age": 32 }""") private val brian = (nxv + "brian", json"""{"name": "Brian", "age": 19 }""") @@ -36,14 +42,20 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt val rev = 1 + private def asChunk(values: Iterable[(Iri, Json)]) = + Chunk.iterable(values).zipWithIndex.map { case ((id, json), index) => + SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev) + } + + private def dropped(id: Iri, offset: Offset) = + DroppedElem(membersEntity, id, None, Instant.EPOCH, offset, rev) + test("Create the index") { client.createIndex(index, None, None).assert(true) } test("Index a chunk of documents and retrieve them") { - val chunk = Chunk.iterable(members).zipWithIndex.map { case ((id, json), index) => - SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev) - } + val chunk = asChunk(members) for { _ <- sink.apply(chunk).assert(chunk.map(_.void)) @@ -55,9 +67,7 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt } test("Delete dropped items from the index") { - val chunk = Chunk(brian, alice).map { case (id, _) => - DroppedElem(membersEntity, id, None, Instant.EPOCH, Offset.at(members.size.toLong + 1), rev) - } + val chunk = Chunk(brian, alice).map { case (id, _) => dropped(id, Offset.at(members.size.toLong + 1)) } for { _ <- sink.apply(chunk).assert(chunk.map(_.void)) @@ -108,4 +118,39 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt } yield () } + test("When the same resource appears twice in a chunk, only the last update prevails") { + val index = IndexLabel.unsafe("test_last_update") + val charlie = (nxv + "charlie", json"""{"name": "Charlie", "age": 34 }""") + val rose = (nxv + "rose", json"""{"name": "Rose", "age": 66 }""") + val charlie_2 = (nxv + "charlie", json"""{"name": "Charlie M.", "age": 35 }""") + + val chunk = asChunk(List(charlie, rose, charlie_2)) + val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True) + + for { + _ <- client.createIndex(index, None, None).assert(true) + _ <- sink.apply(chunk).assert(chunk.map(_.void)) + _ <- client.getSource[Json](index, charlie_2._1.toString).assert(charlie_2._2) + } yield () + } + + test("When the same resource appears twice in a chunk, only the last deletion prevails") { + val index = IndexLabel.unsafe("test_last_delete") + val charlie = (nxv + "charlie", json"""{"name": "Charlie", "age": 34 }""") + val rose = (nxv + "rose", json"""{"name": "Rose", "age": 66 }""") + + val indexingChunk = asChunk(List(charlie, rose)) + val deleteChunk = Chunk.singleton(dropped(charlie._1, Offset.at(indexingChunk.size.toLong + 1))) + + val chunk = Chunk.concat(List(indexingChunk, deleteChunk)) + + val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True) + + for { + _ <- client.createIndex(index, None, None).assert(true) + _ <- sink.apply(chunk).assert(chunk.map(_.void)) + _ <- client.getSource[Json](index, charlie._1.toString).assertError(_.errorCode.contains(StatusCodes.NotFound)) + } yield () + } + }