Skip to content

Commit

Permalink
Partition export file to have a maximum number of events (#5066)
Browse files Browse the repository at this point in the history
* Partition export file to have a maximum number of events

* Revert change on temp directory fixture

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Jul 24, 2024
1 parent 7cee429 commit 3aaa707
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 51 deletions.
2 changes: 2 additions & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ app {
# Database export configuration
export {
batch-size = 30
# Limit number of events per files (this default value should give ~1GB files)
limit-per-file = 32000
# Max number of concurrent exports
permits = 1
# Target directory for exports
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
Expand All @@ -16,8 +15,8 @@ import izumi.distage.model.definition.{Id, ModuleDef}
// $COVERAGE-OFF$
object ExportModule extends ModuleDef {

make[Exporter].fromEffect { (config: AppConfig, clock: Clock[IO], xas: Transactors) =>
Exporter(config.`export`, clock, xas)
make[Exporter].fromEffect { (config: AppConfig, xas: Transactors) =>
Exporter(config.`export`, xas)
}

make[ExportRoutes].from {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Export
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group}
import fs2.io.file.Path

import java.time.Instant

class ExportRoutesSpec extends BaseRouteSpec {

private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm)))
Expand All @@ -31,7 +29,7 @@ class ExportRoutesSpec extends BaseRouteSpec {

private val exporter = new Exporter {
override def events(query: ExportEventQuery): IO[ExportResult] =
exportTrigger.set(true).as(ExportResult(Path("json"), Path("Success"), Instant.EPOCH, Instant.EPOCH))
exportTrigger.set(true).as(ExportResult(Path("target"), Path("success")))
}

private lazy val routes = Route.seal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import pureconfig.ConfigConvert.catchReadError
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigConvert, ConfigReader}

final case class ExportConfig(batchSize: Int, permits: Int, target: Path)
final case class ExportConfig(batchSize: Int, limitPerFile: Int, permits: Int, target: Path)

object ExportConfig {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import cats.effect.IO
import cats.effect.kernel.Clock
import cats.effect.std.Semaphore
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
Expand All @@ -10,15 +9,14 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils
import doobie.Fragments
import doobie.implicits._
import doobie.util.query.Query0
import fs2.Stream
import fs2.io.file.{Files, Path}
import fs2.io.file._
import io.circe.syntax.EncoderOps

import java.time.Instant

trait Exporter {

def events(query: ExportEventQuery): IO[ExportResult]
Expand All @@ -29,15 +27,16 @@ object Exporter {

private val logger = Logger[Exporter]

final case class ExportResult(json: Path, success: Path, start: Instant, end: Instant)
private val fileFormat = "%09d"

final case class ExportResult(targetDirectory: Path, success: Path)

def apply(config: ExportConfig, clock: Clock[IO], xas: Transactors): IO[Exporter] =
Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config, _, clock, xas))
def apply(config: ExportConfig, xas: Transactors): IO[Exporter] =
Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config, _, xas))

private class ExporterImpl(config: ExportConfig, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors)
extends Exporter {
private class ExporterImpl(config: ExportConfig, semaphore: Semaphore[IO], xas: Transactors) extends Exporter {

val queryConfig = QueryConfig(config.batchSize, RefreshStrategy.Stop)
private val queryConfig = QueryConfig(config.batchSize, RefreshStrategy.Stop)
override def events(query: ExportEventQuery): IO[ExportResult] = {
val projectFilter = Fragments.orOpt(
query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" }
Expand All @@ -51,35 +50,43 @@ object Exporter {
|""".stripMargin.query[RowEvent]

val exportIO = for {
start <- clock.realTimeInstant
_ <- logger.info(s"Starting export for projects ${query.projects} from offset ${query.offset}")
targetDirectory = config.target / query.output.value
_ <- Files[IO].createDirectory(targetDirectory)
exportFile = targetDirectory / s"$start.json"
_ <- exportToFile(q, query.offset, exportFile)
end <- clock.realTimeInstant
exportSuccess = targetDirectory / s"$start.success"
exportDuration <- exportToFile(q, query.offset, targetDirectory)
exportSuccess = targetDirectory / s"${paddedOffset(query.offset)}.success"
_ <- writeSuccessFile(query, exportSuccess)
_ <-
logger.info(
s"Export for projects ${query.projects} from offset' ${query.offset}' after ${end.getEpochSecond - start.getEpochSecond} seconds."
s"Export for projects ${query.projects} from offset' ${query.offset.value}' after ${exportDuration.toSeconds} seconds."
)
} yield ExportResult(exportFile, exportSuccess, start, end)
} yield ExportResult(targetDirectory, exportSuccess)

semaphore.permit.use { _ => exportIO }
}

private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetFile: Path) = {
StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas)
.map(_.asJson.noSpaces)
.intersperse("\n")
.through(Files[IO].writeUtf8(targetFile))
private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetDirectory: Path) =
Stream
.eval(IO.ref(start))
.flatMap { offsetRef =>
def computePath = offsetRef.get.map { o =>
targetDirectory / s"${paddedOffset(o)}.json"
}

StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas)
.evalTap { rowEvent => offsetRef.set(rowEvent.ordering) }
.map(_.asJson.noSpaces)
.through(StreamingUtils.writeRotate(computePath, config.limitPerFile))
}
.compile
.drain
}
.timed
.map(_._1)

private def writeSuccessFile(query: ExportEventQuery, targetFile: Path) =
Stream(query.asJson.toString()).through(Files[IO].writeUtf8(targetFile)).compile.drain

private def paddedOffset(offset: Offset) = fileFormat.format(offset.value)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils

import cats.effect.{IO, Resource}
import cats.effect.std.Hotswap
import fs2.io.file.{FileHandle, Files, Flag, Flags, Path, WriteCursor}
import fs2.{text, Pipe, Pull, Stream}

object StreamingUtils {

private val flags = Flags.Write

private val lineSeparator = "\n"

private val newLine = Stream.emit(lineSeparator)

def readLines(path: Path) =
Files[IO].readUtf8Lines(path).filter(_.nonEmpty)

/**
* Writes all data to a sequence of files, each limited to a maximum number of lines
*
* Adapted from fs2.io.file.Files.writeRotate (which is not preserving lines)
*
* @param computePath
* to compute the path of the first file and the subsequent ones
* @param limit
* maximum number of lines
*/
def writeRotate(computePath: IO[Path], limit: Int): Pipe[IO, String, Nothing] = {
def openNewFile: Resource[IO, FileHandle[IO]] =
Resource
.eval(computePath)
.flatMap(p => Files[IO].open(p, flags.addIfAbsent(Flag.Write)))

def newCursor(file: FileHandle[IO]): IO[WriteCursor[IO]] =
Files[IO].writeCursorFromFileHandle(file, flags.contains(Flag.Append))

def go(
fileHotswap: Hotswap[IO, FileHandle[IO]],
cursor: WriteCursor[IO],
acc: Int,
s: Stream[IO, String]
): Pull[IO, Unit, Unit] = {
s.pull.unconsLimit(limit - acc).flatMap {
case Some((hd, tl)) =>
val newAcc = acc + hd.size
val hdAsBytes =
Stream.chunk(hd).intersperse(lineSeparator).append(newLine).through(text.utf8.encode)
cursor.writeAll(hdAsBytes).flatMap { nc =>
if (newAcc >= limit)
Pull
.eval {
fileHotswap
.swap(openNewFile)
.flatMap(newCursor)
}
.flatMap(nc => go(fileHotswap, nc, 0, tl))
else
go(fileHotswap, nc, newAcc, tl)
}
case None => Pull.done
}
}

in =>
Stream
.resource(Hotswap(openNewFile))
.flatMap { case (fileHotswap, fileHandle) =>
Stream.eval(newCursor(fileHandle)).flatMap { cursor =>
go(fileHotswap, cursor, 0, in).stream.drain
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils
import ch.epfl.bluebrain.nexus.testkit.clock.FixedClock
import ch.epfl.bluebrain.nexus.testkit.file.TempDirectory
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
Expand All @@ -23,11 +24,11 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi

private lazy val doobieFixture = doobieInject(
PullRequest.eventStore(_, event1, event2, event3, event4, event5, event6),
Exporter(exporterConfig, clock, _)
Exporter(exporterConfig, _)
)
override def munitFixtures: Seq[AnyFixture[_]] = List(tempDirectory, doobieFixture)

private lazy val exporterConfig = ExportConfig(5, 3, exportDirectory)
private lazy val exporterConfig = ExportConfig(5, 4, 3, exportDirectory)
private lazy val (_, _, exporter) = doobieFixture()
private lazy val exportDirectory = tempDirectory()

Expand Down Expand Up @@ -55,13 +56,21 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi
private def orderingValue(obj: JsonObject) =
obj("ordering").flatMap(_.asNumber.flatMap(_.toInt))

private def readJsonLines(path: Path) = Files[IO]
.readUtf8Lines(path)
.evalMap { line =>
IO.fromEither(parseAsObject(line))
}
.compile
.toList
private def readDataFiles(path: Path): IO[(Int, List[JsonObject])] = {
def readDataFile(path: Path) =
StreamingUtils
.readLines(path)
.evalMap { line =>
IO.fromEither(parseAsObject(line))
}
.compile
.toList

for {
dataFiles <- Files[IO].list(path).filter(_.extName.equals(".json")).compile.toList
jsonEvents <- dataFiles.sortBy(_.fileName.toString).flatTraverse(readDataFile)
} yield (dataFiles.size, jsonEvents)
}

private def readSuccess(path: Path) = Files[IO]
.readUtf8(path)
Expand All @@ -71,29 +80,32 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi
.compile
.lastOrError

private def assertExport(result: Exporter.ExportResult, query: ExportEventQuery, expectedOrdering: List[Int])(implicit
location: Location
) =
for {
exportContent <- readJsonLines(result.json)
orderingValues = exportContent.mapFilter(orderingValue)
_ = assertEquals(orderingValues, expectedOrdering)
_ <- readSuccess(result.success).assertEquals(query)
} yield ()
private def assertExport(
result: Exporter.ExportResult,
query: ExportEventQuery,
expectedFileCount: Int,
expectedOrdering: List[Int]
)(implicit location: Location) =
readDataFiles(result.targetDirectory).map { case (fileCount, exportContent) =>
assertEquals(fileCount, expectedFileCount)
val orderingValues = exportContent.mapFilter(orderingValue)
assertEquals(orderingValues, expectedOrdering)
} >>
readSuccess(result.success).assertEquals(query)

test(s"Export all events for $project1 and $project3") {
val query = ExportEventQuery(Label.unsafe("export1"), NonEmptyList.of(project1, project3), Offset.start)
for {
result <- exporter.events(query)
_ <- assertExport(result, query, List(1, 2, 3, 4, 6))
_ <- assertExport(result, query, 2, List(1, 2, 3, 4, 6))
} yield ()
}

test(s"Export all events for $project1 and $project3 from offset 3") {
val query = ExportEventQuery(Label.unsafe("export2"), NonEmptyList.of(project1, project3), Offset.at(2L))
for {
result <- exporter.events(query)
_ <- assertExport(result, query, List(3, 4, 6))
_ <- assertExport(result, query, 1, List(3, 4, 6))
} yield ()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils

import cats.effect.IO
import ch.epfl.bluebrain.nexus.testkit.file.TempDirectory
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import fs2.Stream
import fs2.io.file.Files
import munit.AnyFixture

class StreamingUtilsSuite extends NexusSuite with TempDirectory.Fixture {

override def munitFixtures: Seq[AnyFixture[_]] = List(tempDirectory)

private lazy val exportDirectory = tempDirectory()

private val limitPerFile = 3
private val lines = Stream.emits(List("A", "B", "C", "D", "E"))

test(s"Write stream of lines in a file rotating every $limitPerFile lines") {
for {
refCompute <- IO.ref(0)
computePath = refCompute
.updateAndGet(_ + 1)
.map { counter => exportDirectory / s"part-$counter.txt" }
_ <- lines.through(StreamingUtils.writeRotate(computePath, limitPerFile)).compile.drain
_ <- Files[IO].list(exportDirectory).assertSize(2)
firstFile = exportDirectory / "part-1.txt"
_ <- Files[IO].readUtf8Lines(firstFile).assert("A", "B", "C")
secondFile = exportDirectory / "part-2.txt"
_ <- Files[IO].readUtf8Lines(secondFile).assert("D", "E")
} yield ()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils
import ch.epfl.bluebrain.nexus.ship.EventStreamer.logger
import fs2.io.file.{Files, Path}
import fs2.{text, Stream}
Expand Down Expand Up @@ -57,6 +58,7 @@ object EventStreamer {
.readFileMultipart(bucket, path.toString)
.through(text.utf8.decode)
.through(text.lines)
.filter(_.nonEmpty)
} yield lines

override def fileList(path: Path): IO[List[Path]] =
Expand All @@ -73,7 +75,7 @@ object EventStreamer {
def localStreamer: EventStreamer = new EventStreamer {

override def streamLines(path: Path): Stream[IO, String] =
Files[IO].readUtf8Lines(path)
StreamingUtils.readLines(path)

override def fileList(path: Path): IO[List[Path]] =
Files[IO].list(path).compile.toList
Expand Down

0 comments on commit 3aaa707

Please sign in to comment.