Memcontinuationed is an asynchronous memcached client for Scala. Memcontinuationed is the fastest memcached client on JVM, much faster than spymemcached or Whalin's client.
Memcontinuationed never blocks any threads. On the other hand, spymemcached does not block the IO thread but it does
block the user's thread. All Memcontinuationed API are
@suspendable
,
which mean these methods can be invoked by a thread, and return to another thread.
Memcontinuationed can merge multiply get
or gets
requests into one request.
On the other hand, spymemcached sends all requests immediately, never waiting for previous response.
The spymemcached way consumes more CPU and more TCP overheads than Memcontinuationed.
Even worse, the spymemcached way is not compatible with some memcached server.
Note: Because Memcontinuationed queues requests until all the previous response have been received,
you may need to create a connection pool of com.dongxiguo.memcontinuationed.Memcontinuationed
to maximize the IOPS.
import com.dongxiguo.memcontinuationed.Memcontinuationed
import com.dongxiguo.memcontinuationed.StorageAccessor
import java.io._
import java.net._
import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors
import scala.util.continuations.reset
import scala.util.control.Exception.Catcher
object Sample {
def main(args: Array[String]) {
val threadPool = Executors.newCachedThreadPool()
val channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool)
// The locator determines where the memcached server is.
// You may want to implement ketama hashing here.
def locator(accessor: StorageAccessor[_]) = {
new InetSocketAddress("localhost", 1978)
}
val memcontinuationed = new Memcontinuationed(channelGroup, locator)
// The error handler
implicit def catcher:Catcher[Unit] = {
case e: Exception =>
scala.Console.err.print(e)
sys.exit(-1)
}
reset {
memcontinuationed.set(MyKey("hello"), "Hello, World!")
val result = memcontinuationed.require(MyKey("hello"))
assert(result == "Hello, World!")
println(result)
sys.exit()
}
}
}
/**
* `MyKey` specifies how to serialize the data of a key/value pair.
*/
case class MyKey(override val key: String) extends StorageAccessor[String] {
override def encode(output: OutputStream, data: String, flags: Int) {
output.write(data.getBytes("UTF-8"))
}
override def decode(input: InputStream, flags: Int): String = {
val result = new Array[Byte](input.available)
input.read(result)
new String(result, "UTF-8")
}
}
There is something you need to know:
get
,set
, and most of other methods inMemcontinuationed
are@suspendable
. You must invoke them inreset
or in another@suspendable
function you defined.get
,set
, and most of other methods inMemcontinuationed
accept an implicit parameterCatcher
. You must useCatcher
to handle exceptions from@suspendable
functions, instead oftry
/catch
.MyKey
is the key you passed to server, which is a customStorageAccessor
. You should implement your ownStorageAccessor
for each type of data. If your value's format is Protocol Buffers, you can usecom.dongxiguo.memcontinuationed.ProtobufAccessor
as your custom key's super class.
Add these lines to your build.sbt
if you use Sbt:
libraryDependencies += "com.dongxiguo" %% "memcontinuationed" % "0.3.2"
libraryDependencies <++= scalaBinaryVersion { bv =>
bv match {
case "2.10" => {
Seq()
}
case _ => {
Seq("org.scala-lang.plugins" % s"scala-continuations-library_$bv" % "1.0.1")
}
}
}
libraryDependencies <+= scalaVersion { sv =>
if (sv.startsWith("2.10.")) {
compilerPlugin("org.scala-lang.plugins" % "continuations" % sv)
} else {
compilerPlugin("org.scala-lang.plugins" % s"scala-continuations-plugin_$sv" % "1.0.1")
}
}
scalacOptions += "-P:continuations:enable"
Memcontinuationed requires Scala 2.10.x or 2.11.x, and JRE 7.