Skip to content

Commit

Permalink
Same fix for Clustered
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherDavenport committed Apr 4, 2023
1 parent 5a6f7ac commit e92c19e
Showing 1 changed file with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object RedisConnection{
chunk.head.liftTo[F](RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))

// Can Be used to implement any low level protocols.
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] =
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] =
runRequestInternal(connection)(Chunk.singleton(input), key).flatMap(head[F]).map(resp => RedisResult[A].decode(resp))

def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[ByteVector], key: Option[ByteVector]): Redis[F, A] = Redis(Kleisli{(connection: RedisConnection[F]) =>
Expand Down Expand Up @@ -585,22 +585,22 @@ object RedisConnection{
server.orElse(s.flatMap(key => topo.served(HashSlot.find(key)))).getOrElse(default) // Explicitly Set Server, Key Hashslot Server, or a default server if none selected.
}.toSeq
).evalMap{
case (server, rest) =>
case (server, rest) =>
keypool.take(server).attempt.use{
case Right(m) =>
val out = Chunk.seq(rest.map(_._5))
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
explicitPipelineRequest(m.value, out).map(c => (c, rest)).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F]
case l@Left(_) => l.rightCast[(Chunk[Resp], List[((Either[Throwable, Resp]) => F[Unit], Option[ByteVector], Option[(Host,Port)], Int, Resp)])].pure[F]
}.flatMap{
case Right(n) =>
n.zipWithIndex.traverse_{
case (ref, i) =>
val (toSet, key, _, retries, initialCommand) = rest(i)
case Right((n, thisChunk)) =>
thisChunk.zipWithIndex.traverse_{
case ((toSet, key, _, retries, initialCommand), i) =>
val ref = Either.catchNonFatal(n(i))
ref match {
case e@Resp.Error(s) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381
case Right(e@Resp.Error(s)) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381
refreshTopology.attempt.void >>
// Offer To Have it reprocessed.
// If the queue is full return the error to the user
Expand All @@ -609,15 +609,15 @@ object RedisConnection{
Applicative[F].unit,
toSet(Either.right(e)).void
)
case e@Resp.Error(s) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381
case Right(e@Resp.Error(s)) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381
val serverRedirect = extractServer(s)
serverRedirect match {
case s@Some(_) => // This is a Special One Off, Requires a Redirect
// Deferred[F, Either[Throwable, Resp]].flatMap{d => // No One Cares About this Callback
val asking = ({(_: Either[Throwable, Resp]) => Applicative[F].unit}, key, s, 6, Resp.renderRequest(NonEmptyList.of(ByteVector.encodeAscii("ASKING").fold(throw _, identity(_))))) // Never Repeat Asking
val repeat = (toSet, key, s, retries + 1, initialCommand)
val chunk = Chunk(asking, repeat)
cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed.
cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed.
//If the queue is full return the error to the user
.ifM(
Applicative[F].unit,
Expand All @@ -627,8 +627,9 @@ object RedisConnection{
case None =>
toSet(Either.right(e))
}
case otherwise =>
case Right(otherwise) =>
toSet(Either.right(otherwise))
case Left(_) => toSet(Either.left(RedisError.Generic("Rediculous: Clustered Command did not get response, this likely indicates an EOF during a read")))
}
}
case e@Left(_) =>
Expand Down

0 comments on commit e92c19e

Please sign in to comment.