From c4b5bb64b143db3680ec90d7ec389b829a09e046 Mon Sep 17 00:00:00 2001 From: toxicafunk Date: Mon, 25 Oct 2021 13:48:18 +0200 Subject: [PATCH] Support optional prefix for statsd (#110) --- .../src/main/scala/zio/metrics/Client.scala | 50 +++++++++++++++---- .../metrics/dogstatsd/DogStatsDClient.scala | 11 ++-- .../zio/metrics/statsd/StatsDClient.scala | 11 ++-- .../zio/metrics/DogStatsDClientTest.scala | 14 +++--- 4 files changed, 58 insertions(+), 28 deletions(-) diff --git a/statsd/src/main/scala/zio/metrics/Client.scala b/statsd/src/main/scala/zio/metrics/Client.scala index 473a05a7..67b1adee 100644 --- a/statsd/src/main/scala/zio/metrics/Client.scala +++ b/statsd/src/main/scala/zio/metrics/Client.scala @@ -9,7 +9,13 @@ import zio.duration.Duration.Finite import zio.metrics.encoders._ import zio.stream.ZStream -final class Client(val bufferSize: Int, val timeout: Long, host: Option[String], port: Option[Int])( +final class Client( + val bufferSize: Int, + val timeout: Long, + host: Option[String], + port: Option[Int], + prefix: Option[String] +)( private val queue: Queue[Metric] ) { @@ -17,6 +23,18 @@ final class Client(val bufferSize: Int, val timeout: Long, host: Option[String], private val duration: Duration = Finite(timeout) + private def addPrefix(prefix: String, metric: Metric): Metric = metric match { + case c @ Counter(name, _, _, _) => c.copy(name = s"$prefix.$name") + case e @ Event(name, _, _, _, _, _, _, _, _) => e.copy(name = s"$prefix.$name") + case g @ Gauge(name, _, _) => g.copy(name = s"$prefix.$name") + case h @ Histogram(name, _, _, _) => h.copy(name = s"$prefix.$name") + case m @ Meter(name, _, _) => m.copy(name = s"$prefix.$name") + case sc @ ServiceCheck(name, _, _, _, _, _) => sc.copy(name = s"$prefix.$name") + case s @ Set(name, _, _) => s.copy(name = s"$prefix.$name") + case t @ Timer(name, _, _, _) => t.copy(name = s"$prefix.$name") + case d @ Distribution(name, _, _, _) => d.copy(name = s"$prefix.$name") + } + private val udpClient: ZManaged[Any, Throwable, UDPClient] = (host, port) match { case (None, None) => UDPClient() case (Some(h), Some(p)) => UDPClient(h, p) @@ -58,10 +76,18 @@ final class Client(val bufferSize: Int, val timeout: Long, host: Option[String], .fork val send: Metric => UIO[Unit] = - metric => queue.offer(metric).unit + metric => + prefix match { + case Some(p) if !p.isEmpty => queue.offer(addPrefix(p, metric)).unit + case _ => queue.offer(metric).unit + } val sendAsync: Metric => UIO[Unit] = - metric => queue.offer(metric).fork.unit + metric => + prefix match { + case Some(p) if !p.isEmpty => queue.offer(addPrefix(p, metric)).fork.unit + case _ => queue.offer(metric).fork.unit + } def sendM(sync: Boolean): Metric => UIO[Unit] = if (sync) { @@ -76,44 +102,46 @@ object Client { type ClientEnv = Encoder with Clock //with Console - def apply(): ZManaged[ClientEnv, Throwable, Client] = apply(5, 5000, 100, None, None) + def apply(): ZManaged[ClientEnv, Throwable, Client] = apply(5, 5000, 100, None, None, None) def apply(bufferSize: Int, timeout: Long): ZManaged[ClientEnv, Throwable, Client] = - apply(bufferSize, timeout, 100, None, None) + apply(bufferSize, timeout, 100, None, None, None) def apply(bufferSize: Int, timeout: Long, queueCapacity: Int): ZManaged[ClientEnv, Throwable, Client] = - apply(bufferSize, timeout, queueCapacity, None, None) + apply(bufferSize, timeout, queueCapacity, None, None, None) def apply( bufferSize: Int, timeout: Long, queueCapacity: Int, host: Option[String], - port: Option[Int] + port: Option[Int], + prefix: Option[String] ): ZManaged[ClientEnv, Throwable, Client] = ZManaged.make { for { queue <- ZQueue.bounded[Metric](queueCapacity) - client = new Client(bufferSize, timeout, host, port)(queue) + client = new Client(bufferSize, timeout, host, port, prefix)(queue) fiber <- client.listen } yield (client, fiber) } { case (client, fiber) => client.queue.shutdown *> fiber.join.orDie } .map(_._1) def withListener[F[_], A](listener: Chunk[Metric] => RIO[Encoder, F[A]]): ZManaged[ClientEnv, Throwable, Client] = - withListener(5, 5000, 100, None, None)(listener) + withListener(5, 5000, 100, None, None, None)(listener) def withListener[F[_], A]( bufferSize: Int, timeout: Long, queueCapacity: Int, host: Option[String], - port: Option[Int] + port: Option[Int], + prefix: Option[String] )(listener: Chunk[Metric] => RIO[Encoder, F[A]]): ZManaged[ClientEnv, Throwable, Client] = ZManaged.make { for { queue <- ZQueue.bounded[Metric](queueCapacity) - client = new Client(bufferSize, timeout, host, port)(queue) + client = new Client(bufferSize, timeout, host, port, prefix)(queue) fiber <- client.listen(listener) } yield (client, fiber) } { case (client, fiber) => client.queue.shutdown *> fiber.join.orDie } diff --git a/statsd/src/main/scala/zio/metrics/dogstatsd/DogStatsDClient.scala b/statsd/src/main/scala/zio/metrics/dogstatsd/DogStatsDClient.scala index 61b9eda4..3a6638e1 100644 --- a/statsd/src/main/scala/zio/metrics/dogstatsd/DogStatsDClient.scala +++ b/statsd/src/main/scala/zio/metrics/dogstatsd/DogStatsDClient.scala @@ -142,21 +142,22 @@ final class DogStatsDClient(client: Client) { object DogStatsDClient { - def apply(): ZManaged[ClientEnv, Throwable, DogStatsDClient] = apply(5, 5000, 100, None, None) + def apply(): ZManaged[ClientEnv, Throwable, DogStatsDClient] = apply(5, 5000, 100, None, None, None) def apply(bufferSize: Int, timeout: Long): ZManaged[ClientEnv, Throwable, DogStatsDClient] = - apply(bufferSize, timeout, 100, None, None) + apply(bufferSize, timeout, 100, None, None, None) def apply(bufferSize: Int, timeout: Long, queueCapacity: Int): ZManaged[ClientEnv, Throwable, DogStatsDClient] = - apply(bufferSize, timeout, queueCapacity, None, None) + apply(bufferSize, timeout, queueCapacity, None, None, None) def apply( bufferSize: Int, timeout: Long, queueCapacity: Int, host: Option[String], - port: Option[Int] + port: Option[Int], + prefix: Option[String] ): ZManaged[ClientEnv, Throwable, DogStatsDClient] = - Client(bufferSize, timeout, queueCapacity, host, port).map { new DogStatsDClient(_) } + Client(bufferSize, timeout, queueCapacity, host, port, prefix).map { new DogStatsDClient(_) } } diff --git a/statsd/src/main/scala/zio/metrics/statsd/StatsDClient.scala b/statsd/src/main/scala/zio/metrics/statsd/StatsDClient.scala index c6cd2630..97d89d5a 100644 --- a/statsd/src/main/scala/zio/metrics/statsd/StatsDClient.scala +++ b/statsd/src/main/scala/zio/metrics/statsd/StatsDClient.scala @@ -62,20 +62,21 @@ class StatsDClient(client: Client) { } object StatsDClient { - def apply(): ZManaged[ClientEnv, Throwable, StatsDClient] = apply(5, 5000, 100, None, None) + def apply(): ZManaged[ClientEnv, Throwable, StatsDClient] = apply(5, 5000, 100, None, None, None) def apply(bufferSize: Int, timeout: Long): ZManaged[ClientEnv, Throwable, StatsDClient] = - apply(bufferSize, timeout, 100, None, None) + apply(bufferSize, timeout, 100, None, None, None) def apply(bufferSize: Int, timeout: Long, queueCapacity: Int): ZManaged[ClientEnv, Throwable, StatsDClient] = - apply(bufferSize, timeout, queueCapacity, None, None) + apply(bufferSize, timeout, queueCapacity, None, None, None) def apply( bufferSize: Int, timeout: Long, queueCapacity: Int, host: Option[String], - port: Option[Int] + port: Option[Int], + prefix: Option[String] ): ZManaged[ClientEnv, Throwable, StatsDClient] = - Client(bufferSize, timeout, queueCapacity, host, port).map(new StatsDClient(_)) + Client(bufferSize, timeout, queueCapacity, host, port, prefix).map(new StatsDClient(_)) } diff --git a/statsd/src/test/scala/zio/metrics/DogStatsDClientTest.scala b/statsd/src/test/scala/zio/metrics/DogStatsDClientTest.scala index 2a18307b..4cf2dd9f 100644 --- a/statsd/src/test/scala/zio/metrics/DogStatsDClientTest.scala +++ b/statsd/src/test/scala/zio/metrics/DogStatsDClientTest.scala @@ -18,7 +18,7 @@ object DogStatsDClientTest extends DefaultRunnableSpec { suite("DogStatsDClient")( testM("Sends correct information via UDP") { val clientWithAgent = for { - d <- DogStatsDClient(500, 5000, 100, Some("localhost"), Some(port)) + d <- DogStatsDClient(500, 5000, 100, Some("localhost"), Some(port), Some("zio")) u <- UDPAgent(port) } yield (d, u) @@ -38,12 +38,12 @@ object DogStatsDClientTest extends DefaultRunnableSpec { _ <- client.event("TestEvent", "something amazing happened") eventMetric <- agent.nextReceivedMetric } yield { - assert(timerMetric)(equalTo("TestTimer:12|ms")) && - assert(counterMetric)(equalTo("TestCounter:1|c|@0.9")) && - assert(histMetric)(equalTo("TestHistogram:1|h0.9")) && - assert(distributionMetric)(equalTo("TestDistribution:20|d")) && - assert(serviceCheckMetric)(containsString("TestServiceCheck|0|d")) && - assert(eventMetric)(containsString("_e{9,26}:TestEvent|something amazing happened|d:")) + assert(timerMetric)(equalTo("zio.TestTimer:12|ms")) && + assert(counterMetric)(equalTo("zio.TestCounter:1|c|@0.9")) && + assert(histMetric)(equalTo("zio.TestHistogram:1|h0.9")) && + assert(distributionMetric)(equalTo("zio.TestDistribution:20|d")) && + assert(serviceCheckMetric)(containsString("zio.TestServiceCheck|0|d")) && + assert(eventMetric)(containsString("_e{13,26}:zio.TestEvent|something amazing happened|d:")) } }