From cc2a1a20328b69092370a9ba208064aaadabe1cc Mon Sep 17 00:00:00 2001 From: Mohammed Ammer Date: Thu, 11 Jun 2020 14:52:41 +0200 Subject: [PATCH] Elasticsearch: Make Elasticsearch job to ignore service(s) spans (#92) Signed-off-by: Mohammed Ammer --- README.md | 1 + .../elastic/ElasticsearchDependenciesJob.java | 36 ++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index bc4c151..3b3e31c 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ Elasticsearch is used when `STORAGE=elasticsearch`. * `ES_INDEX_PREFIX`: index prefix of Jaeger indices. By default unset. * `ES_TIME_RANGE`: How far in the past the job should look to for spans, the maximum and default is `24h`. Any value accepted by [date-math](https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math) can be used here, but the anchor is always `now`. + * `ES_SERVICES_IGNORE`: Comma seperated (,) names of the services that will be ignored by the job (e.g. `serviceX,serviceY`). Example usage: diff --git a/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java b/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java index 42426c0..503393a 100644 --- a/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java +++ b/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -60,6 +62,7 @@ public static final class Builder { Boolean nodesWanOnly = Boolean.parseBoolean(Utils.getEnv("ES_NODES_WAN_ONLY", "false")); String indexPrefix = Utils.getEnv("ES_INDEX_PREFIX", null); String spanRange = Utils.getEnv("ES_TIME_RANGE", "24h"); + String servicesIgnore = Utils.getEnv("ES_SERVICES_IGNORE", null); final Map sparkProperties = new LinkedHashMap<>(); @@ -124,6 +127,12 @@ public Builder spanRange(String spanRange) { return this; } + /** Ignore services by name. By default empty */ + public Builder servicesIgnore(String servicesIgnore) { + this.servicesIgnore = servicesIgnore; + return this; + } + /** Day to process dependencies for. Defaults to today. */ public Builder day(LocalDate day) { this.day = day.atStartOfDay(ZoneOffset.UTC); @@ -166,6 +175,7 @@ private static String getSystemPropertyAsFileResource(String key) { private final SparkConf conf; private final String indexPrefix; private final String spanRange; + private final String servicesIgnore; ElasticsearchDependenciesJob(Builder builder) { this.day = builder.day; @@ -195,6 +205,8 @@ private static String getSystemPropertyAsFileResource(String key) { } this.indexPrefix = builder.indexPrefix; this.spanRange = builder.spanRange; + this.servicesIgnore = builder.servicesIgnore; + } /** @@ -228,10 +240,7 @@ void run(String[] spanIndices, String[] depIndices,String peerServiceTag) { String spanIndex = spanIndices[i]; String depIndex = depIndices[i]; log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanIndex, depIndex); - // Send raw query to ES to select only the docs / spans we want to consider for this job - // This doesn't change the default behavior as the daily indexes only contain up to 24h of data - String esQuery = String.format("{\"range\": {\"startTimeMillis\": { \"gte\": \"now-%s\" }}}", spanRange); - JavaPairRDD> traces = JavaEsSpark.esJsonRDD(sc, spanIndex, esQuery) + JavaPairRDD> traces = JavaEsSpark.esJsonRDD(sc, spanIndex, esQuery()) .map(new ElasticTupleToSpan()) .groupBy(Span::getTraceId); List dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag); @@ -254,6 +263,25 @@ void run(String[] spanIndices, String[] depIndices,String peerServiceTag) { } } + /** + * Create ElasticSearch query to be applied by the job while retrieving the spans. + * @return ElasticSearch query to be applied by the job while retrieving the spans. + */ + private String esQuery() { + // Send raw query to ES to select only the docs / spans we want to consider for this job + // This doesn't change the default behavior as the daily indexes only contain up to 24h of data + String esMustQuery = String.format("{\"range\": {\"startTimeMillis\": { \"gte\": \"now-%s\" }}}", spanRange); + String esMustNotQuery = ""; + if (servicesIgnore != null) { + esMustNotQuery = Stream.of(servicesIgnore.split(",")) + .filter(serviceName -> !serviceName.trim().isEmpty()) + .map(serviceName -> String.format("{\"match_phrase\":{\"process.serviceName\":{\"query\":\"%s\"}}}", serviceName.trim())) + .collect(Collectors.toList()) + .toString(); + } + return String.format("{\"bool\":{\"must_not\":[%s],\"must\":[%s]}}", esMustNotQuery, esMustQuery); + } + private EsMajorVersion getEsVersion() { RestClient client = new RestClient(new SparkSettings(conf)); try {