Skip to content

Commit

Permalink
Add support for splitting rollup queries (#1853)
Browse files Browse the repository at this point in the history
* Add an SLA config flag for rollup intervals

Adds a configuration option for rollup intervals to specify their
maximum acceptable delay. Queries that cover a time between now and that
maximum delay will need to query other tables for that time interval.

* Add global config flag to enable splitting queries

Adds a global config flag to enable splitting queries that would hit the
rollup table, but the rollup table has a delay SLA configured.
In that case, this feature allows splitting a query into to; one that
gets the data from the rollups table until the time where it's
guaranteed to be available, and the rest from the raw table.

* Add a new SplitRollupQuery

Adds a SplitRollupQuery class that suports splitting a rollup query into
two separate queries.
This is useful for when a rollup table is filled by e.g. a batch job
that processes the data from the previous day on a daily basis. Rollup
data for yesterday will then only be available some time today. This
delay SLA can be configured on a per-table basis. The delay would
specify by how much time the table can be behind real time.

If a query comes in that would query data from that blackout period
where data is only available in the raw table, but not yet guaranteed to
be in the rollup table, the incoming query can be split into two using
the SplitRollupQuery class. It wraps a query that queries the rollup
table until the last guaranteed to be available timestamp based on the
SLA; and one that gets the remaining data from the raw table.

* Extract an AbstractQuery

Extracts an AbstractQuery from the TsdbQuery implementation since we'd
like to reuse some parts of it in other Query classes (in this case
SplitRollupQuery)

* Extract an AbstractSpanGroup

* Avoid NullPointerException when setting start time

Avoids a NullPointerException that happened when we were trying to set
the start time on a query that would be eligible to split, but due to
the SLA config only hit the raw table anyway.

* Scale timestamps to milliseconds for split queries

Scales all timestamps for split queries to milliseconds. It's important
to maintain consistent units between all the partial queries that make
up the bigger one.

* Fix starting time error for split queries

Fixes a bug that would happen when the start time of a query aligns
perfectly with the time configured in the SLA for the delay of a rollup
table.
For a defined SLA, e.g. 1 day, if the start time of the query was
exactly 1 day ago, the end time of the rollups part of the query would
be updated and then be equal to its start time. That isn't allowed and
causes a query exception.
  • Loading branch information
muffix authored Feb 24, 2021
1 parent f08dd76 commit 1d35f45
Show file tree
Hide file tree
Showing 31 changed files with 2,508 additions and 139 deletions.
10 changes: 10 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dist_noinst_DATA = pom.xml.in build-aux/rpm/opentsdb.conf \
build-aux/rpm/logback.xml build-aux/rpm/init.d/opentsdb \
build-aux/rpm/systemd/opentsdb@.service
tsdb_SRC := \
src/core/AbstractSpanGroup.java \
src/core/AbstractQuery.java \
src/core/AggregationIterator.java \
src/core/Aggregator.java \
src/core/Aggregators.java \
Expand All @@ -48,6 +50,7 @@ tsdb_SRC := \
src/core/DownsamplingSpecification.java \
src/core/FillingDownsampler.java \
src/core/FillPolicy.java \
src/core/GroupCallback.java \
src/core/Histogram.java \
src/core/HistogramAggregation.java \
src/core/HistogramAggregationIterator.java \
Expand Down Expand Up @@ -81,11 +84,14 @@ tsdb_SRC := \
src/core/iRowSeq.java \
src/core/SaltScanner.java \
src/core/SeekableView.java \
src/core/SeekableViewChain.java \
src/core/SimpleHistogram.java \
src/core/SimpleHistogramDataPointAdapter.java \
src/core/SimpleHistogramDecoder.java \
src/core/Span.java \
src/core/SpanGroup.java \
src/core/SplitRollupQuery.java \
src/core/SplitRollupSpanGroup.java \
src/core/TSDB.java \
src/core/Tags.java \
src/core/TsdbQuery.java \
Expand Down Expand Up @@ -317,8 +323,11 @@ test_SRC := \
test/core/TestRowKey.java \
test/core/TestRowSeq.java \
test/core/TestSaltScanner.java \
test/core/TestSeekableViewChain.java \
test/core/TestSpan.java \
test/core/TestSpanGroup.java \
test/core/TestSplitRollupQuery.java \
test/core/TestSplitRollupSpanGroup.java \
test/core/TestTags.java \
test/core/TestTSDB.java \
test/core/TestTSDBAddPoint.java \
Expand Down Expand Up @@ -376,6 +385,7 @@ test_SRC := \
test/query/pojo/TestTimeSpan.java \
test/rollup/TestRollupConfig.java \
test/rollup/TestRollupInterval.java \
test/rollup/TestRollupQuery.java \
test/rollup/TestRollupSeq.java \
test/rollup/TestRollupUtils.java \
test/search/TestSearchPlugin.java \
Expand Down
66 changes: 66 additions & 0 deletions src/core/AbstractQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2012 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import org.hbase.async.HBaseException;

public abstract class AbstractQuery implements Query {
/**
* Runs this query.
*
* @return The data points matched by this query.
* <p>
* Each element in the non-{@code null} but possibly empty array returned
* corresponds to one time series for which some data points have been
* matched by the query.
* @throws HBaseException if there was a problem communicating with HBase to
* perform the search.
*/
@Override
public DataPoints[] run() throws HBaseException {
try {
return runAsync().joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}

/**
* Runs this query.
*
* @return The data points matched by this query and applied with percentile calculation
* <p>
* Each element in the non-{@code null} but possibly empty array returned
* corresponds to one time series for which some data points have been
* matched by the query.
* @throws HBaseException if there was a problem communicating with HBase to
* perform the search.
* @throws IllegalStateException if the query is not a histogram query
*/
@Override
public DataPoints[] runHistogram() throws HBaseException {
if (!isHistogramQuery()) {
throw new RuntimeException("Should never be here");
}

try {
return runHistogramAsync().joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}
}
70 changes: 70 additions & 0 deletions src/core/AbstractSpanGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2012 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import java.util.*;

import org.hbase.async.Bytes;
import org.hbase.async.Bytes.ByteMap;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

import net.opentsdb.meta.Annotation;
import net.opentsdb.rollup.RollupQuery;

/**
* Groups multiple spans together and offers a dynamic "view" on them.
* <p>
* This is used for queries to the TSDB, where we might group multiple
* {@link Span}s that are for the same time series but different tags
* together. We need to "hide" data points that are outside of the
* time period of the query and do on-the-fly aggregation of the data
* points coming from the different Spans, using an {@link Aggregator}.
* Since not all the Spans will have their data points at exactly the
* same time, we also do on-the-fly linear interpolation. If needed,
* this view can also return the rate of change instead of the actual
* data points.
* <p>
* This is one of the rare (if not the only) implementations of
* {@link DataPoints} for which {@link #getTags} can potentially return
* an empty map.
* <p>
* The implementation can also dynamically downsample the data when a
* sampling interval a downsampling function (in the form of an
* {@link Aggregator}) are given. This is done by using a special
* iterator when using the {@link Span.DownsamplingIterator}.
*/
abstract class AbstractSpanGroup implements DataPoints {
/**
* Finds the {@code i}th data point of this group in {@code O(n)}.
* Where {@code n} is the number of data points in this group.
*/
protected DataPoint getDataPoint(int i) {
if (i < 0) {
throw new IndexOutOfBoundsException("negative index: " + i);
}
final int saved_i = i;
final SeekableView it = iterator();
DataPoint dp = null;
while (it.hasNext() && i >= 0) {
dp = it.next();
i--;
}
if (i != -1 || dp == null) {
throw new IndexOutOfBoundsException("index " + saved_i
+ " too large (it's >= " + size() + ") for " + this);
}
return dp;
}
}
30 changes: 30 additions & 0 deletions src/core/GroupCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2012 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import com.stumbleupon.async.Callback;

import java.util.ArrayList;

class GroupCallback implements Callback<Object, ArrayList<Object>> {
/**
* We're only waiting for all callbacks to complete, ignoring their return values.
*
* @param ignored The return values of the individual callbacks - ignored
* @return null
*/
@Override
public Object call(ArrayList<Object> ignored) {
return null;
}
}
32 changes: 30 additions & 2 deletions src/core/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,23 @@ public void setTimeSeries(final List<String> tsuids,
*/
public Deferred<Object> configureFromQuery(final TSQuery query,
final int index);


/**
* Prepares a query against HBase by setting up group bys and resolving
* strings to UIDs asynchronously. This replaces calls to all of the setters
* like the {@link setTimeSeries}, {@link setStartTime}, etc.
* Make sure to wait on the deferred return before calling {@link runAsync}.
* @param query The main query to fetch the start and end time from
* @param index The index of which sub query we're executing
* @param force_raw If true, always get the data from the raw table; disables rollups
* @throws IllegalArgumentException if the query was missing sub queries or
* the index was out of bounds.
* @throws NoSuchUniqueName if the name of a metric, or a tag name/value
* does not exist. (Bubbles up through the deferred)
* @since 2.4
*/
Deferred<Object> configureFromQuery(final TSQuery query, final int index, boolean force_raw);

/**
* Downsamples the results by specifying a fixed interval between points.
* <p>
Expand Down Expand Up @@ -262,7 +278,19 @@ public Deferred<Object> configureFromQuery(final TSQuery query,
* @return
*/
public boolean isHistogramQuery();


/**
* @return Whether or not this is a rollup query
* @since 2.4
*/
public boolean isRollupQuery();

/**
* @return whether this query needs to be split.
* @since 2.4
*/
public boolean needsSplitting();

/**
* Set the percentile calculation parameters for this query if this is
* a histogram query
Expand Down
97 changes: 97 additions & 0 deletions src/core/SeekableViewChain.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// This file is part of OpenTSDB.
// Copyright (C) 2014 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import java.util.List;
import java.util.NoSuchElementException;

public class SeekableViewChain implements SeekableView {

private final List<SeekableView> iterators;
private int currentIterator;

SeekableViewChain(List<SeekableView> iterators) {
this.iterators = iterators;
}

/**
* Returns {@code true} if this view has more elements.
*/
@Override
public boolean hasNext() {
SeekableView iterator = getCurrentIterator();
return iterator != null && iterator.hasNext();
}

/**
* Returns a <em>view</em> on the next data point.
* No new object gets created, the referenced returned is always the same
* and must not be stored since its internal data structure will change the
* next time {@code next()} is called.
*
* @throws NoSuchElementException if there were no more elements to iterate
* on (in which case {@link #hasNext} would have returned {@code false}.
*/
@Override
public DataPoint next() {
SeekableView iterator = getCurrentIterator();
if (iterator == null || !iterator.hasNext()) {
throw new NoSuchElementException("No elements left in iterator");
}

DataPoint next = iterator.next();

if (!iterator.hasNext()) {
currentIterator++;
}

return next;
}

/**
* Unsupported operation.
*
* @throws UnsupportedOperationException always.
*/
@Override
public void remove() {
throw new UnsupportedOperationException("Removing items is not supported");
}

/**
* Advances the iterator to the given point in time.
* <p>
* This allows the iterator to skip all the data points that are strictly
* before the given timestamp.
*
* @param timestamp A strictly positive 32 bit UNIX timestamp (in seconds).
* @throws IllegalArgumentException if the timestamp is zero, or negative,
* or doesn't fit on 32 bits (think "unsigned int" -- yay Java!).
*/
@Override
public void seek(long timestamp) {
for (final SeekableView it : iterators) {
it.seek(timestamp);
}
}

private SeekableView getCurrentIterator() {
while (currentIterator < iterators.size()) {
if (iterators.get(currentIterator).hasNext()) {
return iterators.get(currentIterator);
}
currentIterator++;
}
return null;
}
}
Loading

0 comments on commit 1d35f45

Please sign in to comment.