Skip to content

Commit

Permalink
Make exporter append (#4921)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored May 2, 2024
1 parent 11febd5 commit ff41e48
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQ
import doobie.Fragments
import doobie.implicits._
import doobie.util.query.Query0
import fs2.Stream
import fs2.io.file.{Files, Path}
import fs2.{text, Stream}
import fs2.io.file.{Files, Flags, Path}
import io.circe.syntax.EncoderOps

import java.time.Instant
Expand Down Expand Up @@ -55,7 +55,7 @@ object Exporter {
targetDirectory = config.target / query.output.value
_ <- Files[IO].createDirectory(targetDirectory)
exportFile = targetDirectory / s"$start.json"
_ <- exportToFile(q, query.offset, exportFile)
_ <- exportToFile(q, query.offset, exportFile, config.batchSize)
end <- clock.realTimeInstant
exportSuccess = targetDirectory / s"$start.success"
_ <- writeSuccessFile(query, exportSuccess)
Expand All @@ -68,11 +68,13 @@ object Exporter {
semaphore.permit.use { _ => exportIO }
}

private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetFile: Path) = {
private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetFile: Path, batchSize: Int) = {
StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas)
.map(_.asJson.noSpaces)
.chunkN(batchSize)
.map(chunk => chunk.toList.map(_.asJson.noSpaces).mkString("\n"))
.intersperse("\n")
.through(Files[IO].writeUtf8(targetFile))
.through(text.utf8.encode)
.through(Files[IO].writeAll(targetFile, Flags.Append))
.compile
.drain
}
Expand Down

0 comments on commit ff41e48

Please sign in to comment.