Skip to content
This repository has been archived by the owner on Mar 29, 2020. It is now read-only.

Commit

Permalink
Merge pull request #31 from Falmarri/master
Browse files Browse the repository at this point in the history
Update to kamon 2.0.0-M4
  • Loading branch information
ivantopo authored Jun 26, 2019
2 parents 575eaa7 + c7a594e commit c2656e5
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 311 deletions.
20 changes: 7 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,30 @@ import com.typesafe.sbt.SbtScalariform.ScalariformKeys
import scalariform.formatter.preferences._


val kamonCore = "io.kamon" %% "kamon-core" % "1.1.6"
val kamonTestKit = "io.kamon" %% "kamon-testkit" % "1.1.6"
val kamonCore = "io.kamon" %% "kamon-core" % "2.0.0-RC1"
val kamonTestKit = "io.kamon" %% "kamon-testkit" % "2.0.0-RC1"
val asyncHttpClient = "com.squareup.okhttp3" % "okhttp" % "3.10.0"
val asyncHttpClientMock = "com.squareup.okhttp3" % "mockwebserver" % "3.10.0"
val scalatest = "org.scalatest" %% "scalatest" % "3.0.8"

lazy val root = (project in file("."))
.settings(name := "kamon-datadog")
.settings(
libraryDependencies ++=
compileScope(kamonCore, asyncHttpClient, scalaCompact.value, playJsonVersion.value) ++
compileScope(kamonCore, asyncHttpClient, playJsonVersion.value) ++
testScope(scalatest, slf4jApi, slf4jnop, kamonCore, kamonTestKit, asyncHttpClientMock),
crossScalaVersions := Seq("2.11.12", "2.12.7", "2.13.0"),
ScalariformKeys.preferences := formatSettings(ScalariformKeys.preferences.value))


def playJsonVersion = Def.setting {
scalaBinaryVersion.value match {
case "2.10" => "com.typesafe.play" %% "play-json" % "2.4.11"
case "2.12" | "2.11" => "com.typesafe.play" %% "play-json" % "2.6.9"
case "2.12" | "2.11" | "2.13" => "com.typesafe.play" %% "play-json" % "2.7.4"
}
}


def scalaCompact = Def.setting {
scalaBinaryVersion.value match {
case "2.10" | "2.11" => "org.scala-lang.modules" %% "scala-java8-compat" % "0.5.0"
case "2.12" => "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"
}
}

/* Changing Kamon configuration in real-time seems to turn tests unstable */
/* Changing Kamon configuration in real-time seems to turn tests unstable */
parallelExecution in Test := false

def formatSettings(prefs: IFormattingPreferences) = prefs
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
lazy val root = project in file(".") dependsOn(RootProject(uri("git://github.com/kamon-io/kamon-sbt-umbrella.git#kamon-1.x")))
lazy val root = project in file(".") dependsOn(RootProject(uri("git://github.com/kamon-io/kamon-sbt-umbrella.git#kamon-2.x")))
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
95 changes: 59 additions & 36 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,52 @@ kamon {

# Max packet size for UDP metrics data sent to Datadog.
max-packet-size = 1024 bytes

measurement-formatter = "default"

packetbuffer = "default"

}

#
# Settings relevant to the DatadogAPIReporter
# Settings relevant to the DatadogSpanReporter
#
http {

api-url = "https://app.datadoghq.com/api/v1/series"
trace {

# Default to agent URL (https://docs.datadoghq.com/api/?lang=python#tracing)
api-url = "http://localhost:8126/v0.4/traces"

# Datadog API key to use to send metrics to datadog directly over HTTPS.
# If this is not set, metrics are sent as statsd packets over UDP to dogstatsd.
api-key = ""

using-agent = false
# FQCN of the "kamon.datadog.KamonDataDogTranslator" implementation that will convert Kamon Spans into Datadog
# Spans, or "defult" to use the built-in translator.
translator = "default"

# HTTP client timeout settings:
# - connect-timeout: how long to wait for an HTTP connection to establish before failing the request.
# - read-timeout: how long to wait for a read IO operation to complete before failing the request.
# - write-timeout: how long to wait for a write IO operation to complete before failing the request.
#
connect-timeout = 5 seconds
read-timeout = 5 seconds
request-timeout = 5 seconds
write-timeout = 5 seconds
}


#
# Settings relevant to the DatadogSpanReporter
# Settings relevant to the DatadogAPIReporter
#
trace.http {
api {

# Default to agent URL (https://docs.datadoghq.com/api/?lang=python#tracing)
api-url = "http://localhost:8126/v0.3/traces"

api-key = ${kamon.datadog.http.api-key}
# API endpoint to which metrics time series data will be posted.
api-url = "https://app.datadoghq.com/api/v1/series"

using-agent = true
# Datadog API key to use to send metrics to Datadog directly over HTTPS. The API key will be combined with the
# API URL to get the complete endpoint use for posting time series to Datadog.
api-key = ""

connect-timeout = ${kamon.datadog.http.connect-timeout}
read-timeout = ${kamon.datadog.http.read-timeout}
request-timeout = ${kamon.datadog.http.request-timeout}
# HTTP client timeout settings:
# - connect-timeout: how long to wait for an HTTP connection to establish before failing the request.
# - read-timeout: how long to wait for a read IO operation to complete before failing the request.
# - write-timeout: how long to wait for a write IO operation to complete before failing the request.
#
connect-timeout = 5 seconds
read-timeout = 5 seconds
write-timeout = 5 seconds
}


Expand All @@ -71,22 +75,41 @@ kamon {
# Value "b" is equivalent to omitting the setting
information-unit = "b"

additional-tags {
service = "yes"
host = "yes"
instance = "yes"
blacklisted-tags = []
environment-tags {
include-service = "yes"
include-host = "yes"
include-instance = "yes"
exclude = []

filter {
includes = ["**"]
excludes = []
}
}
}

filter-config-key = "datadog-tag-filter"
modules {
datadog-agent {
enabled = true
name = "DatadogAgent"
description = "Datadog agent reporter"
factory = "kamon.datadog.DatadogAgentReporterFactory"
}

}
datadog-trace-agent {
enabled = true
name = "DatadogSpanReporter"
description = "Datadog Span reporter"
factory = "kamon.datadog.DatadogSpanReporterFactory"
}

util.filters {
datadog-tag-filter {
includes = ["**"]
excludes = []
datadog-api {
enabled = false
name = "DatadogHttp"
description = "Datadog HTTP reporter"
factory = "kamon.datadog.DatadogAPIReporterFactory"
}
}
}


}
116 changes: 70 additions & 46 deletions src/main/scala/kamon/datadog/DatadogAPIReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,33 @@ import java.util.Locale

import com.typesafe.config.Config
import kamon.metric.MeasurementUnit.Dimension.{ Information, Time }
import kamon.metric.{ MeasurementUnit, MetricDistribution, MetricValue, PeriodSnapshot }
import kamon.util.{ EnvironmentTagBuilder, Matcher }
import kamon.{ Kamon, MetricReporter }
import kamon.metric.{ MeasurementUnit, MetricSnapshot, PeriodSnapshot }
import kamon.tag.{ Tag, TagSet }
import kamon.util.{ EnvironmentTags, Filter }
import kamon.{ module, Kamon }
import kamon.datadog.DatadogAPIReporter.Configuration
import kamon.module.{ MetricReporter, ModuleFactory }
import org.slf4j.LoggerFactory

import scala.util.{ Failure, Success }

class DatadogAPIReporter extends MetricReporter {
class DatadogAPIReporterFactory extends ModuleFactory {
override def create(settings: ModuleFactory.Settings): DatadogAPIReporter = {
val config = DatadogAPIReporter.readConfiguration(settings.config)
new DatadogAPIReporter(config, new HttpClient(config.httpConfig, usingAgent = false))
}
}

class DatadogAPIReporter(@volatile private var configuration: Configuration, @volatile private var httpClient: HttpClient) extends MetricReporter {
import DatadogAPIReporter._

private val logger = LoggerFactory.getLogger(classOf[DatadogAPIReporter])
private val symbols = DecimalFormatSymbols.getInstance(Locale.US)
symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.

private val valueFormat = new DecimalFormat("#0.#########", symbols)
private var configuration = readConfiguration(Kamon.config())
private var httpClient: HttpClient = new HttpClient(configuration.httpConfig)

override def start(): Unit = {
logger.info("Started the Datadog API reporter.")
}
logger.info("Started the Datadog API reporter.")

override def stop(): Unit = {
logger.info("Stopped the Datadog API reporter.")
Expand All @@ -53,15 +59,15 @@ class DatadogAPIReporter extends MetricReporter {
override def reconfigure(config: Config): Unit = {
val newConfiguration = readConfiguration(config)
configuration = newConfiguration
httpClient = new HttpClient(configuration.httpConfig)
httpClient = new HttpClient(configuration.httpConfig, usingAgent = false)
}

override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
httpClient.doPost("application/json; charset=utf-8", buildRequestBody(snapshot)) match {
case Failure(e) =>
logger.error(e.getMessage)
case Success(response) =>
logger.info(response)
logger.trace(response)
}
}

Expand All @@ -72,20 +78,23 @@ class DatadogAPIReporter extends MetricReporter {
val interval = Math.round(Duration.between(snapshot.from, snapshot.to).toMillis() / 1000D)
val seriesBuilder = new StringBuilder()

def addDistribution(metric: MetricDistribution): Unit = {
import metric._

val average = if (distribution.count > 0L) (distribution.sum / distribution.count) else 0L
addMetric(name + ".avg", valueFormat.format(scale(average, unit)), gauge, metric.tags)
addMetric(name + ".count", valueFormat.format(distribution.count), count, metric.tags)
addMetric(name + ".median", valueFormat.format(scale(distribution.percentile(50D).value, unit)), gauge, metric.tags)
addMetric(name + ".95percentile", valueFormat.format(scale(distribution.percentile(95D).value, unit)), gauge, metric.tags)
addMetric(name + ".max", valueFormat.format(scale(distribution.max, unit)), gauge, metric.tags)
addMetric(name + ".min", valueFormat.format(scale(distribution.min, unit)), gauge, metric.tags)
def addDistribution(metric: MetricSnapshot.Distributions): Unit = {
val unit = metric.settings.unit
metric.instruments.foreach { d =>
val dist = d.value

val average = if (dist.count > 0L) (dist.sum / dist.count) else 0L
addMetric(metric.name + ".avg", valueFormat.format(scale(average, unit)), gauge, d.tags)
addMetric(metric.name + ".count", valueFormat.format(dist.count), count, d.tags)
addMetric(metric.name + ".median", valueFormat.format(scale(dist.percentile(50D).value, unit)), gauge, d.tags)
addMetric(metric.name + ".95percentile", valueFormat.format(scale(dist.percentile(95D).value, unit)), gauge, d.tags)
addMetric(metric.name + ".max", valueFormat.format(scale(dist.max, unit)), gauge, d.tags)
addMetric(metric.name + ".min", valueFormat.format(scale(dist.min, unit)), gauge, d.tags)
}
}

def addMetric(metricName: String, value: String, metricType: String, tags: Map[String, String]): Unit = {
val customTags = (configuration.extraTags ++ tags.filterKeys(configuration.tagFilter.accept)).map { case (k, v) quote"$k:$v" }.toSeq
def addMetric(metricName: String, value: String, metricType: String, tags: TagSet): Unit = {
val customTags = (configuration.extraTags ++ tags.iterator(_.toString).map(p => p.key -> p.value).filter(t => configuration.tagFilter.accept(t._1))).map { case (k, v) quote"$k:$v" }
val allTagsString = customTags.mkString("[", ",", "]")

if (seriesBuilder.length() > 0) seriesBuilder.append(",")
Expand All @@ -94,13 +103,28 @@ class DatadogAPIReporter extends MetricReporter {
.append(s"""{"metric":"$metricName","interval":$interval,"points":[[$timestamp,$value]],"type":"$metricType","host":"$host","tags":$allTagsString}""")
}

def add(metric: MetricValue, metricType: String): Unit =
addMetric(metric.name, valueFormat.format(scale(metric.value, metric.unit)), metricType, metric.tags)

snapshot.metrics.counters.foreach(add(_, count))
snapshot.metrics.gauges.foreach(add(_, gauge))
snapshot.counters.foreach { snap =>
snap.instruments.foreach { instrument =>
addMetric(
snap.name,
valueFormat.format(scale(instrument.value, snap.settings.unit)),
count,
instrument.tags
)
}
}
snapshot.gauges.foreach { snap =>
snap.instruments.foreach { instrument =>
addMetric(
snap.name,
valueFormat.format(scale(instrument.value, snap.settings.unit)),
gauge,
instrument.tags
)
}
}

(snapshot.metrics.histograms ++ snapshot.metrics.rangeSamplers).foreach(addDistribution)
(snapshot.histograms ++ snapshot.rangeSamplers).foreach(addDistribution)

seriesBuilder
.insert(0, "{\"series\":[")
Expand All @@ -110,36 +134,36 @@ class DatadogAPIReporter extends MetricReporter {

}

private def scale(value: Long, unit: MeasurementUnit): Double = unit.dimension match {
private def scale(value: Double, unit: MeasurementUnit): Double = unit.dimension match {
case Time if unit.magnitude != configuration.timeUnit.magnitude =>
MeasurementUnit.scale(value, unit, configuration.timeUnit)
MeasurementUnit.convert(value, unit, configuration.timeUnit)

case Information if unit.magnitude != configuration.informationUnit.magnitude =>
MeasurementUnit.scale(value, unit, configuration.informationUnit)

case _ => value.toDouble
}
MeasurementUnit.convert(value, unit, configuration.informationUnit)

private def readConfiguration(config: Config): Configuration = {
val datadogConfig = config.getConfig("kamon.datadog")
Configuration(
datadogConfig.getConfig("http"),
timeUnit = readTimeUnit(datadogConfig.getString("time-unit")),
informationUnit = readInformationUnit(datadogConfig.getString("information-unit")),
// Remove the "host" tag since it gets added to the datadog payload separately
EnvironmentTagBuilder.create(datadogConfig.getConfig("additional-tags")) - "host",
Kamon.filter(datadogConfig.getString("filter-config-key"))
)
case _ => value
}
}

private object DatadogAPIReporter {
val count = "count"
val gauge = "gauge"

case class Configuration(httpConfig: Config, timeUnit: MeasurementUnit, informationUnit: MeasurementUnit, extraTags: Map[String, String], tagFilter: Matcher)
case class Configuration(httpConfig: Config, timeUnit: MeasurementUnit, informationUnit: MeasurementUnit, extraTags: Seq[(String, String)], tagFilter: Filter)

implicit class QuoteInterp(val sc: StringContext) extends AnyVal {
def quote(args: Any*): String = "\"" + sc.s(args: _*) + "\""
}

def readConfiguration(config: Config): Configuration = {
val datadogConfig = config.getConfig("kamon.datadog")
Configuration(
datadogConfig.getConfig("api"),
timeUnit = readTimeUnit(datadogConfig.getString("time-unit")),
informationUnit = readInformationUnit(datadogConfig.getString("information-unit")),
// Remove the "host" tag since it gets added to the datadog payload separately
EnvironmentTags.from(Kamon.environment, datadogConfig.getConfig("environment-tags")).without("host").all().map(p => p.key -> Tag.unwrapValue(p).toString),
Kamon.filter("kamon.datadog.environment-tags.filter")
)
}
}
Loading

0 comments on commit c2656e5

Please sign in to comment.