Skip to content

Commit

Permalink
Fix elasticsearch indexing when the resource is indexed twice in the …
Browse files Browse the repository at this point in the history
…same chunk (#4177)

* Fix elasticsearch indexing when the resource is indexed twice in the same chunk

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Aug 21, 2023
1 parent 443a88e commit b71d5a8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ final class ElasticSearchSink private (
override def inType: Typeable[Json] = Typeable[Json]

override def apply(elements: Chunk[Elem[Json]]): Task[Chunk[Elem[Unit]]] = {
val bulk = elements.foldLeft(List.empty[ElasticSearchBulk]) {
val bulk = elements.foldLeft(Vector.empty[ElasticSearchBulk]) {
case (acc, successElem @ Elem.SuccessElem(_, _, _, _, _, json, _)) =>
if (json.isEmpty()) {
ElasticSearchBulk.Delete(index, documentId(successElem)) :: acc
acc :+ ElasticSearchBulk.Delete(index, documentId(successElem))
} else
ElasticSearchBulk.Index(index, documentId(successElem), json) :: acc
acc :+ ElasticSearchBulk.Index(index, documentId(successElem), json)
case (acc, droppedElem: Elem.DroppedElem) =>
ElasticSearchBulk.Delete(index, documentId(droppedElem)) :: acc
acc :+ ElasticSearchBulk.Delete(index, documentId(droppedElem))
case (acc, _: Elem.FailedElem) => acc
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.Uri.Query
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{IndexLabel, QueryBuilder}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink.BulkUpdateException
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite
import fs2.Chunk
import io.circe.Json
import munit.AnyFixture

import java.time.Instant
Expand All @@ -21,12 +24,15 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt

override def munitFixtures: Seq[AnyFixture[_]] = List(esClient)

private lazy val client = esClient()
private lazy val sink = ElasticSearchSink.events(client, 2, 50.millis, index, Refresh.True)
private def createSink(index: IndexLabel) =
ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True)

private val membersEntity = EntityType("members")
private val index = IndexLabel.unsafe("test_members")

private lazy val client = esClient()
private lazy val sink = createSink(index)

private val alice = (nxv + "alice", json"""{"name": "Alice", "age": 25 }""")
private val bob = (nxv + "bob", json"""{"name": "Bob", "age": 32 }""")
private val brian = (nxv + "brian", json"""{"name": "Brian", "age": 19 }""")
Expand All @@ -36,14 +42,20 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt

val rev = 1

private def asChunk(values: Iterable[(Iri, Json)]) =
Chunk.iterable(values).zipWithIndex.map { case ((id, json), index) =>
SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev)
}

private def dropped(id: Iri, offset: Offset) =
DroppedElem(membersEntity, id, None, Instant.EPOCH, offset, rev)

test("Create the index") {
client.createIndex(index, None, None).assert(true)
}

test("Index a chunk of documents and retrieve them") {
val chunk = Chunk.iterable(members).zipWithIndex.map { case ((id, json), index) =>
SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev)
}
val chunk = asChunk(members)

for {
_ <- sink.apply(chunk).assert(chunk.map(_.void))
Expand All @@ -55,9 +67,7 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt
}

test("Delete dropped items from the index") {
val chunk = Chunk(brian, alice).map { case (id, _) =>
DroppedElem(membersEntity, id, None, Instant.EPOCH, Offset.at(members.size.toLong + 1), rev)
}
val chunk = Chunk(brian, alice).map { case (id, _) => dropped(id, Offset.at(members.size.toLong + 1)) }

for {
_ <- sink.apply(chunk).assert(chunk.map(_.void))
Expand Down Expand Up @@ -108,4 +118,39 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt
} yield ()
}

test("When the same resource appears twice in a chunk, only the last update prevails") {
val index = IndexLabel.unsafe("test_last_update")
val charlie = (nxv + "charlie", json"""{"name": "Charlie", "age": 34 }""")
val rose = (nxv + "rose", json"""{"name": "Rose", "age": 66 }""")
val charlie_2 = (nxv + "charlie", json"""{"name": "Charlie M.", "age": 35 }""")

val chunk = asChunk(List(charlie, rose, charlie_2))
val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True)

for {
_ <- client.createIndex(index, None, None).assert(true)
_ <- sink.apply(chunk).assert(chunk.map(_.void))
_ <- client.getSource[Json](index, charlie_2._1.toString).assert(charlie_2._2)
} yield ()
}

test("When the same resource appears twice in a chunk, only the last deletion prevails") {
val index = IndexLabel.unsafe("test_last_delete")
val charlie = (nxv + "charlie", json"""{"name": "Charlie", "age": 34 }""")
val rose = (nxv + "rose", json"""{"name": "Rose", "age": 66 }""")

val indexingChunk = asChunk(List(charlie, rose))
val deleteChunk = Chunk.singleton(dropped(charlie._1, Offset.at(indexingChunk.size.toLong + 1)))

val chunk = Chunk.concat(List(indexingChunk, deleteChunk))

val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True)

for {
_ <- client.createIndex(index, None, None).assert(true)
_ <- sink.apply(chunk).assert(chunk.map(_.void))
_ <- client.getSource[Json](index, charlie._1.toString).assertError(_.errorCode.contains(StatusCodes.NotFound))
} yield ()
}

}

0 comments on commit b71d5a8

Please sign in to comment.