Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix elasticsearch indexing when the resource is indexed twice in the same chunk #4177

Merged
merged 4 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ final class ElasticSearchSink private (
override def inType: Typeable[Json] = Typeable[Json]

override def apply(elements: Chunk[Elem[Json]]): Task[Chunk[Elem[Unit]]] = {
val bulk = elements.foldLeft(List.empty[ElasticSearchBulk]) {
val bulk = elements.foldLeft(Vector.empty[ElasticSearchBulk]) {
case (acc, successElem @ Elem.SuccessElem(_, _, _, _, _, json, _)) =>
if (json.isEmpty()) {
ElasticSearchBulk.Delete(index, documentId(successElem)) :: acc
acc :+ ElasticSearchBulk.Delete(index, documentId(successElem))
} else
ElasticSearchBulk.Index(index, documentId(successElem), json) :: acc
acc :+ ElasticSearchBulk.Index(index, documentId(successElem), json)
case (acc, droppedElem: Elem.DroppedElem) =>
ElasticSearchBulk.Delete(index, documentId(droppedElem)) :: acc
acc :+ ElasticSearchBulk.Delete(index, documentId(droppedElem))
case (acc, _: Elem.FailedElem) => acc
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.Uri.Query
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{IndexLabel, QueryBuilder}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink.BulkUpdateException
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite
import fs2.Chunk
import io.circe.Json
import munit.AnyFixture

import java.time.Instant
Expand All @@ -21,12 +24,15 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt

override def munitFixtures: Seq[AnyFixture[_]] = List(esClient)

private lazy val client = esClient()
private lazy val sink = ElasticSearchSink.events(client, 2, 50.millis, index, Refresh.True)
private def createSink(index: IndexLabel) =
ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True)

private val membersEntity = EntityType("members")
private val index = IndexLabel.unsafe("test_members")

private lazy val client = esClient()
private lazy val sink = createSink(index)

private val alice = (nxv + "alice", json"""{"name": "Alice", "age": 25 }""")
private val bob = (nxv + "bob", json"""{"name": "Bob", "age": 32 }""")
private val brian = (nxv + "brian", json"""{"name": "Brian", "age": 19 }""")
Expand All @@ -36,14 +42,20 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt

val rev = 1

private def asChunk(values: Iterable[(Iri, Json)]) =
Chunk.iterable(values).zipWithIndex.map { case ((id, json), index) =>
SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev)
}

private def dropped(id: Iri, offset: Offset) =
DroppedElem(membersEntity, id, None, Instant.EPOCH, offset, rev)

test("Create the index") {
client.createIndex(index, None, None).assert(true)
}

test("Index a chunk of documents and retrieve them") {
val chunk = Chunk.iterable(members).zipWithIndex.map { case ((id, json), index) =>
SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev)
}
val chunk = asChunk(members)

for {
_ <- sink.apply(chunk).assert(chunk.map(_.void))
Expand All @@ -55,9 +67,7 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt
}

test("Delete dropped items from the index") {
val chunk = Chunk(brian, alice).map { case (id, _) =>
DroppedElem(membersEntity, id, None, Instant.EPOCH, Offset.at(members.size.toLong + 1), rev)
}
val chunk = Chunk(brian, alice).map { case (id, _) => dropped(id, Offset.at(members.size.toLong + 1)) }

for {
_ <- sink.apply(chunk).assert(chunk.map(_.void))
Expand Down Expand Up @@ -108,4 +118,39 @@ class ElasticSearchSinkSuite extends BioSuite with ElasticSearchClientSetup.Fixt
} yield ()
}

test("When the same resource appears twice in a chunk, only the last update prevails") {
val index = IndexLabel.unsafe("test_last_update")
val charlie = (nxv + "charlie", json"""{"name": "Charlie", "age": 34 }""")
val rose = (nxv + "rose", json"""{"name": "Rose", "age": 66 }""")
val charlie_2 = (nxv + "charlie", json"""{"name": "Charlie M.", "age": 35 }""")

val chunk = asChunk(List(charlie, rose, charlie_2))
val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True)

for {
_ <- client.createIndex(index, None, None).assert(true)
_ <- sink.apply(chunk).assert(chunk.map(_.void))
_ <- client.getSource[Json](index, charlie_2._1.toString).assert(charlie_2._2)
} yield ()
}

test("When the same resource appears twice in a chunk, only the last deletion prevails") {
val index = IndexLabel.unsafe("test_last_delete")
val charlie = (nxv + "charlie", json"""{"name": "Charlie", "age": 34 }""")
val rose = (nxv + "rose", json"""{"name": "Rose", "age": 66 }""")

val indexingChunk = asChunk(List(charlie, rose))
val deleteChunk = Chunk.singleton(dropped(charlie._1, Offset.at(indexingChunk.size.toLong + 1)))

val chunk = Chunk.concat(List(indexingChunk, deleteChunk))

val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True)

for {
_ <- client.createIndex(index, None, None).assert(true)
_ <- sink.apply(chunk).assert(chunk.map(_.void))
_ <- client.getSource[Json](index, charlie._1.toString).assertError(_.errorCode.contains(StatusCodes.NotFound))
} yield ()
}

}