Skip to content

Commit

Permalink
For branch next, add an expression function named FirstDifference, wh… (
Browse files Browse the repository at this point in the history
#1458)

* For branch next, add an expression function named FirstDifference, which calculates the first difference of a time series. I noticed there is MovingAverage calculation, so I thought maybe I can enrich the mathematics functions into that.

* add some unit tests for FirstDifference

Bump version to 2.5.0-SNAPSHOT.

Fix a compilation error about missing FirstDifference (#1471)

Signed-off-by: Chris Larsen <clarsen@yahoo-inc.com>

Bugfix of FsckOptions. (#1464)

Signed-off-by: Chris Larsen <clarsen@yahoo-inc.com>

CORE: (#1472)

- Add RpcResponder for handling callbacks asynchronously
UTILS:
 - Add two convenient methods in Config

Signed-off-by: Chris Larsen <clarsen@verizonmedia.com>

fix #1581 by correcting an edge case in TsdbQuery.getScanEndTimeSeconds() (#1582)

Dockerfile that works without a script. (#1739)

replace FOREVER with a valid value in table creation (#1967)

Co-authored-by: Ion DULGHERU <ion.dulgheru@gmail.com>

Jackson has a serious security problem in 2.9.5, which will cause RCE (#2034)

* Jackson has a serious security problem in 2.9.5, which will cause RCE

FasterXML/jackson-databind#2295

* Jackson has a serious security problem in 2.9.5, which will cause RCE

FasterXML/jackson-databind#2295

Co-authored-by: chi-chi weng <949409306@qq.com>

Pr 1663 (#1966)

* Make UniqueIdRpc aware of the mode

* Update javadoc on new method and rename test methods to be more descriptive

Co-authored-by: Simon Matic Langford <simon@exemel.co.uk>

Re-introduce query timeouts. (#2035)

Co-authored-by: Itamar Turner-Trauring <itamar@itamarst.org>

Updating maven central urls and versions to match what is available now (#2039)

Fixes #1899
Fixes #1941

always write cli tools to stdout (#1488)

Signed-off-by: Chris Larsen <clarsen@yahoo-inc.com>

Fix #1632 (#1634)

Add "check_tsd_v2" script (#1567)

Enhanced check_tsd script evaluates each individual metric group separately when given a filter

Collect stats from meta cache plugin if configured (#1649)

Fix SaltScanner race condition on spans maps (#1651)

* Fix SaltScanner race condition on spans maps

* Fix 1.6 compatibility

Synchronise the KVs list for scanner results

Synchronises the list that holds the KeyValues that have been produced
by the scanner callbacks. The list is accessed from multiple threads at
a time and wasn't thread-safe, causing inconsistent results and partial
loss of data in the response.

Relates to: #1753
Resolves: #1760

Allow rollup downsample and series aggregator to be different

Fix TestSaltScannerHistogram, looks like the method was renamed and the
UTs were not adjusted.

ExplicitTags filtering with FuzzyFilters

Fix PR 1896 with the fuzzy filter list so that it will honor the regex filter
and properly ignore rows that don't match the explicit filter. Also sort the
fuzzy filter list in ascending order and implement a static comparator instead
of instantiating one on each call.

Test rollup filter fix for #1083

Fix concurrent result reporting from scanners

Fixes a concurrency bug where scanners report their results into a map
and would overwrite each other's results

Resolves: #1753

Update Maven jars URLs with HTTPS access

Remove excess param in javadoc for RpcHandler

Fix check_tsd_v2 (#1937)

* renamed instancename of logger

The previous name was copied from another script, cosmetic change only

* Change behaviour of --ignore-recent option

Previous option would fetch data from opentsdb from --duration seconds
ago to time.now(), and then try to remove timestamps that was inside the
--ignore-recent seconds ago, however the logic was flawed and it
actually only included these seconds. Furthermore opentsdb supports
setting an "end" parameter, so we use this to only get the data we want.

for example -d 180 -I 80, would render a query parameter that looks like
`?start=180s-ago&end=80s-ago`. Keeps it simple.

Also added debuglogging to output the actual query sent to OpenTSDB if
--debug option is enabled.

* fixed logic of --percent-over parameter

Previous behaviour didn't work due to wrong logic, would set "crit" or
"warn" to True regardless. This change fixes that.

* better output from logging

Add logmessages to be consistent across alerting-scenarios, and changed
format of some floats. Fixed a log messaged that displayed "crit" value
where it should have been "warn" value.

* Fixed bug in logic that parses results

Removed an if statement that `continue`:ed the for-loop if a result was
neither a `crit` or `warn` already, however this check also made the logic
skip the test to see if no values were returned by opentsdb and -A flag was
specified to alert in such scenarios.

* changed check for timestamps type

Previous behaviour was to check if a timestamp could be cast as a float,
which is a bit weird, because opentsdb will return integers.

I do doubt that opentsdb would return a timestamp that is not an integer
to begin with, so i suspect this check is redundant, but leaving it in
for now regardless, as per discussion in PR.

Rename maxScannerUidtoStringTime into maxScannerUidToStringTime (#1875)

Fix the missing index from #1754 in the salt scanner.

Force Sunday as first day of week.

Tweak TestTsdbQueryQueries to pass in older java versions.

Fix the min case for doubles in AggregationIterator.

Fix the Screw Driver config.

Fix UT for JDK8

PR for SD config.

Fix: Rollup queries with count aggregator produce unexpected results (#1895)

Co-authored-by: Tony Di Nucci <tony.dinucci@skyscanner.net>

Fixed function description Fixes #841 (#2040)

Added tracking of metrics which are null due to auto_metric being disabled Fixes #786 (#2042)

Add support for splitting rollup queries (#1853)

* Add an SLA config flag for rollup intervals

Adds a configuration option for rollup intervals to specify their
maximum acceptable delay. Queries that cover a time between now and that
maximum delay will need to query other tables for that time interval.

* Add global config flag to enable splitting queries

Adds a global config flag to enable splitting queries that would hit the
rollup table, but the rollup table has a delay SLA configured.
In that case, this feature allows splitting a query into to; one that
gets the data from the rollups table until the time where it's
guaranteed to be available, and the rest from the raw table.

* Add a new SplitRollupQuery

Adds a SplitRollupQuery class that suports splitting a rollup query into
two separate queries.
This is useful for when a rollup table is filled by e.g. a batch job
that processes the data from the previous day on a daily basis. Rollup
data for yesterday will then only be available some time today. This
delay SLA can be configured on a per-table basis. The delay would
specify by how much time the table can be behind real time.

If a query comes in that would query data from that blackout period
where data is only available in the raw table, but not yet guaranteed to
be in the rollup table, the incoming query can be split into two using
the SplitRollupQuery class. It wraps a query that queries the rollup
table until the last guaranteed to be available timestamp based on the
SLA; and one that gets the remaining data from the raw table.

* Extract an AbstractQuery

Extracts an AbstractQuery from the TsdbQuery implementation since we'd
like to reuse some parts of it in other Query classes (in this case
SplitRollupQuery)

* Extract an AbstractSpanGroup

* Avoid NullPointerException when setting start time

Avoids a NullPointerException that happened when we were trying to set
the start time on a query that would be eligible to split, but due to
the SLA config only hit the raw table anyway.

* Scale timestamps to milliseconds for split queries

Scales all timestamps for split queries to milliseconds. It's important
to maintain consistent units between all the partial queries that make
up the bigger one.

* Fix starting time error for split queries

Fixes a bug that would happen when the start time of a query aligns
perfectly with the time configured in the SLA for the delay of a rollup
table.
For a defined SLA, e.g. 1 day, if the start time of the query was
exactly 1 day ago, the end time of the rollups part of the query would
be updated and then be equal to its start time. That isn't allowed and
causes a query exception.
  • Loading branch information
Eduardo95 authored and johann8384 committed Feb 24, 2021
1 parent 1d35f45 commit 74f9c9a
Show file tree
Hide file tree
Showing 32 changed files with 1,146 additions and 303 deletions.
8 changes: 4 additions & 4 deletions tools/docker/Dockerfile → Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM java:openjdk-8-alpine

MAINTAINER jonathan.creasy@gmail.com

ENV VERSION 2.3.0-RC1
ENV VERSION 2.5.0-SNAPSHOT
ENV WORKDIR /usr/share/opentsdb
ENV LOGDIR /var/log/opentsdb
ENV DATADIR /data/opentsdb
Expand Down Expand Up @@ -32,10 +32,10 @@ ENV TSDB_PORT 4244

WORKDIR $WORKDIR

ADD libs $WORKDIR/libs
ADD logback.xml $WORKDIR
ADD third_party/*/*.jar $WORKDIR/libs/
ADD src/logback.xml $WORKDIR
ADD tsdb-$VERSION.jar $WORKDIR
ADD opentsdb.conf $ETCDIR/opentsdb.conf
ADD src/opentsdb.conf $ETCDIR/opentsdb.conf

VOLUME ["/etc/openstsdb"]
VOLUME ["/data/opentsdb"]
Expand Down
3 changes: 3 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ tsdb_SRC := \
src/core/RequestBuilder.java \
src/core/RowKey.java \
src/core/RowSeq.java \
src/core/RpcResponder.java \
src/core/iRowSeq.java \
src/core/SaltScanner.java \
src/core/SeekableView.java \
Expand Down Expand Up @@ -126,6 +127,7 @@ tsdb_SRC := \
src/query/expression/ExpressionReader.java \
src/query/expression/Expressions.java \
src/query/expression/ExpressionTree.java \
src/query/expression/FirstDifference.java \
src/query/expression/HighestCurrent.java \
src/query/expression/HighestMax.java \
src/query/expression/IntersectionIterator.java \
Expand Down Expand Up @@ -322,6 +324,7 @@ test_SRC := \
test/core/TestRateSpan.java \
test/core/TestRowKey.java \
test/core/TestRowSeq.java \
test/core/TestRpcResponsder.java \
test/core/TestSaltScanner.java \
test/core/TestSeekableViewChain.java \
test/core/TestSpan.java \
Expand Down
3 changes: 2 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# along with this library. If not, see <http://www.gnu.org/licenses/>.

# Semantic Versioning (see http://semver.org/).
AC_INIT([opentsdb], [2.4.1-SNAPSHOT], [opentsdb@googlegroups.com])
AC_INIT([opentsdb], [2.5.0-SNAPSHOT], [opentsdb@googlegroups.com])

AC_CONFIG_AUX_DIR([build-aux])
AM_INIT_AUTOMAKE([foreign])

Expand Down
71 changes: 14 additions & 57 deletions src/core/IncomingDataPoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
Expand Down Expand Up @@ -45,6 +46,11 @@ final class IncomingDataPoints implements WritableDataPoints {
*/
static final Histogram putlatency = new Histogram(16000, (short) 2, 100);

/**
* Keep track of the number of UIDs that came back null with auto_metric disabled.
*/
static final AtomicLong auto_metric_rejection_count = new AtomicLong();

/** The {@code TSDB} instance we belong to. */
private final TSDB tsdb;

Expand Down Expand Up @@ -137,9 +143,14 @@ static byte[] rowKeyTemplate(final TSDB tsdb, final String metric,

short pos = (short) Const.SALT_WIDTH();

copyInRowKey(row, pos,
(tsdb.config.auto_metric() ? tsdb.metrics.getOrCreateId(metric)
: tsdb.metrics.getId(metric)));
byte[] metric_id = (tsdb.config.auto_metric() ? tsdb.metrics.getOrCreateId(metric)
: tsdb.metrics.getId(metric));

if(!tsdb.config.auto_metric() && metric_id == null) {
auto_metric_rejection_count.incrementAndGet();
}

copyInRowKey(row, pos, metric_id);
pos += metric_width;

pos += Const.TIMESTAMP_BYTES;
Expand All @@ -151,60 +162,6 @@ static byte[] rowKeyTemplate(final TSDB tsdb, final String metric,
return row;
}

/**
* Returns a partially initialized row key for this metric and these tags. The
* only thing left to fill in is the base timestamp.
*
* @since 2.0
*/
static Deferred<byte[]> rowKeyTemplateAsync(final TSDB tsdb,
final String metric, final Map<String, String> tags) {
final short metric_width = tsdb.metrics.width();
final short tag_name_width = tsdb.tag_names.width();
final short tag_value_width = tsdb.tag_values.width();
final short num_tags = (short) tags.size();

int row_size = (Const.SALT_WIDTH() + metric_width + Const.TIMESTAMP_BYTES
+ tag_name_width * num_tags + tag_value_width * num_tags);
final byte[] row = new byte[row_size];

// Lookup or create the metric ID.
final Deferred<byte[]> metric_id;
if (tsdb.config.auto_metric()) {
metric_id = tsdb.metrics.getOrCreateIdAsync(metric, metric, tags);
} else {
metric_id = tsdb.metrics.getIdAsync(metric);
}

// Copy the metric ID at the beginning of the row key.
class CopyMetricInRowKeyCB implements Callback<byte[], byte[]> {
public byte[] call(final byte[] metricid) {
copyInRowKey(row, (short) Const.SALT_WIDTH(), metricid);
return row;
}
}

// Copy the tag IDs in the row key.
class CopyTagsInRowKeyCB implements
Callback<Deferred<byte[]>, ArrayList<byte[]>> {
public Deferred<byte[]> call(final ArrayList<byte[]> tags) {
short pos = (short) (Const.SALT_WIDTH() + metric_width);
pos += Const.TIMESTAMP_BYTES;
for (final byte[] tag : tags) {
copyInRowKey(row, pos, tag);
pos += tag.length;
}
// Once we've resolved all the tags, schedule the copy of the metric
// ID and return the row key we produced.
return metric_id.addCallback(new CopyMetricInRowKeyCB());
}
}

// Kick off the resolution of all tags.
return Tags.resolveOrCreateAllAsync(tsdb, metric, tags)
.addCallbackDeferring(new CopyTagsInRowKeyCB());
}

public void setSeries(final String metric, final Map<String, String> tags) {
checkMetricAndTags(metric, tags);
try {
Expand Down
2 changes: 1 addition & 1 deletion src/core/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Deferred<Object> configureFromQuery(final TSQuery query,
* way we get this one data point is by aggregating all the data points of
* that interval together using an {@link Aggregator}. This enables you
* to compute things like the 5-minute average or 10 minute 99th percentile.
* @param interval Number of seconds wanted between each data point.
* @param interval Number of milliseconds wanted between each data point.
* @param downsampler Aggregation function to use to group data points
* within an interval.
*/
Expand Down
110 changes: 110 additions & 0 deletions src/core/RpcResponder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2017 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import net.opentsdb.utils.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* This class is responsible for building result of requests and
* respond to clients asynchronously.
*
* It can reduce requests that stacking in AsyncHBase, especially put requests.
* When a HBase's RPC has completed, the "AsyncHBase I/O worker" just decodes
* the response, and then do callback by this class asynchronously. We should
* take up workers as short as possible time so that workers can remove RPCs
* from in-flight state more quickly.
*
*/
public class RpcResponder {

private static final Logger LOG = LoggerFactory.getLogger(RpcResponder.class);

public static final String TSD_RESPONSE_ASYNC_KEY = "tsd.core.response.async";
public static final boolean TSD_RESPONSE_ASYNC_DEFAULT = true;

public static final String TSD_RESPONSE_WORKER_NUM_KEY =
"tsd.core.response.worker.num";
public static final int TSD_RESPONSE_WORKER_NUM_DEFAULT = 10;

private final boolean async;
private ExecutorService responders;
private volatile boolean running = true;

RpcResponder(final Config config) {
async = config.getBoolean(TSD_RESPONSE_ASYNC_KEY,
TSD_RESPONSE_ASYNC_DEFAULT);

if (async) {
int threads = config.getInt(TSD_RESPONSE_WORKER_NUM_KEY,
TSD_RESPONSE_WORKER_NUM_DEFAULT);
responders = Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder()
.setNameFormat("OpenTSDB Responder #%d")
.setDaemon(true)
.setUncaughtExceptionHandler(new ExceptionHandler())
.build());
}

LOG.info("RpcResponder mode: {}", async ? "async" : "sync");
}

public void response(Runnable run) {
if (async) {
if (running) {
responders.execute(run);
} else {
throw new IllegalStateException("RpcResponder is closing or closed.");
}
} else {
run.run();
}
}

public void close() {
if (running) {
running = false;
responders.shutdown();
}

boolean completed;
try {
completed = responders.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
completed = false;
}

if (!completed) {
LOG.warn(
"There are still some results that are not returned to the clients.");
}
}

public boolean isAsync() {
return async;
}

private class ExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Run into an uncaught exception in thread: " + t.getName(), e);
}
}
}
22 changes: 20 additions & 2 deletions src/core/SaltScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ final class ScannerCB implements Callback<Object,
private long rows_pre_filter = 0;
private long dps_post_filter = 0;
private long rows_post_filter = 0;

private long query_timeout = tsdb.getConfig().getLong("tsd.query.timeout");

public ScannerCB(final Scanner scanner, final int index) {
this.scanner = scanner;
this.index = index;
Expand Down Expand Up @@ -554,7 +555,24 @@ public Object call(final ArrayList<ArrayList<KeyValue>> rows)
final List<Deferred<Object>> lookups =
filters != null && !filters.isEmpty() ?
new ArrayList<Deferred<Object>>(rows.size()) : null;


// fail the query when the timeout exceeded
if (this.query_timeout > 0 && fetch_time > (this.query_timeout * 1000000)) {
try {
close(false);
handleException(
new QueryException(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE,
"Sorry, your query timed out. Time limit: "
+ this.query_timeout + " ms, fetch time: "
+ (double)(fetch_time)/1000000 + " ms. Please try filtering "
+ "using more tags or decrease your time range."));
return false;
} catch (Exception e) {
LOG.error("Sorry, Scanner is closed: " + scanner, e);
return false;
}
}

// validation checking before processing the next set of results. It's
// kinda funky but we want to allow queries to sneak through that were
// just a *tad* over the limits so that's why we don't check at the
Expand Down
Loading

0 comments on commit 74f9c9a

Please sign in to comment.