diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 72db947..1c22ed9 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -28,9 +28,9 @@ trait RedisConnection[F[_]]{ } object RedisConnection{ - private[rediculous] case class Queued[F[_]: Concurrent](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Resp)]], usePool: Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]{ + private[rediculous] case class Queued[F[_]: Temporal](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Resp)]], usePool: Resource[F, Managed[F, Socket[F]]], commandTimeout: Duration) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, resp)))).flatMap{ c => queue.offer(c.map(_._2)) >> { val x: F[Chunk[Either[Throwable, Resp]]] = c.traverse{ case (d, _) => d.get } @@ -38,57 +38,64 @@ object RedisConnection{ y } } - } + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } - private[rediculous] case class PooledConnection[F[_]: Concurrent]( - pool: KeyPool[F, Unit, Socket[F]] + private[rediculous] case class PooledConnection[F[_]: Temporal]( + pool: KeyPool[F, Unit, Socket[F]], + commandTimeout: Duration, + redisRequestTimeout: Duration, ) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) - def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk) + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) + def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout) pool.take(()).use{ - m => withSocket(m.value).attempt.flatTap{ + m => withSocket(m.value).timeout(redisRequestTimeout).attempt.flatTap{ case Left(_) => m.canBeReused.set(Reusable.DontReuse) case _ => Applicative[F].unit } }.rethrow - } + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } - private[rediculous] case class DirectConnection[F[_]: Concurrent](socket: Socket[F]) extends RedisConnection[F]{ + private[rediculous] case class DirectConnection[F[_]: Temporal](socket: Socket[F], commandTimeout: Duration, redisRequestTimeout: Duration) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) - def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk) + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) + def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout) withSocket(socket) - } + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } private[rediculous] case class Cluster[F[_]: Concurrent](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) - chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c => + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) + chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c => queue.offer(c.map(_._2)) >> { c.traverse(_._1.get).flatMap(_.sequence.liftTo[F].adaptError{case e => RedisError.QueuedExceptionError(e)}) } - } + } } } // Guarantees With Socket That Each Call Receives a Response // Chunk must be non-empty but to do so incurs a penalty - private[rediculous] def explicitPipelineRequest[F[_]: Concurrent](socket: Socket[F], calls: Chunk[Resp], maxBytes: Int = 16 * 1024 * 1024): F[Chunk[Resp]] = { + private[rediculous] def explicitPipelineRequest[F[_]: Temporal](socket: Socket[F], calls: Chunk[Resp], maxBytes: Int, redisRequestTimeout: Duration): F[Chunk[Resp]] = { val out = calls.flatMap(resp => Resp.CodecUtils.codec.encode(resp).toEither.traverse(bits => Chunk.byteVector(bits.bytes)) ).sequence.leftMap(err => new Throwable(s"Failed To Encode Response $err")).liftTo[F] - out.flatMap(socket.write) >> - Stream.eval(socket.read(maxBytes)) - .repeat - .unNoneTerminate - .unchunks - .through(fs2.interop.scodec.StreamDecoder.many(Resp.CodecUtils.codec).toPipeByte) - .take(calls.size.toLong) - .compile - .to(Chunk) + out.flatMap{bytes => + + val request = socket.write(bytes) >> + Stream.eval(socket.read(maxBytes)) + .repeat + .unNoneTerminate + .unchunks + .through(fs2.interop.scodec.StreamDecoder.many(Resp.CodecUtils.codec).toPipeByte) + .take(calls.size.toLong) + .compile + .to(Chunk) + + request.timeoutTo(redisRequestTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.RedisRequestTimeoutException(redisRequestTimeout)))) + } } def runRequestInternal[F[_]: Concurrent](connection: RedisConnection[F])( @@ -131,11 +138,21 @@ object RedisConnection{ val clusterUseDynamicRefreshSource: Boolean = true // Set to false to only use initially provided host for topology refresh val clusterCacheTopologySeconds: FiniteDuration = 1.second // How long topology will not be rechecked for after a succesful refresh val useTLS: Boolean = false - val requestTimeout: Duration = 60.seconds + // same as KeyPool.Builder.Defaults val idleTimeAllowedInPool: Duration = 30.seconds val maxIdle: Int = 100 val maxTotal: Int = 100 + + val commandTimeout: Duration = 30.seconds // If using a blocking operation this is likely inappropriate. + @deprecated("0.5.2", "Use Defaults.commandTimeout instead") + val requestTimeout: Duration = commandTimeout + + val redisRequestTimeout = 20.seconds // If using a blocking operation this is likely inappropriate. + + + // TODO config + private[rediculous] val maxBytes = 16 * 1024 * 1024 } def direct[F[_]: Temporal: Network]: DirectConnectionBuilder[F] = @@ -147,7 +164,8 @@ object RedisConnection{ TLSParameters.Default, None, Defaults.useTLS, - Defaults.requestTimeout + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -162,8 +180,9 @@ object RedisConnection{ private val tlsParameters: TLSParameters, private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, - ) { self => + private val commandTimeout: Duration, + private val redisRequestTimeout: Duration + ) { self => private def copy( sg: SocketGroup[F] = self.sg, @@ -173,7 +192,8 @@ object RedisConnection{ tlsParameters: TLSParameters = self.tlsParameters, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout, ): DirectConnectionBuilder[F] = new DirectConnectionBuilder( sg, host, @@ -182,7 +202,8 @@ object RedisConnection{ tlsParameters, auth, useTLS, - defaultTimeout + commandTimeout, + redisRequestTimeout, ) def withHost(host: Host) = copy(host = host) @@ -195,9 +216,12 @@ object RedisConnection{ def withoutAuth = copy(auth = None) def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + @deprecated("0.5.2", "Use withCommandTimeout") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) - def build: Resource[F,RedisConnection[F]] = + def build: Resource[F,RedisConnection[F]] = for { socket <- sg.client(SocketAddress(host,port), Nil) tlsContextOptWithDefault <- @@ -209,11 +233,11 @@ object RedisConnection{ _ <- Resource.eval(auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(out)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(out, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(out)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(out, commandTimeout, redisRequestTimeout)).void }) - } yield new TimeoutConnection(RedisConnection.DirectConnection(out), defaultTimeout) + } yield RedisConnection.DirectConnection(out, commandTimeout, redisRequestTimeout) } def pool[F[_]: Temporal: Network]: PooledConnectionBuilder[F] = @@ -225,10 +249,11 @@ object RedisConnection{ TLSParameters.Default, None, Defaults.useTLS, - Defaults.requestTimeout, Defaults.idleTimeAllowedInPool, Defaults.maxIdle, - Defaults.maxTotal + Defaults.maxTotal, + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -243,10 +268,12 @@ object RedisConnection{ private val tlsParameters: TLSParameters, private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, private val idleTimeAllowedInPool: Duration, private val maxIdle: Int, - private val maxTotal: Int + private val maxTotal: Int, + private val commandTimeout: Duration, + private val redisRequestTimeout: Duration, + ) { self => private def copy( @@ -257,10 +284,12 @@ object RedisConnection{ tlsParameters: TLSParameters = self.tlsParameters, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout, + idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool, maxIdle: Int = self.maxIdle, - maxTotal: Int = self.maxTotal + maxTotal: Int = self.maxTotal, + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout ): PooledConnectionBuilder[F] = new PooledConnectionBuilder( sg, host, @@ -269,10 +298,11 @@ object RedisConnection{ tlsParameters, auth, useTLS, - defaultTimeout, idleTimeAllowedInPool, maxIdle, - maxTotal + maxTotal, + commandTimeout, + redisRequestTimeout, ) def withHost(host: Host) = copy(host = host) @@ -285,7 +315,11 @@ object RedisConnection{ def withoutAuth = copy(auth = None) def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + + @deprecated("0.5.2", "Use withCommandTimeout") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration) def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle) @@ -304,9 +338,9 @@ object RedisConnection{ auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void } ) } @@ -315,7 +349,7 @@ object RedisConnection{ .withMaxTotal(maxTotal) .withMaxPerKey(Function.const(maxTotal)) .build - } yield new TimeoutConnection(PooledConnection[F](kp), defaultTimeout) + } yield new PooledConnection[F](kp, commandTimeout, redisRequestTimeout) } @@ -331,10 +365,11 @@ object RedisConnection{ Defaults.chunkSizeLimit, None, Defaults.useTLS, - Defaults.requestTimeout, Defaults.idleTimeAllowedInPool, Defaults.maxIdle, - Defaults.maxTotal + Defaults.maxTotal, + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -352,11 +387,13 @@ object RedisConnection{ private val chunkSizeLimit: Int, private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, private val idleTimeAllowedInPool: Duration, private val maxIdle: Int, - private val maxTotal: Int - ) { self => + private val maxTotal: Int, + + private val commandTimeout: Duration, // Command Timeout + private val redisRequestTimeout: Duration, // Redis Interaction Timeout + ) { self => private def copy( sg: SocketGroup[F] = self.sg, @@ -369,10 +406,12 @@ object RedisConnection{ chunkSizeLimit: Int = self.chunkSizeLimit, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout, + idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool, maxIdle: Int = self.maxIdle, - maxTotal: Int = self.maxTotal + maxTotal: Int = self.maxTotal, + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout, ): QueuedConnectionBuilder[F] = new QueuedConnectionBuilder( sg, host, @@ -384,10 +423,11 @@ object RedisConnection{ chunkSizeLimit, auth, useTLS, - defaultTimeout, idleTimeAllowedInPool, maxIdle, - maxTotal + maxTotal, + commandTimeout, + redisRequestTimeout ) def withHost(host: Host) = copy(host = host) @@ -405,7 +445,10 @@ object RedisConnection{ def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + @deprecated("0.5.2", "Use withCommandTimeout instaead") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration) def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle) @@ -427,9 +470,9 @@ object RedisConnection{ auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void } ) } @@ -444,10 +487,13 @@ object RedisConnection{ keypool.take(()).attempt.use{ case Right(m) => val out = chunk.map(_._2) - explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize - case Left(_) => m.canBeReused.set(Reusable.DontReuse) - case _ => Applicative[F].unit - } + explicitPipelineRequest(m.value, out, Defaults.maxBytes, redisRequestTimeout) + .attempt + .timeout(redisRequestTimeout) // Apply Timeout To Call to Redis, this is independent of the timeout on individual calls + .flatTap{// Currently Guarantee Chunk.size === returnSize + case Left(_) => m.canBeReused.set(Reusable.DontReuse) + case _ => Applicative[F].unit + } case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] }.flatMap{ case Right(n) => @@ -459,7 +505,7 @@ object RedisConnection{ } case e@Left(_) => chunk.traverse_{ case (deff, _) => deff(e.asInstanceOf[Either[Throwable, Resp]])} - }) + }) } else { Stream.empty } @@ -468,7 +514,7 @@ object RedisConnection{ .compile .drain .background - } yield new TimeoutConnection(Queued(queue, keypool.take(())), defaultTimeout) + } yield new Queued(queue, keypool.take(()), commandTimeout) } } @@ -487,10 +533,12 @@ object RedisConnection{ Defaults.clusterCacheTopologySeconds, None, Defaults.useTLS, - Defaults.requestTimeout, + Defaults.idleTimeAllowedInPool, Defaults.maxIdle, - Defaults.maxTotal + Defaults.maxTotal, + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -511,10 +559,13 @@ object RedisConnection{ private val cacheTopologySeconds: FiniteDuration, // How long topology will not be rechecked for after a succesful refresh private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, + private val idleTimeAllowedInPool: Duration, private val maxIdle: Int, - private val maxTotal: Int + private val maxTotal: Int, + + private val commandTimeout: Duration, + private val redisRequestTimeout: Duration, ) { self => private def copy( @@ -531,10 +582,11 @@ object RedisConnection{ cacheTopologySeconds: FiniteDuration = self.cacheTopologySeconds, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout, idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool, maxIdle: Int = self.maxIdle, - maxTotal: Int = self.maxTotal + maxTotal: Int = self.maxTotal, + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout, ): ClusterConnectionBuilder[F] = new ClusterConnectionBuilder( sg, host, @@ -549,10 +601,11 @@ object RedisConnection{ cacheTopologySeconds, auth, useTLS, - defaultTimeout, idleTimeAllowedInPool, maxIdle, - maxTotal + maxTotal, + commandTimeout, + redisRequestTimeout, ) def withHost(host: Host) = copy(host = host) @@ -575,7 +628,11 @@ object RedisConnection{ def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + @deprecated("0.5.2", "Use withCommandTimeout") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration) def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle) @@ -598,9 +655,9 @@ object RedisConnection{ auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void } ) } @@ -610,7 +667,7 @@ object RedisConnection{ .withMaxPerKey(Function.const(maxTotal)).build // Cluster Topology Acquisition and Management - sockets <- Resource.eval(keypool.take((host, port)).map(_.value).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))) + sockets <- Resource.eval(keypool.take((host, port)).map(_.value).map(DirectConnection(_, commandTimeout, redisRequestTimeout)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))) now <- Resource.eval(Temporal[F].realTime.map(_.toMillis)) refreshLock <- Resource.eval(Semaphore[F](1L)) refTopology <- Resource.eval(Ref[F].of((sockets, now))) @@ -629,7 +686,7 @@ object RedisConnection{ case ((_, setAt), now) if setAt >= (now - cacheTopologySeconds.toMillis) => Applicative[F].unit case ((l, _), _) => val nelActions: NonEmptyList[F[ClusterSlots]] = l.map{ case (host, port) => - keypool.take((host, port)).map(_.value).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) + keypool.take((host, port)).map(_.value).map(DirectConnection(_, commandTimeout, redisRequestTimeout)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) } raceNThrowFirst(nelActions) .flatMap(s => Clock[F].realTime.map(_.toMillis).flatMap(now => refTopology.set((s,now)))) @@ -650,8 +707,8 @@ object RedisConnection{ case (server, rest) => keypool.take(server).attempt.use{ case Right(m) => - val out = Chunk.seq(rest.map(_._5)) - explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize + val out = Chunk.from(rest.map(_._5)) + explicitPipelineRequest(m.value, out, Defaults.maxBytes, commandTimeout).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize case Left(_) => m.canBeReused.set(Reusable.DontReuse) case _ => Applicative[F].unit } @@ -667,7 +724,7 @@ object RedisConnection{ // Offer To Have it reprocessed. // If the queue is full return the error to the user cluster.queue.tryOffer(Chunk.singleton((toSet, key, extractServer(s), retries + 1, initialCommand))) - .ifM( + .ifM( Applicative[F].unit, toSet(Either.right(e)).void ) @@ -707,7 +764,7 @@ object RedisConnection{ .compile .drain .background - } yield new TimeoutConnection(cluster, defaultTimeout) + } yield cluster } } @@ -733,10 +790,8 @@ object RedisConnection{ private def raceNThrowFirst[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[A] = Stream(Stream.emits(nel.toList).evalMap(identity)).covary[F].parJoinUnbounded.take(1).compile.lastOrError - private[rediculous] case class TimeoutConnection[F[_]: Temporal](rC: RedisConnection[F], duration: Duration) extends RedisConnection[F] { - - def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = - rC.runRequest(inputs, key).timeout(duration) - + // We create this to create custom Timeouts in timeO + private implicit def deferFromMonad[F[_]: cats.Monad]: cats.Defer[F] = new cats.Defer[F] { + def defer[A](fa: => F[A]): F[A] = Monad[F].unit.flatMap(_ => fa) } } diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala index 27e40c1..33d7a0d 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala @@ -1,5 +1,8 @@ package io.chrisdavenport.rediculous +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeoutException + /** Indicates a Error while processing for Rediculous */ trait RedisError extends RuntimeException { @@ -23,4 +26,11 @@ object RedisError { override val message: String = s"Error encountered in queue: ${baseCase.getMessage()}" override val cause: Option[Throwable] = Some(baseCase) } + + // TODO + trait RedisTimeoutException + + final case class CommandTimeoutException(timeout: Duration) extends TimeoutException(s"Redis Command Timed Out: $timeout") with RedisTimeoutException + final case class RedisRequestTimeoutException(timeout: Duration) extends TimeoutException(s"Redis Request Timed Out: $timeout") with RedisTimeoutException + } \ No newline at end of file diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala index fcb8f13..aaffdd0 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala @@ -220,20 +220,20 @@ object RedisPubSub { * connections to all nodes. **/ def fromConnection[F[_]: Concurrent](connection: RedisConnection[F], maxBytes: Int = 8096, clusterBroadcast: Boolean = false): Resource[F, RedisPubSub[F]] = connection match { - case RedisConnection.TimeoutConnection(conn, _) => fromConnection(conn, maxBytes, clusterBroadcast) - case RedisConnection.Queued(_, sockets) => + // case RedisConnection.TimeoutConnection(conn, _, _, _) => fromConnection(conn, maxBytes, clusterBroadcast) + case RedisConnection.Queued(_, sockets, _) => sockets.flatMap{managed => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) val onUnhandledMessageR = Concurrent[F].ref((_: PubSubMessage) => Applicative[F].unit) - Resource.eval((messagesR, onNonMessageR, onUnhandledMessageR).tupled).flatMap{case (ref, onNonMessage, onUnhandledMessage) => + Resource.eval((messagesR, onNonMessageR, onUnhandledMessageR).tupled).flatMap{case (ref, onNonMessage, onUnhandledMessage) => Resource.makeCase(socket(connection, managed.value :: Nil, maxBytes, onNonMessage, onUnhandledMessage, ref).pure[F]){ case (_, Resource.ExitCase.Errored(_)) | (_, Resource.ExitCase.Canceled) => managed.canBeReused.set(Reusable.DontReuse) case (pubsub, Resource.ExitCase.Succeeded) => pubsub.unsubscribeAll } } } - case RedisConnection.PooledConnection(pool) => + case RedisConnection.PooledConnection(pool, _, _) => pool.take(()).flatMap{managed => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) @@ -245,7 +245,7 @@ object RedisPubSub { } } } - case RedisConnection.DirectConnection(s) => + case RedisConnection.DirectConnection(s, _, _) => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) val onUnhandledMessageR = Concurrent[F].ref((_: PubSubMessage) => Applicative[F].unit) diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala index 3e44fcd..59378d8 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala @@ -1,11 +1,9 @@ package io.chrisdavenport.rediculous.util -import cats._ import cats.syntax.all._ import fs2._ import fs2.io.net.Socket import cats.effect._ -import cats.effect.std.Queue import com.comcast.ip4s.{IpAddress, SocketAddress} private[rediculous] trait BufferedSocket[F[_]] extends Socket[F]{ @@ -30,12 +28,12 @@ private[rediculous] object BufferedSocket{ // This can return more bytes than max bytes, may want to refine this later def read(maxBytes: Int): F[Option[Chunk[Byte]]] = takeBuffer.flatMap{ - case s@Some(value) => value.some.pure[F] + case Some(value) => value.some.pure[F] case None => socket.read(maxBytes) } def readN(numBytes: Int): F[Chunk[Byte]] = takeBuffer.flatMap{ - case s@Some(value) => value.pure[F] + case Some(value) => value.pure[F] case None => socket.readN(numBytes) } @@ -53,7 +51,7 @@ private[rediculous] object BufferedSocket{ def write(bytes: Chunk[Byte]): F[Unit] = socket.write(bytes) - def writes: Pipe[F,Byte,INothing] = socket.writes + def writes: Pipe[F,Byte,Nothing] = socket.writes } diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala index 6606863..db8ac38 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -18,7 +18,7 @@ class RedisConnectionSpec extends RediculousCrossSuite { def endOfInput: IO[Unit] = ??? def endOfOutput: IO[Unit] = ??? - + def isOpen: IO[Boolean] = ??? def remoteAddress: IO[SocketAddress[IpAddress]] = ??? @@ -37,7 +37,7 @@ class RedisConnectionSpec extends RediculousCrossSuite { def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): fs2.Stream[IO,Socket[IO]] = ??? def serverResource(address: Option[Host], port: Option[Port], options: List[SocketOption]): Resource[IO,(SocketAddress[IpAddress], fs2.Stream[IO,Socket[IO]])] = ??? - + } RedisConnection.queued[IO].withSocketGroup(sg).build