Skip to content

Commit

Permalink
Merge pull request #3 from ChristopherDavenport/transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherDavenport authored Aug 11, 2020
2 parents 4b4cf40 + 14e8997 commit 35aecbd
Show file tree
Hide file tree
Showing 8 changed files with 547 additions and 354 deletions.
642 changes: 320 additions & 322 deletions core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.duration._

sealed trait RedisConnection[F[_]]
object RedisConnection{
private case class Queued[F[_]](queue: Queue[F, (Deferred[F, Either[Throwable, Resp]], Resp)]) extends RedisConnection[F]
private case class Queued[F[_]](queue: Queue[F, Chunk[(Deferred[F, Either[Throwable, Resp]], Resp)]], usePool: Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]
private case class PooledConnection[F[_]](
pool: KeyPool[F, Unit, (Socket[F], F[Unit])]
) extends RedisConnection[F]
Expand All @@ -28,7 +28,7 @@ object RedisConnection{
def getTillEqualSize(acc: List[List[Resp]], lastArr: Array[Byte]): F[List[Resp]] =
socket.read(maxBytes, timeout).flatMap{
case None =>
ApplicativeError[F, Throwable].raiseError[List[Resp]](new Throwable("Terminated Before reaching Equal size"))
ApplicativeError[F, Throwable].raiseError[List[Resp]](new Throwable("Rediculous: Terminated Before reaching Equal size"))
case Some(bytes) =>
Resp.parseAll(lastArr.toArray ++ bytes.toArray.toIterable) match {
case e@Resp.ParseError(_, _) => ApplicativeError[F, Throwable].raiseError[List[Resp]](e)
Expand All @@ -50,31 +50,29 @@ object RedisConnection{
} else Applicative[F].pure(List.empty)
}

// Can Be used to implement any low level protocols.
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[String]): F[F[Either[Resp, A]]] = {
// All Commands Appear to share this encoding.
val resp = Resp.Array(
Some(
input.toList.map(a => Resp.BulkString(Some(a)))
)
)
def withSocket(socket: Socket[F]): F[Resp] = explicitPipelineRequest[F](socket, Chunk.singleton(resp)).map(_.head)
connection match {
def runRequestInternal[F[_]: Concurrent](connection: RedisConnection[F])(
inputs: NonEmptyList[NonEmptyList[String]]): F[F[NonEmptyList[Resp]]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[NonEmptyList[Resp]] = explicitPipelineRequest[F](socket, chunk).flatMap(l => Sync[F].delay(l.toNel.getOrElse(throw new Throwable("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))))
connection match {
case PooledConnection(pool) => pool.map(_._1).take(()).use{
m => withSocket(m.value).attempt.flatTap{
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
}.rethrow.map(RedisResult[A].decode).map(_.pure[F])
case DirectConnection(socket) => withSocket(socket).map(RedisResult[A].decode).map(_.pure[F])
case Queued(queue) => Deferred[F, Either[Throwable, Resp]].flatMap{d =>
queue.enqueue1((d, resp)).as {
d.get.rethrow.map(RedisResult[A].decode)
}
}.rethrow.map(_.pure[F])
case DirectConnection(socket) => withSocket(socket).map(_.pure[F])
case Queued(queue, _) => chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map((_, resp))).flatMap{ c =>
queue.enqueue1(c).as {
c.traverse(_._1.get).flatMap(_.sequence.traverse(l => Sync[F].delay(l.toNel.getOrElse(throw new Throwable("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))))).rethrow
}
}
}
}

// Can Be used to implement any low level protocols.
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[String]): F[F[Either[Resp, A]]] =
runRequestInternal(connection)(NonEmptyList.of(input)).map(_.map(nel => RedisResult[A].decode(nel.head)))

def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[String]): Redis[F, A] = Redis(Kleisli{connection: RedisConnection[F] =>
runRequest(connection)(input).map{ fE =>
Expand All @@ -86,6 +84,13 @@ object RedisConnection{
}
})

private[rediculous] def closeReturn[F[_]: MonadError[*[_], Throwable], A](fE: F[Either[Resp, A]]): F[A] =
fE.flatMap{
case Right(a) => a.pure[F]
case Left(e@Resp.Error(_)) => ApplicativeError[F, Throwable].raiseError[A](e)
case Left(other) => ApplicativeError[F, Throwable].raiseError[A](new Throwable(s"Rediculous: Incompatible Return Type: Got $other"))
}

def single[F[_]: Concurrent: ContextShift](sg: SocketGroup, address: InetSocketAddress): Resource[F, RedisConnection[F]] =
sg.client[F](address).map(RedisConnection.DirectConnection(_))

Expand All @@ -98,14 +103,15 @@ object RedisConnection{
// Only allows 1k queued actions, before new actions block to be accepted.
def queued[F[_]: Concurrent: Timer: ContextShift](sg: SocketGroup, address: InetSocketAddress, maxQueued: Int = 1000, workers: Int = 2): Resource[F, RedisConnection[F]] =
for {
queue <- Resource.liftF(Queue.bounded[F, (Deferred[F, Either[Throwable,Resp]], Resp)](maxQueued))
queue <- Resource.liftF(Queue.bounded[F, Chunk[(Deferred[F, Either[Throwable,Resp]], Resp)]](maxQueued))
keypool <- KeyPoolBuilder[F, Unit, (Socket[F], F[Unit])](
{_ => sg.client[F](address).allocated},
{ case (_, shutdown) => shutdown}
).build
_ <-
queue.dequeue.chunks.map{chunk =>
if (chunk.nonEmpty) {
queue.dequeue.chunks.map{chunkChunk =>
val chunk = chunkChunk.flatten
val s = if (chunk.nonEmpty) {
Stream.eval(keypool.map(_._1).take(()).use{m =>
val out = chunk.map(_._2)
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
Expand All @@ -121,14 +127,14 @@ object RedisConnection{
}
case e@Left(_) =>
chunk.traverse_{ case (deff, _) => deff.complete(e.asInstanceOf[Either[Throwable, Resp]])}
}) ++ Stream.eval_(ContextShift[F].shift)
})
} else {
Stream.empty
}

s ++ Stream.eval_(ContextShift[F].shift)
}.parJoin(workers) // Worker Threads
.compile
.drain
.background
} yield Queued(queue)
} yield Queued(queue, keypool.take(()).map(_.map(_._1)))
}
17 changes: 17 additions & 0 deletions core/src/main/scala/io/chrisdavenport/rediculous/RedisCtx.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.chrisdavenport.rediculous

import cats.data.NonEmptyList
import cats.effect.Concurrent

trait RedisCtx[F[_]]{
def run[A: RedisResult](command: NonEmptyList[String]): F[A]
}

object RedisCtx {
def apply[F[_]](implicit ev: RedisCtx[F]): ev.type = ev

implicit def redis[F[_]: Concurrent]: RedisCtx[Redis[F, *]] = new RedisCtx[Redis[F, *]]{
def run[A: RedisResult](command: NonEmptyList[String]): Redis[F, A] =
RedisConnection.runRequestTotal(command)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.chrisdavenport.rediculous

import cats._
import cats.implicits._
import cats.data._
import cats.effect._
import RedisProtocol._


/**
* Transactions Operate via typeclasses. RedisCtx allows us to abstract our operations into
* different types depending on the behavior we want. In the case of transactions that is
* [[RedisTransaction]]. These can be composed together via its Applicative
* instance to form a transaction consisting of multiple commands, then transacted via
* either multiExec or transact on the class.
*
* @example
* {{{
* import io.chrisdavenport.rediculous._
* import cats.effect.Concurrent
* val tx = (
* RedisCommands.ping[RedisTransaction],
* RedisCommands.del[RedisTransaction](List("foo")),
* RedisCommands.get[RedisTransaction]("foo"),
* RedisCommands.set[RedisTransaction]("foo", "value"),
* RedisCommands.get[RedisTransaction]("foo")
* ).tupled
*
* def operation[F[_]: Concurrent] = tx.transact[F]
* }}}
**/
final case class RedisTransaction[A](value: RedisTransaction.RedisTxState[RedisTransaction.Queued[A]]){
def transact[F[_]: Concurrent]: Redis[F, RedisTransaction.TxResult[A]] =
RedisTransaction.multiExec[F](this)
}

object RedisTransaction {

implicit val ctx: RedisCtx[RedisTransaction] = new RedisCtx[RedisTransaction]{
def run[A: RedisResult](command: NonEmptyList[String]): RedisTransaction[A] = RedisTransaction(RedisTxState{for {
(i, base) <- State.get
_ <- State.set((i + 1, command :: base))
} yield Queued(l => RedisResult[A].decode(l(i)))})
}
implicit val applicative: Applicative[RedisTransaction] = new Applicative[RedisTransaction]{
def pure[A](a: A) = RedisTransaction(Monad[RedisTxState].pure(Monad[Queued].pure(a)))

override def ap[A, B](ff: RedisTransaction[A => B])(fa: RedisTransaction[A]): RedisTransaction[B] =
RedisTransaction(RedisTxState(
Nested(ff.value.value).ap(Nested(fa.value.value)).value
))
}

/**
* A TxResult Represent the state of a RedisTransaction when run.
* Success means it completed succesfully, Aborted means we received
* a Nil Arrary from Redis which represent that at least one key being watched
* has been modified. An error occurs depending on the succesful execution of
* the function built in Queued.
*/
sealed trait TxResult[+A]
object TxResult {
final case class Success[A](value: A) extends TxResult[A]
final case object Aborted extends TxResult[Nothing]
final case class Error(value: String) extends TxResult[Nothing]
}

final case class RedisTxState[A](value: State[(Int, List[NonEmptyList[String]]), A])
object RedisTxState {

implicit val m: Monad[RedisTxState] = new StackSafeMonad[RedisTxState]{
def pure[A](a: A): RedisTxState[A] = RedisTxState(Monad[State[(Int, List[NonEmptyList[String]]), *]].pure(a))
def flatMap[A, B](fa: RedisTxState[A])(f: A => RedisTxState[B]): RedisTxState[B] = RedisTxState(
fa.value.flatMap(f.andThen(_.value))
)
}
}
final case class Queued[A](f: List[Resp] => Either[Resp, A])
object Queued {
implicit val m: Monad[Queued] = new StackSafeMonad[Queued]{
def pure[A](a: A) = Queued{_ => Either.right(a)}
def flatMap[A, B](fa: Queued[A])(f: A => Queued[B]): Queued[B] = {
Queued{l =>
for {
a <- fa.f(l)
b <- f(a).f(l)
} yield b
}
}
}
}

// ----------
// Operations
// ----------
def watch[F[_]: Concurrent](keys: List[String]): Redis[F, Status] =
RedisCtx[Redis[F,*]].run(NonEmptyList("WATCH", keys))

def unwatch[F[_]: Concurrent]: Redis[F, Status] =
RedisCtx[Redis[F,*]].run(NonEmptyList.of("UNWATCH"))

def multiExec[F[_]] = new MultiExecPartiallyApplied[F]

class MultiExecPartiallyApplied[F[_]]{

def apply[A](tx: RedisTransaction[A])(implicit F: Concurrent[F]): Redis[F, TxResult[A]] = {
Redis(Kleisli{c: RedisConnection[F] =>
val ((_, commandsR), Queued(f)) = tx.value.value.run((0, List.empty)).value
val commands = commandsR.reverse
RedisConnection.runRequestInternal(c)(NonEmptyList(
NonEmptyList.of("MULTI"),
commands ++
List(NonEmptyList.of("EXEC"))
)).map{_.flatMap{_.last match {
case Resp.Array(Some(a)) => f(a).fold[TxResult[A]](e => TxResult.Error(e.toString), TxResult.Success(_)).pure[F]
case Resp.Array(None) => (TxResult.Aborted: TxResult[A]).pure[F]
case other => ApplicativeError[F, Throwable].raiseError(new Throwable(s"EXEC returned $other"))
}}}
})
}
}



}
11 changes: 11 additions & 0 deletions core/src/main/scala/io/chrisdavenport/rediculous/Resp.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.chrisdavenport.rediculous

import scala.collection.mutable
import cats.data.NonEmptyList
import cats.implicits._
import scala.util.control.NonFatal
import java.nio.charset.StandardCharsets
Expand All @@ -25,6 +26,16 @@ object Resp {

private[Resp] val CRLF = "\r\n".getBytes

def renderRequest(nel: NonEmptyList[String]): Resp = {
Resp.Array(Some(
nel.toList.map(renderArg)
))
}

def renderArg(arg: String): Resp = {
Resp.BulkString(Some(arg))
}

def encode(resp: Resp): SArray[Byte] = {
resp match {
case s@SimpleString(_) => SimpleString.encode(s)
Expand Down
8 changes: 4 additions & 4 deletions examples/src/main/scala/BasicExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ object BasicExample extends IOApp {

r.use {client =>
val r = (
RedisCommands.ping[IO],
RedisCommands.get[IO]("foo"),
RedisCommands.set[IO]("foo", "value"),
RedisCommands.get[IO]("foo")
RedisCommands.ping[Redis[IO, *]],
RedisCommands.get[Redis[IO, *]]("foo"),
RedisCommands.set[Redis[IO, *]]("foo", "value"),
RedisCommands.get[Redis[IO, *]]("foo")
).parTupled

val r2= List.fill(10)(r.run(client)).parSequence.map{_.flatMap{
Expand Down
36 changes: 36 additions & 0 deletions examples/src/main/scala/TransactionExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import io.chrisdavenport.rediculous._
import cats.implicits._
import cats.effect._
import fs2.io.tcp._
import java.net.InetSocketAddress

// Send a Single Transaction to the Redis Server
object TransactionExample extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
val r = for {
blocker <- Blocker[IO]
sg <- SocketGroup[IO](blocker)
// maxQueued: How many elements before new submissions semantically block. Tradeoff of memory to queue jobs.
// Default 1000 is good for small servers. But can easily take 100,000.
// workers: How many threads will process pipelined messages.
connection <- RedisConnection.queued[IO](sg, new InetSocketAddress("localhost", 6379), maxQueued = 10000, workers = 2)
} yield connection

r.use {client =>
val r = (
RedisCommands.ping[RedisTransaction],
RedisCommands.del[RedisTransaction](List("foo")),
RedisCommands.get[RedisTransaction]("foo"),
RedisCommands.set[RedisTransaction]("foo", "value"),
RedisCommands.get[RedisTransaction]("foo")
).tupled

val multi = r.transact[IO]

multi.run(client).flatTap(output => IO(println(output)))

}.as(ExitCode.Success)

}
}
8 changes: 4 additions & 4 deletions site/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ object BasicExample extends IOApp {

r.use {client =>
val r = (
RedisCommands.ping[IO],
RedisCommands.get[IO]("foo"),
RedisCommands.set[IO]("foo", "value"),
RedisCommands.get[IO]("foo")
RedisCommands.ping[Redis[IO, *]],
RedisCommands.get[Redis[IO, *]]("foo"),
RedisCommands.set[Redis[IO, *]]("foo", "value"),
RedisCommands.get[Redis[IO, *]]("foo")
).parTupled

val r2= List.fill(10)(r.run(client)).parSequence.map{_.flatMap{
Expand Down

0 comments on commit 35aecbd

Please sign in to comment.