Skip to content

Commit

Permalink
Support optional prefix for statsd (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
toxicafunk authored Oct 25, 2021
1 parent 56a4a16 commit c4b5bb6
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 28 deletions.
50 changes: 39 additions & 11 deletions statsd/src/main/scala/zio/metrics/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,32 @@ import zio.duration.Duration.Finite
import zio.metrics.encoders._
import zio.stream.ZStream

final class Client(val bufferSize: Int, val timeout: Long, host: Option[String], port: Option[Int])(
final class Client(
val bufferSize: Int,
val timeout: Long,
host: Option[String],
port: Option[Int],
prefix: Option[String]
)(
private val queue: Queue[Metric]
) {

type UDPQueue = ZQueue[Nothing, Any, Encoder, Throwable, Nothing, Metric]

private val duration: Duration = Finite(timeout)

private def addPrefix(prefix: String, metric: Metric): Metric = metric match {
case c @ Counter(name, _, _, _) => c.copy(name = s"$prefix.$name")
case e @ Event(name, _, _, _, _, _, _, _, _) => e.copy(name = s"$prefix.$name")
case g @ Gauge(name, _, _) => g.copy(name = s"$prefix.$name")
case h @ Histogram(name, _, _, _) => h.copy(name = s"$prefix.$name")
case m @ Meter(name, _, _) => m.copy(name = s"$prefix.$name")
case sc @ ServiceCheck(name, _, _, _, _, _) => sc.copy(name = s"$prefix.$name")
case s @ Set(name, _, _) => s.copy(name = s"$prefix.$name")
case t @ Timer(name, _, _, _) => t.copy(name = s"$prefix.$name")
case d @ Distribution(name, _, _, _) => d.copy(name = s"$prefix.$name")
}

private val udpClient: ZManaged[Any, Throwable, UDPClient] = (host, port) match {
case (None, None) => UDPClient()
case (Some(h), Some(p)) => UDPClient(h, p)
Expand Down Expand Up @@ -58,10 +76,18 @@ final class Client(val bufferSize: Int, val timeout: Long, host: Option[String],
.fork

val send: Metric => UIO[Unit] =
metric => queue.offer(metric).unit
metric =>
prefix match {
case Some(p) if !p.isEmpty => queue.offer(addPrefix(p, metric)).unit
case _ => queue.offer(metric).unit
}

val sendAsync: Metric => UIO[Unit] =
metric => queue.offer(metric).fork.unit
metric =>
prefix match {
case Some(p) if !p.isEmpty => queue.offer(addPrefix(p, metric)).fork.unit
case _ => queue.offer(metric).fork.unit
}

def sendM(sync: Boolean): Metric => UIO[Unit] =
if (sync) {
Expand All @@ -76,44 +102,46 @@ object Client {

type ClientEnv = Encoder with Clock //with Console

def apply(): ZManaged[ClientEnv, Throwable, Client] = apply(5, 5000, 100, None, None)
def apply(): ZManaged[ClientEnv, Throwable, Client] = apply(5, 5000, 100, None, None, None)

def apply(bufferSize: Int, timeout: Long): ZManaged[ClientEnv, Throwable, Client] =
apply(bufferSize, timeout, 100, None, None)
apply(bufferSize, timeout, 100, None, None, None)

def apply(bufferSize: Int, timeout: Long, queueCapacity: Int): ZManaged[ClientEnv, Throwable, Client] =
apply(bufferSize, timeout, queueCapacity, None, None)
apply(bufferSize, timeout, queueCapacity, None, None, None)

def apply(
bufferSize: Int,
timeout: Long,
queueCapacity: Int,
host: Option[String],
port: Option[Int]
port: Option[Int],
prefix: Option[String]
): ZManaged[ClientEnv, Throwable, Client] =
ZManaged.make {
for {
queue <- ZQueue.bounded[Metric](queueCapacity)
client = new Client(bufferSize, timeout, host, port)(queue)
client = new Client(bufferSize, timeout, host, port, prefix)(queue)
fiber <- client.listen
} yield (client, fiber)
} { case (client, fiber) => client.queue.shutdown *> fiber.join.orDie }
.map(_._1)

def withListener[F[_], A](listener: Chunk[Metric] => RIO[Encoder, F[A]]): ZManaged[ClientEnv, Throwable, Client] =
withListener(5, 5000, 100, None, None)(listener)
withListener(5, 5000, 100, None, None, None)(listener)

def withListener[F[_], A](
bufferSize: Int,
timeout: Long,
queueCapacity: Int,
host: Option[String],
port: Option[Int]
port: Option[Int],
prefix: Option[String]
)(listener: Chunk[Metric] => RIO[Encoder, F[A]]): ZManaged[ClientEnv, Throwable, Client] =
ZManaged.make {
for {
queue <- ZQueue.bounded[Metric](queueCapacity)
client = new Client(bufferSize, timeout, host, port)(queue)
client = new Client(bufferSize, timeout, host, port, prefix)(queue)
fiber <- client.listen(listener)
} yield (client, fiber)
} { case (client, fiber) => client.queue.shutdown *> fiber.join.orDie }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,22 @@ final class DogStatsDClient(client: Client) {

object DogStatsDClient {

def apply(): ZManaged[ClientEnv, Throwable, DogStatsDClient] = apply(5, 5000, 100, None, None)
def apply(): ZManaged[ClientEnv, Throwable, DogStatsDClient] = apply(5, 5000, 100, None, None, None)

def apply(bufferSize: Int, timeout: Long): ZManaged[ClientEnv, Throwable, DogStatsDClient] =
apply(bufferSize, timeout, 100, None, None)
apply(bufferSize, timeout, 100, None, None, None)

def apply(bufferSize: Int, timeout: Long, queueCapacity: Int): ZManaged[ClientEnv, Throwable, DogStatsDClient] =
apply(bufferSize, timeout, queueCapacity, None, None)
apply(bufferSize, timeout, queueCapacity, None, None, None)

def apply(
bufferSize: Int,
timeout: Long,
queueCapacity: Int,
host: Option[String],
port: Option[Int]
port: Option[Int],
prefix: Option[String]
): ZManaged[ClientEnv, Throwable, DogStatsDClient] =
Client(bufferSize, timeout, queueCapacity, host, port).map { new DogStatsDClient(_) }
Client(bufferSize, timeout, queueCapacity, host, port, prefix).map { new DogStatsDClient(_) }

}
11 changes: 6 additions & 5 deletions statsd/src/main/scala/zio/metrics/statsd/StatsDClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,21 @@ class StatsDClient(client: Client) {
}

object StatsDClient {
def apply(): ZManaged[ClientEnv, Throwable, StatsDClient] = apply(5, 5000, 100, None, None)
def apply(): ZManaged[ClientEnv, Throwable, StatsDClient] = apply(5, 5000, 100, None, None, None)

def apply(bufferSize: Int, timeout: Long): ZManaged[ClientEnv, Throwable, StatsDClient] =
apply(bufferSize, timeout, 100, None, None)
apply(bufferSize, timeout, 100, None, None, None)

def apply(bufferSize: Int, timeout: Long, queueCapacity: Int): ZManaged[ClientEnv, Throwable, StatsDClient] =
apply(bufferSize, timeout, queueCapacity, None, None)
apply(bufferSize, timeout, queueCapacity, None, None, None)

def apply(
bufferSize: Int,
timeout: Long,
queueCapacity: Int,
host: Option[String],
port: Option[Int]
port: Option[Int],
prefix: Option[String]
): ZManaged[ClientEnv, Throwable, StatsDClient] =
Client(bufferSize, timeout, queueCapacity, host, port).map(new StatsDClient(_))
Client(bufferSize, timeout, queueCapacity, host, port, prefix).map(new StatsDClient(_))
}
14 changes: 7 additions & 7 deletions statsd/src/test/scala/zio/metrics/DogStatsDClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object DogStatsDClientTest extends DefaultRunnableSpec {
suite("DogStatsDClient")(
testM("Sends correct information via UDP") {
val clientWithAgent = for {
d <- DogStatsDClient(500, 5000, 100, Some("localhost"), Some(port))
d <- DogStatsDClient(500, 5000, 100, Some("localhost"), Some(port), Some("zio"))
u <- UDPAgent(port)
} yield (d, u)

Expand All @@ -38,12 +38,12 @@ object DogStatsDClientTest extends DefaultRunnableSpec {
_ <- client.event("TestEvent", "something amazing happened")
eventMetric <- agent.nextReceivedMetric
} yield {
assert(timerMetric)(equalTo("TestTimer:12|ms")) &&
assert(counterMetric)(equalTo("TestCounter:1|c|@0.9")) &&
assert(histMetric)(equalTo("TestHistogram:1|h0.9")) &&
assert(distributionMetric)(equalTo("TestDistribution:20|d")) &&
assert(serviceCheckMetric)(containsString("TestServiceCheck|0|d")) &&
assert(eventMetric)(containsString("_e{9,26}:TestEvent|something amazing happened|d:"))
assert(timerMetric)(equalTo("zio.TestTimer:12|ms")) &&
assert(counterMetric)(equalTo("zio.TestCounter:1|c|@0.9")) &&
assert(histMetric)(equalTo("zio.TestHistogram:1|h0.9")) &&
assert(distributionMetric)(equalTo("zio.TestDistribution:20|d")) &&
assert(serviceCheckMetric)(containsString("zio.TestServiceCheck|0|d")) &&
assert(eventMetric)(containsString("_e{13,26}:zio.TestEvent|something amazing happened|d:"))
}
}

Expand Down

0 comments on commit c4b5bb6

Please sign in to comment.