Skip to content

Commit

Permalink
Merge pull request #66 from davenverse/useResourceKeypool
Browse files Browse the repository at this point in the history
Use Resource Keypool rather than tuple
  • Loading branch information
ChristopherDavenport authored Mar 29, 2023
2 parents 09d3d22 + a2f1c25 commit 754ca4f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import _root_.io.chrisdavenport.rediculous.cluster.ClusterCommands
import fs2.io.net.Socket
import fs2.io.net.tls.TLSContext
import fs2.io.net.tls.TLSParameters
import java.time.Instant
import _root_.io.chrisdavenport.rediculous.cluster.ClusterCommands.ClusterSlots
import fs2.io.net.SocketGroupCompanionPlatform
import scodec.bits.ByteVector
import fs2.io.net.Network

Expand All @@ -43,12 +41,12 @@ object RedisConnection{
}
}
private[rediculous] case class PooledConnection[F[_]: Concurrent](
pool: KeyPool[F, Unit, (Socket[F], F[Unit])]
pool: KeyPool[F, Unit, Socket[F]]
) extends RedisConnection[F]{
def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk)
Functor[KeyPool[F, Unit, *]].map(pool)(_._1).take(()).use{
pool.take(()).use{
m => withSocket(m.value).attempt.flatTap{
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
Expand Down Expand Up @@ -264,7 +262,7 @@ object RedisConnection{
.fold(Network[F].tlsContext.systemResource.attempt.map(_.toOption))(
_.some.pure[Resource[F, *]]
)
kp <- KeyPoolBuilder[F, Unit, (Socket[F], F[Unit])](
kp <- KeyPool.Builder[F, Unit, Socket[F]](
{_ => sg.client(SocketAddress(host,port), Nil)
.flatMap(elevateSocket(_, tlsContextOptWithDefault, tlsParameters, useTLS))
.evalTap(socket =>
Expand All @@ -276,9 +274,7 @@ object RedisConnection{
RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void
}
)
.allocated
},
{ case (_, shutdown) => shutdown}
}
).build
} yield PooledConnection[F](kp)

Expand Down Expand Up @@ -364,7 +360,7 @@ object RedisConnection{
.fold(Network[F].tlsContext.systemResource.attempt.map(_.toOption))(
_.some.pure[Resource[F, *]]
)
keypool <- KeyPoolBuilder[F, Unit, (Socket[F], F[Unit])](
keypool <- KeyPool.Builder.apply[F, Unit, Socket[F]](
{_ => sg.client(SocketAddress(host,port), Nil)
.flatMap(elevateSocket(_, tlsContextOptWithDefault, tlsParameters, useTLS))
.evalTap(socket =>
Expand All @@ -376,15 +372,13 @@ object RedisConnection{
RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void
}
)
.allocated
},
{ case (_, shutdown) => shutdown}
}
).build
_ <-
Stream.fromQueueUnterminatedChunk(queue, chunkSizeLimit).chunks.map{chunk =>
val s = if (chunk.nonEmpty) {
Stream.eval(
Functor[KeyPool[F, Unit, *]].map(keypool)(_._1).take(()).attempt.use{
keypool.take(()).attempt.use{
case Right(m) =>
val out = chunk.map(_._2)
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
Expand All @@ -410,7 +404,7 @@ object RedisConnection{
.compile
.drain
.background
} yield Queued(queue, keypool.take(()).map(Functor[Managed[F, *]].map(_)(_._1)))
} yield Queued(queue, keypool.take(()))
}
}

Expand Down Expand Up @@ -509,8 +503,8 @@ object RedisConnection{
.fold(Network[F].tlsContext.systemResource.attempt.map(_.toOption))(
_.some.pure[Resource[F, *]]
)
keypool <- KeyPoolBuilder[F, (Host, Port), (Socket[F], F[Unit])](
{(t: (Host, Port)) => sg.client(SocketAddress(host,port), Nil)
keypool <- KeyPool.Builder[F, (Host, Port), Socket[F]](
{case ((host: Host, port: Port)) => sg.client(SocketAddress(host, port), Nil)
.flatMap(elevateSocket(_, tlsContextOptWithDefault, tlsParameters, useTLS))
.evalTap(socket =>
auth match {
Expand All @@ -521,13 +515,11 @@ object RedisConnection{
RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void
}
)
.allocated
},
{ case (_, shutdown) => shutdown}
}
).build

// Cluster Topology Acquisition and Management
sockets <- Resource.eval(keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)))
sockets <- Resource.eval(keypool.take((host, port)).map(_.value).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)))
now <- Resource.eval(Temporal[F].realTime.map(_.toMillis))
refreshLock <- Resource.eval(Semaphore[F](1L))
refTopology <- Resource.eval(Ref[F].of((sockets, now)))
Expand All @@ -546,14 +538,14 @@ object RedisConnection{
case ((_, setAt), now) if setAt >= (now - cacheTopologySeconds.toMillis) => Applicative[F].unit
case ((l, _), _) =>
val nelActions: NonEmptyList[F[ClusterSlots]] = l.map{ case (host, port) =>
keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))
keypool.take((host, port)).map(_.value).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))
}
raceNThrowFirst(nelActions)
.flatMap(s => Clock[F].realTime.map(_.toMillis).flatMap(now => refTopology.set((s,now))))
}
)
queue <- Resource.eval(Queue.bounded[F, Chunk[(Either[Throwable,Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]](maxQueued))
cluster = Cluster(queue, refTopology.get.map(_._1), {case(host, port) => keypool.take((host, port)).map(_.map(_._1))})
cluster = Cluster(queue, refTopology.get.map(_._1), {case(host, port) => keypool.take((host, port))})
_ <-
Stream.fromQueueUnterminatedChunk(queue, chunkSizeLimit).chunks.map{chunk =>
val s = if (chunk.nonEmpty) {
Expand All @@ -565,7 +557,7 @@ object RedisConnection{
}.toSeq
).evalMap{
case (server, rest) =>
Functor[KeyPool[F, (Host, Port), *]].map(keypool)(_._1).take(server).attempt.use{
keypool.take(server).attempt.use{
case Right(m) =>
val out = Chunk.seq(rest.map(_._5))
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ object RedisPubSub {
}
}
case RedisConnection.PooledConnection(pool) =>
pool.take(()).map(_.map(_._1)).flatMap{managed =>
pool.take(()).flatMap{managed =>
val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]())
val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit)
val onUnhandledMessageR = Concurrent[F].ref((_: PubSubMessage) => Applicative[F].unit)
Expand Down

0 comments on commit 754ca4f

Please sign in to comment.