From dbce7335040bbc01791d6ff2fe1fb9bd2d8cb864 Mon Sep 17 00:00:00 2001 From: Swoorup Joshi Date: Tue, 4 Jan 2022 20:55:16 -0800 Subject: [PATCH 1/5] Add Base Redis Streams Implementation --- .../rediculous/RedisCommands.scala | 111 +++++++++++++++++- .../rediculous/RedisStream.scala | 61 ++++++++++ .../rediculous/RedisCommandsSpec.scala | 26 +++- .../rediculous/RedisStreamSpec.scala | 63 ++++++++++ 4 files changed, 255 insertions(+), 6 deletions(-) create mode 100644 core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala create mode 100644 core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala index 43a6c06..f47f221 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala @@ -197,8 +197,115 @@ object RedisCommands { // TODO Scan // TODO LEX - // TODO xadd - // TODO xread + + sealed trait Trimming + object Trimming { + case object Approximate extends Trimming + case object Exact extends Trimming + implicit val arg: RedisArg[Trimming] = RedisArg[String].contramap[Trimming]{ + case Approximate => "~" + case Exact => "=" + } + } + + final case class XAddOpts( + id: Option[String], + maxLength: Option[Long], + trimming: Option[Trimming], + noMkStream: Boolean, + minId: Option[String], + limit: Option[Long] + ) + object XAddOpts { + val default = XAddOpts(None, None, None, false, None, None) + } + + def xadd[F[_]: RedisCtx](stream: String, map: List[(String, String)], xaddOpts: XAddOpts = XAddOpts.default): F[String] = { + val maxLen = xaddOpts.maxLength.toList.flatMap{ l => List("MAXLEN".some, xaddOpts.trimming.map(_.encode), l.encode.some).flattenOption } + val minId = xaddOpts.minId.toList.flatMap{ l => List("MINID".some, xaddOpts.trimming.map(_.encode), l.encode.some).flattenOption } + val limit = xaddOpts.limit.toList.flatMap(l=> if (xaddOpts.trimming.contains(Trimming.Approximate)) List("LIMIT", l.encode) else List.empty) + val noMkStream = Alternative[List].guard(xaddOpts.noMkStream).as("NOMKSTREAM") + val id = List(xaddOpts.id.getOrElse("*")) + val body = map.foldLeft(List.empty[String]){ case (s, (k,v)) => s ::: List(k.encode, v.encode) } + + RedisCtx[F].unkeyed(NEL("XADD", stream :: maxLen ::: minId ::: limit ::: noMkStream ::: id ::: body)) + } + + final case class XReadOpts( + blockMillisecond: Option[Long], + count: Option[Long], + noAck: Boolean + ) + object XReadOpts { + val default = XReadOpts(None, None, false) + } + + sealed trait StreamOffset { + def stream: String + def offset: String + } + + object StreamOffset { + case class All(stream: String) extends StreamOffset { + override def offset: String = "0" + } + case class Latest(stream: String) extends StreamOffset { + override def offset: String = "$" + } + case class From(stream: String, offset: String) extends StreamOffset + } + + case class StreamsRecord( + recordId: String, + keyValues: List[(String, String)] + ) + + object StreamsRecord { + implicit val result : RedisResult[StreamsRecord] = new RedisResult[StreamsRecord] { + def decode(resp: Resp): Either[Resp,StreamsRecord] = { + def two[A](l: List[A], acc: List[(A, A)] = List.empty): List[(A, A)] = l match { + case first :: second :: rest => two(rest, (first, second):: acc) + case otherwise => acc.reverse + } + resp match { + case Resp.Array(Some(Resp.BulkString(Some(recordId)) :: Resp.Array(Some(rawKeyValues)) :: Nil)) => + for { + keyValuesList <- rawKeyValues.traverse(RedisResult[String].decode).map(two(_)) + } yield StreamsRecord(recordId, keyValuesList) + case otherwise => Left(otherwise) + } + } + } + } + + case class XReadResponse( + stream: String, + records: List[StreamsRecord] + ) + object XReadResponse{ + implicit val result: RedisResult[XReadResponse] = new RedisResult[XReadResponse] { + def decode(resp: Resp): Either[Resp,XReadResponse] = { + resp match { + case Resp.Array(Some(Resp.BulkString(Some(stream)) :: Resp.Array(Some(list)) :: Nil)) => + list.traverse(RedisResult[StreamsRecord].decode).map(l => + XReadResponse(stream, l) + ) + case otherwise => Left(otherwise) + } + } + } + } + + def xread[F[_]: RedisCtx](streams: Set[StreamOffset], xreadOpts: XReadOpts = XReadOpts.default): F[Option[List[XReadResponse]]] = {//F[Option[List[List[(String, List[List[(String, List[(String, String)])]])]]]] = { + val block = xreadOpts.blockMillisecond.toList.flatMap(l => List("BLOCK", l.encode)) + val count = xreadOpts.count.toList.flatMap(l => List("COUNT", l.encode)) + val noAck = Alternative[List].guard(xreadOpts.noAck).as("NOACK") + val streamKeys = streams.map(_.stream.encode).toList + val streamOffsets = streams.map(_.offset.encode).toList + val streamPairs = "STREAMS" :: streamKeys ::: streamOffsets + + RedisCtx[F].unkeyed(NEL("XREAD", block ::: count ::: noAck ::: streamPairs)) + } def xgroupcreate[F[_]: RedisCtx](stream: String, groupName: String, startId: String): F[Status] = RedisCtx[F].unkeyed(NEL.of("XGROUP", "CREATE", stream, groupName, startId)) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala new file mode 100644 index 0000000..edc6be5 --- /dev/null +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala @@ -0,0 +1,61 @@ +package io.chrisdavenport.rediculous + +import cats.implicits._ +import fs2.{Stream, Pipe} +import scala.concurrent.duration.Duration +import RedisCommands.{XAddOpts, XReadOpts, StreamOffset, Trimming, xadd, xread} +import cats.effect._ + + +trait RedisStream[F[_]] { + def append(messages: List[RedisStream.XAddMessage]): F[List[String]] + + def read( + keys: Set[String], + chunkSize: Int, + initialOffset: String => StreamOffset = StreamOffset.All, + block: Duration = Duration.Zero, + count: Option[Long] = None + ): Stream[F, RedisCommands.XReadResponse] +} + +object RedisStream { + + final case class XAddMessage(stream: String, body: List[(String, String)], approxMaxlen: Option[Long] = None) + + /** + * Create a RedisStream from a connection. + * + **/ + def fromConnection[F[_]: Async](connection: RedisConnection[F]): RedisStream[F] = new RedisStream[F] { + def append(messages: List[XAddMessage]): F[List[String]] = { + messages + .traverse{ case msg => + val opts = msg.approxMaxlen.map(l => XAddOpts.default.copy(maxLength = l.some, trimming = Trimming.Approximate.some)) + xadd[RedisPipeline](msg.stream, msg.body, opts getOrElse XAddOpts.default) + } + .pipeline[F] + .run(connection) + } + + private val nextOffset: String => RedisCommands.StreamsRecord => StreamOffset = + key => msg => StreamOffset.From(key, msg.recordId) + + private val offsetsByKey: List[RedisCommands.StreamsRecord] => Map[String, Option[StreamOffset]] = + list => list.groupBy(_.recordId).map { case (k, values) => k -> values.lastOption.map(nextOffset(k)) } + + def read(keys: Set[String], chunkSize: Int, initialOffset: String => StreamOffset, block: Duration, count: Option[Long]): Stream[F, RedisCommands.XReadResponse] = { + val initial = keys.map(k => k -> initialOffset(k)).toMap + val opts = XReadOpts.default.copy(blockMillisecond = block.toMillis.some, count = count) + Stream.eval(Ref.of[F, Map[String, StreamOffset]](initial)).flatMap { ref => + (for { + offsets <- Stream.eval(ref.get) + list <- Stream.eval(xread(offsets.values.toSet, opts).run(connection)).flattenOption + newOffsets = offsetsByKey(list.flatMap(_.records)).collect { case (key, Some(value)) => key -> value }.toList + _ <- Stream.eval(newOffsets.map { case (k, v) => ref.update(_.updated(k, v)) }.sequence) + result <- Stream.emits(list) + } yield result).repeat + } + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/io/chrisdavenport/rediculous/RedisCommandsSpec.scala b/core/src/test/scala/io/chrisdavenport/rediculous/RedisCommandsSpec.scala index b0abbb8..accb75f 100644 --- a/core/src/test/scala/io/chrisdavenport/rediculous/RedisCommandsSpec.scala +++ b/core/src/test/scala/io/chrisdavenport/rediculous/RedisCommandsSpec.scala @@ -4,8 +4,8 @@ import cats.syntax.all._ import cats.effect._ import munit.CatsEffectSuite import scala.concurrent.duration._ -import io.chrisdavenport.whaletail.Docker -import io.chrisdavenport.whaletail.manager._ +import _root_.io.chrisdavenport.whaletail.Docker +import _root_.io.chrisdavenport.whaletail.manager._ import com.comcast.ip4s.Host import com.comcast.ip4s.Port @@ -40,14 +40,32 @@ class RedisCommandsSpec extends CatsEffectSuite { override def munitFixtures: Seq[Fixture[_]] = Seq( redisConnection ) - test("set a value"){ //connection => + test("set/get parity"){ //connection => redisConnection().flatMap{connection => val key = "foo" val value = "bar" - val action = RedisCommands.set[RedisIO](key, value) >> RedisCommands.get[RedisIO](key) + val action = RedisCommands.set[RedisIO](key, value) >> + RedisCommands.get[RedisIO](key) <* + RedisCommands.del[RedisIO]("foo") action.run(connection) }.map{ assertEquals(_, Some("bar")) } } + + test("xadd/xread parity"){ + redisConnection().flatMap{ connection => + val kv = "bar" -> "baz" + val action = RedisCommands.xadd[RedisIO]("foo", List(kv)) >> + RedisCommands.xread[RedisIO](Set(RedisCommands.StreamOffset.All("foo"))) <* + RedisCommands.del[RedisIO]("foo") + + val extract = (resp: Option[List[RedisCommands.XReadResponse]]) => + resp.flatMap(_.headOption).flatMap(_.records.headOption).flatMap(_.keyValues.headOption) + + action.run(connection).map{ resp => + assertEquals(extract(resp), Some(kv)) + } + } + } } diff --git a/core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala b/core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala new file mode 100644 index 0000000..be2bb69 --- /dev/null +++ b/core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala @@ -0,0 +1,63 @@ +package io.chrisdavenport.rediculous + +import cats.syntax.all._ +import cats.effect._ +import munit.CatsEffectSuite +import scala.concurrent.duration._ +import _root_.io.chrisdavenport.whaletail.Docker +import _root_.io.chrisdavenport.whaletail.manager._ +import com.comcast.ip4s.Host +import com.comcast.ip4s.Port + +class RedisStreamSpec extends CatsEffectSuite { + val resource = Docker.default[IO].flatMap(client => + WhaleTailContainer.build(client, "redis", "latest".some, Map(6379 -> None), Map.empty, Map.empty) + .evalTap( + ReadinessStrategy.checkReadiness( + client, + _, + ReadinessStrategy.LogRegex(".*Ready to accept connections.*\\s".r), + 30.seconds + ) + ) + ).flatMap(container => + for { + t <- Resource.eval( + container.ports.get(6379).liftTo[IO](new Throwable("Missing Port")) + ) + (hostS, portI) = t + host <- Resource.eval(Host.fromString(hostS).liftTo[IO](new Throwable("Invalid Host"))) + port <- Resource.eval(Port.fromInt(portI).liftTo[IO](new Throwable("Invalid Port"))) + connection <- RedisConnection.pool[IO].withHost(host).withPort(port).build + } yield connection + + ) + // Not available on scala.js + val redisConnection = UnsafeResourceSuiteLocalDeferredFixture( + "redisconnection", + resource + ) + override def munitFixtures: Seq[Fixture[_]] = Seq( + redisConnection + ) + test("send a single message"){ //connection => + val messages = List( + RedisStream.XAddMessage("foo", List("bar" -> "baz", "zoom" -> "zad")) + ) + redisConnection().flatMap{connection => + + val rStream = RedisStream.fromConnection(connection) + rStream.append(messages) >> + rStream.read(Set("foo"), 512).take(1).compile.lastOrError + + }.map{ xrr => + val i = xrr.stream + assertEquals(xrr.stream, "foo") + val i2 = xrr.records.flatMap(sr => sr.keyValues) + assertEquals(i2, messages.flatMap(_.body)) + } + } + +} + + From 46fe0a0a9723fe871d8b8da81343d4ee39e64256 Mon Sep 17 00:00:00 2001 From: Swoorup Joshi Date: Tue, 4 Jan 2022 21:01:24 -0800 Subject: [PATCH 2/5] Add RedisStream Example --- build.sbt | 2 +- examples/src/main/scala/StreamsExample.scala | 91 ++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/scala/StreamsExample.scala diff --git a/build.sbt b/build.sbt index 37873f6..db9ccb5 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType} val catsV = "2.6.1" val catsEffectV = "3.2.0" // val fs2V = "3.0.6" -val fs2V = "3.1.0" +val fs2V = "3.2.4" val munitCatsEffectV = "1.0.7" diff --git a/examples/src/main/scala/StreamsExample.scala b/examples/src/main/scala/StreamsExample.scala new file mode 100644 index 0000000..68cbe8f --- /dev/null +++ b/examples/src/main/scala/StreamsExample.scala @@ -0,0 +1,91 @@ +import io.chrisdavenport.rediculous._ +import java.util.concurrent.TimeoutException +import scala.collection.immutable.Queue +import scala.concurrent.duration._ +import scala.util.Random +import cats.effect._ +import cats.implicits._ +import fs2._ +import fs2.timeseries.{TimeStamped, TimeSeries} +import fs2.io.net._ +import com.comcast.ip4s._ + +object StreamRate { + def rate[A] = + TimeStamped.withPerSecondRate[Option[Chunk[A]], Float](_.map(chunk => chunk.size.toFloat).getOrElse(0.0f)) + + def averageRate[A] = + rate[A].andThen(Scan.stateful1(Queue.empty[Float]) { + case (q, tsv @ TimeStamped(_, Right(_))) => (q, tsv) + case (q, TimeStamped(t, Left(sample))) => + val q2 = (sample +: q).take(10) + val average = q2.sum / q2.size + (q, TimeStamped(t, Left(average))) + }) + + implicit class Logger[F[_]: Temporal, A](input: Stream[F, A]) { + def logAverageRate(logger: Float => F[Unit]): Stream[F, A] = + TimeSeries.timePulled(input.chunks, 1.second, 1.second) + .through(averageRate.toPipe) + .flatMap { + case TimeStamped(_, Left(rate)) => Stream.exec(logger(rate)) + case TimeStamped(_, Right(Some(chunk))) => Stream.chunk(chunk) + case TimeStamped(_, Right(None)) => Stream.empty + } + } +} + +object StreamProducerExample extends IOApp { + import StreamRate._ + + def putStrLn[A](a: A): IO[Unit] = IO(println(a)) + + def randomMessage: IO[List[(String, String)]] = { + val rndKey = IO(Random.nextInt(1000).toString) + val rndValue = IO(Random.nextString(10)) + (rndKey, rndValue).parMapN{ case (k, v) => List(k -> v) } + } + + def run(args: List[String]): IO[ExitCode] = { + val mystream = "mystream" + + RedisConnection.pool[IO].withHost(host"localhost").withPort(port"6379").build + .map(RedisStream.fromConnection[IO]) + .use { rs => + val consumer = rs + .read(Set(mystream), 10000) + .evalMap(putStrLn) + .onError(err => Stream.exec(IO.println(s"Consumer err: $err"))) + .logAverageRate(rate => IO.println(s"Consumer rate: $rate/s")) + + val producer = + Stream + .repeatEval(randomMessage) + .map(RedisStream.XAddMessage(mystream, _)) + .chunkMin(10000) + .flatMap{ chunk => + Stream.evalSeq(rs.append(chunk.toList)) + } + .onError(err => Stream.exec(IO.println(s"Producer err: $err"))) + .logAverageRate(rate => IO.println(s"Producer rate: $rate/s")) + + val stream = + // Stream.exec( RedisCommands.del[RedisPipeline]("mystream").pipeline[IO].run(client).void) ++ + Stream.exec(IO.println("Started")) ++ + consumer + .concurrently(producer) + .interruptAfter(7.second) + + // Stream.eval( RedisCommands.xlen[RedisPipeline]("mystream").pipeline[IO].run(client).flatMap(length => IO.println(s"Finished: $length"))) + + stream.compile.count.flatTap(l => putStrLn(s"Length: $l")) + } + .redeem( + { t => + IO.println(s"Error: $t, Something went wrong") + ExitCode(1) + }, + _ => ExitCode.Success + ) + } +} \ No newline at end of file From aced191a04ff833fc3f8bddcf4bcd51dee374332 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Jan 2022 21:08:14 -0800 Subject: [PATCH 3/5] Cross Scala Versions --- .../scala/io/chrisdavenport/rediculous/RedisCommands.scala | 6 +++--- .../scala/io/chrisdavenport/rediculous/RedisStream.scala | 2 +- examples/src/main/scala/StreamsExample.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala index f47f221..6415ab1 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala @@ -198,7 +198,7 @@ object RedisCommands { // TODO Scan // TODO LEX - sealed trait Trimming + sealed trait Trimming object Trimming { case object Approximate extends Trimming case object Exact extends Trimming @@ -255,7 +255,7 @@ object RedisCommands { case class From(stream: String, offset: String) extends StreamOffset } - case class StreamsRecord( + final case class StreamsRecord( recordId: String, keyValues: List[(String, String)] ) @@ -278,7 +278,7 @@ object RedisCommands { } } - case class XReadResponse( + final case class XReadResponse( stream: String, records: List[StreamsRecord] ) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala index edc6be5..377783e 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala @@ -13,7 +13,7 @@ trait RedisStream[F[_]] { def read( keys: Set[String], chunkSize: Int, - initialOffset: String => StreamOffset = StreamOffset.All, + initialOffset: String => StreamOffset = {(s: String) => StreamOffset.All(s)}, block: Duration = Duration.Zero, count: Option[Long] = None ): Stream[F, RedisCommands.XReadResponse] diff --git a/examples/src/main/scala/StreamsExample.scala b/examples/src/main/scala/StreamsExample.scala index 68cbe8f..869b825 100644 --- a/examples/src/main/scala/StreamsExample.scala +++ b/examples/src/main/scala/StreamsExample.scala @@ -55,7 +55,7 @@ object StreamProducerExample extends IOApp { val consumer = rs .read(Set(mystream), 10000) .evalMap(putStrLn) - .onError(err => Stream.exec(IO.println(s"Consumer err: $err"))) + .onError{ case err => Stream.exec(IO.println(s"Consumer err: $err"))} .logAverageRate(rate => IO.println(s"Consumer rate: $rate/s")) val producer = @@ -66,7 +66,7 @@ object StreamProducerExample extends IOApp { .flatMap{ chunk => Stream.evalSeq(rs.append(chunk.toList)) } - .onError(err => Stream.exec(IO.println(s"Producer err: $err"))) + .onError{ case err => Stream.exec(IO.println(s"Producer err: $err"))} .logAverageRate(rate => IO.println(s"Producer rate: $rate/s")) val stream = From d7425eed52beda9adbc96bddb43526abc802b70b Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Jan 2022 21:14:10 -0800 Subject: [PATCH 4/5] Correct Cross-Version --- build.sbt | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index db9ccb5..410bf83 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,8 @@ import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType} -val catsV = "2.6.1" -val catsEffectV = "3.2.0" -// val fs2V = "3.0.6" -val fs2V = "3.2.4" +val catsV = "2.7.0" +val catsEffectV = "3.3.3" +val fs2V = "3.2.3" val munitCatsEffectV = "1.0.7" From 8de48fea63ea577e7fb83c48ad66aeaed0237805 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 4 Jan 2022 21:31:49 -0800 Subject: [PATCH 5/5] Change Param Names --- .../scala/io/chrisdavenport/rediculous/RedisCommands.scala | 6 +++--- .../scala/io/chrisdavenport/rediculous/RedisStream.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala index 6415ab1..f0eedc8 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala @@ -220,15 +220,15 @@ object RedisCommands { val default = XAddOpts(None, None, None, false, None, None) } - def xadd[F[_]: RedisCtx](stream: String, map: List[(String, String)], xaddOpts: XAddOpts = XAddOpts.default): F[String] = { + def xadd[F[_]: RedisCtx](stream: String, body: List[(String, String)], xaddOpts: XAddOpts = XAddOpts.default): F[String] = { val maxLen = xaddOpts.maxLength.toList.flatMap{ l => List("MAXLEN".some, xaddOpts.trimming.map(_.encode), l.encode.some).flattenOption } val minId = xaddOpts.minId.toList.flatMap{ l => List("MINID".some, xaddOpts.trimming.map(_.encode), l.encode.some).flattenOption } val limit = xaddOpts.limit.toList.flatMap(l=> if (xaddOpts.trimming.contains(Trimming.Approximate)) List("LIMIT", l.encode) else List.empty) val noMkStream = Alternative[List].guard(xaddOpts.noMkStream).as("NOMKSTREAM") val id = List(xaddOpts.id.getOrElse("*")) - val body = map.foldLeft(List.empty[String]){ case (s, (k,v)) => s ::: List(k.encode, v.encode) } + val bodyEnd = body.foldLeft(List.empty[String]){ case (s, (k,v)) => s ::: List(k.encode, v.encode) } - RedisCtx[F].unkeyed(NEL("XADD", stream :: maxLen ::: minId ::: limit ::: noMkStream ::: id ::: body)) + RedisCtx[F].unkeyed(NEL("XADD", stream :: maxLen ::: minId ::: limit ::: noMkStream ::: id ::: bodyEnd)) } final case class XReadOpts( diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala index 377783e..cb2f696 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala @@ -11,7 +11,7 @@ trait RedisStream[F[_]] { def append(messages: List[RedisStream.XAddMessage]): F[List[String]] def read( - keys: Set[String], + streams: Set[String], chunkSize: Int, initialOffset: String => StreamOffset = {(s: String) => StreamOffset.All(s)}, block: Duration = Duration.Zero,