From c1483039e275714986b968a5086befa7cfe2c91f Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 10:31:27 -0700 Subject: [PATCH 01/11] Fix Hanging Queue Responses --- .../rediculous/RedisConnection.scala | 10 ++--- .../rediculous/RedisConnectionSpec.scala | 41 +++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 4d86ffa..daa1f23 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -83,7 +83,6 @@ object RedisConnection{ out.flatMap(socket.write) >> Stream.eval(socket.read(maxBytes)) .repeat - .debug(c => c.map(_.toByteVector.decodeAsciiLenient).toString) .unNoneTerminate .unchunks .through(fs2.interop.scodec.StreamDecoder.many(Resp.CodecUtils.codec).toPipeByte) @@ -409,10 +408,11 @@ object RedisConnection{ case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] }.flatMap{ case Right(n) => - n.zipWithIndex.traverse_{ - case (ref, i) => - val (toSet, _) = chunk(i) - toSet(Either.right(ref)) + chunk.zipWithIndex.traverse_{ + case ((toSet, _), i) => + val ref = Either.catchNonFatal(n(i)) + .leftMap(_ => RedisError.Generic("Rediculous: Queued Command did not get response, this likely indicates an EOF during a read")) // TODO should this be something more specific + toSet(ref) } case e@Left(_) => chunk.traverse_{ case (deff, _) => deff(e.asInstanceOf[Either[Throwable, Resp]])} diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala new file mode 100644 index 0000000..9ba9677 --- /dev/null +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -0,0 +1,41 @@ +package io.chrisdavenport.rediculous + +import munit.CatsEffectSuite +import cats.effect._ +import fs2.Chunk +import com.comcast.ip4s.{IpAddress, SocketAddress} +import fs2.Pipe +import scala.concurrent.duration._ +import java.util.concurrent.TimeoutException + +class RedisConnectionSpec extends CatsEffectSuite { + test("Test Suite"){ + val fakeSocket = new fs2.io.net.Socket[IO]{ + def read(maxBytes: Int): IO[Option[Chunk[Byte]]] = IO(None) + + def readN(numBytes: Int): IO[Chunk[Byte]] = ??? + + def reads: fs2.Stream[IO,Byte] = ??? + + def endOfInput: IO[Unit] = ??? + + def endOfOutput: IO[Unit] = ??? + + def isOpen: IO[Boolean] = ??? + + def remoteAddress: IO[SocketAddress[IpAddress]] = ??? + + def localAddress: IO[SocketAddress[IpAddress]] = ??? + + def write(bytes: Chunk[Byte]): IO[Unit] = IO.unit + + def writes: Pipe[IO,Byte,Nothing] = _.chunks.evalMap(write).drain + + } + val test = RedisConnection.explicitPipelineRequest(fakeSocket, Chunk(Resp.SimpleString("PING"))) + + test.map( + c => assertEquals(c, Chunk()) + ) + } +} \ No newline at end of file From 54ed295bbe242e07c1580f097f61b30c011cc24b Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 11:18:59 -0700 Subject: [PATCH 02/11] Add Test To Catch what happens on an EOF --- .../rediculous/RedisConnectionSpec.scala | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala index 9ba9677..e38e549 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -2,23 +2,21 @@ package io.chrisdavenport.rediculous import munit.CatsEffectSuite import cats.effect._ -import fs2.Chunk -import com.comcast.ip4s.{IpAddress, SocketAddress} -import fs2.Pipe -import scala.concurrent.duration._ -import java.util.concurrent.TimeoutException +import fs2.{Chunk, Pipe} +import com.comcast.ip4s.{Host, Port,IpAddress, SocketAddress} +import fs2.io.net.{Socket, SocketOption, SocketGroup} class RedisConnectionSpec extends CatsEffectSuite { - test("Test Suite"){ + test("Queued Connection Does Not Hang on EOF"){ val fakeSocket = new fs2.io.net.Socket[IO]{ def read(maxBytes: Int): IO[Option[Chunk[Byte]]] = IO(None) - + def readN(numBytes: Int): IO[Chunk[Byte]] = ??? - + def reads: fs2.Stream[IO,Byte] = ??? - + def endOfInput: IO[Unit] = ??? - + def endOfOutput: IO[Unit] = ??? def isOpen: IO[Boolean] = ??? @@ -28,14 +26,23 @@ class RedisConnectionSpec extends CatsEffectSuite { def localAddress: IO[SocketAddress[IpAddress]] = ??? def write(bytes: Chunk[Byte]): IO[Unit] = IO.unit - + def writes: Pipe[IO,Byte,Nothing] = _.chunks.evalMap(write).drain + + } + + val sg = new SocketGroup[IO] { + def client(to: SocketAddress[Host], options: List[SocketOption]): Resource[IO,Socket[IO]] = Resource.pure(fakeSocket) + + def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): fs2.Stream[IO,Socket[IO]] = ??? + + def serverResource(address: Option[Host], port: Option[Port], options: List[SocketOption]): Resource[IO,(SocketAddress[IpAddress], fs2.Stream[IO,Socket[IO]])] = ??? } - val test = RedisConnection.explicitPipelineRequest(fakeSocket, Chunk(Resp.SimpleString("PING"))) - test.map( - c => assertEquals(c, Chunk()) - ) + RedisConnection.queued[IO].withSocketGroup(sg).build + .use(c => + RedisCommands.ping[RedisIO].run(c) + ).intercept[RedisError.QueuedExceptionError] // We catch the redis error from the empty returned chunk } } \ No newline at end of file From 5e5189758a24ac1f82646f1d48b3d16ff434dd59 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 11:21:12 -0700 Subject: [PATCH 03/11] Better explanation of the test --- .../io/chrisdavenport/rediculous/RedisConnectionSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala index e38e549..d4664f4 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -43,6 +43,6 @@ class RedisConnectionSpec extends CatsEffectSuite { RedisConnection.queued[IO].withSocketGroup(sg).build .use(c => RedisCommands.ping[RedisIO].run(c) - ).intercept[RedisError.QueuedExceptionError] // We catch the redis error from the empty returned chunk + ).intercept[RedisError.QueuedExceptionError] // We catch the redis error from the empty returned chunk, previously to #69 this would hang. } } \ No newline at end of file From d8e447ddc12f47c692705beb17838036326cc806 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 11:30:29 -0700 Subject: [PATCH 04/11] Add epollcat for native testing with socketgroup --- build.sbt | 4 ++++ .../io/chrisdavenport/rediculous/RedisConnectionSpec.scala | 1 + 2 files changed, 5 insertions(+) diff --git a/build.sbt b/build.sbt index fdb8fb6..dfc4524 100644 --- a/build.sbt +++ b/build.sbt @@ -62,6 +62,10 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "io.chrisdavenport" %%% "whale-tail-manager" % "0.0.8" % Test, ) + ).platformsSettings(NativePlatform)( + libraryDependencies ++= Seq( + "com.armanbilge" %%% "epollcat" % "0.1.4" + ) ) lazy val examples = crossProject(JVMPlatform, JSPlatform) diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala index d4664f4..e8f465f 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -7,6 +7,7 @@ import com.comcast.ip4s.{Host, Port,IpAddress, SocketAddress} import fs2.io.net.{Socket, SocketOption, SocketGroup} class RedisConnectionSpec extends CatsEffectSuite { + test("Queued Connection Does Not Hang on EOF"){ val fakeSocket = new fs2.io.net.Socket[IO]{ def read(maxBytes: Int): IO[Option[Chunk[Byte]]] = IO(None) From 73d2f91f07585b96f045effc7e476a7c33c6f580 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 11:34:39 -0700 Subject: [PATCH 05/11] Update build.sbt Co-authored-by: Arman Bilge --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index dfc4524..fac543c 100644 --- a/build.sbt +++ b/build.sbt @@ -64,7 +64,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) ) ).platformsSettings(NativePlatform)( libraryDependencies ++= Seq( - "com.armanbilge" %%% "epollcat" % "0.1.4" + "com.armanbilge" %%% "epollcat" % "0.1.4" % Test ) ) From 8ebd7085523f89113ebc869377539f1adc1c5bcb Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 11:41:21 -0700 Subject: [PATCH 06/11] RediculousCrossSuite for runtime switch --- .../chrisdavenport/rediculous/RediculousCrossSuite.scala | 3 +++ .../chrisdavenport/rediculous/RediculousCrosssuite.scala | 7 +++++++ .../io/chrisdavenport/rediculous/RedisConnectionSpec.scala | 3 +-- 3 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 core/js-jvm/src/test/scala/io/chrisdavenport/rediculous/RediculousCrossSuite.scala create mode 100644 core/native/src/test/scala/io/chrisdavenport/rediculous/RediculousCrosssuite.scala diff --git a/core/js-jvm/src/test/scala/io/chrisdavenport/rediculous/RediculousCrossSuite.scala b/core/js-jvm/src/test/scala/io/chrisdavenport/rediculous/RediculousCrossSuite.scala new file mode 100644 index 0000000..1055a49 --- /dev/null +++ b/core/js-jvm/src/test/scala/io/chrisdavenport/rediculous/RediculousCrossSuite.scala @@ -0,0 +1,3 @@ +package io.chrisdavenport.rediculous + +trait RediculousCrossSuite extends munit.CatsEffectSuite \ No newline at end of file diff --git a/core/native/src/test/scala/io/chrisdavenport/rediculous/RediculousCrosssuite.scala b/core/native/src/test/scala/io/chrisdavenport/rediculous/RediculousCrosssuite.scala new file mode 100644 index 0000000..9511e6c --- /dev/null +++ b/core/native/src/test/scala/io/chrisdavenport/rediculous/RediculousCrosssuite.scala @@ -0,0 +1,7 @@ +package io.chrisdavenport.rediculous + +import epollcat.unsafe.EpollRuntime + +trait RediculousCrossSuite extends munit.CatsEffectSuite { + override def munitIORuntime = EpollRuntime.global +} \ No newline at end of file diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala index e8f465f..6606863 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -1,12 +1,11 @@ package io.chrisdavenport.rediculous -import munit.CatsEffectSuite import cats.effect._ import fs2.{Chunk, Pipe} import com.comcast.ip4s.{Host, Port,IpAddress, SocketAddress} import fs2.io.net.{Socket, SocketOption, SocketGroup} -class RedisConnectionSpec extends CatsEffectSuite { +class RedisConnectionSpec extends RediculousCrossSuite { test("Queued Connection Does Not Hang on EOF"){ val fakeSocket = new fs2.io.net.Socket[IO]{ From 8e22191d21f44a25d9b234974761a4bf6ec04499 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 11:50:16 -0700 Subject: [PATCH 07/11] Setup Brew to Link Native --- build.sbt | 8 ++++++-- project/plugins.sbt | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index fac543c..e5f6a51 100644 --- a/build.sbt +++ b/build.sbt @@ -62,10 +62,14 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "io.chrisdavenport" %%% "whale-tail-manager" % "0.0.8" % Test, ) - ).platformsSettings(NativePlatform)( + ) + .nativeEnablePlugins(ScalaNativeBrewedConfigPlugin) + .platformsSettings(NativePlatform)( libraryDependencies ++= Seq( "com.armanbilge" %%% "epollcat" % "0.1.4" % Test - ) + ), + Test / nativeBrewFormulas ++= Set("s2n", "utf8proc"), + Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1") ) lazy val examples = crossProject(JVMPlatform, JSPlatform) diff --git a/project/plugins.sbt b/project/plugins.sbt index 7fd15bd..8b7ee9c 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,3 +5,5 @@ addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.13.0") addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.2.0") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.12") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0") + +addSbtPlugin("com.armanbilge" % "sbt-scala-native-config-brew" % "0.1.2") From 1da622fd208ce1065c820791538799e5aa23998e Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 12:05:27 -0700 Subject: [PATCH 08/11] Update project/plugins.sbt Co-authored-by: Arman Bilge --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 8b7ee9c..4920265 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,4 +6,4 @@ addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.2.0") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.12") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0") -addSbtPlugin("com.armanbilge" % "sbt-scala-native-config-brew" % "0.1.2") +addSbtPlugin("com.armanbilge" % "sbt-scala-native-config-brew-github-actions" % "0.1.2") From 5a6f7ac67d3fd0fcb23be2d8f00242be79468697 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 12:31:11 -0700 Subject: [PATCH 09/11] Add workflow steps --- .github/workflows/ci.yml | 4 ++++ build.sbt | 2 ++ 2 files changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86bbef7..9fd4e66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,6 +66,10 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} + - name: Install brew formulae (ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: /home/linuxbrew/.linuxbrew/bin/brew install s2n utf8proc + - name: Check that workflows are up to date run: sbt githubWorkflowCheck diff --git a/build.sbt b/build.sbt index e5f6a51..ff4839a 100644 --- a/build.sbt +++ b/build.sbt @@ -13,6 +13,8 @@ ThisBuild / tlCiReleaseBranches := Seq("main") // true by default, set to false to publish to s01.oss.sonatype.org ThisBuild / tlSonatypeUseLegacyHost := true +ThisBuild / githubWorkflowBuildPreamble ++= nativeBrewInstallWorkflowSteps.value + val catsV = "2.9.0" val catsEffectV = "3.4.8" From e92c19e2003afd6e41c818a0d9fd98fb0f71fa4e Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 13:24:51 -0700 Subject: [PATCH 10/11] Same fix for Clustered --- .../rediculous/RedisConnection.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index daa1f23..f56b431 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -103,7 +103,7 @@ object RedisConnection{ chunk.head.liftTo[F](RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input")) // Can Be used to implement any low level protocols. - def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] = + def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] = runRequestInternal(connection)(Chunk.singleton(input), key).flatMap(head[F]).map(resp => RedisResult[A].decode(resp)) def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[ByteVector], key: Option[ByteVector]): Redis[F, A] = Redis(Kleisli{(connection: RedisConnection[F]) => @@ -585,22 +585,22 @@ object RedisConnection{ server.orElse(s.flatMap(key => topo.served(HashSlot.find(key)))).getOrElse(default) // Explicitly Set Server, Key Hashslot Server, or a default server if none selected. }.toSeq ).evalMap{ - case (server, rest) => + case (server, rest) => keypool.take(server).attempt.use{ case Right(m) => val out = Chunk.seq(rest.map(_._5)) - explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize + explicitPipelineRequest(m.value, out).map(c => (c, rest)).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize case Left(_) => m.canBeReused.set(Reusable.DontReuse) case _ => Applicative[F].unit } - case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] + case l@Left(_) => l.rightCast[(Chunk[Resp], List[((Either[Throwable, Resp]) => F[Unit], Option[ByteVector], Option[(Host,Port)], Int, Resp)])].pure[F] }.flatMap{ - case Right(n) => - n.zipWithIndex.traverse_{ - case (ref, i) => - val (toSet, key, _, retries, initialCommand) = rest(i) + case Right((n, thisChunk)) => + thisChunk.zipWithIndex.traverse_{ + case ((toSet, key, _, retries, initialCommand), i) => + val ref = Either.catchNonFatal(n(i)) ref match { - case e@Resp.Error(s) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381 + case Right(e@Resp.Error(s)) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381 refreshTopology.attempt.void >> // Offer To Have it reprocessed. // If the queue is full return the error to the user @@ -609,7 +609,7 @@ object RedisConnection{ Applicative[F].unit, toSet(Either.right(e)).void ) - case e@Resp.Error(s) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381 + case Right(e@Resp.Error(s)) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381 val serverRedirect = extractServer(s) serverRedirect match { case s@Some(_) => // This is a Special One Off, Requires a Redirect @@ -617,7 +617,7 @@ object RedisConnection{ val asking = ({(_: Either[Throwable, Resp]) => Applicative[F].unit}, key, s, 6, Resp.renderRequest(NonEmptyList.of(ByteVector.encodeAscii("ASKING").fold(throw _, identity(_))))) // Never Repeat Asking val repeat = (toSet, key, s, retries + 1, initialCommand) val chunk = Chunk(asking, repeat) - cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed. + cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed. //If the queue is full return the error to the user .ifM( Applicative[F].unit, @@ -627,8 +627,9 @@ object RedisConnection{ case None => toSet(Either.right(e)) } - case otherwise => + case Right(otherwise) => toSet(Either.right(otherwise)) + case Left(_) => toSet(Either.left(RedisError.Generic("Rediculous: Clustered Command did not get response, this likely indicates an EOF during a read"))) } } case e@Left(_) => From 9767cb3a94a95a4ed0e21565593f6aac912762c3 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Apr 2023 13:32:43 -0700 Subject: [PATCH 11/11] Use rest which was still in lexical scope --- .../io/chrisdavenport/rediculous/RedisConnection.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index f56b431..0de57eb 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -589,14 +589,14 @@ object RedisConnection{ keypool.take(server).attempt.use{ case Right(m) => val out = Chunk.seq(rest.map(_._5)) - explicitPipelineRequest(m.value, out).map(c => (c, rest)).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize + explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize case Left(_) => m.canBeReused.set(Reusable.DontReuse) case _ => Applicative[F].unit } - case l@Left(_) => l.rightCast[(Chunk[Resp], List[((Either[Throwable, Resp]) => F[Unit], Option[ByteVector], Option[(Host,Port)], Int, Resp)])].pure[F] + case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] }.flatMap{ - case Right((n, thisChunk)) => - thisChunk.zipWithIndex.traverse_{ + case Right(n) => + rest.zipWithIndex.traverse_{ case ((toSet, key, _, retries, initialCommand), i) => val ref = Either.catchNonFatal(n(i)) ref match {