Skip to content

Commit

Permalink
fix match error in RedisPubSub.fromConnection by unwrapping `Timeou…
Browse files Browse the repository at this point in the history
…tConnection`
  • Loading branch information
buntec committed Nov 22, 2023
1 parent 6e89288 commit 1a4916f
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,10 @@ object RedisConnection{
private def raceNThrowFirst[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[A] =
Stream(Stream.emits(nel.toList).evalMap(identity)).covary[F].parJoinUnbounded.take(1).compile.lastOrError

private class TimeoutConnection[F[_]: Temporal](rC: RedisConnection[F], duration: Duration) extends RedisConnection[F] {
private[rediculous] case class TimeoutConnection[F[_]: Temporal](rC: RedisConnection[F], duration: Duration) extends RedisConnection[F] {

def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] =
rC.runRequest(inputs, key).timeout(duration)

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ object RedisPubSub {
* connections to all nodes.
**/
def fromConnection[F[_]: Concurrent](connection: RedisConnection[F], maxBytes: Int = 8096, clusterBroadcast: Boolean = false): Resource[F, RedisPubSub[F]] = connection match {
case RedisConnection.TimeoutConnection(conn, _) => fromConnection(conn, maxBytes, clusterBroadcast)
case RedisConnection.Queued(_, sockets) =>
sockets.flatMap{managed =>
val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]())
Expand Down Expand Up @@ -284,4 +285,4 @@ object RedisPubSub {
Array(Some(List(BulkString(Some(message)), BulkString(Some(foo)), BulkString(Some(hi there!)))))
Array(Some(List(BulkString(Some(punsubscribe)), BulkString(Some(foos)), Integer(1))))
*/
}
}

0 comments on commit 1a4916f

Please sign in to comment.