Skip to content

Commit

Permalink
Merge pull request #63 from davenverse/addAuth
Browse files Browse the repository at this point in the history
Add Auth On Connection Initialization
  • Loading branch information
ChristopherDavenport authored Mar 29, 2023
2 parents 8e19d7f + 782f72a commit 09d3d22
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import _root_.io.chrisdavenport.whaletail.Docker
import _root_.io.chrisdavenport.whaletail.manager._
import com.comcast.ip4s.Host
import com.comcast.ip4s.Port
import _root_.fs2.io.net.Network

class RedisStreamSpec extends CatsEffectSuite {
val resource = Docker.default[IO].flatMap(client =>
Expand All @@ -29,7 +30,7 @@ class RedisStreamSpec extends CatsEffectSuite {
(hostS, portI) = t
host <- Resource.eval(Host.fromString(hostS).liftTo[IO](new Throwable("Invalid Host")))
port <- Resource.eval(Port.fromInt(portI).liftTo[IO](new Throwable("Invalid Port")))
connection <- RedisConnection.queued[IO].withHost(host).withPort(port).build
connection <- RedisConnection.queued[IO](Async[IO], Network[IO]).withHost(host).withPort(port).build
} yield connection

)
Expand All @@ -45,9 +46,9 @@ class RedisStreamSpec extends CatsEffectSuite {
val messages = fs2.Chunk.singleton(
RedisStream.XAddMessage("foo", List("bar" -> "baz", "zoom" -> "zad"))
)
redisConnection().flatMap{connection =>
val rStream = RedisStream.fromConnection(connection)
redisConnection().flatMap{connection =>

val rStream = RedisStream.fromConnection[IO](connection)
rStream.append(messages) >>
rStream.read(Set("foo")).take(1).compile.lastOrError

Expand All @@ -64,9 +65,9 @@ class RedisStreamSpec extends CatsEffectSuite {
RedisStream.XAddMessage("fee", List("2" -> "2")),
RedisStream.XAddMessage("fee", List("3" -> "3")),
)
redisConnection().flatMap{connection =>
redisConnection().flatMap{connection =>

val rStream = RedisStream.fromConnection(connection)
val rStream = RedisStream.fromConnection[IO](connection)
rStream.append(messages) >>
rStream
.read(Set("fee"), (_ => RedisCommands.StreamOffset.From("fee", "0-0")), Duration.Zero, 1L.some)
Expand All @@ -91,7 +92,7 @@ class RedisStreamSpec extends CatsEffectSuite {
)
redisConnection().flatMap{connection =>

val rStream = RedisStream.fromConnection(connection)
val rStream = RedisStream.fromConnection[IO](connection)
rStream.append(messages) >>
rStream
.read(Set("baf", "baz", "bar"), stream => RedisCommands.StreamOffset.From(stream, "0"), Duration.Zero, 1L.some)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object RedisCommands {
def zrevrange[F[_]: RedisCtx](key: String, start: Long, stop: Long): F[List[String]] =
RedisCtx[F].keyed(key, NEL.of("ZREVRANGE", key.encode, start.encode, stop.encode))

def zrevrangewithscores[F[_]: RedisCtx](key: String, start: Long, stop: Long): F[List[(String, Double)]] =
def zrevrangewithscores[F[_]: RedisCtx](key: String, start: Long, stop: Long): F[List[(String, Double)]] =
RedisCtx[F].keyed(key, NEL.of("ZREVRANGE", key.encode, start.encode, stop.encode, "WITHSCORES"))

def zrangebyscore[F[_]: RedisCtx](key: String, min: Double, max: Double): F[List[String]] =
Expand Down Expand Up @@ -456,10 +456,18 @@ object RedisCommands {
RedisCtx[F].unkeyed(NEL("XCLAIM", stream :: consumerFragment ::: minIdleTime ::: messageIds ::: argFragment))
}

def auth[F[_]: RedisCtx](username: String, password: String): F[Status] = {
RedisCtx[F].unkeyed(NEL("AUTH", username :: password :: Nil))
}

def auth[F[_]: RedisCtx](password: String): F[Status] = {
RedisCtx[F].unkeyed(NEL("AUTH", password :: Nil))
}

def xclaimsummary[F[_]: RedisCtx](stream: String, consumer: Consumer, args: XClaimArgs, messageIds: List[String]): F[List[String]] =
xclaimraw(stream, consumer, args, true, messageIds)

def xclaimdetail[F[_]: RedisCtx](stream: String, consumer: Consumer, args: XClaimArgs, messageIds: List[String]): F[List[StreamsRecord]] =
def xclaimdetail[F[_]: RedisCtx](stream: String, consumer: Consumer, args: XClaimArgs, messageIds: List[String]): F[List[StreamsRecord]] =
xclaimraw(stream, consumer, args, false, messageIds)

final case class XAutoClaimArgs(
Expand Down
Loading

0 comments on commit 09d3d22

Please sign in to comment.