From f5d8f38301edab7e95666a96f822f2a3c9ebadb1 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 24 Dec 2019 10:29:53 -0800 Subject: [PATCH 1/2] Include Caffeine Integration --- build.sbt | 14 +++++- modules/bench/README.md | 17 +++++++ .../io/chrisdavenport/mules/LookupBench.scala | 40 +++++++++++++--- .../mules/caffeine/CaffeineCache.scala | 48 +++++++++++++++++++ .../mules/caffeine/CaffeineCacheSpec.scala | 45 +++++++++++++++++ .../scala/io/chrisdavenport/mules/Cache.scala | 5 +- .../mules/DispatchOneCache.scala | 2 +- .../chrisdavenport/mules/noop/NoOpCache.scala | 3 +- 8 files changed, 159 insertions(+), 15 deletions(-) create mode 100644 modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala create mode 100644 modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala diff --git a/build.sbt b/build.sbt index 58881594..01bc3893 100644 --- a/build.sbt +++ b/build.sbt @@ -2,14 +2,14 @@ lazy val mules = project.in(file(".")) .disablePlugins(MimaPlugin) .settings(skip in publish := true) .settings(commonSettings) - .aggregate(core, reload, noop) + .aggregate(core, caffeine, reload, noop, bench) lazy val bench = project.in(file("modules/bench")) .disablePlugins(MimaPlugin) .enablePlugins(JmhPlugin) .settings(skip in publish := true) .settings(commonSettings) - .dependsOn(core) + .dependsOn(core, caffeine) lazy val core = project.in(file("modules/core")) .settings(commonSettings) @@ -17,6 +17,16 @@ lazy val core = project.in(file("modules/core")) name := "mules" ) +lazy val caffeine = project.in(file("modules/caffeine")) + .settings(commonSettings) + .dependsOn(core) + .settings( + name := "mules-caffeine", + libraryDependencies ++= Seq( + "com.github.ben-manes.caffeine" % "caffeine" % "2.8.0" + ) + ) + lazy val noop = project.in(file("modules/noop")) .settings(commonSettings) .dependsOn(core) diff --git a/modules/bench/README.md b/modules/bench/README.md index c0c11c81..4f67b506 100644 --- a/modules/bench/README.md +++ b/modules/bench/README.md @@ -4,4 +4,21 @@ sbt project bench jmh:run -i 3 -wi 3 -f1 -t15 // iterations 3, warmup iterations 3, forks 1, threads 15 +``` + + +## Numbers presently + + +``` +[info] Benchmark Mode Cnt Score Error Units +[info] LookUpBench.contentionCaffeine thrpt 10 68602.693 ± 183.322 ops/s +[info] LookUpBench.contentionConcurrentHashMap thrpt 10 26815.305 ± 47.615 ops/s +[info] LookUpBench.contentionSingleImmutableMap thrpt 10 21853.931 ± 82.138 ops/s +[info] LookUpBench.contentionReadsCaffeine thrpt 10 88898.190 ± 676.454 ops/s +[info] LookUpBench.contentionReadsConcurrentHashMap thrpt 10 28990.070 ± 161.409 ops/s +[info] LookUpBench.contentionReadsSingleImmutableMap thrpt 10 24290.804 ± 233.290 ops/s +[info] LookUpBench.contentionWritesCaffeine thrpt 10 74592.814 ± 811.518 ops/s +[info] LookUpBench.contentionWritesConcurrentHashMap thrpt 10 40196.853 ± 247.774 ops/s +[info] LookUpBench.contentionWritesSingleImmutableMap thrpt 10 28423.209 ± 215.411 ops/s ``` \ No newline at end of file diff --git a/modules/bench/src/main/scala/io/chrisdavenport/mules/LookupBench.scala b/modules/bench/src/main/scala/io/chrisdavenport/mules/LookupBench.scala index eff91528..00849efe 100644 --- a/modules/bench/src/main/scala/io/chrisdavenport/mules/LookupBench.scala +++ b/modules/bench/src/main/scala/io/chrisdavenport/mules/LookupBench.scala @@ -1,14 +1,15 @@ package io.chrisdavenport.mules -import java.util.concurrent.TimeUnit +// import java.util.concurrent.TimeUnit import org.openjdk.jmh.annotations._ import cats.implicits._ import cats.effect._ +import io.chrisdavenport.mules.caffeine.CaffeineCache -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +// @OutputTimeUnit(TimeUnit.MILLISECONDS) class LookUpBench { import LookUpBench._ @@ -20,7 +21,11 @@ class LookUpBench { def contentionConcurrentHashMap(in: BenchStateCHM) = testUnderContention(in.memoryCache, in.readList, in.writeList)(in.CS) - def testUnderContention(m: MemoryCache[IO, Int, String], r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = { + @Benchmark + def contentionCaffeine(in: BenchStateCaffeine) = + testUnderContention(in.cache, in.readList, in.writeList)(in.CS) + + def testUnderContention(m: Cache[IO, Int, String], r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = { val set = w.traverse( m.insert(_, "foo")) val read = r.traverse(m.lookup(_)) val action = (set, read).parMapN((_, _) => ()) @@ -35,7 +40,11 @@ class LookUpBench { def contentionReadsConcurrentHashMap(in: BenchStateCHM) = underContentionWaitReads(in.memoryCache, in.readList, in.writeList)(in.CS) - def underContentionWaitReads(m: MemoryCache[IO, Int, String], r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = { + @Benchmark + def contentionReadsCaffeine(in: BenchStateCaffeine) = + underContentionWaitReads(in.cache, in.readList, in.writeList)(in.CS) + + def underContentionWaitReads(m: Cache[IO, Int, String], r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = { val set = w.traverse(m.insert(_, "foo")) val read = r.traverse(m.lookup(_)) Concurrent[IO].bracket(set.start)( @@ -51,7 +60,11 @@ class LookUpBench { def contentionWritesConcurrentHashMap(in: BenchStateCHM) = underContentionWaitWrites(in.memoryCache, in.readList, in.writeList)(in.CS) - def underContentionWaitWrites(m: MemoryCache[IO, Int, String],r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = { + @Benchmark + def contentionWritesCaffeine(in: BenchStateCaffeine) = + underContentionWaitWrites(in.cache, in.readList, in.writeList)(in.CS) + + def underContentionWaitWrites(m: Cache[IO, Int, String],r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = { val set = w.traverse( m.insert(_, "foo")) val read = r.traverse(m.lookup(_)) Concurrent[IO].bracket(read.start)( @@ -91,4 +104,19 @@ object LookUpBench { } } + + @State(Scope.Benchmark) + class BenchStateCaffeine { + var cache: Cache[IO, Int, String] = _ + val writeList: List[Int] = (1 to 100).toList + val readList : List[Int] = (1 to 100).toList + implicit val T = IO.timer(scala.concurrent.ExecutionContext.global) + implicit val CS = IO.contextShift(scala.concurrent.ExecutionContext.global) + + @Setup(Level.Trial) + def setup(): Unit = { + cache = CaffeineCache.build[IO, Int, String](None, None, None).unsafeRunSync() + cache.insert(1, "yellow").unsafeRunSync() + } + } } \ No newline at end of file diff --git a/modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala b/modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala new file mode 100644 index 00000000..771d7191 --- /dev/null +++ b/modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala @@ -0,0 +1,48 @@ +package io.chrisdavenport.mules.caffeine + + +import cats.implicits._ +import io.chrisdavenport.mules.Cache +import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CCache} +import cats.effect._ +import io.chrisdavenport.mules.TimeSpec +import java.util.concurrent.TimeUnit + +private class CaffeineCache[F[_], K, V](cc: CCache[K, V])(implicit sync: Sync[F]) extends Cache[F, K, V]{ + // Members declared in io.chrisdavenport.mules.Delete + def delete(k: K): F[Unit] = sync.delay(cc.invalidate(k)) + + // Members declared in io.chrisdavenport.mules.Insert + def insert(k: K, v: V): F[Unit] = sync.delay(cc.put(k, v)) + + // Members declared in io.chrisdavenport.mules.Lookup + def lookup(k: K): F[Option[V]] = + sync.delay{ + Option(cc.getIfPresent(k)) + } +} + + +object CaffeineCache { + + /** + * insertWithTimeout does not operate as the underlying cache is fully responsible for these values + **/ + def build[F[_]: Sync, K, V]( + defaultTimeout: Option[TimeSpec], + accessTimeout: Option[TimeSpec], + maxSize: Option[Long] + ): F[Cache[F, K, V]] = { + Sync[F].delay(Caffeine.newBuilder()) + .map(b => defaultTimeout.fold(b)(ts => b.expireAfterWrite(ts.nanos, TimeUnit.NANOSECONDS))) + .map(b => accessTimeout.fold(b)(ts => b.expireAfterAccess(ts.nanos, TimeUnit.NANOSECONDS))) + .map(b => maxSize.fold(b)(b.maximumSize)) + .map(_.build[K, V]()) + .map(fromCache(_)) + } + + /** Build a Cache from a Caffeine Cache **/ + def fromCache[F[_]: Sync, K, V](cache: CCache[K, V]): Cache[F, K, V] = + new CaffeineCache[F, K, V](cache) + +} \ No newline at end of file diff --git a/modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala b/modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala new file mode 100644 index 00000000..9739f9bd --- /dev/null +++ b/modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala @@ -0,0 +1,45 @@ +package io.chrisdavenport.mules.caffeine + +import org.specs2.mutable.Specification +import scala.concurrent.duration._ +import cats.effect._ +// import cats.effect.implicits._ +import cats.effect.IO +import cats.effect.specs2.CatsIO +import io.chrisdavenport.mules.TimeSpec + +class CaffeineCacheSpec extends Specification with CatsIO { + "CaffeineCache" should { + "get a value in a quicker period than the timeout" in { + val setup = for { + cache <- CaffeineCache.build[IO, String, Int](Some(TimeSpec.unsafeFromDuration(1.second)), None, None) + _ <- cache.insert("Foo", 1) + _ <- Timer[IO].sleep(1.milli) + value <- cache.lookup("Foo") + } yield value + setup.map(_ must_=== Some(1)) + } + + + "remove a value after delete" in { + val setup = for { + cache <- CaffeineCache.build[IO, String, Int](None, None, None) + _ <- cache.insert("Foo", 1) + _ <- cache.delete("Foo") + value <- cache.lookup("Foo") + } yield value + setup.map(_ must_=== None) + } + + + "Lookup after interval fails to get a value" in { + val setup = for { + cache <- CaffeineCache.build[IO, String, Int](Some(TimeSpec.unsafeFromDuration(1.second)), None, None) + _ <- cache.insert("Foo", 1) + _ <- Timer[IO].sleep(2.second) + value <- cache.lookup("Foo") + } yield value + setup.map(_ must_=== None) + } + } +} \ No newline at end of file diff --git a/modules/core/src/main/scala/io/chrisdavenport/mules/Cache.scala b/modules/core/src/main/scala/io/chrisdavenport/mules/Cache.scala index 1685ccee..1155745c 100644 --- a/modules/core/src/main/scala/io/chrisdavenport/mules/Cache.scala +++ b/modules/core/src/main/scala/io/chrisdavenport/mules/Cache.scala @@ -10,7 +10,6 @@ trait Get[F[_], K, V]{ trait Insert[F[_], K, V]{ def insert(k: K, v: V): F[Unit] - def insertWithTimeout(optionTimeout: Option[TimeSpec])(k: K, v: V): F[Unit] } trait Delete[F[_], K]{ @@ -20,6 +19,4 @@ trait Delete[F[_], K]{ trait Cache[F[_], K, V] extends Lookup[F, K, V] with Insert[F, K, V] - with Delete[F, K] - -trait GetCache[F[_], K, V] extends Get[F, K, V] with Cache[F, K, V] \ No newline at end of file + with Delete[F, K] \ No newline at end of file diff --git a/modules/core/src/main/scala/io/chrisdavenport/mules/DispatchOneCache.scala b/modules/core/src/main/scala/io/chrisdavenport/mules/DispatchOneCache.scala index c7ab2221..228ecf4e 100644 --- a/modules/core/src/main/scala/io/chrisdavenport/mules/DispatchOneCache.scala +++ b/modules/core/src/main/scala/io/chrisdavenport/mules/DispatchOneCache.scala @@ -16,7 +16,7 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] ( private val purgeExpiredEntriesOpt : Option[Long => F[List[K]]], // Optional Performance Improvement over Default val defaultExpiration: Option[TimeSpec], private val createItem: K => F[V] -)(implicit val F: Concurrent[F], val C: Clock[F]) extends GetCache[F, K, V] { +)(implicit val F: Concurrent[F], val C: Clock[F]) extends Cache[F, K, V] with Get[F, K, V] { import DispatchOneCache.DispatchOneCacheItem import DispatchOneCache.CancelationDuringDispatchOneCacheInsertProcessing diff --git a/modules/noop/src/main/scala/io/chrisdavenport/mules/noop/NoOpCache.scala b/modules/noop/src/main/scala/io/chrisdavenport/mules/noop/NoOpCache.scala index b07bea8d..97e623bb 100644 --- a/modules/noop/src/main/scala/io/chrisdavenport/mules/noop/NoOpCache.scala +++ b/modules/noop/src/main/scala/io/chrisdavenport/mules/noop/NoOpCache.scala @@ -1,6 +1,6 @@ package io.chrisdavenport.mules.noop -import io.chrisdavenport.mules.{Cache, TimeSpec} +import io.chrisdavenport.mules.Cache import cats.Applicative private class NoOpCache[F[_], K, V](implicit F: Applicative[F]) extends Cache[F, K, V]{ @@ -10,7 +10,6 @@ private class NoOpCache[F[_], K, V](implicit F: Applicative[F]) extends Cache[F, // Members declared in io.chrisdavenport.mules.Insert def insert(k: K, v: V): F[Unit] = F.unit - def insertWithTimeout(optionTimeout: Option[TimeSpec])(k: K, v: V): F[Unit] = F.unit // Members declared in io.chrisdavenport.mules.Lookup def lookup(k: K): F[Option[V]] = noneF From 4c9f3b45711a49f9396cc04d3df3e0c39807aef0 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 24 Dec 2019 14:02:24 -0800 Subject: [PATCH 2/2] 2.12 Hacks that gets to underlying object --- .../chrisdavenport/mules/caffeine/CaffeineCache.scala | 11 ++++++----- .../mules/caffeine/CaffeineCacheSpec.scala | 2 ++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala b/modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala index 771d7191..1f47fc12 100644 --- a/modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala +++ b/modules/caffeine/src/main/scala/io/chrisdavenport/mules/caffeine/CaffeineCache.scala @@ -17,9 +17,8 @@ private class CaffeineCache[F[_], K, V](cc: CCache[K, V])(implicit sync: Sync[F] // Members declared in io.chrisdavenport.mules.Lookup def lookup(k: K): F[Option[V]] = - sync.delay{ - Option(cc.getIfPresent(k)) - } + sync.delay(Option(cc.getIfPresent(k))) + } @@ -37,12 +36,14 @@ object CaffeineCache { .map(b => defaultTimeout.fold(b)(ts => b.expireAfterWrite(ts.nanos, TimeUnit.NANOSECONDS))) .map(b => accessTimeout.fold(b)(ts => b.expireAfterAccess(ts.nanos, TimeUnit.NANOSECONDS))) .map(b => maxSize.fold(b)(b.maximumSize)) - .map(_.build[K, V]()) - .map(fromCache(_)) + .map(_.build[K with Object, V with Object]()) + .map(_.asInstanceOf[CCache[K, V]]) // 2.12 hack + .map(fromCache[F, K, V](_)) } /** Build a Cache from a Caffeine Cache **/ def fromCache[F[_]: Sync, K, V](cache: CCache[K, V]): Cache[F, K, V] = new CaffeineCache[F, K, V](cache) + } \ No newline at end of file diff --git a/modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala b/modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala index 9739f9bd..53278de6 100644 --- a/modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala +++ b/modules/caffeine/src/test/scala/io/chrisdavenport/mules/caffeine/CaffeineCacheSpec.scala @@ -41,5 +41,7 @@ class CaffeineCacheSpec extends Specification with CatsIO { } yield value setup.map(_ must_=== None) } + + } } \ No newline at end of file