diff --git a/README.md b/README.md index 5799bb5..a2cddb1 100644 --- a/README.md +++ b/README.md @@ -4,18 +4,12 @@ [![ustats Scala version support](https://index.scala-lang.org/jodersky/ustats/ustats/latest.svg)](https://index.scala-lang.org/jodersky/ustats/ustats) [![stability: soft](https://img.shields.io/badge/stability-soft-white)](https://www.crashbox.io/stability.html) -A simple and intuitive metrics collection library for Prometheus. - -## Highlights - -- ease of use: *does what you want it to do without any setup ceremony* - -- efficiency: *light memory footprint and extremely fast* +A simple and intuitive metrics collection library. ## Getting Started -ustats is available from maven central (for Scala 2.13 and Dotty). Add its -coordinates to your build config: +μstats is available from maven central for Scala 3.1 and above, for the JVM and +Scala Native. Add its coordinates to your build config: - mill: `ivy"io.crashbox::ustats:"` - sbt: `"io.crashbox" %% "ustats" % ""` @@ -28,10 +22,10 @@ where `` is Basic example, create a counter using the default collector: ```scala -val myCounter = ustats.counter("my_counter", "This is just a simple counter.") +val myCounter = ustats.global.counter("my_counter", "This is just a simple counter.") myCounter += 1 -println(ustats.metrics()) +println(ustats.global.metrics()) // # HELP my_counter This is just a simple counter. // # TYPE my_counter counter @@ -41,11 +35,11 @@ println(ustats.metrics()) You can also add label-value pairs to individual metrics: ```scala -val myGauge = ustats.gauge("my_gauge", labels = Seq("label1" -> "foo", "label2" -> 42)) +val myGauge = ustats.global.gauge("my_gauge", labels = Seq("label1" -> "foo", "label2" -> 42)) myGauge += 1 myGauge += 2 -println(ustats.metrics()) +println(ustats.global.metrics()) // # TYPE my_counter gauge // my_gauge{label1="foo", label2="42"} 3.0 ``` @@ -54,27 +48,27 @@ However, you'd usually want to declare one metric sharing a common basename, and add labels on demand: ```scala -val queueSizes = ustats.gauges("queue_size", labels = Seq("queue_name")) +val queueSizes = ustats.global.gauges("queue_size").labelled("queue_name") -queueSizes.labelled("queue1") += 10 -queueSizes.labelled("queue1") -= 1 -queueSizes.labelled("queue2") += 2 +queueSizes(queue_name = "queue1") += 10 +queueSizes(queue_name = "queue1") -= 1 +queueSizes(queue_name = "queue2") += 2 -println(ustats.metrics()) +println(ustats.global.metrics()) // # TYPE queue_size gauge // queue_size{queue_name="queue1"} 9.0 // queue_size{queue_name="queue2"} 2.0 ``` -Use your own collector: +User-defined grouping of metrics: ```scala -val collector = new ustats.Stats() +val mymetrics = ustats.Metrics() -val currentUsers = collector.gauge("my_app_current_users") +val currentUsers = mymetrics.gauge("my_app_current_users") currentUsers += 10 currentUsers -= 1 -println(collector.metrics()) +println(mymetrics.metrics()) ``` ## Probing @@ -85,23 +79,15 @@ modifying it. ustats has a builtin "probe" mechanism to run batch jobs repeatedly at customizable intervals. ```scala -val counter1 = ustats.counter("counter1") -val gauge1 = ustats.gauge("gauge1") +val counter1 = ustats.global.counter("counter1") +val gauge1 = ustats.global.gauge("gauge1") // run this action every 10 seconds -ustats.probe("query_database", 10){ +ustats.global.probe("query_database", 10){ // query database counter1 += 1 gauge1.set(42) } - -// also works with async code -ustats.probe.async("query_database", 10) { implicit ec => - val f: Future[_] = // something that returns a Future[_] - f.map{ _ => - counter1 += 1 - } -} ``` Note that failures of probes themselves are recorded and exposed as a metric. @@ -117,11 +103,11 @@ over HTTP, under the standard `/metrics` endpoint. The server module is based on ```scala // global server for global stats -ustats.server.start("localhost", 10000) +ustats.server.global.start("localhost", 10000) // custom server for custom stats -val stats = new ustats.Stats() -val server = new ustats.MetricsServer(stats) +val metrics = ustats.Metrics() +val server = ustats.server.MetricsServer(metrics) server.start("localhost", 10000) ``` diff --git a/benchmark/src/Counter.scala b/benchmark/src/Counter.scala index 5527a08..fd57b29 100644 --- a/benchmark/src/Counter.scala +++ b/benchmark/src/Counter.scala @@ -6,7 +6,7 @@ import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUn @State(Scope.Benchmark) class state { - val collector = new Stats() + val collector = new Metrics() val counter = collector.counter("counter") } diff --git a/build.sc b/build.sc index ada4a9b..34c75a8 100644 --- a/build.sc +++ b/build.sc @@ -1,10 +1,9 @@ import $file.jmh import jmh.Jmh -import mill._, scalalib._, scalafmt._, publish._ +import mill._, scalalib._, scalafmt._, publish._, scalanativelib._ -val scala213 = "2.13.7" -val scala3 = "3.0.2" -val dottyCustomVersion = Option(sys.props("dottyVersion")) +val scala3 = "3.1.2" +val scalaNative = "0.4.5" trait Publish extends PublishModule { def publishVersion = "0.5.0" @@ -21,76 +20,67 @@ trait Publish extends PublishModule { } trait Utest extends ScalaModule with TestModule { - def ivyDeps = Agg(ivy"com.lihaoyi::utest:0.7.10") + def ivyDeps = Agg(ivy"com.lihaoyi::utest::0.7.11") def testFramework = "utest.runner.Framework" } -class UstatsModule(val crossScalaVersion: String) - extends CrossScalaModule +trait UstatsModule + extends ScalaModule with ScalafmtModule with Publish { + def scalaVersion = scala3 def artifactName = "ustats" - object test extends Tests with Utest - // FIXME: scaladoc 3 is not supported by mill yet. Remove the override - // once it is. - override def docJar = - if (crossScalaVersion.startsWith("2")) super.docJar - else T { - val outDir = T.ctx().dest - val javadocDir = outDir / 'javadoc - os.makeDir.all(javadocDir) - mill.api.Result.Success(mill.modules.Jvm.createJar(Agg(javadocDir))(outDir)) - } + def ivyDeps = Agg( + ivy"com.lihaoyi::geny::1.0.0" + ) } -object ustats extends Cross[UstatsModule]((Seq(scala213, scala3) ++ dottyCustomVersion): _*) { +object ustats extends Module { + + object jvm extends UstatsModule { + override def millSourcePath = super.millSourcePath / os.up + def sources = T.sources(super.sources() ++ Seq(PathRef(millSourcePath / "src-jvm"))) + object test extends Tests with Utest + } + object native extends UstatsModule with ScalaNativeModule { + def scalaNativeVersion = scalaNative + override def millSourcePath = super.millSourcePath / os.up + def sources = T.sources(super.sources() ++ Seq(PathRef(millSourcePath / "src-native"))) + object test extends Tests with Utest + } - class UstatsServerModule(val crossScalaVersion: String) - extends CrossScalaModule - with ScalafmtModule - with Publish { - def artifactName = "ustats-server" - def moduleDeps = Seq(ustats(crossScalaVersion)) + object server extends ScalaModule with Publish { + def scalaVersion = scala3 + def moduleDeps = Seq(ustats.jvm) def ivyDeps = Agg( - ivy"io.undertow:undertow-core:2.2.3.Final" + ivy"io.undertow:undertow-core:2.3.0.Final" ) object test extends Tests with Utest - // FIXME: scaladoc 3 is not supported by mill yet. Remove the override - // once it is. - override def docJar = - if (crossScalaVersion.startsWith("2")) super.docJar - else T { - val outDir = T.ctx().dest - val javadocDir = outDir / 'javadoc - os.makeDir.all(javadocDir) - mill.api.Result.Success(mill.modules.Jvm.createJar(Agg(javadocDir))(outDir)) - } } - object server extends Cross[UstatsServerModule]((Seq(scala213, scala3) ++ dottyCustomVersion): _*) } object benchmark extends ScalaModule with Jmh { - def scalaVersion = scala213 - def moduleDeps = Seq(ustats(scala213)) + def scalaVersion = scala3 + def moduleDeps = Seq(ustats.jvm) } object examples extends Module { trait Example extends ScalaModule { - def scalaVersion = scala213 - def moduleDeps: Seq[ScalaModule] = Seq(ustats(scala213)) + def scalaVersion = scala3 + def moduleDeps: Seq[ScalaModule] = Seq(ustats.jvm) } object cask extends Example { def ivyDeps = Agg( - ivy"com.lihaoyi::cask:0.7.9" + ivy"com.lihaoyi::cask:0.8.1" ) } object cask2 extends Example { def ivyDeps = cask.ivyDeps } object probe extends Example { - def moduleDeps = Seq(ustats(scala213), ustats.server(scala213)) + def moduleDeps = Seq(ustats.jvm, ustats.server) } } diff --git a/examples/cask/src/Main.scala b/examples/cask/src/Main.scala index fbe4017..8c2a88f 100644 --- a/examples/cask/src/Main.scala +++ b/examples/cask/src/Main.scala @@ -1,7 +1,7 @@ object Main extends cask.MainRoutes { - val httpRequestsSeconds = ustats.histogram("http_requests_seconds", labels = Seq("path" -> "/index")) - val randomFailures = ustats.counter("random_failures") + val httpRequestsSeconds = ustats.global.histogram("http_requests_seconds", labels = Seq("path" -> "/index")) + val randomFailures = ustats.global.counter("random_failures") @cask.get("/") def index() = httpRequestsSeconds.time { @@ -12,7 +12,7 @@ object Main extends cask.MainRoutes { } @cask.get("/metrics") - def metrics() = ustats.metrics() + def metrics() = ustats.global.metrics() initialize() diff --git a/examples/cask2/src/Main.scala b/examples/cask2/src/Main.scala index 0c6269e..4779144 100644 --- a/examples/cask2/src/Main.scala +++ b/examples/cask2/src/Main.scala @@ -7,7 +7,7 @@ import io.undertow.server.HttpServerExchange trait Timed extends cask.Main { lazy val histograms = { - val hs = ustats.histograms("http_requests_seconds", labels = Seq("method", "path")) + val hs = ustats.global.histograms("http_requests_seconds").labelled("method", "path") // prepopulate histogram with all known routes for { @@ -16,7 +16,7 @@ trait Timed extends cask.Main { x: cask.router.EndpointMetadata[_] ) m <- route.endpoint.methods - } hs.labelled(m, route.endpoint.path) + } hs(m, route.endpoint.path) hs } @@ -27,15 +27,17 @@ trait Timed extends cask.Main { val timedHandler = new HttpHandler { def handleRequest(exchange: HttpServerExchange): Unit = { val effectiveMethod = exchange.getRequestMethod.toString.toLowerCase() - val endpoint = routeTries(effectiveMethod).lookup(cask.internal.Util.splitPath(exchange.getRequestPath).toList, Map()) - - endpoint match { - case None => parent.handleRequest(exchange) - case Some(((_,metadata), _, _)) => - histograms.labelled(effectiveMethod, metadata.endpoint.path).time( - parent.handleRequest(exchange) - ) - } + val endpoint = dispatchTrie.lookup(cask.internal.Util.splitPath(exchange.getRequestPath).toList, Map()) + endpoint match + case None => + parent.handleRequest(exchange) + case Some((methodMap, routeBindings, remaining)) => + methodMap.get(effectiveMethod) match + case None => parent.handleRequest(exchange) + case Some((routes, metadata)) => + histograms(method = effectiveMethod, path = metadata.endpoint.path).time( + parent.handleRequest(exchange) + ) } } @@ -62,7 +64,7 @@ object Main extends cask.MainRoutes with Timed { } @cask.get("/metrics") - def metrics() = ustats.metrics() + def metrics() = ustats.global.metrics() initialize() diff --git a/examples/probe/src/Main.scala b/examples/probe/src/Main.scala index da7e323..52133ca 100644 --- a/examples/probe/src/Main.scala +++ b/examples/probe/src/Main.scala @@ -1,16 +1,16 @@ -object Main extends App { - +@main +def main() = var database = collection.mutable.ListBuffer.empty[String] - val itemsTotal = ustats.gauge("items_total") + val itemsTotal = ustats.global.gauge("items_total") - ustats.probe("query_database", 10){ - val l = database.length + ustats.global.probe("query_database", 10){ + val l = synchronized {database.length} if (l > 5) sys.error("random failure") itemsTotal.set(l) } - ustats.server.start("localhost", 8081) + ustats.server.global.start("localhost", 8081) println("Go to http://localhost:8081/metrics to see current metrics.") println("Ctrl+D to exit") @@ -25,4 +25,3 @@ object Main extends App { } println("bye") -} diff --git a/mill b/mill index 4754fd0..4fd673d 100755 --- a/mill +++ b/mill @@ -3,7 +3,7 @@ # This is a wrapper script, that automatically download mill from GitHub release pages # You can give the required mill version with MILL_VERSION env variable # If no version is given, it falls back to the value of DEFAULT_MILL_VERSION -DEFAULT_MILL_VERSION=0.9.7 +DEFAULT_MILL_VERSION=0.10.8 set -e diff --git a/ustats/server/src/ustats/Server.scala b/ustats/server/src/ustats/Server.scala deleted file mode 100644 index 93833ba..0000000 --- a/ustats/server/src/ustats/Server.scala +++ /dev/null @@ -1,81 +0,0 @@ -package ustats - -import io.undertow.Undertow -import io.undertow.server.handlers.BlockingHandler -import io.undertow.server.HttpServerExchange -import io.undertow.server.HttpHandler -import io.undertow.util.HttpString -import io.undertow.util.Methods - -/** A convenience HTTP server to export metrics at the standard /metrics - * endpoint - * @param stats the statistics instance to export, or None for exporting the - * default instance - */ -class MetricsServer private[ustats] (stats: Option[Stats]) { - - val handler = new HttpHandler { - override def handleRequest(exchange: HttpServerExchange): Unit = { - if (exchange.getRequestMethod() != Methods.GET) { - exchange.setStatusCode(405) - exchange.getOutputStream().close() - return - } - - if (exchange.getRequestPath() != "/metrics") { - exchange.setStatusCode(404) - exchange.getOutputStream().close() - return - } - - exchange.setStatusCode(200) - exchange - .getResponseHeaders() - .put(new HttpString("content-type"), "text/plain") - stats match { - case Some(s) => s.writeMetricsTo(exchange.getOutputStream()) - case None => ustats.writeMetricsTo(exchange.getOutputStream()) - } - exchange.getOutputStream().close() - } - } - - def start( - host: String = "[::]", - port: Int = 10000, - verbose: Boolean = false - ) = { - if (!verbose) MetricsServer.silenceJboss() - val server = Undertow.builder - .addHttpListener(port, host) - .setHandler(new BlockingHandler(handler)) - .setWorkerOption( - org.xnio.Options.THREAD_DAEMON.asInstanceOf[org.xnio.Option[Any]], - true - ) - .build() - server.start() - server - } -} - -object MetricsServer { - def apply(stats: Stats) = new MetricsServer(Some(stats)) - - private def silenceJboss(): Unit = { - // Some jboss classes litter logs from their static initializers. This is a - // workaround to stop this rather annoying behavior. - val tmp = System.err - System.setErr(null) - org.jboss.threads.Version.getVersionString() // this causes the static initializer to be run - System.setErr(tmp) - - // Other loggers print way too much information. Set them to only print - // interesting stuff. - val level = java.util.logging.Level.WARNING - java.util.logging.Logger.getLogger("org.jboss").setLevel(level) - java.util.logging.Logger.getLogger("org.xnio").setLevel(level) - java.util.logging.Logger.getLogger("io.undertow").setLevel(level) - } - -} diff --git a/ustats/server/src/ustats/package.scala b/ustats/server/src/ustats/package.scala deleted file mode 100644 index 01adc7e..0000000 --- a/ustats/server/src/ustats/package.scala +++ /dev/null @@ -1,4 +0,0 @@ -package ustats - -/** Default global stats HTTP server. */ -object server extends MetricsServer(None) diff --git a/ustats/server/src/ustats/server/MetricsServer.scala b/ustats/server/src/ustats/server/MetricsServer.scala new file mode 100644 index 0000000..05ad32c --- /dev/null +++ b/ustats/server/src/ustats/server/MetricsServer.scala @@ -0,0 +1,64 @@ +package ustats.server + +import io.undertow.Undertow +import io.undertow.server.handlers.BlockingHandler +import io.undertow.server.HttpServerExchange +import io.undertow.server.HttpHandler +import io.undertow.util.HttpString +import io.undertow.util.Methods + +class MetricsServer(metrics: ustats.Metrics*): + val handler = new HttpHandler: + override def handleRequest(req: HttpServerExchange): Unit = + if req.getRequestMethod() != Methods.GET then + req.setStatusCode(405) + req.getOutputStream().close() + return + if req.getRequestPath() != "/metrics" then + req.setStatusCode(404) + req.getOutputStream().close() + return + + req.setStatusCode(200) + req.getResponseHeaders().put(HttpString("content-type"), "text/plain") + for m <- metrics do + m.writeBytesTo(req.getOutputStream()) + req.getOutputStream().close() + end handler + + def start( + host: String = "[::]", + port: Int = 10000, + verbose: Boolean = false + ): Undertow = + if !verbose then MetricsServer.silenceJboss() + val server = Undertow.builder() + .addHttpListener(port, host) + .setHandler(new BlockingHandler(handler)) + .setWorkerOption( + org.xnio.Options.THREAD_DAEMON.asInstanceOf[org.xnio.Option[Any]], + true + ) + .build() + server.start() + server + +object MetricsServer { + + private def silenceJboss(): Unit = { + // Some jboss classes litter logs from their static initializers. This is a + // workaround to stop this rather annoying behavior. + val tmp = System.err + System.setErr(null) + org.jboss.threads.Version.getVersionString() // this causes the static initializer to be run + System.setErr(tmp) + + // Other loggers print way too much information. Set them to only print + // interesting stuff. + val level = java.util.logging.Level.WARNING + java.util.logging.Logger.getLogger("org.jboss").setLevel(level) + java.util.logging.Logger.getLogger("org.xnio").setLevel(level) + java.util.logging.Logger.getLogger("io.undertow").setLevel(level) + } + +} diff --git a/ustats/server/src/ustats/server/package.scala b/ustats/server/src/ustats/server/package.scala new file mode 100644 index 0000000..a387d99 --- /dev/null +++ b/ustats/server/src/ustats/server/package.scala @@ -0,0 +1,4 @@ +package ustats.server + +/** Default global stats HTTP server. */ +object global extends MetricsServer(ustats.global) diff --git a/ustats/server/test/src/ServerTest.scala b/ustats/server/test/src/ServerTest.scala index 7d7a9b7..0bc998f 100644 --- a/ustats/server/test/src/ServerTest.scala +++ b/ustats/server/test/src/ServerTest.scala @@ -34,8 +34,8 @@ object ServerTest extends TestSuite { } - val stats = new ustats.Stats - val server = ustats.MetricsServer(stats) + val stats = new ustats.Metrics + val server = ustats.server.MetricsServer(stats) val http = server.start("localhost", 10000) val tests = Tests { test("invalid path") { diff --git a/ustats/src-jvm/ustats/package.scala b/ustats/src-jvm/ustats/package.scala new file mode 100644 index 0000000..e3b8702 --- /dev/null +++ b/ustats/src-jvm/ustats/package.scala @@ -0,0 +1,4 @@ +package ustats + +/** Default global stats collector. */ +object global extends Metrics with Probing diff --git a/ustats/src-jvm/ustats/probe.scala b/ustats/src-jvm/ustats/probe.scala new file mode 100644 index 0000000..b6e388a --- /dev/null +++ b/ustats/src-jvm/ustats/probe.scala @@ -0,0 +1,84 @@ +package ustats + +import java.util.{concurrent => juc} + +class ProbeFailedException(cause: Exception) extends Exception(cause) + +object Probing { + def makeThreadPool(n: Int) = { + val counter = new juc.atomic.AtomicInteger(0) + juc.Executors.newScheduledThreadPool( + n, + new juc.ThreadFactory { + override def newThread(r: Runnable) = { + val t = new Thread(r) + t.setDaemon(true) + t.setName(s"ustats-probe-${counter.incrementAndGet()}") + t + } + } + ) + } +} + +trait Probing { this: Metrics => + + lazy val probePool: juc.ScheduledExecutorService = Probing.makeThreadPool( + math.min(Runtime.getRuntime().availableProcessors(), 4) + ) + + // override this if you want to deactivate probe failure reporting + lazy val probeFailureCounter: Option[MetricsGroup[Counter]] = Some( + this.counters("ustats_probe_failures_count").labelled("probe") + ) + + + /** Run a given probing action regularly. + * + * Although an action can contain arbitrary code, it is intended to be used + * to measure something and set various ustats metrics (counters, gauges, + * histograms, etc) in its body. + * + * All actions are run in a dedicated thread pool. + */ + def probe( + name: String, + rateInSeconds: Long, + pool: juc.ScheduledExecutorService = probePool + )(action: => Any): juc.ScheduledFuture[_] = { + pool.scheduleAtFixedRate( + () => + try { + action + } catch { + case ex: Exception => + probeFailureCounter.foreach { counters => counters.labelled(name).inc() } + (new ProbeFailedException(ex)).printStackTrace() + }, + 0L, + rateInSeconds, + juc.TimeUnit.SECONDS + ) + } + + /** Async wrapper for apply(). */ + def probeAsync( + name: String, + rateInSeconds: Long, + pool: juc.ScheduledExecutorService = probePool + )( + action: scala.concurrent.ExecutionContext => scala.concurrent.Future[_] + ): juc.ScheduledFuture[_] = { + val ec = scala.concurrent.ExecutionContext.fromExecutorService(pool) + probe(name, rateInSeconds, pool) { + scala.concurrent.Await.result( + action(ec), + scala.concurrent.duration.FiniteDuration( + rateInSeconds, + juc.TimeUnit.SECONDS + ) + ) + } + } + +} diff --git a/ustats/src-jvm/ustats/types.scala b/ustats/src-jvm/ustats/types.scala new file mode 100644 index 0000000..b0e4f3b --- /dev/null +++ b/ustats/src-jvm/ustats/types.scala @@ -0,0 +1,7 @@ +package ustats + +object types: + + final type DoubleAdder = java.util.concurrent.atomic.DoubleAdder + final type ConcurrentHashMap[K, V] = java.util.concurrent.ConcurrentHashMap[K, V] + final type ConcurrentLinkedDeque[A] = java.util.concurrent.ConcurrentLinkedDeque[A] diff --git a/ustats/src-native/ustats/package.scala b/ustats/src-native/ustats/package.scala new file mode 100644 index 0000000..06a3486 --- /dev/null +++ b/ustats/src-native/ustats/package.scala @@ -0,0 +1,9 @@ +package ustats + +/** Default global stats collector. */ +object global extends Metrics + +object test: + @main + def foo = + global.counter("test").inc() diff --git a/ustats/src-native/ustats/types.scala b/ustats/src-native/ustats/types.scala new file mode 100644 index 0000000..05075e1 --- /dev/null +++ b/ustats/src-native/ustats/types.scala @@ -0,0 +1,28 @@ +package ustats + +object types: + + final type DoubleAdder = NativeDoubleAdder + final type ConcurrentHashMap[K, V] = NativeConcurrentHashMap[K, V] + final type ConcurrentLinkedDeque[A] = NativeConcurrentLinkedDeque[A] + + class NativeDoubleAdder: + private var value: Double = 0 + def add(v: Double) = value += v + def reset() = value = 0 + def sum(): Double = value + + class NativeConcurrentHashMap[K, V]: + private val data = collection.mutable.Map.empty[K, V] + + def computeIfAbsent(k: K, f: K => V): V = + data.getOrElseUpdate(k, f(k)) + + def get(k: K): V | Null = data.get(k) match + case None => null + case Some(v) => v + + class NativeConcurrentLinkedDeque[A]: + private val queue = collection.mutable.Queue.empty[A] + def iterator() = queue.iterator + def add(a: A) = queue.append(a) diff --git a/ustats/src/ustats/Counter.scala b/ustats/src/ustats/Counter.scala index 4f019a9..89694e5 100644 --- a/ustats/src/ustats/Counter.scala +++ b/ustats/src/ustats/Counter.scala @@ -1,6 +1,6 @@ package ustats -import java.util.concurrent.atomic.DoubleAdder +import types.DoubleAdder /** A counter may only ever be increased (or reset when the application restarts). */ class Counter(private val adder: DoubleAdder) extends AnyVal { diff --git a/ustats/src/ustats/Gauge.scala b/ustats/src/ustats/Gauge.scala index 88670bf..db616f8 100644 --- a/ustats/src/ustats/Gauge.scala +++ b/ustats/src/ustats/Gauge.scala @@ -1,6 +1,6 @@ package ustats -import java.util.concurrent.atomic.DoubleAdder +import types.DoubleAdder /** Gauges may be increased and decreased. */ class Gauge(private val adder: DoubleAdder) extends AnyVal { diff --git a/ustats/src/ustats/Histogram.scala b/ustats/src/ustats/Histogram.scala index 6539cec..448072c 100644 --- a/ustats/src/ustats/Histogram.scala +++ b/ustats/src/ustats/Histogram.scala @@ -1,6 +1,6 @@ package ustats -import java.util.concurrent.atomic.DoubleAdder +import types.DoubleAdder import scala.concurrent.ExecutionContext import scala.concurrent.Future diff --git a/ustats/src/ustats/Metrics.scala b/ustats/src/ustats/Metrics.scala index 4444a45..a6b7b85 100644 --- a/ustats/src/ustats/Metrics.scala +++ b/ustats/src/ustats/Metrics.scala @@ -1,23 +1,273 @@ package ustats -import java.util.concurrent.ConcurrentHashMap +import java.io.OutputStream +import java.io.ByteArrayOutputStream +import types.DoubleAdder +import types.ConcurrentLinkedDeque -class Metrics[A](size: Int, mkNew: Seq[Any] => A) { - private val all = new ConcurrentHashMap[Seq[Any], A] +/** A memory-efficient, concurrent-friendly metrics collection interface. + * + * @param BlockSize Metrics are stored in a linked list of arrays which are + * allocated on demand. The size of the arrays in each block + * is controlled by this parameter. It does not need to be + * changed for regular use, but can be tuned to a larger + * value, should you have many metrics. + */ +class Metrics(BlockSize: Int = 128) extends geny.Writable { - /** Return a metric only if it has already been added. + // For fast access and a light memory footprint, all data is stored as chunks + // of names and DoubleAdder arrays. DoubleAdders (as opposed to AtomicDoubles) + // are used to minimize contention when several threads update the same metric + // concurrently. + private class Block { + @volatile var next: Block = null + val metrics = new Array[String](BlockSize) // name + val data = Array.fill[DoubleAdder](BlockSize)(new DoubleAdder) + @volatile var i = 0 // next index to fill + } + private var curr = new Block + private var head = curr + + private val infos = new ConcurrentLinkedDeque[String] + + /** Write metrics as they are right now to the given output stream. * - * Compared to `labelled()`, this method does not create a new metric if it - * does not already exist. This is useful in situations where metrics can - * be pre-populated, and can help avoid accidental cardinality explosion. + * Concurrency note: this method does not block updating or adding new + * metrics while it is called. */ - def existing(values: Any*): Option[A] = Option(all.get(values)) + def writeMetricsTo(out: OutputStream, includeInfo: Boolean = true): Unit = { + if (includeInfo) { + val items = infos.iterator() + while (items.hasNext) { + out.write(items.next.getBytes("utf-8")) + } + } + var block = head + while (block != null) { + var i = 0 + while (i < block.i) { + out.write(block.metrics(i).getBytes("utf-8")) + out.write(32) // space + out.write(block.data(i).sum().toString().getBytes("utf-8")) + out.write(10) // newline + i += 1 + } + block = block.next + } + out.flush() + } + + override def httpContentType: Option[String] = Some("text/plain; charset=utf-8") + override def writeBytesTo(out: OutputStream): Unit = writeMetricsTo(out, includeInfo = true) + + /** Show metrics as they are right now. */ + def metrics(includeInfo: Boolean = true): String = { + val out = new ByteArrayOutputStream + writeMetricsTo(out, includeInfo) + out.close() + out.toString("utf-8") + } + + /** Add an informational comment, to be exported when metrics are read. */ + def addInfo( + name: String, + help: String = null, + tpe: String = null, + comment: String = null + ): String = { + def escape(line: String) = + line + .replace("""\""", """\\""") + .replace("\n", """\n""") + + val commentBuilder = new StringBuilder + if (comment != null) { + commentBuilder ++= "# " + commentBuilder ++= escape(comment) + commentBuilder ++= "\n" + } + if (help != null) { + commentBuilder ++= "# HELP " + commentBuilder ++= name + commentBuilder ++= " " + commentBuilder ++= escape(help) + commentBuilder ++= "\n" + } + if (tpe != null) { + commentBuilder ++= "# TYPE " + commentBuilder ++= name + commentBuilder ++= " " + commentBuilder ++= escape(tpe) + commentBuilder ++= "\n" + } - /** Create or find an existing metric with the given label values. */ - def labelled(values: Any*): A = { - require(values.size == size, "label size mismatch") - all.computeIfAbsent(values, values => mkNew(values)) + val info = commentBuilder.result() + infos.add(info) + info } - def apply(values: Any*): A = labelled(values: _*) + /** Add a metric manually. + * + * This is a low-level escape hatch that should be used only when no other + * functions apply. + */ + def addRawMetric( + name: String, + labels: Seq[(String, Any)] + ): DoubleAdder = synchronized { + if (curr.i >= BlockSize) { + val b = new Block + curr.next = b + curr = b + addRawMetric(name, labels) + } else { + val i = curr.i + curr.metrics(i) = util.labelify(name, labels) + curr.i += 1 + curr.data(i) + } + } + + private def addRawCounter( + name: String, + labels: Seq[(String, Any)] = Nil + ): Counter = + new Counter(addRawMetric(name, labels)) + + private def addRawGauge( + name: String, + labels: Seq[(String, Any)] = Nil + ): Gauge = + new Gauge(addRawMetric(name, labels)) + + protected def addRawHistogram( + name: String, + buckets: Seq[Double] = BucketDistribution.httpRequestDuration, + labels: Seq[(String, Any)] = Nil + ): Histogram = { + val buckets0: Seq[Double] = buckets.sorted + require( + labels.forall(_._1 != "le"), + "histograms may not contain the label \"le\"" + ) + require(buckets0.size >= 1, "histograms must have at least one bucket") + require(buckets0.forall(!_.isNaN()), "histograms may not have NaN buckets") + + val buckets1 = if (buckets0.last == Double.PositiveInfinity) { + Array.from(buckets0) + } else { + Array.from(buckets0 ++ Seq(Double.PositiveInfinity)) + } + + val adders = buckets1.zipWithIndex.map { + case (bucket, idx) => + val label = if (bucket.isPosInfinity) { + "+Inf" + } else { + bucket + } + + addRawMetric(name + "_bucket", labels ++ Seq("le" -> label)) + } + + new Histogram( + buckets1, + adders, + addRawMetric(name + "_count", labels), + addRawMetric(name + "_sum", labels) + ) + } + + /** Add a single counter with the given name and label-value pairs. */ + def counter( + name: String, + help: String = null, + labels: Seq[(String, Any)] = Nil + ): Counter = { + addInfo(name, help, "counter") + addRawCounter(name, labels) + } + + /** Add a single gauge with the given name and label-value pairs. */ + def gauge( + name: String, + help: String = null, + labels: Seq[(String, Any)] = Nil + ): Gauge = { + addInfo(name, help, "gauge") + addRawGauge(name, labels) + } + + /** Add a single histogram with the given name and label-value pairs. */ + def histogram( + name: String, + help: String = null, + buckets: Seq[Double] = BucketDistribution.httpRequestDuration, + labels: Seq[(String, Any)] = Nil + ): Histogram = { + addInfo(name, help, "histogram") + addRawHistogram(name, buckets, labels) + } + + /** Add a group of counters with the given name. */ + class counters( + name: String, + help: String = "" + ): + transparent inline final def labelled( + inline labels: String* + ): MetricsGroup[Counter] = + addInfo(name, help, "counter") + MetricsGroup.refine( + labels, + values => addRawCounter(name, labels.zip(values)) + ) + end counters + + /** Add a group of gauges with the given name. */ + class gauges( + name: String, + help: String = "" + ): + transparent inline final def labelled( + inline labels: String* + ): MetricsGroup[Gauge] = + addInfo(name, help, "gauge") + MetricsGroup.refine( + labels, + values => addRawGauge(name, labels.zip(values)) + ) + end gauges + + /** Add a group of histograms with the given name. */ + class histograms( + name: String, + help: String = "", + buckets: Seq[Double] = BucketDistribution.httpRequestDuration + ): + transparent inline final def labelled( + inline labels: String* + ): MetricsGroup[Histogram] = + addInfo(name, help, "histogram") + MetricsGroup.refine( + labels, + values => addRawHistogram(name, buckets, labels.zip(values)) + ) + end histograms + + /** A pseudo-metric used to expose application information in labels. + * + * E.g. + * {{{ + * buildInfo("version" -> "0.1.0", "commit" -> "deadbeef") + * }}} + * will create the metric + * {{{ + * build_info{version="0.1.0", commit="deadbeef"} 1.0 + * }}} + * Note that the actual value will always be 1. + */ + def buildInfo(properties: (String, Any)*): Unit = + addRawMetric("build_info", properties).add(1) + } diff --git a/ustats/src/ustats/MetricsGroup.scala b/ustats/src/ustats/MetricsGroup.scala new file mode 100644 index 0000000..0b9f19a --- /dev/null +++ b/ustats/src/ustats/MetricsGroup.scala @@ -0,0 +1,54 @@ +package ustats + +import types.DoubleAdder +import types.ConcurrentHashMap + +class MetricsGroup[A]( + labels: Seq[String], + mkNew: Seq[Any] => A +) extends Selectable: + private val all = new ConcurrentHashMap[Seq[Any], A] + + def applyDynamic(f: String)(labelValues: Any*): A = + all.computeIfAbsent(labelValues, values => mkNew(values)) + + def labelled(labelValues: Any*) = + require(labelValues.size == labels.size, "label size mismatch") + applyDynamic("")(labelValues*) + +object MetricsGroup: + import scala.quoted.Expr + import scala.quoted.Quotes + import scala.quoted.Type + import scala.quoted.Varargs + + transparent inline final def refine[A](inline labels: Seq[String], mkNew: Seq[Any] => A): MetricsGroup[A] = + ${refineImpl[A]('labels, 'mkNew)} + + private def refineImpl[A](using qctx: Quotes, tpe: Type[A])( + labels: Expr[Seq[String]], + mkNew: Expr[Seq[Any] => A] + ): Expr[MetricsGroup[A]] = + import qctx.reflect.* + + val strings: List[String] = labels.value match + case None => + report.error("labels must be statically known at compile time", labels) + Nil + case Some(xs) => xs.toList + + val mt = MethodType(strings)( + _ => strings.map(_ => TypeRepr.of[Any]), + _ => TypeRepr.of[A] + ) + + val tpe = Refinement( + TypeRepr.of[MetricsGroup[A]], + "apply", + mt + ).asType + + tpe match + case '[tpe] => + '{MetricsGroup[A](${Expr(strings)}, $mkNew).asInstanceOf[MetricsGroup[A] & tpe]} + diff --git a/ustats/src/ustats/Stats.scala b/ustats/src/ustats/Stats.scala deleted file mode 100644 index fee45f4..0000000 --- a/ustats/src/ustats/Stats.scala +++ /dev/null @@ -1,264 +0,0 @@ -package ustats - -import java.io.OutputStream -import java.io.ByteArrayOutputStream -import java.util.concurrent.ConcurrentLinkedDeque -import java.util.concurrent.atomic.DoubleAdder - -/** A memory-efficient, concurrent-friendly metrics collection interface. - * - * @param BlockSize Metrics are stored in a linked list of arrays which are - * allocated on demand. The size of the arrays in each block - * is controlled by this parameter. It does not need to be - * changed for regular use, but can be tuned to a larger - * value, should you have many metrics. - */ -class Stats(BlockSize: Int = 128) { - - // For fast access and a light memory footprint, all data is stored as chunks - // of names and DoubleAdder arrays. DoubleAdders (as opposed to AtomicDoubles) - // are used to minimize contention when several threads update the same metric - // concurrently. - private class Block { - @volatile var next: Block = null - val metrics = new Array[String](BlockSize) // name - val data = Array.fill[DoubleAdder](BlockSize)(new DoubleAdder) - @volatile var i = 0 // next index to fill - } - private var curr = new Block - private var head = curr - - private val infos = new ConcurrentLinkedDeque[String] - - /** Write metrics as they are right now to the given output stream. - * - * Concurrency note: this method does not block updating or adding new - * metrics while it is called. - */ - def writeMetricsTo(out: OutputStream, includeInfo: Boolean = true): Unit = { - if (includeInfo) { - val items = infos.iterator() - while (items.hasNext()) { - out.write(items.next().getBytes("utf-8")) - } - } - var block = head - while (block != null) { - var i = 0 - while (i < block.i) { - out.write(block.metrics(i).getBytes("utf-8")) - out.write(32) // space - out.write(block.data(i).sum().toString().getBytes("utf-8")) - out.write(10) // newline - i += 1 - } - block = block.next - } - out.flush() - } - - /** Show metrics as they are right now. */ - def metrics(includeInfo: Boolean = true): String = { - val out = new ByteArrayOutputStream - writeMetricsTo(out, includeInfo) - out.close() - out.toString("utf-8") - } - - /** Add an informational comment, to be exported when metrics are read. */ - def addInfo( - name: String, - help: String = null, - tpe: String = null, - comment: String = null - ): String = { - def escape(line: String) = - line - .replace("""\""", """\\""") - .replace("\n", """\n""") - - val commentBuilder = new StringBuilder - if (comment != null) { - commentBuilder ++= "# " - commentBuilder ++= escape(comment) - commentBuilder ++= "\n" - } - if (help != null) { - commentBuilder ++= "# HELP " - commentBuilder ++= name - commentBuilder ++= " " - commentBuilder ++= escape(help) - commentBuilder ++= "\n" - } - if (tpe != null) { - commentBuilder ++= "# TYPE " - commentBuilder ++= name - commentBuilder ++= " " - commentBuilder ++= escape(tpe) - commentBuilder ++= "\n" - } - - val info = commentBuilder.result() - infos.add(info) - info - } - - /** Add a metric manually. - * - * This is a low-level escape hatch that should be used only when no other - * functions apply. - */ - def addRawMetric( - name: String, - labels: Seq[(String, Any)] - ): DoubleAdder = synchronized { - if (curr.i >= BlockSize) { - val b = new Block - curr.next = b - curr = b - addRawMetric(name, labels) - } else { - val i = curr.i - curr.metrics(i) = util.labelify(name, labels) - curr.i += 1 - curr.data(i) - } - } - - private def addRawCounter( - name: String, - labels: Seq[(String, Any)] = Nil - ): Counter = - new Counter(addRawMetric(name, labels)) - - private def addRawGauge( - name: String, - labels: Seq[(String, Any)] = Nil - ): Gauge = - new Gauge(addRawMetric(name, labels)) - - protected def addRawHistogram( - name: String, - buckets: Seq[Double] = BucketDistribution.httpRequestDuration, - labels: Seq[(String, Any)] = Nil - ): Histogram = { - val buckets0: Seq[Double] = buckets.sorted - require( - labels.forall(_._1 != "le"), - "histograms may not contain the label \"le\"" - ) - require(buckets0.size >= 1, "histograms must have at least one bucket") - require(buckets0.forall(!_.isNaN()), "histograms may not have NaN buckets") - - val buckets1 = if (buckets0.last == Double.PositiveInfinity) { - Array.from(buckets0) - } else { - Array.from(buckets0 ++ Seq(Double.PositiveInfinity)) - } - - val adders = buckets1.zipWithIndex.map { - case (bucket, idx) => - val label = if (bucket.isPosInfinity) { - "+Inf" - } else { - bucket - } - - addRawMetric(name + "_bucket", labels ++ Seq("le" -> label)) - } - - new Histogram( - buckets1, - adders, - addRawMetric(name + "_count", labels), - addRawMetric(name + "_sum", labels) - ) - } - - /** Add a single counter with the given name and label-value pairs. */ - def counter( - name: String, - help: String = null, - labels: Seq[(String, Any)] = Nil - ): Counter = { - addInfo(name, help, "counter") - addRawCounter(name, labels) - } - - /** Add a single gauge with the given name and label-value pairs. */ - def gauge( - name: String, - help: String = null, - labels: Seq[(String, Any)] = Nil - ): Gauge = { - addInfo(name, help, "gauge") - addRawGauge(name, labels) - } - - /** Add a single histogram with the given name and label-value pairs. */ - def histogram( - name: String, - help: String = null, - buckets: Seq[Double] = BucketDistribution.httpRequestDuration, - labels: Seq[(String, Any)] = Nil - ): Histogram = { - addInfo(name, help, "histogram") - addRawHistogram(name, buckets, labels) - } - - /** Add a group of counters with the given name. */ - def counters( - name: String, - help: String = null, - labels: Seq[String] = Nil - ): Metrics[Counter] = { - addInfo(name, help, "counter") - new Metrics[Counter]( - labels.length, - values => addRawCounter(name, labels.zip(values)) - ) - } - - /** Add a group of gauges with the given name. */ - def gauges( - name: String, - help: String = null, - labels: Seq[String] = Nil - ): Metrics[Gauge] = { - addInfo(name, help, "gauge") - new Metrics[Gauge]( - labels.length, - values => addRawGauge(name, labels.zip(values)) - ) - } - - /** Add a group of histograms with the given name. */ - def histograms( - name: String, - help: String = null, - buckets: Seq[Double] = BucketDistribution.httpRequestDuration, - labels: Seq[String] = Nil - ): Metrics[Histogram] = { - addInfo(name, help, "histogram") - new Metrics[Histogram]( - labels.length, - values => addRawHistogram(name, buckets, labels.zip(values)) - ) - } - - /** A pseudo-metric used to expose application information in labels. - * - * E.g. - * {{{ - * buildInfo("version" -> "0.1.0", "commit" -> "deadbeef") - * }}} - * will create the metric - * {{{ - * build_info{version="0.1.0", commit="deadbeef"} 1.0 - * }}} - * Note that the actual value will always be 1. - */ - def buildInfo(properties: (String, Any)*): Unit = - addRawMetric("build_info", properties).add(1) - -} diff --git a/ustats/src/ustats/package.scala b/ustats/src/ustats/package.scala deleted file mode 100644 index 16c50e4..0000000 --- a/ustats/src/ustats/package.scala +++ /dev/null @@ -1,2 +0,0 @@ -/** Default global stats collector. */ -package object ustats extends Stats with Probing diff --git a/ustats/src/ustats/probe.scala b/ustats/src/ustats/probe.scala deleted file mode 100644 index 7f2af38..0000000 --- a/ustats/src/ustats/probe.scala +++ /dev/null @@ -1,86 +0,0 @@ -package ustats - -import java.util.{concurrent => juc} - -class ProbeFailedException(cause: Exception) extends Exception(cause) - -object Probing { - def makeThreadPool(n: Int) = { - val counter = new juc.atomic.AtomicInteger(0) - juc.Executors.newScheduledThreadPool( - n, - new juc.ThreadFactory { - override def newThread(r: Runnable) = { - val t = new Thread(r) - t.setDaemon(true) - t.setName(s"ustats-probe-${counter.incrementAndGet()}") - t - } - } - ) - } -} - -trait Probing { this: Stats => - - lazy val probePool: juc.ScheduledExecutorService = Probing.makeThreadPool( - math.min(Runtime.getRuntime().availableProcessors(), 4) - ) - - // override this if you want to deactivate probe failure reporting - lazy val probeFailureCounter: Option[Metrics[Counter]] = Some( - ustats.counters("ustats_probe_failures_count", labels = Seq("probe")) - ) - - object probe { - - /** Run a given probing action regularly. - * - * Although an action can contain arbitrary code, it is intended to be used - * to measure something and set various ustats metrics (counters, gauges, - * histograms, etc) in its body. - * - * All actions are run in a dedicated thread pool. - */ - def apply( - name: String, - rateInSeconds: Long, - pool: juc.ScheduledExecutorService = probePool - )(action: => Any): juc.ScheduledFuture[_] = { - pool.scheduleAtFixedRate( - () => - try { - action - } catch { - case ex: Exception => - probeFailureCounter.foreach { counter => counter(name).inc() } - (new ProbeFailedException(ex)).printStackTrace() - }, - 0L, - rateInSeconds, - juc.TimeUnit.SECONDS - ) - } - - /** Async wrapper for apply(). */ - def async( - name: String, - rateInSeconds: Long, - pool: juc.ScheduledExecutorService = probePool - )( - action: scala.concurrent.ExecutionContext => scala.concurrent.Future[_] - ): juc.ScheduledFuture[_] = { - val ec = scala.concurrent.ExecutionContext.fromExecutorService(pool) - apply(name, rateInSeconds, pool) { - scala.concurrent.Await.result( - action(ec), - scala.concurrent.duration.FiniteDuration( - rateInSeconds, - juc.TimeUnit.SECONDS - ) - ) - } - } - } - -} diff --git a/ustats/src/ustats/util.scala b/ustats/src/ustats/util.scala index 1911138..c26b086 100644 --- a/ustats/src/ustats/util.scala +++ b/ustats/src/ustats/util.scala @@ -15,13 +15,29 @@ object util { snake.result() } - def labelify(baseName: String, labels: Seq[(String, Any)]): String = + def labelify(baseName: String, labels: Iterable[(String, Any)]): String = if (labels.isEmpty) { baseName } else { - baseName + labels - .map { case (key, value) => s"""$key="$value"""" } - .mkString("{", ", ", "}") + val it = labels.iterator + val b = new StringBuilder + b ++= baseName + b += '{' + + val (key, value) = it.next() + b ++= key + b ++= "=\"" + b ++= value.toString + b += '"' + while it.hasNext do + val (key, value) = it.next() + b ++= ", " + b ++= key + b ++= "=\"" + b ++= value.toString + b += '"' + b += '}' + b.result() } } diff --git a/ustats/test/src/ustats/StatsTest.scala b/ustats/test/src/ustats/StatsTest.scala index 0e0a89f..1de448e 100644 --- a/ustats/test/src/ustats/StatsTest.scala +++ b/ustats/test/src/ustats/StatsTest.scala @@ -4,13 +4,13 @@ import utest._ object Test extends TestSuite { - def withStats(fct: Stats => Unit) = { - fct(new Stats()) + def withMetrics(fct: Metrics => Unit) = { + fct(new Metrics()) } val tests = Tests { test("basic") { - withStats { s => + withMetrics { s => val myFirstCounter = s.counter( "my_first_counter", @@ -25,7 +25,7 @@ object Test extends TestSuite { } } test("many blocks") { - withStats { s => + withMetrics { s => // this tests that there are no logic errors when traversing metrics split // over several blocks for (i <- 0 until 1000) { @@ -36,7 +36,7 @@ object Test extends TestSuite { s.metrics(false) ==> expected } } - test("histogram")(withStats { s => + test("histogram")(withMetrics { s => val myHist = s.histogram("my_hist", buckets = Seq(0, 1, 2)) myHist.observe(0.5) myHist.observe(1.5) @@ -55,7 +55,7 @@ object Test extends TestSuite { }) test("types") { test("counter") { - withStats { s => + withMetrics { s => val myFirstCounter = s.counter("my_first_counter") s.metrics() ==> """|# TYPE my_first_counter counter |my_first_counter 0.0 @@ -63,7 +63,7 @@ object Test extends TestSuite { } } test("gauge") { - withStats { s => + withMetrics { s => val megaGauge = s.gauge("mega_gauge") s.metrics() ==> """|# TYPE mega_gauge gauge |mega_gauge 0.0 @@ -71,7 +71,7 @@ object Test extends TestSuite { } } test("histogram") { - withStats { s => + withMetrics { s => val myHist = s.histogram("my_hist", buckets = Seq(0, 1, 2)) s.metrics() ==> """|# TYPE my_hist histogram |my_hist_bucket{le="0.0"} 0.0 @@ -85,7 +85,7 @@ object Test extends TestSuite { } } test("help") { - withStats { s => + withMetrics { s => val myFirstCounter = s.counter("my_first_counter", help = "some random counter") s.metrics() ==> """|# HELP my_first_counter some random counter @@ -95,8 +95,8 @@ object Test extends TestSuite { } } test("labels") { - withStats { s => - val c = s.counters("some_counter", labels = Seq("l1", "l2")) + withMetrics { s => + val c = s.counters("some_counter").labelled("l1", "l2") s.metrics(false) ==> "" intercept[IllegalArgumentException] { @@ -108,12 +108,13 @@ object Test extends TestSuite { c.labelled(1, 2) s.metrics(false) ==> "some_counter{l1=\"1\", l2=\"2\"} 0.0\n" - c.labelled(1, 2) += 1 + c(l1 = 1, l2 = 2) += 1 s.metrics(false) ==> "some_counter{l1=\"1\", l2=\"2\"} 1.0\n" + c.labelled(1, 2) += 1 + s.metrics(false) ==> "some_counter{l1=\"1\", l2=\"2\"} 2.0\n" c.labelled(2, 2) += 1 - s.metrics(false) ==> "some_counter{l1=\"1\", l2=\"2\"} 1.0\nsome_counter{l1=\"2\", l2=\"2\"} 1.0\n" - + s.metrics(false) ==> "some_counter{l1=\"1\", l2=\"2\"} 2.0\nsome_counter{l1=\"2\", l2=\"2\"} 1.0\n" } } }