diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86bbef7..9fd4e66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,6 +66,10 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} + - name: Install brew formulae (ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: /home/linuxbrew/.linuxbrew/bin/brew install s2n utf8proc + - name: Check that workflows are up to date run: sbt githubWorkflowCheck diff --git a/build.sbt b/build.sbt index fdb8fb6..ff4839a 100644 --- a/build.sbt +++ b/build.sbt @@ -13,6 +13,8 @@ ThisBuild / tlCiReleaseBranches := Seq("main") // true by default, set to false to publish to s01.oss.sonatype.org ThisBuild / tlSonatypeUseLegacyHost := true +ThisBuild / githubWorkflowBuildPreamble ++= nativeBrewInstallWorkflowSteps.value + val catsV = "2.9.0" val catsEffectV = "3.4.8" @@ -63,6 +65,14 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "io.chrisdavenport" %%% "whale-tail-manager" % "0.0.8" % Test, ) ) + .nativeEnablePlugins(ScalaNativeBrewedConfigPlugin) + .platformsSettings(NativePlatform)( + libraryDependencies ++= Seq( + "com.armanbilge" %%% "epollcat" % "0.1.4" % Test + ), + Test / nativeBrewFormulas ++= Set("s2n", "utf8proc"), + Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1") + ) lazy val examples = crossProject(JVMPlatform, JSPlatform) .crossType(CrossType.Pure) diff --git a/core/js-jvm/src/test/scala/io/chrisdavenport/rediculous/RediculousCrossSuite.scala b/core/js-jvm/src/test/scala/io/chrisdavenport/rediculous/RediculousCrossSuite.scala new file mode 100644 index 0000000..1055a49 --- /dev/null +++ b/core/js-jvm/src/test/scala/io/chrisdavenport/rediculous/RediculousCrossSuite.scala @@ -0,0 +1,3 @@ +package io.chrisdavenport.rediculous + +trait RediculousCrossSuite extends munit.CatsEffectSuite \ No newline at end of file diff --git a/core/native/src/test/scala/io/chrisdavenport/rediculous/RediculousCrosssuite.scala b/core/native/src/test/scala/io/chrisdavenport/rediculous/RediculousCrosssuite.scala new file mode 100644 index 0000000..9511e6c --- /dev/null +++ b/core/native/src/test/scala/io/chrisdavenport/rediculous/RediculousCrosssuite.scala @@ -0,0 +1,7 @@ +package io.chrisdavenport.rediculous + +import epollcat.unsafe.EpollRuntime + +trait RediculousCrossSuite extends munit.CatsEffectSuite { + override def munitIORuntime = EpollRuntime.global +} \ No newline at end of file diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 4d86ffa..0de57eb 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -83,7 +83,6 @@ object RedisConnection{ out.flatMap(socket.write) >> Stream.eval(socket.read(maxBytes)) .repeat - .debug(c => c.map(_.toByteVector.decodeAsciiLenient).toString) .unNoneTerminate .unchunks .through(fs2.interop.scodec.StreamDecoder.many(Resp.CodecUtils.codec).toPipeByte) @@ -104,7 +103,7 @@ object RedisConnection{ chunk.head.liftTo[F](RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input")) // Can Be used to implement any low level protocols. - def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] = + def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] = runRequestInternal(connection)(Chunk.singleton(input), key).flatMap(head[F]).map(resp => RedisResult[A].decode(resp)) def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[ByteVector], key: Option[ByteVector]): Redis[F, A] = Redis(Kleisli{(connection: RedisConnection[F]) => @@ -409,10 +408,11 @@ object RedisConnection{ case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] }.flatMap{ case Right(n) => - n.zipWithIndex.traverse_{ - case (ref, i) => - val (toSet, _) = chunk(i) - toSet(Either.right(ref)) + chunk.zipWithIndex.traverse_{ + case ((toSet, _), i) => + val ref = Either.catchNonFatal(n(i)) + .leftMap(_ => RedisError.Generic("Rediculous: Queued Command did not get response, this likely indicates an EOF during a read")) // TODO should this be something more specific + toSet(ref) } case e@Left(_) => chunk.traverse_{ case (deff, _) => deff(e.asInstanceOf[Either[Throwable, Resp]])} @@ -585,7 +585,7 @@ object RedisConnection{ server.orElse(s.flatMap(key => topo.served(HashSlot.find(key)))).getOrElse(default) // Explicitly Set Server, Key Hashslot Server, or a default server if none selected. }.toSeq ).evalMap{ - case (server, rest) => + case (server, rest) => keypool.take(server).attempt.use{ case Right(m) => val out = Chunk.seq(rest.map(_._5)) @@ -595,12 +595,12 @@ object RedisConnection{ } case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] }.flatMap{ - case Right(n) => - n.zipWithIndex.traverse_{ - case (ref, i) => - val (toSet, key, _, retries, initialCommand) = rest(i) + case Right(n) => + rest.zipWithIndex.traverse_{ + case ((toSet, key, _, retries, initialCommand), i) => + val ref = Either.catchNonFatal(n(i)) ref match { - case e@Resp.Error(s) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381 + case Right(e@Resp.Error(s)) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381 refreshTopology.attempt.void >> // Offer To Have it reprocessed. // If the queue is full return the error to the user @@ -609,7 +609,7 @@ object RedisConnection{ Applicative[F].unit, toSet(Either.right(e)).void ) - case e@Resp.Error(s) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381 + case Right(e@Resp.Error(s)) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381 val serverRedirect = extractServer(s) serverRedirect match { case s@Some(_) => // This is a Special One Off, Requires a Redirect @@ -617,7 +617,7 @@ object RedisConnection{ val asking = ({(_: Either[Throwable, Resp]) => Applicative[F].unit}, key, s, 6, Resp.renderRequest(NonEmptyList.of(ByteVector.encodeAscii("ASKING").fold(throw _, identity(_))))) // Never Repeat Asking val repeat = (toSet, key, s, retries + 1, initialCommand) val chunk = Chunk(asking, repeat) - cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed. + cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed. //If the queue is full return the error to the user .ifM( Applicative[F].unit, @@ -627,8 +627,9 @@ object RedisConnection{ case None => toSet(Either.right(e)) } - case otherwise => + case Right(otherwise) => toSet(Either.right(otherwise)) + case Left(_) => toSet(Either.left(RedisError.Generic("Rediculous: Clustered Command did not get response, this likely indicates an EOF during a read"))) } } case e@Left(_) => diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala new file mode 100644 index 0000000..6606863 --- /dev/null +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -0,0 +1,48 @@ +package io.chrisdavenport.rediculous + +import cats.effect._ +import fs2.{Chunk, Pipe} +import com.comcast.ip4s.{Host, Port,IpAddress, SocketAddress} +import fs2.io.net.{Socket, SocketOption, SocketGroup} + +class RedisConnectionSpec extends RediculousCrossSuite { + + test("Queued Connection Does Not Hang on EOF"){ + val fakeSocket = new fs2.io.net.Socket[IO]{ + def read(maxBytes: Int): IO[Option[Chunk[Byte]]] = IO(None) + + def readN(numBytes: Int): IO[Chunk[Byte]] = ??? + + def reads: fs2.Stream[IO,Byte] = ??? + + def endOfInput: IO[Unit] = ??? + + def endOfOutput: IO[Unit] = ??? + + def isOpen: IO[Boolean] = ??? + + def remoteAddress: IO[SocketAddress[IpAddress]] = ??? + + def localAddress: IO[SocketAddress[IpAddress]] = ??? + + def write(bytes: Chunk[Byte]): IO[Unit] = IO.unit + + def writes: Pipe[IO,Byte,Nothing] = _.chunks.evalMap(write).drain + + } + + val sg = new SocketGroup[IO] { + def client(to: SocketAddress[Host], options: List[SocketOption]): Resource[IO,Socket[IO]] = Resource.pure(fakeSocket) + + def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): fs2.Stream[IO,Socket[IO]] = ??? + + def serverResource(address: Option[Host], port: Option[Port], options: List[SocketOption]): Resource[IO,(SocketAddress[IpAddress], fs2.Stream[IO,Socket[IO]])] = ??? + + } + + RedisConnection.queued[IO].withSocketGroup(sg).build + .use(c => + RedisCommands.ping[RedisIO].run(c) + ).intercept[RedisError.QueuedExceptionError] // We catch the redis error from the empty returned chunk, previously to #69 this would hang. + } +} \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 7fd15bd..4920265 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,3 +5,5 @@ addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.13.0") addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.2.0") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.12") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0") + +addSbtPlugin("com.armanbilge" % "sbt-scala-native-config-brew-github-actions" % "0.1.2")