Skip to content

Commit

Permalink
Initial attempt at an API refactor (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguel-vila authored Aug 3, 2020
1 parent ca67228 commit d6bd17f
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 318 deletions.
205 changes: 100 additions & 105 deletions docs/essentials/statsd.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ uses ZIO's concurrency toolbox such as `Queue`, `Stream`, `Schedule` as well as

Its main components, besides the `Metrics` themselves are the `Encoder` module
with instances for StatsD and DogStatsD and the `Client` itself, which is not a
module but has separate instances for StatsD and DogStatsD.
module but is used for separate instances for StatsD and DogStatsD.

# Metrics
## StatsD Metrics
Expand All @@ -20,7 +20,7 @@ We're gonna need the following import:
import zio.metrics._
```

Although each metric can be created directly, th preferred way is to di it
Although each metric can be created directly, the preferred way is to do it
through the `StatsDClient` which has helper methods for all metrics.

### Gauge
Expand All @@ -41,13 +41,13 @@ If we use the client instead we save ourselves issues as with `tags` above:

```scala mdoc:silent
import zio.metrics.statsd._
import zio.Queue
import zio.metrics.Client.ClientEnv
import zio.ZManaged

val uioQueue = Queue.bounded[Metric](100) // details on Client Section
val client = StatsDClient()
uioQueue >>= (q =>
client.gauge("name", 34.0)(q)
)
val statsDClient: ZManaged[ClientEnv, Throwable, StatsDClient] = StatsDClient()
statsDClient.use { client =>
client.gauge("name", 34.0)
}
```

Also note, this methods are asynchronous by default, we'll talk more about that
Expand All @@ -62,13 +62,12 @@ the values will be actually sent the StatsD server. A sampleRate of 1.0 (or
above) ensures that all values are sent.

```scala mdoc:silent
uioQueue >>= (queue => {
implicit val q = queue // make it implicit so we don't pass it everytime
client.counter("counterName", 3.0) // no sample rate
client .counter("counterName", 2.0, 0.75) // 75% sample rate
statsDClient.use { client =>
client.counter("counterName", 3.0) // no sample rate
client.counter("counterName", 2.0, 0.75) // 75% sample rate
client.increment("counterName")
client.decrement("counterName", 0.9) // 90% sample rate
})
client.decrement("counterName", 0.9) // 90% sample rate
}
```

### Timers
Expand All @@ -80,35 +79,33 @@ The number of milliseconds between a start and end time.
import zio.clock.Clock
import zio.duration.Duration

for {
q <- uioQueue
statsDClient.use { client =>
for {
clock <- RIO.environment[Clock]
t1 <- clock.get.currentTime(TimeUnit.MILLISECONDS)
_ <- clock.get.sleep(Duration(75, TimeUnit.MILLISECONDS))
t2 <- clock.get.currentTime(TimeUnit.MILLISECONDS)
_ <- client.timer("zmetrics.timer", (t2 - t1).toDouble, 0.9)(q)
} yield ()
_ <- client.timer("zmetrics.timer", (t2 - t1).toDouble, 0.9)
} yield ()
}
```

### Meter
Measures the rate of events over time, calculated at the server.

```scala mdoc:silent
for {
q <- uioQueue
_ <- client.meter("zmetrics.meter", 2.5)(q)
} yield ()
statsDClient.use { client =>
client.meter("zmetrics.meter", 2.5)
}
```

### Set
Counts the number of unique occurrences of events over a period of time.


```scala mdoc:silent
for {
q <- uioQueue
_ <- client.set("zmetrics.meter", "ocurrence")(q)
} yield ()
statsDClient.use { client =>
client.set("zmetrics.meter", "ocurrence")
}
```

## DogStatsD Metrics
Expand All @@ -122,34 +119,32 @@ Allows you to measure the statistical distribution of a set of values.
```scala mdoc:silent
import zio.metrics.dogstatsd._

val dogClient = DogStatsDClient()
for {
q <- uioQueue
_ <- dogClient.histogram(
val dogStatsDClient = DogStatsDClient()
dogStatsDClient.use { dogClient =>
dogClient.histogram(
"zmetrics.hist",
2.5, // value
0.9, // sample rate
Seq(Tag("key1","val1"), Tag("key2","val2"))
)(q)
} yield ()
)
}
```

### ServiceCheck
Allow you to characterize the status of a service in order to monitor it within Datadog.

```scala mdoc:silent
for {
q <- uioQueue
_ <- dogClient.serviceCheck(
"zmetrics.checks",
ServiceCheckOk,
None, // defaults to current time
None, // optional hostname
Some("Check custom message"),
Seq(Tag("key1","val1")),
true // use synchronous version
)(q) // don't forget the queue
} yield ()
dogStatsDClient.use { dogClient =>
dogClient.serviceCheck(
"zmetrics.checks",
ServiceCheckOk,
None, // defaults to current time
None, // optional hostname
Some("Check custom message"),
Seq(Tag("key1","val1")),
true // use synchronous version
)
}
```

### Event
Expand All @@ -160,9 +155,8 @@ displayed.
```scala mdoc:silent
val tagEnv = Tag("env", "prod")
val tagVersion = Tag("version", "0.1.0")
for {
q <- uioQueue
_ <- dogClient.event(
dogStatsDClient.use { dogClient =>
dogClient.event(
"zmetrics.dog.event", // name
"something amazing happened", // event text/message
None, // timestamp, encoder defaults to 'now'
Expand All @@ -173,22 +167,36 @@ displayed.
Some(EventAlertError), // 'None' defaults to 'info'
Seq(tagEnv, tagVersion),
false // use asynchronous version
)(q) // alternatively we could have an 'implicit' queue
} yield ()
)
}
```


# Client

The base class for the clients is `zio.metrics.Client` which has sub-clases
`StatsDClient` and `DogStatsDClient`. Each of them takes 3 parameters but
provides also constructors with default values for each such that:
The different reporting clients (`StatsDClient` and `DogStatsDClient`)
use a `zio.metrics.Client` which is a simple UDP client.
Each of them takes a `Client` as a parameter but
also provides constructors with default values for each such that:

```scala mdoc:silent
Client() == Client(5, 5000, 100, None, None)
```

The first two parameters (`bufferSize` and `timeout`) define how to agregate each
The `Client` constructors return a `ZManaged`. You can create your own specific client
by reusing the default client constructors like this:

```scala mdoc:silent
Client().map(client => new StatsDClient(client))
```

or you can just use one of the custom constructors that wrap this process:

```scala mdoc:silent
StatsDClient() == StatsDClient(5, 5000, 100, None, None)
```

The first two parameters (`bufferSize` and `timeout`) define how to aggregate each
batch of metrics sent to the StatsD server. The third parameters (`queueCapacity`)
determines the size of the internal queue used to batch said metrics. Before
going into detail let's see how a `Client` looks like. **NOTE** however that
Expand All @@ -207,38 +215,30 @@ this is a base client, ZIO-Metrics-StatsD also provide `StatsD` and a

val program = {
val messages = List(1.0, 2.2, 3.4, 4.6, 5.1, 6.0, 7.9)
val client = Client()
client.queue >>= (queue => {
implicit val q = queue
val createClient = Client()
createClient.use { client =>
for {
z <- client.listen // uses implicit 'q'
opt <- RIO.foreach(messages)(d => Task(Counter("clientbar", d, 1.0, Seq.empty[Tag])))
_ <- RIO.collectAll(opt.map(m => client.send(q)(m)))
} yield z
})
_ <- RIO.collectAll(opt.map(m => client.sendM(true)(m)))
} yield ()
}
}
```

`client.queue` is the client's internal queue, however, this been a `zio.Queue`
the actual type for queue is `UIO[Queue[Metric]]` which is why we need to
`flatmap` it (aka `>>=`) to obtain the actual `Queue[Metric]`. In this example I
have cerated a `q` value just to make it implicit, this is for demonstrating it
can be done, you can just passed it explicitly to `client.listen` instead.

Since the client was created with default values, `client.queue` is a [back-pressured bounded
queue](https://zio.dev/docs/datatypes/datatypes_queue) with a maximum capacity
for a 100 items. This means that any fiber that offers items when thew queue is
Since the client was created with default values, the internal queue is a [back-pressured bounded
queue](https://zio.dev/docs/datatypes/datatypes_queue) with a maximum capacity of 100 items.
This means that any fiber that offers items when the queue is
full, will be suspended until the queue is able to add the item.

## Batching
The next relevant bit is `client.listen` which will batch items based on the
constructor values for `bufferSize` and `timeout`. Essentially, items in the
The next relevant bit how the underlying client works. The `Client` will batch
items based on the constructor values for `bufferSize` and `timeout`. Essentially, items in the
queue will be processed once `bufferSize` is reached or the `timeout` (in
milliseconds) expires, whichever happens first. So for the default values of 5
and 5000, items in the queue will be processed either when 5 items are collected
or after 5 seconds have passed.

`client.listen` will use the default message processor which will sample metrics
The default constructors will use the default message processor which will sample metrics
according to their defined `sampleRate` (Counter, Timer and Histogram only),
then it encodes each Metric according to the provided `Encoder` and then sends
them to the specified UDP StatsD server. In this case, the default uses
Expand All @@ -248,14 +248,14 @@ them to the specified UDP StatsD server. In this case, the default uses
The next line in the sample client creates 7 `Counter`s with a `sampleRate` of
`1.0`, ensuring every metric will be encoded and sent to the StatsD server. The
last line in the sample client is the one that actually `sends/offer` the metric
to the queue. There are two variation sto this method `send: Queue[Metric] =>
to the queue. There are two variations of this method `send: Queue[Metric] =>
Metric => Task[Unit]` and `sendAsync: Queue[Metric] => Metric => Task[Unit] `.
The asynchronous version, [forks a
fiber](https://zio.dev/docs/datatypes/datatypes_fiber) and returns immediately.
The synchronous one offers the metric in the same fiber the rest of the program
has running, while this operation is normally almost as fast as the
asynchronous, issues may arise in case of a full queue since the fiber will
block until ther is space to add more items to the queue. In such a case, the
block until there is space to add more items to the queue. In such a case, the
asynchronous version comes in handy since the main execution of our `program`
will not block, the risk for this scenario is that we keep creating and
suspending fibers which might become a memory leak.
Expand Down Expand Up @@ -285,20 +285,19 @@ from `UDPClient`.
} yield l
```

and we can use this instead of the default changing the `client.listen` call so:
and we can use this instead of the default behavior by using the `withListener` constructor:

```scala mdoc:silent
val messages = List(1.0, 2.2, 3.4, 4.6, 5.1, 6.0, 7.9)
client.queue >>= (queue => {
implicit val q = queue
for {
z <- client.listen[List, Int]{ l =>
myudp(l).provideSomeLayer[Encoder](Console.live)
}
opt <- RIO.foreach(messages)(d => Task(Counter("clientbar", d, 1.0, Seq.empty[Tag])))
_ <- RIO.collectAll(opt.map(m => client.send(q)(m)))
} yield z
})
val createCustomClient = Client.withListener { l =>
myudp(l).provideSomeLayer[Encoder](Console.live)
}
createCustomClient.use { client =>
for {
opt <- RIO.foreach(messages)(d => Task(Counter("clientbar", d, 1.0, Seq.empty[Tag])))
_ <- RIO.collectAll(opt.map(m => client.sendM(true)(m)))
} yield ()
}
```

A message processor is defined as: `List[Metric] => RIO[Encoder, F[A]]`, since
Expand All @@ -307,18 +306,17 @@ A message processor is defined as: `List[Metric] => RIO[Encoder, F[A]]`, since

## StatsD Client
Most of the client's functionality is provided by `Client`, you won't normally use
it though since ypu can use `StatsDClient` instead which has methods to help
it though since you can use `StatsDClient` instead which has methods to help
create and offer/send metrics to the queue. Here's a sample of how it's used.

```scala mdoc:silent
import zio.Schedule

val schd = Schedule.recurs(10) // optional, used to send more samples to StatsD
val statsDClient = StatsDClient() // StatsD default client
val createStatsDClient = StatsDClient() // StatsD default client

def program(r: Long)(implicit queue: Queue[Metric]) =
def program(r: Long)(statsDClient: StatsDClient) =
for {
_ <- statsDClient.listen
clock <- RIO.environment[Clock]
t1 <- clock.get.currentTime(TimeUnit.MILLISECONDS)
_ <- statsDClient.increment("zmetrics.counter", 0.9)
Expand All @@ -334,12 +332,11 @@ We can reuse `rt`, the runtime created earlier to run our `program`:
def main1(args: Array[String]): Unit = {
val timeouts = Seq(34L, 76L, 52L)
rt.unsafeRun(
statsDClient .queue >>= (
q =>
RIO
.foreach(timeouts)(l => program(l)(q))
.repeat(schd)
)
createStatsDClient.use { statsDClient =>
RIO
.foreach(timeouts)(l => program(l)(statsDClient))
.repeat(schd)
}
)
Thread.sleep(10000) // wait for all messages to be consumed
}
Expand All @@ -351,11 +348,10 @@ help you create and send histograms, service checks and events which are not
supported by StatsD.

```scala mdoc:silent
val dogStatsDClient = DogStatsDClient()
val createDogStatsDClient = DogStatsDClient()

def dogProgram(r: Long)(implicit queue: Queue[Metric]) =
def dogProgram(r: Long)(dogStatsDClient: DogStatsDClient) =
for {
_ <- dogStatsDClient.listen
clock <- RIO.environment[Clock]
t1 <- clock.get.currentTime(TimeUnit.MILLISECONDS)
_ <- dogStatsDClient.increment("zmetrics.dog.counter", 0.9)
Expand All @@ -378,12 +374,11 @@ create a new runtime to support it.
def main2(args: Array[String]): Unit = {
val timeouts = Seq(34L, 76L, 52L)
rtDog.unsafeRun(
dogStatsDClient .queue >>= (
q =>
RIO
.foreach(timeouts)(l => program(l)(q))
.repeat(schd)
)
createDogStatsDClient.use { dogStatsDClient =>
RIO
.foreach(timeouts)(l => dogProgram(l)(dogStatsDClient))
.repeat(schd)
}
)
Thread.sleep(10000)
}
Expand Down
Loading

0 comments on commit d6bd17f

Please sign in to comment.