Skip to content

Commit

Permalink
Merge pull request #69 from davenverse/fixHangingQueueResponses
Browse files Browse the repository at this point in the history
Fix Hanging Queue Responses
  • Loading branch information
ChristopherDavenport authored Apr 6, 2023
2 parents f6ff01d + 9767cb3 commit 1382c22
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 15 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ jobs:
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Install brew formulae (ubuntu)
if: startsWith(matrix.os, 'ubuntu')
run: /home/linuxbrew/.linuxbrew/bin/brew install s2n utf8proc

- name: Check that workflows are up to date
run: sbt githubWorkflowCheck

Expand Down
10 changes: 10 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ ThisBuild / tlCiReleaseBranches := Seq("main")
// true by default, set to false to publish to s01.oss.sonatype.org
ThisBuild / tlSonatypeUseLegacyHost := true

ThisBuild / githubWorkflowBuildPreamble ++= nativeBrewInstallWorkflowSteps.value


val catsV = "2.9.0"
val catsEffectV = "3.4.8"
Expand Down Expand Up @@ -63,6 +65,14 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
"io.chrisdavenport" %%% "whale-tail-manager" % "0.0.8" % Test,
)
)
.nativeEnablePlugins(ScalaNativeBrewedConfigPlugin)
.platformsSettings(NativePlatform)(
libraryDependencies ++= Seq(
"com.armanbilge" %%% "epollcat" % "0.1.4" % Test
),
Test / nativeBrewFormulas ++= Set("s2n", "utf8proc"),
Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1")
)

lazy val examples = crossProject(JVMPlatform, JSPlatform)
.crossType(CrossType.Pure)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package io.chrisdavenport.rediculous

trait RediculousCrossSuite extends munit.CatsEffectSuite
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.chrisdavenport.rediculous

import epollcat.unsafe.EpollRuntime

trait RediculousCrossSuite extends munit.CatsEffectSuite {
override def munitIORuntime = EpollRuntime.global
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ object RedisConnection{
out.flatMap(socket.write) >>
Stream.eval(socket.read(maxBytes))
.repeat
.debug(c => c.map(_.toByteVector.decodeAsciiLenient).toString)
.unNoneTerminate
.unchunks
.through(fs2.interop.scodec.StreamDecoder.many(Resp.CodecUtils.codec).toPipeByte)
Expand All @@ -104,7 +103,7 @@ object RedisConnection{
chunk.head.liftTo[F](RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))

// Can Be used to implement any low level protocols.
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] =
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] =
runRequestInternal(connection)(Chunk.singleton(input), key).flatMap(head[F]).map(resp => RedisResult[A].decode(resp))

def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[ByteVector], key: Option[ByteVector]): Redis[F, A] = Redis(Kleisli{(connection: RedisConnection[F]) =>
Expand Down Expand Up @@ -409,10 +408,11 @@ object RedisConnection{
case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F]
}.flatMap{
case Right(n) =>
n.zipWithIndex.traverse_{
case (ref, i) =>
val (toSet, _) = chunk(i)
toSet(Either.right(ref))
chunk.zipWithIndex.traverse_{
case ((toSet, _), i) =>
val ref = Either.catchNonFatal(n(i))
.leftMap(_ => RedisError.Generic("Rediculous: Queued Command did not get response, this likely indicates an EOF during a read")) // TODO should this be something more specific
toSet(ref)
}
case e@Left(_) =>
chunk.traverse_{ case (deff, _) => deff(e.asInstanceOf[Either[Throwable, Resp]])}
Expand Down Expand Up @@ -585,7 +585,7 @@ object RedisConnection{
server.orElse(s.flatMap(key => topo.served(HashSlot.find(key)))).getOrElse(default) // Explicitly Set Server, Key Hashslot Server, or a default server if none selected.
}.toSeq
).evalMap{
case (server, rest) =>
case (server, rest) =>
keypool.take(server).attempt.use{
case Right(m) =>
val out = Chunk.seq(rest.map(_._5))
Expand All @@ -595,12 +595,12 @@ object RedisConnection{
}
case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F]
}.flatMap{
case Right(n) =>
n.zipWithIndex.traverse_{
case (ref, i) =>
val (toSet, key, _, retries, initialCommand) = rest(i)
case Right(n) =>
rest.zipWithIndex.traverse_{
case ((toSet, key, _, retries, initialCommand), i) =>
val ref = Either.catchNonFatal(n(i))
ref match {
case e@Resp.Error(s) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381
case Right(e@Resp.Error(s)) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381
refreshTopology.attempt.void >>
// Offer To Have it reprocessed.
// If the queue is full return the error to the user
Expand All @@ -609,15 +609,15 @@ object RedisConnection{
Applicative[F].unit,
toSet(Either.right(e)).void
)
case e@Resp.Error(s) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381
case Right(e@Resp.Error(s)) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381
val serverRedirect = extractServer(s)
serverRedirect match {
case s@Some(_) => // This is a Special One Off, Requires a Redirect
// Deferred[F, Either[Throwable, Resp]].flatMap{d => // No One Cares About this Callback
val asking = ({(_: Either[Throwable, Resp]) => Applicative[F].unit}, key, s, 6, Resp.renderRequest(NonEmptyList.of(ByteVector.encodeAscii("ASKING").fold(throw _, identity(_))))) // Never Repeat Asking
val repeat = (toSet, key, s, retries + 1, initialCommand)
val chunk = Chunk(asking, repeat)
cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed.
cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed.
//If the queue is full return the error to the user
.ifM(
Applicative[F].unit,
Expand All @@ -627,8 +627,9 @@ object RedisConnection{
case None =>
toSet(Either.right(e))
}
case otherwise =>
case Right(otherwise) =>
toSet(Either.right(otherwise))
case Left(_) => toSet(Either.left(RedisError.Generic("Rediculous: Clustered Command did not get response, this likely indicates an EOF during a read")))
}
}
case e@Left(_) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.chrisdavenport.rediculous

import cats.effect._
import fs2.{Chunk, Pipe}
import com.comcast.ip4s.{Host, Port,IpAddress, SocketAddress}
import fs2.io.net.{Socket, SocketOption, SocketGroup}

class RedisConnectionSpec extends RediculousCrossSuite {

test("Queued Connection Does Not Hang on EOF"){
val fakeSocket = new fs2.io.net.Socket[IO]{
def read(maxBytes: Int): IO[Option[Chunk[Byte]]] = IO(None)

def readN(numBytes: Int): IO[Chunk[Byte]] = ???

def reads: fs2.Stream[IO,Byte] = ???

def endOfInput: IO[Unit] = ???

def endOfOutput: IO[Unit] = ???

def isOpen: IO[Boolean] = ???

def remoteAddress: IO[SocketAddress[IpAddress]] = ???

def localAddress: IO[SocketAddress[IpAddress]] = ???

def write(bytes: Chunk[Byte]): IO[Unit] = IO.unit

def writes: Pipe[IO,Byte,Nothing] = _.chunks.evalMap(write).drain

}

val sg = new SocketGroup[IO] {
def client(to: SocketAddress[Host], options: List[SocketOption]): Resource[IO,Socket[IO]] = Resource.pure(fakeSocket)

def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): fs2.Stream[IO,Socket[IO]] = ???

def serverResource(address: Option[Host], port: Option[Port], options: List[SocketOption]): Resource[IO,(SocketAddress[IpAddress], fs2.Stream[IO,Socket[IO]])] = ???

}

RedisConnection.queued[IO].withSocketGroup(sg).build
.use(c =>
RedisCommands.ping[RedisIO].run(c)
).intercept[RedisError.QueuedExceptionError] // We catch the redis error from the empty returned chunk, previously to #69 this would hang.
}
}
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.13.0")
addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.2.0")
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.12")
addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0")

addSbtPlugin("com.armanbilge" % "sbt-scala-native-config-brew-github-actions" % "0.1.2")

0 comments on commit 1382c22

Please sign in to comment.