Skip to content

Commit

Permalink
Merge pull request #6 from ChristopherDavenport/explicitPipelining
Browse files Browse the repository at this point in the history
Add Explicit Pipeline Support
  • Loading branch information
ChristopherDavenport authored Aug 12, 2020
2 parents 44fabee + ce7516d commit d7093d9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.chrisdavenport.rediculous

import cats._
import cats.implicits._
import cats.data._
import cats.effect._

/**
* For When you don't trust automatic pipelining.
*
* ClusterMode: Multi Key Operations Will use for the first key
* provided.
*
* [[pipeline]] method converts the Pipeline state to the Redis Monad.
**/
final case class RedisPipeline[A](value: RedisTransaction.RedisTxState[RedisTransaction.Queued[A]]){
def pipeline[F[_]: Concurrent]: Redis[F, A] = RedisPipeline.pipeline[F](this)
}

object RedisPipeline {

implicit val ctx: RedisCtx[RedisPipeline] = new RedisCtx[RedisPipeline]{
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): RedisPipeline[A] =
RedisPipeline(RedisTransaction.RedisTxState{for {
(i, base, value) <- State.get
_ <- State.set((i + 1, command :: base, value.orElse(Some(key))))
} yield RedisTransaction.Queued(l => RedisResult[A].decode(l(i)))})

def unkeyed[A: RedisResult](command: NonEmptyList[String]): RedisPipeline[A] = RedisPipeline(RedisTransaction.RedisTxState{for {
(i, base, value) <- State.get
_ <- State.set((i + 1, command :: base, value))
} yield RedisTransaction.Queued(l => RedisResult[A].decode(l(i)))})
}

implicit val applicative: Applicative[RedisPipeline] = new Applicative[RedisPipeline]{
def pure[A](a: A) = RedisPipeline(Monad[RedisTransaction.RedisTxState].pure(Monad[RedisTransaction.Queued].pure(a)))

override def ap[A, B](ff: RedisPipeline[A => B])(fa: RedisPipeline[A]): RedisPipeline[B] =
RedisPipeline(RedisTransaction.RedisTxState(
Nested(ff.value.value).ap(Nested(fa.value.value)).value
))
}

val fromTransaction = new (RedisTransaction ~> RedisPipeline){
def apply[A](fa: RedisTransaction[A]): RedisPipeline[A] = RedisPipeline(fa.value)
}

val toTransaction = new (RedisPipeline ~> RedisTransaction){
def apply[A](fa: RedisPipeline[A]): RedisTransaction[A] = RedisTransaction(fa.value)
}

def pipeline[F[_]] = new SendPipelinePartiallyApplied[F]


class SendPipelinePartiallyApplied[F[_]]{
def apply[A](tx: RedisPipeline[A])(implicit F: Concurrent[F]): Redis[F, A] = {
Redis(Kleisli{c: RedisConnection[F] =>
val ((_, commandsR, key), RedisTransaction.Queued(f)) = tx.value.value.run((0, List.empty, None)).value
val commands = commandsR.reverse.toNel
commands.traverse(nelCommands => RedisConnection.runRequestInternal(c)(nelCommands, key) // We Have to Actually Send A Command
.map{fNel => RedisConnection.closeReturn(fNel.map(a => f(a.toList)))}
).flatMap{fOpt =>
fOpt.map(_.pure[F]).getOrElse(F.raiseError(new Throwable("Rediculous: Attempted to Pipeline Empty Command")))
}
})
}
}

}
35 changes: 35 additions & 0 deletions examples/src/main/scala/PipelineExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import io.chrisdavenport.rediculous._
import cats.implicits._
import cats.effect._
import fs2.io.tcp._

// Send a Single Set of Pipelined Commands to the Redis Server
object PipelineExample extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
val r = for {
blocker <- Blocker[IO]
sg <- SocketGroup[IO](blocker)
// maxQueued: How many elements before new submissions semantically block. Tradeoff of memory to queue jobs.
// Default 10000 is good for small servers. But can easily take 100,000.
// workers: How many threads will process pipelined messages.
connection <- RedisConnection.queued[IO](sg, "localhost", 6379, maxQueued = 10000, workers = 2)
} yield connection

r.use {client =>
val r = (
RedisCommands.ping[RedisPipeline],
RedisCommands.del[RedisPipeline]("foo"),
RedisCommands.get[RedisPipeline]("foo"),
RedisCommands.set[RedisPipeline]("foo", "value"),
RedisCommands.get[RedisPipeline]("foo")
).tupled

val multi = r.pipeline[IO]

multi.run(client).flatTap(output => IO(println(output)))

}.as(ExitCode.Success)

}
}

0 comments on commit d7093d9

Please sign in to comment.