Skip to content

Commit

Permalink
pass through values to KeyPool Builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Garner committed Oct 3, 2023
1 parent 0d0c2d6 commit ea4bf27
Showing 1 changed file with 68 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ object RedisConnection{
val clusterCacheTopologySeconds: FiniteDuration = 1.second // How long topology will not be rechecked for after a succesful refresh
val useTLS: Boolean = false
val requestTimeout: Duration = 60.seconds
// same as KeyPool.Builder.Defaults
val idleTimeAllowedInPool: Duration = 30.seconds
val maxIdle: Int = 100
val maxTotal: Int = 100
}

def direct[F[_]: Temporal: Network]: DirectConnectionBuilder[F] =
Expand Down Expand Up @@ -222,6 +226,9 @@ object RedisConnection{
None,
Defaults.useTLS,
Defaults.requestTimeout,
Defaults.idleTimeAllowedInPool,
Defaults.maxIdle,
Defaults.maxTotal
)

@deprecated("Use overload that takes a Network", "0.4.1")
Expand All @@ -237,6 +244,9 @@ object RedisConnection{
private val auth: Option[(Option[String], String)],
private val useTLS: Boolean,
private val defaultTimeout: Duration,
private val idleTimeAllowedInPool: Duration,
private val maxIdle: Int,
private val maxTotal: Int
) { self =>

private def copy(
Expand All @@ -248,6 +258,9 @@ object RedisConnection{
auth: Option[(Option[String], String)] = self.auth,
useTLS: Boolean = self.useTLS,
defaultTimeout: Duration = self.defaultTimeout,
idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool,
maxIdle: Int = self.maxIdle,
maxTotal: Int = self.maxTotal
): PooledConnectionBuilder[F] = new PooledConnectionBuilder(
sg,
host,
Expand All @@ -256,7 +269,10 @@ object RedisConnection{
tlsParameters,
auth,
useTLS,
defaultTimeout
defaultTimeout,
idleTimeAllowedInPool,
maxIdle,
maxTotal
)

def withHost(host: Host) = copy(host = host)
Expand All @@ -271,6 +287,10 @@ object RedisConnection{
def withoutTLS = copy(useTLS = false)
def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout)

def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration)
def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle)
def withMaxTotal(total: Int) = copy(maxTotal = total)

def build: Resource[F,RedisConnection[F]] = for {
tlsContextOptWithDefault <-
tlsContext
Expand All @@ -290,7 +310,11 @@ object RedisConnection{
}
)
}
).build
).withIdleTimeAllowedInPool(idleTimeAllowedInPool)
.withMaxIdle(maxIdle)
.withMaxTotal(maxTotal)
.withMaxPerKey(Function.const(maxTotal))
.build
} yield new TimeoutConnection(PooledConnection[F](kp), defaultTimeout)

}
Expand All @@ -308,6 +332,9 @@ object RedisConnection{
None,
Defaults.useTLS,
Defaults.requestTimeout,
Defaults.idleTimeAllowedInPool,
Defaults.maxIdle,
Defaults.maxTotal
)

@deprecated("Use overload that takes a Network", "0.4.1")
Expand All @@ -326,6 +353,9 @@ object RedisConnection{
private val auth: Option[(Option[String], String)],
private val useTLS: Boolean,
private val defaultTimeout: Duration,
private val idleTimeAllowedInPool: Duration,
private val maxIdle: Int,
private val maxTotal: Int
) { self =>

private def copy(
Expand All @@ -339,7 +369,10 @@ object RedisConnection{
chunkSizeLimit: Int = self.chunkSizeLimit,
auth: Option[(Option[String], String)] = self.auth,
useTLS: Boolean = self.useTLS,
defaultTimeout: Duration = self.defaultTimeout
defaultTimeout: Duration = self.defaultTimeout,
idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool,
maxIdle: Int = self.maxIdle,
maxTotal: Int = self.maxTotal
): QueuedConnectionBuilder[F] = new QueuedConnectionBuilder(
sg,
host,
Expand All @@ -352,6 +385,9 @@ object RedisConnection{
auth,
useTLS,
defaultTimeout,
idleTimeAllowedInPool,
maxIdle,
maxTotal
)

def withHost(host: Host) = copy(host = host)
Expand All @@ -371,6 +407,10 @@ object RedisConnection{
def withoutTLS = copy(useTLS = false)
def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout)

def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration)
def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle)
def withMaxTotal(total: Int) = copy(maxTotal = total)

def build: Resource[F,RedisConnection[F]] = {
for {
queue <- Resource.eval(Queue.bounded[F, Chunk[(Either[Throwable,Resp] => F[Unit], Resp)]](maxQueued))
Expand All @@ -393,7 +433,10 @@ object RedisConnection{
}
)
}
).build
).withIdleTimeAllowedInPool(idleTimeAllowedInPool)
.withMaxIdle(maxIdle)
.withMaxTotal(maxTotal)
.withMaxPerKey(Function.const(maxTotal)).build
_ <-
Stream.fromQueueUnterminatedChunk(queue, chunkSizeLimit).chunks.map{chunk =>
val s = if (chunk.nonEmpty) {
Expand Down Expand Up @@ -445,6 +488,9 @@ object RedisConnection{
None,
Defaults.useTLS,
Defaults.requestTimeout,
Defaults.idleTimeAllowedInPool,
Defaults.maxIdle,
Defaults.maxTotal
)

@deprecated("Use overload that takes a Network", "0.4.1")
Expand All @@ -466,6 +512,9 @@ object RedisConnection{
private val auth: Option[(Option[String], String)],
private val useTLS: Boolean,
private val defaultTimeout: Duration,
private val idleTimeAllowedInPool: Duration,
private val maxIdle: Int,
private val maxTotal: Int
) { self =>

private def copy(
Expand All @@ -482,7 +531,10 @@ object RedisConnection{
cacheTopologySeconds: FiniteDuration = self.cacheTopologySeconds,
auth: Option[(Option[String], String)] = self.auth,
useTLS: Boolean = self.useTLS,
defaultTimeout: Duration = self.defaultTimeout
defaultTimeout: Duration = self.defaultTimeout,
idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool,
maxIdle: Int = self.maxIdle,
maxTotal: Int = self.maxTotal
): ClusterConnectionBuilder[F] = new ClusterConnectionBuilder(
sg,
host,
Expand All @@ -498,6 +550,9 @@ object RedisConnection{
auth,
useTLS,
defaultTimeout,
idleTimeAllowedInPool,
maxIdle,
maxTotal
)

def withHost(host: Host) = copy(host = host)
Expand All @@ -522,6 +577,10 @@ object RedisConnection{
def withoutTLS = copy(useTLS = false)
def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout)

def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration)
def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle)
def withMaxTotal(total: Int) = copy(maxTotal = total)

def build: Resource[F,RedisConnection[F]] = {
for {
tlsContextOptWithDefault <-
Expand All @@ -545,7 +604,10 @@ object RedisConnection{
}
)
}
).build
).withIdleTimeAllowedInPool(idleTimeAllowedInPool)
.withMaxIdle(maxIdle)
.withMaxTotal(maxTotal)
.withMaxPerKey(Function.const(maxTotal)).build

// Cluster Topology Acquisition and Management
sockets <- Resource.eval(keypool.take((host, port)).map(_.value).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)))
Expand Down

0 comments on commit ea4bf27

Please sign in to comment.