Skip to content

Commit

Permalink
Introducing rewriting batch inserts for Postgres (#5173)
Browse files Browse the repository at this point in the history
* Introducing rewriting batch inserts for Postgres

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Oct 8, 2024
1 parent 75061c7 commit a2ea991
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 14 deletions.
4 changes: 4 additions & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ app {
# when true it creates the tables on service boot
tables-autocreate = false

# This will change batch inserts from insert into foo (col1, col2, col3) values (1,2,3) into insert into foo (col1, col2, col3) values (1,2,3), (4,5,6)
# this provides 2-3x performance improvement
rewrite-batch-inserts = true

cache {
# The max number of tokens in the partition cache
max-size = 1000
Expand Down
2 changes: 0 additions & 2 deletions delta/plugins/blazegraph/src/main/resources/blazegraph.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ plugins.blazegraph {
prefix = ${app.defaults.indexing.prefix}
# configuration of the maximum number of view references allowed on an aggregated view
max-view-refs = 20
# the maximum idle duration in between events on the indexing stream after which the stream will be stopped (min. 10 minutes)
idle-timeout = 30 minutes
#the maximum duration allowed so that synchronous indexing can complete
sync-indexing-timeout = 10 seconds
# In order to disable this feature, set an infinite time ('Inf')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientConfig
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, EventLogConfig}
import com.typesafe.config.Config
import pureconfig.error.FailureReason
import pureconfig.generic.auto._
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}
Expand Down Expand Up @@ -37,8 +36,6 @@ import scala.concurrent.duration._
* prefix for namespaces
* @param maxViewRefs
* configuration of the maximum number of view references allowed on an aggregated view
* @param idleTimeout
* the maximum idle duration in between events on the indexing stream after which the stream will be stopped
* @param syncIndexingTimeout
* the maximum duration for synchronous indexing to complete
* @param defaults
Expand All @@ -57,7 +54,6 @@ final case class BlazegraphViewsConfig(
batch: BatchConfig,
prefix: String,
maxViewRefs: Int,
idleTimeout: Duration,
syncIndexingTimeout: FiniteDuration,
defaults: Defaults,
indexingEnabled: Boolean
Expand Down Expand Up @@ -85,11 +81,5 @@ object BlazegraphViewsConfig {
.loadOrThrow[BlazegraphViewsConfig]

implicit final val blazegraphViewsConfigConfigReader: ConfigReader[BlazegraphViewsConfig] =
deriveReader[BlazegraphViewsConfig].emap { c =>
Either.cond(
c.idleTimeout.gteq(10.minutes),
c,
new FailureReason { override def description: String = "'idle-timeout' must be greater than 10 minutes" }
)
}
deriveReader[BlazegraphViewsConfig]
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ object Transactors {
username,
Secret(password),
tablesAutocreate = false,
rewriteBatchInserts = true,
5.seconds,
CacheConfig(500, 10.minutes)
)
Expand All @@ -105,12 +106,20 @@ object Transactors {
config: DatabaseConfig
): Resource[IO, Transactors] = {

def jdbcUrl(access: DatabaseAccess, readOnly: Boolean) = {
val baseUrl = s"jdbc:postgresql://${access.host}:${access.port}/${config.name}"
if (!readOnly && config.rewriteBatchInserts)
s"$baseUrl?reWriteBatchedInserts=true"
else
baseUrl
}

def transactor(access: DatabaseAccess, readOnly: Boolean, poolName: String): Resource[IO, HikariTransactor[IO]] = {
for {
ec <- ExecutionContexts.fixedThreadPool[IO](access.poolSize)
dataSource <- Resource.make[IO, HikariDataSource](IO.delay {
val ds = new HikariDataSource
ds.setJdbcUrl(s"jdbc:postgresql://${access.host}:${access.port}/${config.name}")
ds.setJdbcUrl(jdbcUrl(access, readOnly))
ds.setUsername(config.username)
ds.setPassword(config.password.value)
ds.setDriverClassName("org.postgresql.Driver")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.concurrent.duration.FiniteDuration
* The database password
* @param tablesAutocreate
* When true it creates the tables on service boot
* @param rewriteBatchInserts
* When true it creates the tables on service boot
* @param slowQueryThreshold
* Threshold allowing to trigger a warning log when a query execution time reaches this limit
* @param cache
Expand All @@ -37,6 +39,7 @@ final case class DatabaseConfig(
username: String,
password: Secret[String],
tablesAutocreate: Boolean,
rewriteBatchInserts: Boolean,
slowQueryThreshold: FiniteDuration,
cache: CacheConfig
)
Expand Down
4 changes: 4 additions & 0 deletions ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ ship {
# when true it creates the tables on service boot
tables-autocreate = false

# This will change batch inserts from insert into foo (col1, col2, col3) values (1,2,3) into insert into foo (col1, col2, col3) values (1,2,3), (4,5,6)
# this provides 2-3x performance improvement
rewrite-batch-inserts = true

cache {
# The max number of tokens in the partition cache
max-size = 1000
Expand Down

0 comments on commit a2ea991

Please sign in to comment.