diff --git a/Makefile.am b/Makefile.am index d3ce9287e7..9bf563d304 100644 --- a/Makefile.am +++ b/Makefile.am @@ -32,6 +32,8 @@ dist_noinst_DATA = pom.xml.in build-aux/rpm/opentsdb.conf \ build-aux/rpm/logback.xml build-aux/rpm/init.d/opentsdb \ build-aux/rpm/systemd/opentsdb@.service tsdb_SRC := \ + src/core/AbstractSpanGroup.java \ + src/core/AbstractQuery.java \ src/core/AggregationIterator.java \ src/core/Aggregator.java \ src/core/Aggregators.java \ @@ -48,6 +50,7 @@ tsdb_SRC := \ src/core/DownsamplingSpecification.java \ src/core/FillingDownsampler.java \ src/core/FillPolicy.java \ + src/core/GroupCallback.java \ src/core/Histogram.java \ src/core/HistogramAggregation.java \ src/core/HistogramAggregationIterator.java \ @@ -81,11 +84,14 @@ tsdb_SRC := \ src/core/iRowSeq.java \ src/core/SaltScanner.java \ src/core/SeekableView.java \ + src/core/SeekableViewChain.java \ src/core/SimpleHistogram.java \ src/core/SimpleHistogramDataPointAdapter.java \ src/core/SimpleHistogramDecoder.java \ src/core/Span.java \ src/core/SpanGroup.java \ + src/core/SplitRollupQuery.java \ + src/core/SplitRollupSpanGroup.java \ src/core/TSDB.java \ src/core/Tags.java \ src/core/TsdbQuery.java \ @@ -317,8 +323,11 @@ test_SRC := \ test/core/TestRowKey.java \ test/core/TestRowSeq.java \ test/core/TestSaltScanner.java \ + test/core/TestSeekableViewChain.java \ test/core/TestSpan.java \ test/core/TestSpanGroup.java \ + test/core/TestSplitRollupQuery.java \ + test/core/TestSplitRollupSpanGroup.java \ test/core/TestTags.java \ test/core/TestTSDB.java \ test/core/TestTSDBAddPoint.java \ @@ -376,6 +385,7 @@ test_SRC := \ test/query/pojo/TestTimeSpan.java \ test/rollup/TestRollupConfig.java \ test/rollup/TestRollupInterval.java \ + test/rollup/TestRollupQuery.java \ test/rollup/TestRollupSeq.java \ test/rollup/TestRollupUtils.java \ test/search/TestSearchPlugin.java \ diff --git a/src/core/AbstractQuery.java b/src/core/AbstractQuery.java new file mode 100644 index 0000000000..5a37c50f4a --- /dev/null +++ b/src/core/AbstractQuery.java @@ -0,0 +1,66 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2010-2012 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import org.hbase.async.HBaseException; + +public abstract class AbstractQuery implements Query { + /** + * Runs this query. + * + * @return The data points matched by this query. + *

+ * Each element in the non-{@code null} but possibly empty array returned + * corresponds to one time series for which some data points have been + * matched by the query. + * @throws HBaseException if there was a problem communicating with HBase to + * perform the search. + */ + @Override + public DataPoints[] run() throws HBaseException { + try { + return runAsync().joinUninterruptibly(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Should never be here", e); + } + } + + /** + * Runs this query. + * + * @return The data points matched by this query and applied with percentile calculation + *

+ * Each element in the non-{@code null} but possibly empty array returned + * corresponds to one time series for which some data points have been + * matched by the query. + * @throws HBaseException if there was a problem communicating with HBase to + * perform the search. + * @throws IllegalStateException if the query is not a histogram query + */ + @Override + public DataPoints[] runHistogram() throws HBaseException { + if (!isHistogramQuery()) { + throw new RuntimeException("Should never be here"); + } + + try { + return runHistogramAsync().joinUninterruptibly(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Should never be here", e); + } + } +} diff --git a/src/core/AbstractSpanGroup.java b/src/core/AbstractSpanGroup.java new file mode 100644 index 0000000000..f9605a6b05 --- /dev/null +++ b/src/core/AbstractSpanGroup.java @@ -0,0 +1,70 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2010-2012 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import java.util.*; + +import org.hbase.async.Bytes; +import org.hbase.async.Bytes.ByteMap; + +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; + +import net.opentsdb.meta.Annotation; +import net.opentsdb.rollup.RollupQuery; + +/** + * Groups multiple spans together and offers a dynamic "view" on them. + *

+ * This is used for queries to the TSDB, where we might group multiple + * {@link Span}s that are for the same time series but different tags + * together. We need to "hide" data points that are outside of the + * time period of the query and do on-the-fly aggregation of the data + * points coming from the different Spans, using an {@link Aggregator}. + * Since not all the Spans will have their data points at exactly the + * same time, we also do on-the-fly linear interpolation. If needed, + * this view can also return the rate of change instead of the actual + * data points. + *

+ * This is one of the rare (if not the only) implementations of + * {@link DataPoints} for which {@link #getTags} can potentially return + * an empty map. + *

+ * The implementation can also dynamically downsample the data when a + * sampling interval a downsampling function (in the form of an + * {@link Aggregator}) are given. This is done by using a special + * iterator when using the {@link Span.DownsamplingIterator}. + */ +abstract class AbstractSpanGroup implements DataPoints { + /** + * Finds the {@code i}th data point of this group in {@code O(n)}. + * Where {@code n} is the number of data points in this group. + */ + protected DataPoint getDataPoint(int i) { + if (i < 0) { + throw new IndexOutOfBoundsException("negative index: " + i); + } + final int saved_i = i; + final SeekableView it = iterator(); + DataPoint dp = null; + while (it.hasNext() && i >= 0) { + dp = it.next(); + i--; + } + if (i != -1 || dp == null) { + throw new IndexOutOfBoundsException("index " + saved_i + + " too large (it's >= " + size() + ") for " + this); + } + return dp; + } +} diff --git a/src/core/GroupCallback.java b/src/core/GroupCallback.java new file mode 100644 index 0000000000..da835dbd63 --- /dev/null +++ b/src/core/GroupCallback.java @@ -0,0 +1,30 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2010-2012 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import com.stumbleupon.async.Callback; + +import java.util.ArrayList; + +class GroupCallback implements Callback> { + /** + * We're only waiting for all callbacks to complete, ignoring their return values. + * + * @param ignored The return values of the individual callbacks - ignored + * @return null + */ + @Override + public Object call(ArrayList ignored) { + return null; + } +} diff --git a/src/core/Query.java b/src/core/Query.java index 3d95834004..455f641a25 100644 --- a/src/core/Query.java +++ b/src/core/Query.java @@ -171,7 +171,23 @@ public void setTimeSeries(final List tsuids, */ public Deferred configureFromQuery(final TSQuery query, final int index); - + + /** + * Prepares a query against HBase by setting up group bys and resolving + * strings to UIDs asynchronously. This replaces calls to all of the setters + * like the {@link setTimeSeries}, {@link setStartTime}, etc. + * Make sure to wait on the deferred return before calling {@link runAsync}. + * @param query The main query to fetch the start and end time from + * @param index The index of which sub query we're executing + * @param force_raw If true, always get the data from the raw table; disables rollups + * @throws IllegalArgumentException if the query was missing sub queries or + * the index was out of bounds. + * @throws NoSuchUniqueName if the name of a metric, or a tag name/value + * does not exist. (Bubbles up through the deferred) + * @since 2.4 + */ + Deferred configureFromQuery(final TSQuery query, final int index, boolean force_raw); + /** * Downsamples the results by specifying a fixed interval between points. *

@@ -262,7 +278,19 @@ public Deferred configureFromQuery(final TSQuery query, * @return */ public boolean isHistogramQuery(); - + + /** + * @return Whether or not this is a rollup query + * @since 2.4 + */ + public boolean isRollupQuery(); + + /** + * @return whether this query needs to be split. + * @since 2.4 + */ + public boolean needsSplitting(); + /** * Set the percentile calculation parameters for this query if this is * a histogram query diff --git a/src/core/SeekableViewChain.java b/src/core/SeekableViewChain.java new file mode 100644 index 0000000000..63c0454240 --- /dev/null +++ b/src/core/SeekableViewChain.java @@ -0,0 +1,97 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2014 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import java.util.List; +import java.util.NoSuchElementException; + +public class SeekableViewChain implements SeekableView { + + private final List iterators; + private int currentIterator; + + SeekableViewChain(List iterators) { + this.iterators = iterators; + } + + /** + * Returns {@code true} if this view has more elements. + */ + @Override + public boolean hasNext() { + SeekableView iterator = getCurrentIterator(); + return iterator != null && iterator.hasNext(); + } + + /** + * Returns a view on the next data point. + * No new object gets created, the referenced returned is always the same + * and must not be stored since its internal data structure will change the + * next time {@code next()} is called. + * + * @throws NoSuchElementException if there were no more elements to iterate + * on (in which case {@link #hasNext} would have returned {@code false}. + */ + @Override + public DataPoint next() { + SeekableView iterator = getCurrentIterator(); + if (iterator == null || !iterator.hasNext()) { + throw new NoSuchElementException("No elements left in iterator"); + } + + DataPoint next = iterator.next(); + + if (!iterator.hasNext()) { + currentIterator++; + } + + return next; + } + + /** + * Unsupported operation. + * + * @throws UnsupportedOperationException always. + */ + @Override + public void remove() { + throw new UnsupportedOperationException("Removing items is not supported"); + } + + /** + * Advances the iterator to the given point in time. + *

+ * This allows the iterator to skip all the data points that are strictly + * before the given timestamp. + * + * @param timestamp A strictly positive 32 bit UNIX timestamp (in seconds). + * @throws IllegalArgumentException if the timestamp is zero, or negative, + * or doesn't fit on 32 bits (think "unsigned int" -- yay Java!). + */ + @Override + public void seek(long timestamp) { + for (final SeekableView it : iterators) { + it.seek(timestamp); + } + } + + private SeekableView getCurrentIterator() { + while (currentIterator < iterators.size()) { + if (iterators.get(currentIterator).hasNext()) { + return iterators.get(currentIterator); + } + currentIterator++; + } + return null; + } +} diff --git a/src/core/SpanGroup.java b/src/core/SpanGroup.java index 94bed154d0..07beaaa5d5 100644 --- a/src/core/SpanGroup.java +++ b/src/core/SpanGroup.java @@ -12,14 +12,7 @@ // see . package net.opentsdb.core; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.hbase.async.Bytes; import org.hbase.async.Bytes.ByteMap; @@ -52,7 +45,7 @@ * {@link Aggregator}) are given. This is done by using a special * iterator when using the {@link Span.DownsamplingIterator}. */ -final class SpanGroup implements DataPoints { +final class SpanGroup extends AbstractSpanGroup { /** Annotations */ private final ArrayList annotations; @@ -109,7 +102,10 @@ final class SpanGroup implements DataPoints { /** The TSDB to which we belong, used for resolution */ private final TSDB tsdb; - + + /** The group we belong to */ + private byte[] group; + /** * Ctor. * @param tsdb The TSDB we belong to. @@ -231,7 +227,7 @@ final class SpanGroup implements DataPoints { final long query_end, final int query_index) { this(tsdb, start_time, end_time, spans, rate, rate_options, aggregator, - downsampler, query_start, query_end, query_index, null); + downsampler, query_start, query_end, query_index, null, new byte[0]); } /** @@ -265,7 +261,8 @@ final class SpanGroup implements DataPoints { final long query_start, final long query_end, final int query_index, - final RollupQuery rollup_query) { + final RollupQuery rollup_query, + byte[] group) { annotations = new ArrayList(); this.start_time = (start_time & Const.SECOND_MASK) == 0 ? start_time * 1000 : start_time; @@ -285,6 +282,7 @@ final class SpanGroup implements DataPoints { this.query_index = query_index; this.rollup_query = rollup_query; this.tsdb = tsdb; + this.group = group; } /** @@ -531,32 +529,22 @@ public SeekableView iterator() { rate, rate_options, rollup_query); } - /** - * Finds the {@code i}th data point of this group in {@code O(n)}. - * Where {@code n} is the number of data points in this group. - */ - private DataPoint getDataPoint(int i) { - if (i < 0) { - throw new IndexOutOfBoundsException("negative index: " + i); - } - final int saved_i = i; - final SeekableView it = iterator(); - DataPoint dp = null; - while (it.hasNext() && i >= 0) { - dp = it.next(); - i--; - } - if (i != -1 || dp == null) { - throw new IndexOutOfBoundsException("index " + saved_i - + " too large (it's >= " + size() + ") for " + this); - } - return dp; - } - public long timestamp(final int i) { return getDataPoint(i).timestamp(); } + /** + * Returns the group the spans in here belong to. + * + * Returns null if the NONE aggregator was requested in the query + * Returns an empty array if there were no group bys and they're all in the same group + * Returns the group otherwise + * @return The group + */ + public byte[] group() { + return group; + } + public boolean isInteger(final int i) { return getDataPoint(i).isInteger(); } @@ -585,7 +573,8 @@ private String toStringSharedAttributes() { + ", aggregator=" + aggregator + ", downsampler=" + downsampler + ", query_start=" + query_start - + ", query_end" + query_end + + ", query_end=" + query_end + + ", group=" + Arrays.toString(group) + ')'; } @@ -602,6 +591,10 @@ public boolean isPercentile() { public float getPercentile() { throw new UnsupportedOperationException("getPercentile not supported"); } + + public List getSpans() { + return spans; + } /** * Resolves the set of tag keys to their string names. diff --git a/src/core/SplitRollupQuery.java b/src/core/SplitRollupQuery.java new file mode 100644 index 0000000000..7121c99762 --- /dev/null +++ b/src/core/SplitRollupQuery.java @@ -0,0 +1,480 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2010-2012 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; +import net.opentsdb.rollup.RollupQuery; +import net.opentsdb.uid.NoSuchUniqueName; +import org.hbase.async.Bytes; +import org.hbase.async.Bytes.ByteMap; +import org.hbase.async.HBaseException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +public class SplitRollupQuery extends AbstractQuery { + + private TSDB tsdb; + + private TsdbQuery rollupQuery; + private TsdbQuery rawQuery; + + private Deferred rollupResolution; + private Deferred rawResolution; + + SplitRollupQuery(final TSDB tsdb, TsdbQuery rollupQuery, Deferred rollupResolution) { + if (rollupQuery == null) { + throw new IllegalArgumentException("Rollup query cannot be null"); + } + + this.tsdb = tsdb; + + this.rollupQuery = rollupQuery; + this.rollupResolution = rollupResolution; + } + + /** + * Returns the start time of the graph. + * + * @return A strictly positive integer. + * @throws IllegalStateException if {@link #setStartTime(long)} was never + * called on this instance before. + */ + @Override + public long getStartTime() { + return rollupQuery != null ? rollupQuery.getStartTime() : rawQuery.getStartTime(); + } + + /** + * Sets the start time of the graph. Converts the timestamp to milliseconds if necessary. + * + * @param timestamp The start time, all the data points returned will have a + * timestamp greater than or equal to this one. + * @throws IllegalArgumentException if timestamp is less than or equal to 0, + * or if it can't fit on 32 bits. + * @throws IllegalArgumentException if + * {@code timestamp >= }{@link #getEndTime getEndTime}. + */ + @Override + public void setStartTime(long timestamp) { + if ((timestamp & Const.SECOND_MASK) == 0) { timestamp *= 1000L; } + + if (rollupQuery == null) { + rawQuery.setStartTime(timestamp); + return; + } + + if (rollupQuery.getEndTime() <= timestamp) { + rollupQuery = null; + rawQuery.setStartTime(timestamp); + return; + } + + rollupQuery.setStartTime(timestamp); + } + + /** + * Returns the end time of the graph. + *

+ * If {@link #setEndTime} was never called before, this method will + * automatically execute + * {@link #setEndTime setEndTime}{@code (System.currentTimeMillis() / 1000)} + * to set the end time. + * + * @return A strictly positive integer. + */ + @Override + public long getEndTime() { + return rawQuery != null ? rawQuery.getEndTime() : rollupQuery.getEndTime(); + } + + /** + * Sets the end time of the graph. Converts the timestamp to milliseconds if necessary. + * + * @param timestamp The end time, all the data points returned will have a + * timestamp less than or equal to this one. + * @throws IllegalArgumentException if timestamp is less than or equal to 0, + * or if it can't fit on 32 bits. + * @throws IllegalArgumentException if + * {@code timestamp <= }{@link #getStartTime getStartTime}. + */ + @Override + public void setEndTime(long timestamp) { + if ((timestamp & Const.SECOND_MASK) == 0) { timestamp *= 1000L; } + + rawQuery.setEndTime(timestamp); + } + + /** + * Returns whether or not the data queried will be deleted. + * + * @return A boolean + * @since 2.4 + */ + @Override + public boolean getDelete() { + return rawQuery.getDelete(); + } + + /** + * Sets whether or not the data queried will be deleted. + * + * @param delete True if data should be deleted, false otherwise. + * @since 2.4 + */ + @Override + public void setDelete(boolean delete) { + if (rollupQuery != null) { + rollupQuery.setDelete(delete); + } + rawQuery.setDelete(delete); + } + + /** + * Sets the time series to the query. + * + * @param metric The metric to retrieve from the TSDB. + * @param tags The set of tags of interest. + * @param function The aggregation function to use. + * @param rate If true, the rate of the series will be used instead of the + * actual values. + * @param rate_options If included specifies additional options that are used + * when calculating and graph rate values + * @throws NoSuchUniqueName if the name of a metric, or a tag name/value + * does not exist. + * @since 2.4 + */ + @Override + public void setTimeSeries(String metric, + Map tags, + Aggregator function, + boolean rate, + RateOptions rate_options) throws NoSuchUniqueName { + if (rollupQuery != null) { + rollupQuery.setTimeSeries(metric, tags, function, rate, rate_options); + } + rawQuery.setTimeSeries(metric, tags, function, rate, rate_options); + } + + /** + * Sets the time series to the query. + * + * @param metric The metric to retrieve from the TSDB. + * @param tags The set of tags of interest. + * @param function The aggregation function to use. + * @param rate If true, the rate of the series will be used instead of the + * actual values. + * @throws NoSuchUniqueName if the name of a metric, or a tag name/value + * does not exist. + */ + @Override + public void setTimeSeries(String metric, Map tags, Aggregator function, boolean rate) throws NoSuchUniqueName { + if (rollupQuery != null) { + rollupQuery.setTimeSeries(metric, tags, function, rate); + } + rawQuery.setTimeSeries(metric, tags, function, rate); + } + + /** + * Sets up a query for the given timeseries UIDs. For now, all TSUIDs in the + * group must share a common metric. This is to avoid issues where the scanner + * may have to traverse the entire data table if one TSUID has a metric of + * 000001 and another has a metric of FFFFFF. After modifying the query code + * to run asynchronously and use different scanners, we can allow different + * TSUIDs. + * Note: This method will not check to determine if the TSUIDs are + * valid, since that wastes time and we *assume* that the user provides TSUIDs + * that are up to date. + * + * @param tsuids A list of one or more TSUIDs to scan for + * @param function The aggregation function to use on results + * @param rate Whether or not the results should be converted to a rate + * @throws IllegalArgumentException if the tsuid list is null, empty or the + * TSUIDs do not share a common metric + * @since 2.4 + */ + @Override + public void setTimeSeries(List tsuids, Aggregator function, boolean rate) { + if (rollupQuery != null) { + rollupQuery.setTimeSeries(tsuids, function, rate); + } + rawQuery.setTimeSeries(tsuids, function, rate); + } + + /** + * Sets up a query for the given timeseries UIDs. For now, all TSUIDs in the + * group must share a common metric. This is to avoid issues where the scanner + * may have to traverse the entire data table if one TSUID has a metric of + * 000001 and another has a metric of FFFFFF. After modifying the query code + * to run asynchronously and use different scanners, we can allow different + * TSUIDs. + * Note: This method will not check to determine if the TSUIDs are + * valid, since that wastes time and we *assume* that the user provides TSUIDs + * that are up to date. + * + * @param tsuids A list of one or more TSUIDs to scan for + * @param function The aggregation function to use on results + * @param rate Whether or not the results should be converted to a rate + * @param rate_options If included specifies additional options that are used + * when calculating and graph rate values + * @throws IllegalArgumentException if the tsuid list is null, empty or the + * TSUIDs do not share a common metric + * @since 2.4 + */ + @Override + public void setTimeSeries(List tsuids, Aggregator function, boolean rate, RateOptions rate_options) { + if (rollupQuery != null) { + rollupQuery.setTimeSeries(tsuids, function, rate, rate_options); + } + rawQuery.setTimeSeries(tsuids, function, rate, rate_options); + } + + /** + * Prepares a query against HBase by setting up group bys and resolving + * strings to UIDs asynchronously. This replaces calls to all of the setters + * like the {@link setTimeSeries}, {@link setStartTime}, etc. + * Make sure to wait on the deferred return before calling {@link runAsync}. + * + * @param query The main query to fetch the start and end time from + * @param index The index of which sub query we're executing + * @return A deferred to wait on for UID resolution. The result doesn't have + * any meaning and can be discarded. + * @throws IllegalArgumentException if the query was missing sub queries or + * the index was out of bounds. + * @throws NoSuchUniqueName if the name of a metric, or a tag name/value + * does not exist. (Bubbles up through the deferred) + * @since 2.4 + */ + @Override + public Deferred configureFromQuery(TSQuery query, int index) { + return configureFromQuery(query, index, false); + } + + @Override + public Deferred configureFromQuery(TSQuery query, int index, boolean force_raw) { + if (force_raw) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + if (!rollupQuery.needsSplitting()) { + return rollupResolution; + } + + rawQuery = new TsdbQuery(tsdb); + rawResolution = rollupQuery.split(query, index, rawQuery); + + if (rollupQuery.getRollupQuery().getLastRollupTimestampSeconds() * 1000L < rollupQuery.getStartTime()) { + // We're looking at a query that would normally hit a rollup table, but the table doesn't + // have data guaranteed to be available for the requested time period or any part of it + // (i.e. the last guaranteed rollup point is before the query actually starts) + // So we won't bother running it. + rollupQuery = null; + } + + return Deferred.group(rollupResolution, rawResolution).addCallback(new GroupCallback()); + } + + /** + * Downsamples the results by specifying a fixed interval between points. + *

+ * Technically, downsampling means reducing the sampling interval. Here + * the idea is similar. Instead of returning every single data point that + * matched the query, we want one data point per fixed time interval. The + * way we get this one data point is by aggregating all the data points of + * that interval together using an {@link Aggregator}. This enables you + * to compute things like the 5-minute average or 10 minute 99th percentile. + * + * @param interval Number of seconds wanted between each data point. + * @param downsampler Aggregation function to use to group data points + */ + @Override + public void downsample(long interval, Aggregator downsampler) { + if (rollupQuery != null) { + rollupQuery.downsample(interval, downsampler); + } + rawQuery.downsample(interval, downsampler); + } + + /** + * Sets an optional downsampling function on this query + * + * @param interval The interval, in milliseconds to rollup data points + * @param downsampler An aggregation function to use when rolling up data points + * @param fill_policy Policy specifying whether to interpolate or to fill + * missing intervals with special values. + * @throws NullPointerException if the aggregation function is null + * @throws IllegalArgumentException if the interval is not greater than 0 + * @since 2.4 + */ + @Override + public void downsample(long interval, Aggregator downsampler, FillPolicy fill_policy) { + if (rollupQuery != null) { + rollupQuery.downsample(interval, downsampler, fill_policy); + } + rawQuery.downsample(interval, downsampler, fill_policy); + } + + /** + * Executes the query asynchronously + * + * @return The data points matched by this query. + *

+ * Each element in the non-{@code null} but possibly empty array returned + * corresponds to one time series for which some data points have been + * matched by the query. + * @throws HBaseException if there was a problem communicating with HBase to + * perform the search. + * @since 1.2 + */ + @Override + public Deferred runAsync() throws HBaseException { + Deferred rollupResults = Deferred.fromResult(new DataPoints[0]); + if (rollupQuery != null) { + rollupResults = rollupQuery.runAsync(); + } + Deferred rawResults = rawQuery.runAsync(); + + return Deferred.groupInOrder(Arrays.asList(rollupResults, rawResults)).addCallback(new RunCB()); + } + + /** + * Runs this query asynchronously. + * + * @return The data points matched by this query and applied with percentile calculation + *

+ * Each element in the non-{@code null} but possibly empty array returned + * corresponds to one time series for which some data points have been + * matched by the query. + * @throws HBaseException if there was a problem communicating with HBase to + * perform the search. + * @throws IllegalStateException if the query is not a histogram query + */ + @Override + public Deferred runHistogramAsync() throws HBaseException { + Deferred rollupResults = Deferred.fromResult(new DataPoints[0]); + if (rollupQuery != null) { + rollupResults = rollupQuery.runHistogramAsync(); + } Deferred rawResults = rawQuery.runHistogramAsync(); + + return Deferred.groupInOrder(Arrays.asList(rollupResults, rawResults)).addCallback(new RunCB()); + } + + /** + * Returns an index for this sub-query in the original set of queries. + * + * @return A zero based index. + * @since 2.4 + */ + @Override + public int getQueryIdx() { + return rawQuery.getQueryIdx(); + } + + /** + * Check this is a histogram query or not + * + * @return + */ + @Override + public boolean isHistogramQuery() { + return rawQuery.isHistogramQuery(); + } + + /** + * Check this is a rollup query or not + * + * @return Whether or not this is a rollup query + * @since 2.4 + */ + @Override + public boolean isRollupQuery() { + return rollupQuery != null && RollupQuery.isValidQuery(rollupQuery.getRollupQuery()); + } + + /** + * @since 2.4 + */ + @Override + public boolean needsSplitting() { + // No further splitting supported + return false; + } + + /** + * Set the percentile calculation parameters for this query if this is + * a histogram query + * + * @param percentiles + */ + @Override + public void setPercentiles(List percentiles) { + if (rollupQuery != null) { + rollupQuery.setPercentiles(percentiles); + } + rawQuery.setPercentiles(percentiles); + } + + private class RunCB implements Callback> { + + private ByteMap makeSpanGroupMap(DataPoints[] dataPointsArray) { + ByteMap map = new ByteMap<>(); + + for (DataPoints points : dataPointsArray) { + if (!(points instanceof SpanGroup)) { + throw new IllegalArgumentException("Only SpanGroups implemented"); + } + SpanGroup spanGroup = (SpanGroup) points; + map.put(spanGroup.group(), spanGroup); + } + + return map; + } + + private DataPoints[] merge(DataPoints[] rollup, DataPoints[] raw) { + ByteMap rollupResults = makeSpanGroupMap(rollup); + ByteMap rawResults = makeSpanGroupMap(raw); + + TreeSet allGroups = new TreeSet<>(Bytes.MEMCMP); + allGroups.addAll(rollupResults.keySet()); + allGroups.addAll(rawResults.keySet()); + + List results = new ArrayList<>(allGroups.size()); + + for (byte[] group : allGroups) { + SpanGroup rawGroup = rawResults.get(group); + SpanGroup rollupGroup = rollupResults.get(group); + results.add(new SplitRollupSpanGroup(rollupGroup, rawGroup)); + } + + return results.toArray(new DataPoints[0]); + } + + /** + * After both queries have run, merge their results + * + * @param dataPointArrays The results from both queries + * @return The merged data points + */ + @Override + public DataPoints[] call(ArrayList dataPointArrays) { + DataPoints[] rollupResults = dataPointArrays.get(0); + DataPoints[] rawResults = dataPointArrays.get(1); + + return merge(rollupResults, rawResults); + } + } +} diff --git a/src/core/SplitRollupSpanGroup.java b/src/core/SplitRollupSpanGroup.java new file mode 100644 index 0000000000..dd6ab87830 --- /dev/null +++ b/src/core/SplitRollupSpanGroup.java @@ -0,0 +1,417 @@ +package net.opentsdb.core; + +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; +import net.opentsdb.meta.Annotation; +import org.hbase.async.Bytes; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SplitRollupSpanGroup extends AbstractSpanGroup { + private final List spanGroups = new ArrayList<>(); + + public SplitRollupSpanGroup(SpanGroup... groups) { + for (SpanGroup group : groups) { + if (group != null) { + spanGroups.add(group); + } + } + + if (spanGroups.isEmpty()) { + throw new IllegalArgumentException("At least one SpanGroup must be non-null"); + } + } + + /** + * Returns the name of the series. + */ + @Override + public String metricName() { + return spanGroups.get(0).metricName(); + } + + /** + * Returns the name of the series. + * + * @since 1.2 + */ + @Override + public Deferred metricNameAsync() { + return spanGroups.get(0).metricNameAsync(); + } + + /** + * @return the metric UID + * @since 2.3 + */ + @Override + public byte[] metricUID() { + return spanGroups.get(0).metricUID(); + } + + /** + * Returns the tags associated with these data points. + * + * @return A non-{@code null} map of tag names (keys), tag values (values). + */ + @Override + public Map getTags() { + Map tags = new HashMap<>(); + + for (SpanGroup group : spanGroups) { + tags.putAll(group.getTags()); + } + + return tags; + } + + /** + * Returns the tags associated with these data points. + * + * @return A non-{@code null} map of tag names (keys), tag values (values). + * @since 1.2 + */ + @Override + public Deferred> getTagsAsync() { + class GetTagsCB implements Callback, ArrayList>> { + @Override + public Map call(ArrayList> resolvedTags) throws Exception { + Map tags = new HashMap<>(); + for (Map groupTags : resolvedTags) { + tags.putAll(groupTags); + } + return tags; + } + } + + List>> deferreds = new ArrayList<>(spanGroups.size()); + + for (SpanGroup group : spanGroups) { + deferreds.add(group.getTagsAsync()); + } + + return Deferred.groupInOrder(deferreds).addCallback(new GetTagsCB()); + } + + /** + * Returns a map of tag pairs as UIDs. + * When used on a span or row, it returns the tag set. When used on a span + * group it will return only the tag pairs that are common across all + * time series in the group. + * + * @return A potentially empty map of tagk to tagv pairs as UIDs + * @since 2.2 + */ + @Override + public Bytes.ByteMap getTagUids() { + Bytes.ByteMap tagUids = new Bytes.ByteMap<>(); + + for (SpanGroup group : spanGroups) { + tagUids.putAll(group.getTagUids()); + } + + return tagUids; + } + + /** + * Returns the tags associated with some but not all of the data points. + *

+ * When this instance represents the aggregation of multiple time series + * (same metric but different tags), {@link #getTags} returns the tags that + * are common to all data points (intersection set) whereas this method + * returns all the tags names that are not common to all data points (union + * set minus the intersection set, also called the symmetric difference). + *

+ * If this instance does not represent an aggregation of multiple time + * series, the list returned is empty. + * + * @return A non-{@code null} list of tag names. + */ + @Override + public List getAggregatedTags() { + List aggregatedTags = new ArrayList<>(); + + for (SpanGroup group : spanGroups) { + aggregatedTags.addAll(group.getAggregatedTags()); + } + + return aggregatedTags; + } + + /** + * Returns the tags associated with some but not all of the data points. + *

+ * When this instance represents the aggregation of multiple time series + * (same metric but different tags), {@link #getTags} returns the tags that + * are common to all data points (intersection set) whereas this method + * returns all the tags names that are not common to all data points (union + * set minus the intersection set, also called the symmetric difference). + *

+ * If this instance does not represent an aggregation of multiple time + * series, the list returned is empty. + * + * @return A non-{@code null} list of tag names. + * @since 1.2 + */ + @Override + public Deferred> getAggregatedTagsAsync() { + class GetAggregatedTagsCB implements Callback, ArrayList>> { + @Override + public List call(ArrayList> resolvedTags) throws Exception { + List aggregatedTags = new ArrayList<>(); + for (List groupTags : resolvedTags) { + aggregatedTags.addAll(groupTags); + } + return aggregatedTags; + } + } + + List>> deferreds = new ArrayList<>(spanGroups.size()); + for (SpanGroup group : spanGroups) { + deferreds.add(group.getAggregatedTagsAsync()); + } + + return Deferred.groupInOrder(deferreds).addCallback(new GetAggregatedTagsCB()); + } + + /** + * Returns the tagk UIDs associated with some but not all of the data points. + * + * @return a non-{@code null} list of tagk UIDs. + * @since 2.3 + */ + @Override + public List getAggregatedTagUids() { + List aggTagUids = new ArrayList<>(); + + for (SpanGroup group : spanGroups) { + aggTagUids.addAll(group.getAggregatedTagUids()); + } + + return aggTagUids; + } + + /** + * Returns a list of unique TSUIDs contained in the results + * + * @return an empty list if there were no results, otherwise a list of TSUIDs + */ + @Override + public List getTSUIDs() { + List tsuids = new ArrayList<>(); + + for (SpanGroup group : spanGroups) { + tsuids.addAll(group.getTSUIDs()); + } + + return tsuids; + } + + /** + * Compiles the annotations for each span into a new array list + * + * @return Null if none of the spans had any annotations, a list if one or + * more were found + */ + @Override + public List getAnnotations() { + List annotations = new ArrayList<>(); + + for (SpanGroup group : spanGroups) { + List groupAnnotations = group.getAnnotations(); + if (groupAnnotations != null) { + annotations.addAll(group.getAnnotations()); + } + } + + return annotations; + } + + /** + * Returns the number of data points. + *

+ * This method must be implemented in {@code O(1)} or {@code O(n)} + * where n = {@link #aggregatedSize} > 0. + * + * @return A positive integer. + */ + @Override + public int size() { + int size = 0; + for (SpanGroup group : spanGroups) { + size += group.size(); + } + return size; + } + + /** + * Returns the number of data points aggregated in this instance. + *

+ * When this instance represents the aggregation of multiple time series + * (same metric but different tags), {@link #size} returns the number of data + * points after aggregation, whereas this method returns the number of data + * points before aggregation. + *

+ * If this instance does not represent an aggregation of multiple time + * series, then 0 is returned. + * + * @return A positive integer. + */ + @Override + public int aggregatedSize() { + int aggregatedSize = 0; + for (SpanGroup group : spanGroups) { + aggregatedSize += group.aggregatedSize(); + } + return aggregatedSize; + } + + /** + * Returns a zero-copy view to go through {@code size()} data points. + *

+ * The iterator returned must return each {@link DataPoint} in {@code O(1)}. + * The {@link DataPoint} returned must not be stored and gets + * invalidated as soon as {@code next} is called on the iterator. If you + * want to store individual data points, you need to copy the timestamp + * and value out of each {@link DataPoint} into your own data structures. + */ + @Override + public SeekableView iterator() { + List iterators = new ArrayList<>(); + for (SpanGroup group : spanGroups) { + iterators.add(group.iterator()); + } + return new SeekableViewChain(iterators); + } + + /** + * Returns the timestamp associated with the {@code i}th data point. + * The first data point has index 0. + *

+ * This method must be implemented in + * O({@link #aggregatedSize}) or better. + *

+ * It is guaranteed that

timestamp(i) < timestamp(i+1)
+ * + * @param i + * @return A strictly positive integer. + * @throws IndexOutOfBoundsException if {@code i} is not in the range + * [0, {@link #size} - 1] + */ + @Override + public long timestamp(int i) { + return getDataPoint(i).timestamp(); + } + + /** + * Tells whether or not the {@code i}th value is of integer type. + * The first data point has index 0. + *

+ * This method must be implemented in + * O({@link #aggregatedSize}) or better. + * + * @param i + * @return {@code true} if the {@code i}th value is of integer type, + * {@code false} if it's of floating point type. + * @throws IndexOutOfBoundsException if {@code i} is not in the range + * [0, {@link #size} - 1] + */ + @Override + public boolean isInteger(int i) { + return getDataPoint(i).isInteger(); + } + + /** + * Returns the value of the {@code i}th data point as a long. + * The first data point has index 0. + *

+ * This method must be implemented in + * O({@link #aggregatedSize}) or better. + * Use {@link #iterator} to get successive {@code O(1)} accesses. + * + * @param i + * @throws IndexOutOfBoundsException if {@code i} is not in the range + * [0, {@link #size} - 1] + * @throws ClassCastException if the + * {@link #isInteger isInteger(i)} == false. + * @see #iterator + */ + @Override + public long longValue(int i) { + return getDataPoint(i).longValue(); + } + + /** + * Returns the value of the {@code i}th data point as a float. + * The first data point has index 0. + *

+ * This method must be implemented in + * O({@link #aggregatedSize}) or better. + * Use {@link #iterator} to get successive {@code O(1)} accesses. + * + * @param i + * @throws IndexOutOfBoundsException if {@code i} is not in the range + * [0, {@link #size} - 1] + * @throws ClassCastException if the + * {@link #isInteger isInteger(i)} == true. + * @see #iterator + */ + @Override + public double doubleValue(int i) { + return getDataPoint(i).doubleValue(); + } + + /** + * Return the query index that maps this datapoints to the original TSSubQuery. + * + * @return index of the query in the TSQuery class + * @throws UnsupportedOperationException if the implementing class can't map + * to a sub query. + * @since 2.2 + */ + @Override + public int getQueryIndex() { + return spanGroups.get(0).getQueryIndex(); + } + + /** + * Return whether these data points are the result of the percentile calculation + * on the histogram data points. The client can call {@code getPercentile} to get + * the percentile calculation parameter. + * + * @return true or false + * @since 2.4 + */ + @Override + public boolean isPercentile() { + return spanGroups.get(0).isPercentile(); + } + + /** + * Return the percentile calculation parameter. This interface and {@code isPercentile} are used + * to convert {@code HistogramDataPoints} to {@code DataPoints} + * + * @return the percentile parameter + * @since 2.4 + */ + @Override + public float getPercentile() { + return spanGroups.get(0).getPercentile(); + } + + /** + * Returns the group the spans in here belong to. + *

+ * Returns null if the NONE aggregator was requested in the query + * Returns an empty array if there were no group bys and they're all in the same group + * Returns the group otherwise + * + * @return The group + */ + public byte[] group() { + return spanGroups.get(0).group(); + } +} diff --git a/src/core/TSDB.java b/src/core/TSDB.java index ab6b95d68b..b3e5633cad 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -183,7 +183,13 @@ public enum OperationMode { /** Whether or not to block writing of derived rollups/pre-ags */ private final boolean rollups_block_derived; - + + /** + * Whether or not to enable splitting rollup queries if the rollup table is lagging + * Global config setting: tsd.rollups.split_query.enable = true + */ + private final boolean rollups_split_queries; + /** An optional histogram manger used when the TSD will be dealing with * histograms and sketches. Instantiated ONLY if * {@link #initializePlugins(boolean)} was called.*/ @@ -312,6 +318,7 @@ public TSDB(final HBaseClient client, final Config config) { agg_tag_key = config.getString("tsd.rollups.agg_tag_key"); raw_agg_tag_value = config.getString("tsd.rollups.raw_agg_tag_value"); rollups_block_derived = config.getBoolean("tsd.rollups.block_derived"); + rollups_split_queries = config.getBoolean("tsd.rollups.split_query.enable"); } else { rollup_config = null; default_interval = null; @@ -319,6 +326,7 @@ public TSDB(final HBaseClient client, final Config config) { agg_tag_key = null; raw_agg_tag_value = null; rollups_block_derived = false; + rollups_split_queries = false; } QueryStats.setEnableDuplicates( @@ -549,7 +557,7 @@ public void initializePlugins(final boolean init_rpcs) { uid_filter.getClass().getCanonicalName() + "] version: " + uid_filter.version()); } - + // finally load the histo manager after plugins have been loaded. if (config.hasProperty("tsd.core.histograms.config")) { histogram_manager = new HistogramCodecManager(this); @@ -566,7 +574,7 @@ public void initializePlugins(final boolean init_rpcs) { public final Authentication getAuth() { return this.authentication; } - + /** * Returns the configured HBase client * @return The HBase client @@ -2124,7 +2132,18 @@ public String getRawTagValue() { return raw_agg_tag_value; } - /** @return The optional histogram manager registered to this TSD. + /** + * Returns whether the global config setting allows splitting rollups queries + * if the rollups table to be hit is lagging. + * + * @return Whether or not splitting rollup queries is enabled + * @since 2.4 + */ + public boolean isRollupsSplittingEnabled() { + return rollups_split_queries; + } + + /** @return The optional histogram manager registered to this TSD. * @since 2.4 */ public HistogramCodecManager histogramManager() { return histogram_manager; diff --git a/src/core/TSQuery.java b/src/core/TSQuery.java index e571004d66..1d525b6c9f 100644 --- a/src/core/TSQuery.java +++ b/src/core/TSQuery.java @@ -240,8 +240,15 @@ public Deferred buildQueriesAsync(final TSDB tsdb) { final List> deferreds = new ArrayList>(queries.size()); for (int i = 0; i < queries.size(); i++) { - final Query query = tsdb.newQuery(); - deferreds.add(query.configureFromQuery(this, i)); + Query query = tsdb.newQuery(); + Deferred resolution = query.configureFromQuery(this, i); + + if (query.needsSplitting() && (query instanceof TsdbQuery)) { + query = new SplitRollupQuery(tsdb, (TsdbQuery) query, resolution); + resolution = query.configureFromQuery(this, i); + } + deferreds.add(resolution); + tsdb_queries[i] = query; } diff --git a/src/core/TsdbQuery.java b/src/core/TsdbQuery.java index 762e0cd272..de2a3779a2 100644 --- a/src/core/TsdbQuery.java +++ b/src/core/TsdbQuery.java @@ -61,7 +61,7 @@ /** * Non-synchronized implementation of {@link Query}. */ -final class TsdbQuery implements Query { +final class TsdbQuery extends AbstractQuery { private static final Logger LOG = LoggerFactory.getLogger(TsdbQuery.class); @@ -229,7 +229,7 @@ public boolean fallback() { return this == ROLLUP_FALLBACK || this == ROLLUP_FALLBACK_RAW; } } - + /** Constructor. */ public TsdbQuery(final TSDB tsdb) { this.tsdb = tsdb; @@ -429,10 +429,72 @@ public void setTimeSeries(final List tsuids, public void setExplicitTags(final boolean explicit_tags) { this.explicit_tags = explicit_tags; } - + + /** + * Splits this query into one query for the part that is covered by the rollup + * table (as defined in its SLA) and one to get the data for the remaining time + * range from the raw table. + * @param query The original TSQuery as parsed + * @param index The index of the TSQuery + * @param rawQuery A new TsdbQuery instance that will be configured to hit the raw table + * for the correct time range + * @return the deferred analogous to {@link TsdbQuery#configureFromQuery(TSQuery, int)} + * @throws IllegalStateException if the query is not eligible or splitting is disabled + */ + public Deferred split(final TSQuery query, final int index, final TsdbQuery rawQuery) { + if (!needsSplitting()) { + throw new IllegalStateException("Query is not eligible for splitting" + this.toString()); + } + + Deferred rawResolutionDeferred = rawQuery.configureFromQuery(query, index, true); + + long lastRollupTimestampMillis = rollup_query.getLastRollupTimestampSeconds() * 1000L; + + boolean needsRawAndRollupData = QueryUtil.isTimestampAfter(lastRollupTimestampMillis, getStartTime()); + if (needsRawAndRollupData) { + updateRollupSplitTimes(rawQuery, lastRollupTimestampMillis); + } + + return rawResolutionDeferred; + } + + /** + * Updates the timestamp of this query and the corresponding raw part in the case of a split. + * + * Sets the start and end times for this query so that it hits the rollup table until the given timestamp. + * Also updates the passed {@param rawQuery} with the new start time so that it hits the raw table for points from the + * given timestamp onwards. + * + * Makes sure that all timestamps are in milliseconds. + * + * @param rawQuery The raw query part + * @param splitTimestamp The timestamp until when rollup data is guaranteed to be available + */ + private void updateRollupSplitTimes(final TsdbQuery rawQuery, long splitTimestamp) { + setEndTime(splitTimestamp); + + boolean isStartTimeInSeconds = (getStartTime() & Const.SECOND_MASK) == 0; + if (isStartTimeInSeconds) { + setStartTime(getStartTime() * 1000L); + } + + boolean isRawEndTimeInSeconds = (rawQuery.getEndTime() & Const.SECOND_MASK) == 0; + if (isRawEndTimeInSeconds) { + rawQuery.setEndTime(rawQuery.getEndTime() * 1000L); + } + + rawQuery.setStartTime(splitTimestamp); + } + @Override - public Deferred configureFromQuery(final TSQuery query, - final int index) { + public Deferred configureFromQuery(final TSQuery query, + final int index) { + return configureFromQuery(query, index, false); + } + + + public Deferred configureFromQuery(final TSQuery query, + final int index, boolean force_raw) { if (query.getQueries() == null || query.getQueries().isEmpty()) { throw new IllegalArgumentException("Missing sub queries"); } @@ -477,7 +539,7 @@ public Deferred configureFromQuery(final TSQuery query, percentiles = sub_query.getPercentiles(); show_histogram_buckets = sub_query.getShowHistogramBuckets(); - if (rollup_usage != ROLLUP_USAGE.ROLLUP_RAW) { + if (!force_raw && rollup_usage != ROLLUP_USAGE.ROLLUP_RAW) { //Check whether the down sampler is set and rollup is enabled transformDownSamplerToRollupQuery(aggregator, sub_query.getDownsample()); } @@ -576,7 +638,7 @@ private List> resolveTagFilters() { return deferreds; } } - + // fire off the callback chain by resolving the metric first return tsdb.metrics.getIdAsync(sub_query.getMetric()) .addCallbackDeferring(new MetricCB()); @@ -587,7 +649,7 @@ private List> resolveTagFilters() { public void downsample(final long interval, final Aggregator downsampler, final FillPolicy fill_policy) { this.downsampler = new DownsamplingSpecification( - interval, downsampler,fill_policy); + interval, downsampler, fill_policy); } /** @@ -704,43 +766,11 @@ private void findGroupBys() { } } } - /** - * Executes the query. - * NOTE: Do not run the same query multiple times. Construct a new query with - * the same parameters again if needed - * TODO(cl) There are some strange occurrences when unit testing where the end - * time, if not set, can change between calls to run() - * @return An array of data points with one time series per array value - */ - @Override - public DataPoints[] run() throws HBaseException { - try { - return runAsync().joinUninterruptibly(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException("Should never be here", e); - } - } - - @Override - public DataPoints[] runHistogram() throws HBaseException { - if (!isHistogramQuery()) { - throw new RuntimeException("Should never be here"); - } - - try { - return runHistogramAsync().joinUninterruptibly(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException("Should never be here", e); - } - } - + @Override public Deferred runAsync() throws HBaseException { Deferred result = null; + if (use_multi_gets && override_multi_get) { result = this.findSpansWithMultiGetter().addCallback(new GroupByAndAggregateCB()); } else { @@ -777,10 +807,45 @@ public boolean isHistogramQuery() { if ((this.percentiles != null && this.percentiles.size() > 0) || show_histogram_buckets) { return true; } - + return false; } - + + @Override + public boolean isRollupQuery() { + return RollupQuery.isValidQuery(rollup_query); + } + + /** + * Returns whether this query needs to be split. It does if + * - splitting of queries is enabled globally AND + * - it can be split (i.e. it's a valid rollups query) AND + * - the table it is hitting has an SLA configured that describes the blackout period AND + * - the query is actually looking at data from the time beyond the SLA + * + * @return whether this query needs to be split. + * @since 2.4 + */ + @Override + public boolean needsSplitting() { + if (!tsdb.isRollupsSplittingEnabled()) { + // Don't split if the global config doesn't allow it + return false; + } + + if (!isRollupQuery()) { + // Don't split if it's hitting the raw table anyway + return false; + } + + if (rollup_query.getRollupInterval().getMaximumLag() <= 0) { + // Don't split if the table doesn't have a maximum lag configured + return false; + } + + return rollup_query.isInBlackoutPeriod(getEndTime()); + } + /** * Finds all the {@link Span}s that match this query. * This is what actually scans the HBase table and loads the data into @@ -838,8 +903,8 @@ private Deferred> findSpansWithMultiGetter() throws HBas new TreeMap(new SpanCmp(metric_width)); scan_start_time = System.nanoTime(); - - return new MultiGetQuery(tsdb, this, metric, row_key_literals_list, + + return new MultiGetQuery(tsdb, this, metric, row_key_literals_list, getScanStartTimeSeconds(), getScanEndTimeSeconds(), tableToBeScanned(), spans, null, 0, rollup_query, query_stats, query_index, 0, false, search_query_failure).fetch(); @@ -954,7 +1019,8 @@ public DataPoints[] call(final SortedMap spans) throws Exception { getStartTime(), getEndTime(), query_index, - rollup_query); + rollup_query, + null); group.add(span); groups[i++] = group; } @@ -974,7 +1040,8 @@ public DataPoints[] call(final SortedMap spans) throws Exception { getStartTime(), getEndTime(), query_index, - rollup_query); + rollup_query, + new byte[0]); if (query_stats != null) { query_stats.addStat(query_index, QueryStat.GROUP_BY_TIME, 0); } @@ -1018,6 +1085,11 @@ public DataPoints[] call(final SortedMap spans) throws Exception { //LOG.info("Span belongs to group " + Arrays.toString(group) + ": " + Arrays.toString(row)); SpanGroup thegroup = groups.get(group); if (thegroup == null) { + // Copy the array because we're going to keep `group' and overwrite + // its contents. So we want the collection to have an immutable copy. + final byte[] group_copy = new byte[group.length]; + System.arraycopy(group, 0, group_copy, 0, group.length); + thegroup = new SpanGroup(tsdb, getScanStartTimeSeconds(), getScanEndTimeSeconds(), null, rate, rate_options, aggregator, @@ -1025,11 +1097,8 @@ public DataPoints[] call(final SortedMap spans) throws Exception { getStartTime(), getEndTime(), query_index, - rollup_query); - // Copy the array because we're going to keep `group' and overwrite - // its contents. So we want the collection to have an immutable copy. - final byte[] group_copy = new byte[group.length]; - System.arraycopy(group, 0, group_copy, 0, group.length); + rollup_query, + group_copy); groups.put(group_copy, thegroup); } thegroup.add(entry.getValue()); @@ -1503,7 +1572,7 @@ else if (pre_aggregate) { } /** Returns the UNIX timestamp from which we must start scanning. */ - private long getScanStartTimeSeconds() { + long getScanStartTimeSeconds() { // Begin with the raw query start time. long start = getStartTime(); @@ -1545,7 +1614,7 @@ private long getScanStartTimeSeconds() { } /** Returns the UNIX timestamp at which we must stop scanning. */ - private long getScanEndTimeSeconds() { + long getScanEndTimeSeconds() { // Begin with the raw query end time. long end = getEndTime(); @@ -1823,6 +1892,14 @@ public int compare(final byte[] a, final byte[] b) { } + RateOptions getRateOptions() { return rate_options; } + boolean isRate() { return rate; } + Aggregator getAggregator() {return aggregator; } + DownsamplingSpecification getDownsampler() { return downsampler; } + RollupQuery getRollupQuery() { return rollup_query; } + int getQueryIndex() { return query_index; } + + /** Helps unit tests inspect private methods. */ @VisibleForTesting static class ForTesting { diff --git a/src/query/QueryUtil.java b/src/query/QueryUtil.java index 5324e0e822..bf19f2e46b 100644 --- a/src/query/QueryUtil.java +++ b/src/query/QueryUtil.java @@ -639,4 +639,24 @@ public static String byteRegexToString(final String regexp) { } return buf.toString(); } + + /** + * Compares two timestamps where either can be in seconds or milliseconds. + * + * @param ts1 The first timestamp in either seconds or milliseconds. + * @param ts2 The second timestamp in either seconds or milliseconds. + * @return Whether the first timestamp is after the second + */ + public static boolean isTimestampAfter(long ts1, long ts2) { + boolean ts1InSeconds = (ts1 & Const.SECOND_MASK) == 0; + boolean ts2InSeconds = (ts2 & Const.SECOND_MASK) == 0; + + if (ts1InSeconds && !ts2InSeconds) { + ts1 *= 1000L; + } else if (!ts1InSeconds && ts2InSeconds) { + ts2 *= 1000L; + } + + return ts1 > ts2; + } } diff --git a/src/rollup/RollupInterval.java b/src/rollup/RollupInterval.java index 3dea39542a..0ccedce6be 100644 --- a/src/rollup/RollupInterval.java +++ b/src/rollup/RollupInterval.java @@ -82,6 +82,16 @@ public class RollupInterval { * also it might be compacted. */ private final boolean is_default_interval; + + /** + * The delay SLA for this rollup interval. If a query is asking for data from a + * recent enough time interval that might not be available (or partially unavailable) + * in the table, the data points will be read from the raw table. + */ + private final String delay_sla; + + /** The delay SLA in seconds */ + private int max_delay_seconds; /** * Protected ctor used by the builder. @@ -93,6 +103,7 @@ protected RollupInterval(final Builder builder) { string_interval = builder.interval; row_span = builder.rowSpan; is_default_interval = builder.defaultInterval; + delay_sla = builder.delaySla != null ? builder.delaySla : ""; final String parsed_units = DateTime.getDurationUnits(row_span); if (parsed_units.length() > 1) { @@ -116,7 +127,8 @@ public String toString() { .append(", unit_multipier=").append(unit_multiplier) .append(", intervals=").append(intervals) .append(", interval=").append(interval) - .append(", interval_units=").append(interval_units); + .append(", interval_units=").append(interval_units) + .append(", delay_sla=").append(delay_sla); return buf.toString(); } @@ -133,6 +145,7 @@ public HashCode buildHashCode() { .putString(string_interval, Const.UTF8_CHARSET) .putString(row_span, Const.UTF8_CHARSET) .putBoolean(is_default_interval) + .putString(delay_sla, Const.UTF8_CHARSET) .hash(); } @@ -152,7 +165,8 @@ public boolean equals(final Object obj) { && Objects.equal(groupby_table_name, interval.groupby_table_name) && Objects.equal(row_span, interval.row_span) && Objects.equal(string_interval, interval.string_interval) - && Objects.equal(is_default_interval, interval.is_default_interval); + && Objects.equal(is_default_interval, interval.is_default_interval) + && Objects.equal(delay_sla, interval.delay_sla); } /** @@ -185,15 +199,20 @@ void validateAndCompile() { } interval = (int) (DateTime.parseDuration(string_interval) / 1000); - if (interval < 1) { - throw new IllegalArgumentException("Millisecond intervals are not supported"); - } + if (interval >= Integer.MAX_VALUE) { throw new IllegalArgumentException("Interval is too big: " + interval); } // The line above will validate for us interval_units = string_interval.charAt(string_interval.length() - 1); + if (delay_sla != null && !delay_sla.isEmpty()) { + max_delay_seconds = (int) (DateTime.parseDuration(delay_sla) / 1000); + if (max_delay_seconds < 1) { + throw new IllegalArgumentException("Milliseconds are not supported as the maximum delay"); + } + } + int num_span = 0; switch (units) { case 'h': @@ -303,6 +322,15 @@ public boolean isDefaultInterval() { public String getRowSpan() { return row_span; } + + /** + * Rollup tables can have an SLA configured specifying by how much time the + * data in the table can be delayed. + * @return the maximum delay in seconds for a table as configured. + */ + public int getMaximumLag() { + return max_delay_seconds; + } public static Builder builder() { return new Builder(); @@ -321,6 +349,8 @@ public static class Builder { private String rowSpan; @JsonProperty private boolean defaultInterval; + @JsonProperty + private String delaySla; public Builder setTable(final String table) { this.table = table; @@ -346,6 +376,11 @@ public Builder setDefaultInterval(final boolean defaultInterval) { this.defaultInterval = defaultInterval; return this; } + + public Builder setDelaySla(final String delaySla) { + this.delaySla = delaySla; + return this; + } public RollupInterval build() { return new RollupInterval(this); diff --git a/src/rollup/RollupQuery.java b/src/rollup/RollupQuery.java index 648d5b33ff..46d37cdce2 100644 --- a/src/rollup/RollupQuery.java +++ b/src/rollup/RollupQuery.java @@ -17,6 +17,7 @@ import net.opentsdb.core.Aggregator; import net.opentsdb.core.Aggregators; +import net.opentsdb.utils.DateTime; /** * Holds information about a rollup interval and rollup aggregator. @@ -185,4 +186,26 @@ public long getSampleIntervalInMS() { public boolean isLowerSamplingRate() { return this.rollup_interval.getIntervalSeconds() * 1000 < sample_interval_ms; } + + /** + * Looks at the SLA configured for the table to be queried and determines the + * timestamp of the latest data point that is guaranteed to be covered by the + * table. + * @return last timestamp in seconds of the period guaranteed to be covered + */ + public int getLastRollupTimestampSeconds() { + return (int) (DateTime.currentTimeMillis()/1000 - getRollupInterval().getMaximumLag()); + } + + /** + * Checks whether the passed timestamp is in the blackout period (between + * the latest guaranteed timestamp and now) + * @param timestampMillis The timestamp to check in milliseconds + * @return whether the timestamp is in the blackout period + */ + public boolean isInBlackoutPeriod(long timestampMillis) { + long latestRollupPointTimestamp = DateTime.currentTimeMillis() - getRollupInterval().getMaximumLag()*1000L; + + return timestampMillis > latestRollupPointTimestamp; + } } diff --git a/src/utils/Config.java b/src/utils/Config.java index f92f2f5b56..723df36c7a 100644 --- a/src/utils/Config.java +++ b/src/utils/Config.java @@ -577,6 +577,7 @@ protected void setDefaults() { default_map.put("tsd.rollups.agg_tag_key", "_aggregate"); default_map.put("tsd.rollups.raw_agg_tag_value", "RAW"); default_map.put("tsd.rollups.block_derived", "true"); + default_map.put("tsd.rollups.split_query.enable", "false"); default_map.put("tsd.rtpublisher.enable", "false"); default_map.put("tsd.rtpublisher.plugin", ""); default_map.put("tsd.search.enable", "false"); diff --git a/src/utils/DateTime.java b/src/utils/DateTime.java index 4649fc3097..8989ae85f5 100644 --- a/src/utils/DateTime.java +++ b/src/utils/DateTime.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.TimeZone; +import com.google.common.base.Strings; import net.opentsdb.core.Tags; /** @@ -184,6 +185,10 @@ public static final long parseDateTimeString(final String datetime, * @throws IllegalArgumentException if the interval was malformed. */ public static final long parseDuration(final String duration) { + if (duration == null || duration.isEmpty()) { + throw new IllegalArgumentException("Cannot parse null or empty duration"); + } + long interval; long multiplier; double temp; @@ -614,7 +619,7 @@ public static Calendar previousInterval(final long ts, final int interval, * @since 2.3 */ public static int unitsToCalendarType(final String units) { - if (units == null || units.isEmpty()) { + if (Strings.isNullOrEmpty(units)) { throw new IllegalArgumentException("Units cannot be null or empty"); } diff --git a/test/core/BaseTsdbTest.java b/test/core/BaseTsdbTest.java index 9dbc6c31e3..bafe0eae95 100644 --- a/test/core/BaseTsdbTest.java +++ b/test/core/BaseTsdbTest.java @@ -31,6 +31,8 @@ import net.opentsdb.auth.Authentication; import net.opentsdb.auth.Authorization; import net.opentsdb.meta.Annotation; +import net.opentsdb.rollup.RollupInterval; +import net.opentsdb.rollup.RollupQuery; import net.opentsdb.storage.MockBase; import net.opentsdb.uid.NoSuchUniqueId; import net.opentsdb.uid.NoSuchUniqueName; @@ -914,6 +916,23 @@ protected void storeAnnotation(final long timestamp) throws Exception { note.syncToStorage(tsdb, false).joinUninterruptibly(); } + RollupQuery makeRollupQuery() { + Whitebox.setInternalState(tsdb, "rollups_split_queries", true); + final RollupInterval oneHourWithDelay = RollupInterval.builder() + .setTable("fake-rollup-table") + .setPreAggregationTable("fake-preagg-table") + .setInterval("1h") + .setRowSpan("1d") + .setDelaySla("2d") + .build(); + return new RollupQuery( + oneHourWithDelay, + Aggregators.SUM, + 3600000, + Aggregators.SUM + ); + } + /** * A fake {@link org.jboss.netty.util.Timer} implementation. * Instead of executing the task it will store that task in a internal state @@ -984,4 +1003,4 @@ public UnitTestException(final String msg) { } private static final long serialVersionUID = -4404095849459619922L; } -} \ No newline at end of file +} diff --git a/test/core/TestSeekableViewChain.java b/test/core/TestSeekableViewChain.java new file mode 100644 index 0000000000..5da9efbc62 --- /dev/null +++ b/test/core/TestSeekableViewChain.java @@ -0,0 +1,99 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2017 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +@RunWith(PowerMockRunner.class) +public class TestSeekableViewChain { + + private static final long BASE_TIME = 1356998400000L; + private static final DataPoint[] DATA_POINTS_1 = new DataPoint[]{ + MutableDataPoint.ofLongValue(BASE_TIME, 40), + MutableDataPoint.ofLongValue(BASE_TIME + 10000, 50), + MutableDataPoint.ofLongValue(BASE_TIME + 30000, 70) + }; + + @Before + public void before() throws Exception { + + } + + @Test + public void testIteratorChain() { + List iterators = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + iterators.add(SeekableViewsForTest.fromArray(DATA_POINTS_1)); + } + SeekableViewChain chain = new SeekableViewChain(iterators); + + int items = 0; + while (chain.hasNext()) { + chain.next(); + items += 1; + } + + assertEquals(9, items); + } + + @Test + public void testSeek() { + List iterators = new ArrayList<>(); + iterators.add(SeekableViewsForTest.generator( + BASE_TIME, 10000, 5, true + )); + iterators.add(SeekableViewsForTest.generator( + BASE_TIME + 50000, 10000, 5, true + )); + + SeekableViewChain chain = new SeekableViewChain(iterators); + + chain.seek(BASE_TIME + 75000); + + int items = 0; + while (chain.hasNext()) { + chain.next(); + items += 1; + } + + assertEquals(2, items); + } + + @Test + public void testEmptyChain() { + SeekableViewChain chain = new SeekableViewChain(new ArrayList<>()); + assertFalse(chain.hasNext()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemoveUnsupported() { + makeChain(1).remove(); + } + + private SeekableViewChain makeChain(int numIterators) { + List iterators = new ArrayList<>(); + for (int i = 0; i < numIterators; i++) { + iterators.add(SeekableViewsForTest.fromArray(DATA_POINTS_1)); + } + return new SeekableViewChain(iterators); + } +} diff --git a/test/core/TestSplitRollupQuery.java b/test/core/TestSplitRollupQuery.java new file mode 100644 index 0000000000..d2006ca9d2 --- /dev/null +++ b/test/core/TestSplitRollupQuery.java @@ -0,0 +1,329 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2015-2017 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import com.stumbleupon.async.Deferred; +import net.opentsdb.utils.DateTime; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({DateTime.class, TsdbQuery.class}) +public class TestSplitRollupQuery extends BaseTsdbTest { + private SplitRollupQuery queryUnderTest; + private TsdbQuery rollupQuery; + + @Before + public void beforeLocal() { + rollupQuery = spy(new TsdbQuery(tsdb)); + queryUnderTest = new SplitRollupQuery(tsdb, rollupQuery, Deferred.fromResult(null)); + } + + @Test + public void setStartTime() { + queryUnderTest.setStartTime(42L); + assertEquals(42000L, queryUnderTest.getStartTime()); + assertEquals(42000L, rollupQuery.getStartTime()); + } + + @Test + public void setStartTimeBeyondOriginalEnd() { + TsdbQuery rawQuery = new TsdbQuery(tsdb); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + rollupQuery.setEndTime(41L); + queryUnderTest.setStartTime(42L); + + assertEquals(42000L, queryUnderTest.getStartTime()); + } + + @Test + public void setStartTimeWithoutRollupQuery() { + TsdbQuery rawQuery = new TsdbQuery(tsdb); + Whitebox.setInternalState(queryUnderTest, "rollupQuery", (TsdbQuery)null); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + queryUnderTest.setStartTime(42L); + + assertEquals(42000L, queryUnderTest.getStartTime()); + } + + @Test + public void setEndTime() { + TsdbQuery rawQuery = new TsdbQuery(tsdb); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + rollupQuery.setStartTime(0); + rollupQuery.setEndTime(21000L); + rawQuery.setStartTime(21000L); + + queryUnderTest.setEndTime(42L); + + assertEquals(42000L, queryUnderTest.getEndTime()); + assertEquals(21000L, rollupQuery.getEndTime()); + assertEquals(42000L, rawQuery.getEndTime()); + } + + @Test(expected = IllegalArgumentException.class) + public void setEndTimeBeforeRawStartTime() { + TsdbQuery rawQuery = new TsdbQuery(tsdb); + rawQuery.setStartTime(DateTime.currentTimeMillis()); + + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + queryUnderTest.setEndTime(42L); + } + + @Test + public void setDeletePassesThroughToBoth() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + queryUnderTest.setDelete(true); + + verify(rollupQuery).setDelete(true); + verify(rawQuery).setDelete(true); + } + + @Test + public void setTimeSeriesPassesThroughToBoth() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + RateOptions options = new RateOptions(); + + queryUnderTest.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false, options); + + verify(rollupQuery).setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false, options); + verify(rawQuery).setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false, options); + } + + @Test + public void setTimeSeriesWithoutRateOptionsPassesThroughToBoth() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + queryUnderTest.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false); + + verify(rollupQuery).setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false); + verify(rawQuery).setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false); + } + + @Test + public void setTimeSeriesWithTSUIDsPassesThroughToBoth() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + List tsuids = Arrays.asList("000001000001000001", "000001000001000002"); + RateOptions options = new RateOptions(); + + queryUnderTest.setTimeSeries(tsuids, Aggregators.SUM, false, options); + + verify(rollupQuery).setTimeSeries(tsuids, Aggregators.SUM, false, options); + verify(rawQuery).setTimeSeries(tsuids, Aggregators.SUM, false, options); + } + + @Test + public void setTimeSeriesWithTSUIDsWithoutRateOptionsPassesThroughToBoth() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + List tsuids = Arrays.asList("000001000001000001", "000001000001000002"); + + queryUnderTest.setTimeSeries(tsuids, Aggregators.SUM, false); + + verify(rollupQuery).setTimeSeries(tsuids, Aggregators.SUM, false); + verify(rawQuery).setTimeSeries(tsuids, Aggregators.SUM, false); + } + + @Test + public void downsamplePassesThroughToBoth() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + queryUnderTest.downsample(42L, Aggregators.SUM, FillPolicy.ZERO); + + verify(rollupQuery).downsample(42L, Aggregators.SUM, FillPolicy.ZERO); + verify(rawQuery).downsample(42L, Aggregators.SUM, FillPolicy.ZERO); + } + + @Test + public void downsampleWithoutFillPolicyPassesThroughToBoth() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + queryUnderTest.downsample(42L, Aggregators.SUM); + + verify(rollupQuery).downsample(42L, Aggregators.SUM); + verify(rawQuery).downsample(42L, Aggregators.SUM); + } + + @Test(expected = UnsupportedOperationException.class) + public void configureFromQueryThrowsIfForcedRaw() { + queryUnderTest.configureFromQuery(null, 0, true); + } + + @Test + public void configureFromQuerySplitsRollupQuery() { + mockEnableRollupQuerySplitting(); + doReturn(Deferred.fromResult(null)).when(rollupQuery).split(any(), anyInt(), any()); + + assertNull(Whitebox.getInternalState(queryUnderTest, "rawQuery")); + + rollupQuery.setStartTime(0); + queryUnderTest.configureFromQuery(null, 0, false); + + verify(rollupQuery).split(eq(null), eq(0), anyObject()); + assertNotNull(Whitebox.getInternalState(queryUnderTest, "rawQuery")); + } + + @Test + public void configureFromQuerySplitsRollupQueryWithRawOnlyQuery() { + mockEnableRollupQuerySplitting(); + doReturn(true).when(rollupQuery).needsSplitting(); + doReturn(Deferred.fromResult(null)).when(rollupQuery).split(any(), anyInt(), any()); + + rollupQuery.setStartTime(DateTime.currentTimeMillis()); + + assertNull(Whitebox.getInternalState(queryUnderTest, "rawQuery")); + + queryUnderTest.configureFromQuery(null, 0, false); + + verify(rollupQuery).split(eq(null), eq(0), anyObject()); + assertNotNull(Whitebox.getInternalState(queryUnderTest, "rawQuery")); + assertNull(Whitebox.getInternalState(queryUnderTest, "rollupQuery")); + } + + @Test + public void setPercentiles() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + List percentiles = Arrays.asList(50f, 75f, 99f, 99.9f); + + queryUnderTest.setPercentiles(percentiles); + + verify(rollupQuery).setPercentiles(percentiles); + verify(rawQuery).setPercentiles(percentiles); + } + + @Test + public void run() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + doReturn(Deferred.fromResult(new DataPoints[0])).when(rollupQuery).runAsync(); + doReturn(Deferred.fromResult(new DataPoints[0])).when(rawQuery).runAsync(); + + DataPoints[] actualPoints = queryUnderTest.run(); + + verify(rollupQuery).runAsync(); + verify(rawQuery).runAsync(); + + assertEquals(0, actualPoints.length); + } + + @Test + public void runHistogram() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + doReturn(true).when(rollupQuery).isHistogramQuery(); + doReturn(Deferred.fromResult(new DataPoints[0])).when(rollupQuery).runHistogramAsync(); + doReturn(true).when(rawQuery).isHistogramQuery(); + doReturn(Deferred.fromResult(new DataPoints[0])).when(rawQuery).runHistogramAsync(); + + DataPoints[] actualPoints = queryUnderTest.runHistogram(); + + verify(rollupQuery).runHistogramAsync(); + verify(rawQuery).runHistogramAsync(); + + assertEquals(0, actualPoints.length); + } + + @Test + public void runAsyncMergesResults() { + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + Whitebox.setInternalState(queryUnderTest, "rawQuery", rawQuery); + + rollupQuery.setStartTime(0); + rollupQuery.setEndTime(21); + rawQuery.setStartTime(21); + rawQuery.setEndTime(42); + + DataPoints[] rollupDataPoints = new DataPoints[] { + makeSpanGroup("group1"), + makeSpanGroup("group2"), + makeSpanGroup("group3"), + }; + + DataPoints[] rawDataPoints = new DataPoints[] { + makeSpanGroup("group2"), + makeSpanGroup("group3"), + makeSpanGroup("group4"), + makeSpanGroup("group5"), + }; + + doReturn(Deferred.fromResult(rollupDataPoints)).when(rollupQuery).runAsync(); + doReturn(Deferred.fromResult(rawDataPoints)).when(rawQuery).runAsync(); + + DataPoints[] actualPoints = queryUnderTest.run(); + + verify(rollupQuery).runAsync(); + verify(rawQuery).runAsync(); + + List actualGroups = new ArrayList<>(actualPoints.length); + for (DataPoints dataPoints : actualPoints) { + actualGroups.add(new String(((SplitRollupSpanGroup) dataPoints).group())); + } + Collections.sort(actualGroups); + + assertEquals(Arrays.asList("group1", "group2", "group3", "group4", "group5"), actualGroups); + } + + private SpanGroup makeSpanGroup(String group) { + + return new SpanGroup( + tsdb, + 0, + 42, + new ArrayList<>(), + false, + new RateOptions(), + Aggregators.SUM, + DownsamplingSpecification.NO_DOWNSAMPLER, + 0, + 42, + 0, + null, + group.getBytes() + ); + } + + private void mockEnableRollupQuerySplitting() { + Whitebox.setInternalState(tsdb, "rollups_split_queries", true); + Whitebox.setInternalState(rollupQuery, "rollup_query", makeRollupQuery()); + } +} diff --git a/test/core/TestSplitRollupSpanGroup.java b/test/core/TestSplitRollupSpanGroup.java new file mode 100644 index 0000000000..30ee198a0c --- /dev/null +++ b/test/core/TestSplitRollupSpanGroup.java @@ -0,0 +1,194 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2015 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import net.opentsdb.rollup.RollupSpan; +import net.opentsdb.utils.Config; +import org.hbase.async.Bytes; +import org.hbase.async.HBaseClient; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({TSDB.class, HBaseClient.class, Config.class, SpanGroup.class, + Span.class, RollupSpan.class}) +public class TestSplitRollupSpanGroup { + private final static long START_TS = 1356998400L; + private final static long SPLIT_TS = 1356998500L; + private final static long END_TS = 1356998600L; + + private TSDB tsdb; + private SpanGroup rollupSpanGroup; + private SpanGroup rawSpanGroup; + + @Before + public void before() { + rawSpanGroup = PowerMockito.spy(new SpanGroup(tsdb, START_TS, SPLIT_TS, null, false, Aggregators.SUM, 0, null)); + rollupSpanGroup = PowerMockito.spy(new SpanGroup(tsdb, SPLIT_TS, END_TS, null, false, Aggregators.SUM, 0, null)); + + doAnswer(new SeekableViewAnswer(START_TS, 100, 1, true)).when(rollupSpanGroup).iterator(); + doAnswer(new SeekableViewAnswer(SPLIT_TS, 100, 2, true)).when(rawSpanGroup).iterator(); + + tsdb = PowerMockito.mock(TSDB.class); + } + + @Test + public void testConstructorFiltersNullSpanGroups() { + SplitRollupSpanGroup group = new SplitRollupSpanGroup(null, rawSpanGroup); + final ArrayList actual = Whitebox.getInternalState(group, "spanGroups"); + assertEquals(1, actual.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorThrowsWhenAllNull() { + new SplitRollupSpanGroup(null, null); + } + + @Test + public void testMetricName() { + when(rollupSpanGroup.metricName()).thenReturn("metric name"); + assertEquals("metric name", (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).metricName()); + verifyZeroInteractions(rawSpanGroup); + } + + @Test + public void testMetricUID() { + when(rollupSpanGroup.metricUID()).thenReturn(new byte[]{0, 0, 1}); + assertArrayEquals(new byte[]{0, 0, 1}, (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).metricUID()); + verifyZeroInteractions(rawSpanGroup); + } + + @Test + public void testSize() { + when(rollupSpanGroup.size()).thenReturn(5); + when(rawSpanGroup.size()).thenReturn(7); + assertEquals(12, (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).size()); + } + + @Test + public void testAggregatedSize() { + when(rollupSpanGroup.aggregatedSize()).thenReturn(5); + when(rawSpanGroup.aggregatedSize()).thenReturn(7); + assertEquals(12, (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).aggregatedSize()); + } + + @Test + public void testGetTagUids() { + final Bytes.ByteMap uids1 = new Bytes.ByteMap<>(); + uids1.put(new byte[]{0, 0, 1}, new byte[]{0, 0, 2}); + final Bytes.ByteMap uids2 = new Bytes.ByteMap<>(); + uids2.put(new byte[]{0, 0, 3}, new byte[]{0, 0, 4}); + + when(rollupSpanGroup.getTagUids()).thenReturn(uids1); + when(rawSpanGroup.getTagUids()).thenReturn(uids2); + + Bytes.ByteMap actual = (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).getTagUids(); + + assertEquals(2, actual.size()); + assertArrayEquals(new byte[]{0, 0, 1}, actual.firstKey()); + assertArrayEquals(new byte[]{0, 0, 2}, actual.firstEntry().getValue()); + assertArrayEquals(new byte[]{0, 0, 3}, actual.lastKey()); + assertArrayEquals(new byte[]{0, 0, 4}, actual.lastEntry().getValue()); + } + + @Test + public void testGetAggregatedTagUids() { + final List uids1 = new ArrayList<>(); + uids1.add(new byte[]{0, 0, 1}); + final List uids2 = new ArrayList<>(); + uids2.add(new byte[]{0, 0, 2}); + + when(rollupSpanGroup.getAggregatedTagUids()).thenReturn(uids1); + when(rawSpanGroup.getAggregatedTagUids()).thenReturn(uids2); + + List actual = (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).getAggregatedTagUids(); + + assertEquals(2, actual.size()); + assertArrayEquals(new byte[]{0, 0, 1}, actual.get(0)); + assertArrayEquals(new byte[]{0, 0, 2}, actual.get(1)); + } + + @Test + public void testIterator() { + SeekableView iterator = (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).iterator(); + int size = 0; + while (iterator.hasNext()) { + size++; + iterator.next(); + } + assertEquals(3, size); + } + + @Test + public void testTimestamp() { + SplitRollupSpanGroup group = new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup); + assertEquals(START_TS, group.timestamp(0)); + assertEquals(SPLIT_TS, group.timestamp(1)); + assertEquals(END_TS, group.timestamp(2)); + } + + @Test + public void testIsInteger() { + assertTrue(new SplitRollupSpanGroup(rollupSpanGroup).isInteger(0)); + } + + @Test + public void testLongValue() { + assertEquals(0L, new SplitRollupSpanGroup(rollupSpanGroup).longValue(0)); + } + + @Test + public void testDoubleValue() { + doAnswer(new SeekableViewAnswer(START_TS, 100, 1, false)).when(rollupSpanGroup).iterator(); + assertEquals(0, new SplitRollupSpanGroup(rollupSpanGroup).doubleValue(0), 0.0); + } + + @Test + public void testGroup() { + when(rollupSpanGroup.metricName()).thenReturn("group name"); + assertEquals("group name", (new SplitRollupSpanGroup(rollupSpanGroup, rawSpanGroup)).metricName()); + verifyZeroInteractions(rawSpanGroup); + } + + static class SeekableViewAnswer implements Answer { + private final long timestamp; + private final int samplePeriod; + private final int numPoints; + private final boolean isInteger; + + SeekableViewAnswer(long startTimestamp, int samplePeriod, int numPoints, boolean isInteger) { + this.timestamp = startTimestamp; + this.samplePeriod = samplePeriod; + this.numPoints = numPoints; + this.isInteger = isInteger; + } + + @Override + public SeekableView answer(InvocationOnMock ignored) { + return SeekableViewsForTest.generator(timestamp, samplePeriod, numPoints, isInteger); + } + } +} diff --git a/test/core/TestTSDBAddAggregatePoint.java b/test/core/TestTSDBAddAggregatePoint.java index d1c1cfd2bf..60478b88f6 100644 --- a/test/core/TestTSDBAddAggregatePoint.java +++ b/test/core/TestTSDBAddAggregatePoint.java @@ -84,6 +84,7 @@ public void beforeLocal() throws Exception { Whitebox.setInternalState(tsdb, "default_interval", rollup_config.getRollupInterval("1m")); Whitebox.setInternalState(tsdb, "rollups_block_derived", true); + Whitebox.setInternalState(tsdb, "rollups_split_queries", false); Whitebox.setInternalState(tsdb, "agg_tag_key", config.getString("tsd.rollups.agg_tag_key")); Whitebox.setInternalState(tsdb, "raw_agg_tag_value", diff --git a/test/core/TestTSDBAddAggregatePointSalted.java b/test/core/TestTSDBAddAggregatePointSalted.java index 7bc7821af7..6b3642c8cc 100644 --- a/test/core/TestTSDBAddAggregatePointSalted.java +++ b/test/core/TestTSDBAddAggregatePointSalted.java @@ -85,6 +85,7 @@ public void beforeLocal() throws Exception { Whitebox.setInternalState(tsdb, "default_interval", rollup_config.getRollupInterval("1m")); Whitebox.setInternalState(tsdb, "rollups_block_derived", true); + Whitebox.setInternalState(tsdb, "rollups_split_queries", false); Whitebox.setInternalState(tsdb, "agg_tag_key", config.getString("tsd.rollups.agg_tag_key")); Whitebox.setInternalState(tsdb, "raw_agg_tag_value", diff --git a/test/core/TestTSQuery.java b/test/core/TestTSQuery.java index d528eaaf56..40f6622e84 100644 --- a/test/core/TestTSQuery.java +++ b/test/core/TestTSQuery.java @@ -17,12 +17,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.HashMap; +import com.stumbleupon.async.Deferred; import net.opentsdb.utils.DateTime; import org.junit.Test; @@ -32,7 +33,7 @@ import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) -@PrepareForTest({ TSQuery.class, DateTime.class }) +@PrepareForTest({ TSQuery.class, TsdbQuery.class, TSDB.class, SplitRollupQuery.class, DateTime.class }) public final class TestTSQuery { @Test @@ -633,6 +634,43 @@ public void testEqualsSame() { TSQuery sub1 = getMetricForValidate(); assertTrue(sub1.equals(sub1)); } + + @Test + public void testSplitsEligibleRollupQuery() throws Exception { + final TSQuery queryUnderTest = getMetricForValidate(); + + TSDB tsdb = PowerMockito.mock(TSDB.class); + TsdbQuery mockTsdbQuery = PowerMockito.mock(TsdbQuery.class); + when(mockTsdbQuery.configureFromQuery(eq(queryUnderTest), anyInt())).thenReturn(Deferred.fromResult(null)); + when(mockTsdbQuery.needsSplitting()).thenReturn(true); + when(tsdb.newQuery()).thenReturn(mockTsdbQuery); + + SplitRollupQuery mockSplitQuery = PowerMockito.mock(SplitRollupQuery.class); + when(mockSplitQuery.configureFromQuery(eq(queryUnderTest), anyInt())).thenReturn(Deferred.fromResult(null)); + + PowerMockito.whenNew(SplitRollupQuery.class).withAnyArguments().thenReturn(mockSplitQuery); + + queryUnderTest.buildQueriesAsync(tsdb); + + verify(mockSplitQuery).configureFromQuery(queryUnderTest, 0); + } + + @Test + public void testDoesNotSplitIneligibleRollupQuery() { + final TSQuery queryUnderTest = getMetricForValidate(); + + TSDB tsdb = PowerMockito.mock(TSDB.class); + TsdbQuery mockTsdbQuery = PowerMockito.mock(TsdbQuery.class); + when(mockTsdbQuery.configureFromQuery(eq(queryUnderTest), anyInt())).thenReturn(Deferred.fromResult(null)); + when(mockTsdbQuery.needsSplitting()).thenReturn(false); + when(tsdb.newQuery()).thenReturn(mockTsdbQuery); + + SplitRollupQuery mockedSplitQuery = PowerMockito.mock(SplitRollupQuery.class); + + queryUnderTest.buildQueriesAsync(tsdb); + + verifyZeroInteractions(mockedSplitQuery); + } /** * Sets up an object with good, common values for testing the validation diff --git a/test/core/TestTsdbQuery.java b/test/core/TestTsdbQuery.java index 366e9951b2..e564e62560 100644 --- a/test/core/TestTsdbQuery.java +++ b/test/core/TestTsdbQuery.java @@ -12,22 +12,17 @@ // see . package net.opentsdb.core; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.util.ArrayList; import java.util.Collections; import java.util.List; +import com.stumbleupon.async.Deferred; import net.opentsdb.core.TsdbQuery.ForTesting; import net.opentsdb.query.QueryLimitOverride; import net.opentsdb.query.filter.TagVFilter; import net.opentsdb.query.filter.TagVWildcardFilter; +import net.opentsdb.rollup.RollupInterval; +import net.opentsdb.rollup.RollupQuery; import net.opentsdb.storage.MockBase; import net.opentsdb.uid.NoSuchUniqueName; import net.opentsdb.utils.DateTime; @@ -42,6 +37,13 @@ import com.stumbleupon.async.DeferredGroupException; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.doReturn; +import static org.powermock.api.mockito.PowerMockito.spy; + /** * This class is for unit testing the TsdbQuery class. Pretty much making sure * the various ctors and methods function as expected. For actually running the @@ -49,8 +51,11 @@ * {@link TestTsdbQueryQueries} */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ DateTime.class }) +@PrepareForTest({ DateTime.class, TsdbQuery.class }) public final class TestTsdbQuery extends BaseTsdbTest { + + private static final long ONE_DAY_MS = 24 * 60 * 60 * 1000; + private TsdbQuery query = null; @Before @@ -356,6 +361,26 @@ public void configureFromQueryWithGroupByAndRegularFilters() throws Exception { assertNotNull(ForTesting.getRateOptions(query)); } + @Test + public void configureFromQueryWithForceRaw() throws Exception { + setDataPointStorage(); + mockEnableRollupQuerySplitting(); + + final TSQuery ts_query = getTSQuery(TsdbQuery.ROLLUP_USAGE.ROLLUP_NOFALLBACK); + ts_query.validateAndSetQuery(); + query = spy(new TsdbQuery(tsdb)); + query.configureFromQuery(ts_query, 0, true).joinUninterruptibly(); + + assertFalse(query.isRollupQuery()); + verify(query, never()).transformDownSamplerToRollupQuery(any(), any()); + + assertArrayEquals(METRIC_BYTES, ForTesting.getMetric(query)); + assertEquals(1, ForTesting.getFilters(query).size()); + assertArrayEquals(TAGK_BYTES, ForTesting.getGroupBys(query).get(0)); + assertEquals(1, ForTesting.getGroupBys(query).size()); + assertNotNull(ForTesting.getRateOptions(query)); + } + @Test (expected = IllegalArgumentException.class) public void configureFromQueryNullSubs() throws Exception { final TSQuery ts_query = new TSQuery(); @@ -566,21 +591,180 @@ public void scannerException() throws Exception { } } + @Test + public void needsSplittingReturnsFalseIfDisabled() { + Whitebox.setInternalState(tsdb, "rollups_split_queries", false); + assertFalse(query.needsSplitting()); + } + + @Test + public void needsSplittingReturnsFalseIfNotARollupQuery() { + Whitebox.setInternalState(tsdb, "rollups_split_queries", true); + Whitebox.setInternalState(query, "rollup_query", (RollupQuery) null); + assertFalse(query.needsSplitting()); + } + + @Test + public void needsSplittingReturnsFalseIfNoSLAConfigured() { + Whitebox.setInternalState(tsdb, "rollups_split_queries", true); + RollupInterval oneHourWithDelay = RollupInterval.builder() + .setTable("fake-rollup-table") + .setPreAggregationTable("fake-preagg-table") + .setInterval("1h") + .setRowSpan("1d") + .setDelaySla(null) + .build(); + RollupQuery rollup_query = new RollupQuery( + oneHourWithDelay, + Aggregators.SUM, + 3600000, + Aggregators.SUM + ); + Whitebox.setInternalState(query, "rollup_query", rollup_query); + + assertTrue(query.isRollupQuery()); + + assertFalse(query.needsSplitting()); + } + + @Test + public void needsSplittingReturnsFalseIfNotInBlackoutPeriod() { + mockSystemTime(1356998400000L); + mockEnableRollupQuerySplitting(); + + query.setStartTime(0); + query.setEndTime(1); + + assertTrue(query.isRollupQuery()); + + assertFalse(query.needsSplitting()); + } + + @Test + public void needsSplittingReturnsFalseIfQueryEndsWithLastRollupTimestamp() { + mockSystemTime(1356998400000L); + mockEnableRollupQuerySplitting(); + + query.setStartTime(0); + query.setEndTime(query.getRollupQuery().getLastRollupTimestampSeconds() * 1000L); + + assertTrue(query.isRollupQuery()); + + assertFalse(query.needsSplitting()); + } + + @Test + public void needsSplittingReturnsTrueIfQueryStartsWithLastRollupTimestamp() { + long mockNowTimestamp = 1356998400000L; + mockSystemTime(mockNowTimestamp); mockEnableRollupQuerySplitting(); + + query.setStartTime(query.getRollupQuery().getLastRollupTimestampSeconds() * 1000L); + + assertTrue(query.isRollupQuery()); + + assertTrue(query.needsSplitting()); + } + + @Test + public void needsSplittingReturnsTrueIfInBlackoutPeriod() { + long mockNowTimestamp = 1356998400000L; + mockSystemTime(mockNowTimestamp); + mockEnableRollupQuerySplitting(); + + query.setStartTime(0L); + query.setEndTime(mockNowTimestamp); + + assertTrue(query.isRollupQuery()); + + assertTrue(query.needsSplitting()); + } + + @Test + public void needsSplittingReturnsTrueIfStartAndEndInBlackoutPeriod() { + long mockNowTimestamp = 1356998400000L; + mockSystemTime(mockNowTimestamp); + mockEnableRollupQuerySplitting(); + + int oneHour = 60 * 60 * 1000; + + query.setStartTime(mockNowTimestamp - oneHour); + query.setEndTime(mockNowTimestamp); + + assertTrue(query.isRollupQuery()); + + assertTrue(query.needsSplitting()); + } + + @Test + public void split() { + long mockSystemTime = 1356998400000L; + mockSystemTime(mockSystemTime); + mockEnableRollupQuerySplitting(); + + TSQuery tsQuery = getTSQuery(); + TsdbQuery rawQuery = spy(new TsdbQuery(tsdb)); + + query.setStartTime(mockSystemTime - 7 * ONE_DAY_MS); + + doReturn(Deferred.fromResult(null)).when(rawQuery).configureFromQuery(eq(tsQuery), eq(0), eq(true)); + + query.split(tsQuery, 0, rawQuery); + + verify(rawQuery).configureFromQuery(eq(tsQuery), eq(0), eq(true)); + + assertEquals(mockSystemTime - 7 * ONE_DAY_MS, query.getStartTime()); + assertEquals(mockSystemTime - 2 * ONE_DAY_MS, query.getEndTime()); + assertEquals(mockSystemTime - 2 * ONE_DAY_MS, rawQuery.getStartTime()); + assertEquals(mockSystemTime, rawQuery.getEndTime()); + } + + @Test(expected = IllegalStateException.class) + public void splitThrowsIfNotSplittable() { + Whitebox.setInternalState(tsdb, "rollups_split_queries", false); + + query.split(getTSQuery(), 0, new TsdbQuery(tsdb)); + } + /** @return a simple TSQuery object for testing */ private TSQuery getTSQuery() { + return getTSQuery(null); + } + + private TSQuery getTSQuery(TsdbQuery.ROLLUP_USAGE rollupUsage) { final TSQuery ts_query = new TSQuery(); ts_query.setStart("1356998400"); + final ArrayList sub_queries = new ArrayList(1); + sub_queries.add(getSubQuery(rollupUsage)); + + ts_query.setQueries(sub_queries); + return ts_query; + } + + private TSSubQuery getSubQuery(TsdbQuery.ROLLUP_USAGE rollupUsage) { final TSSubQuery sub_query = new TSSubQuery(); sub_query.setMetric(METRIC_STRING); sub_query.setAggregator("sum"); sub_query.setTags(tags); - final ArrayList sub_queries = new ArrayList(1); - sub_queries.add(sub_query); + if (rollupUsage != null) { + sub_query.setRollupUsage(rollupUsage.name()); + } - ts_query.setQueries(sub_queries); - return ts_query; + return sub_query; + } + + private void mockSystemTime(long newTimestamp) { + PowerMockito.mockStatic(DateTime.class); + PowerMockito.when(DateTime.currentTimeMillis()).thenReturn(newTimestamp); + PowerMockito.when(DateTime.getDurationUnits(anyString())).thenCallRealMethod(); + PowerMockito.when(DateTime.getDurationInterval(anyString())).thenCallRealMethod(); + PowerMockito.when(DateTime.parseDuration(anyString())).thenCallRealMethod(); + } + + private void mockEnableRollupQuerySplitting() { + Whitebox.setInternalState(tsdb, "rollups_split_queries", true); + Whitebox.setInternalState(query, "rollup_query", makeRollupQuery()); } } diff --git a/test/query/TestQueryUtil.java b/test/query/TestQueryUtil.java index ab04c38568..ca0d12f25d 100644 --- a/test/query/TestQueryUtil.java +++ b/test/query/TestQueryUtil.java @@ -12,6 +12,8 @@ // see . package net.opentsdb.query; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -19,6 +21,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import net.opentsdb.core.Query; +import net.opentsdb.utils.DateTime; import org.hbase.async.Bytes.ByteMap; import org.hbase.async.FilterList; import org.hbase.async.KeyRegexpFilter; @@ -145,4 +149,20 @@ public void setDataTableScanFilterEnableBoth() throws Exception { verify(scanner, times(1)).setStartKey(any(byte[].class)); verify(scanner, times(1)).setStopKey(any(byte[].class)); } + + @Test + public void timestampComparison() { + long now = DateTime.currentTimeMillis() / 1000L; + assertFalse(QueryUtil.isTimestampAfter(now*1000, now+1)); + assertFalse(QueryUtil.isTimestampAfter(now-1, now*1000L)); + assertFalse(QueryUtil.isTimestampAfter(now-1, now)); + assertFalse(QueryUtil.isTimestampAfter((now-1)*1000L, now*1000L)); + + assertTrue(QueryUtil.isTimestampAfter(now+1, now*1000L)); + assertTrue(QueryUtil.isTimestampAfter(now*1000L, now-1)); + assertTrue(QueryUtil.isTimestampAfter(now, now-1)); + assertTrue(QueryUtil.isTimestampAfter(now*1000L, (now-1)*1000L)); + + assertFalse(QueryUtil.isTimestampAfter(now, now)); + } } diff --git a/test/rollup/TestRollupConfig.java b/test/rollup/TestRollupConfig.java index 0aef49e861..612c28f010 100644 --- a/test/rollup/TestRollupConfig.java +++ b/test/rollup/TestRollupConfig.java @@ -43,13 +43,16 @@ public class TestRollupConfig { private final static String tsdb_table = "tsdb"; private final static String rollup_table = "tsdb-rollup-10m"; private final static String preagg_table = "tsdb-rollup-agg-10m"; + private final static String rollup_table_1h = "tsdb-rollup-1h"; + private final static String preagg_table_1h = "tsdb-rollup-agg-1h"; private TSDB tsdb; private HBaseClient client; private RollupConfig.Builder builder; private RollupInterval raw; private RollupInterval tenmin; - + private RollupInterval oneHourWithDelay; + @Before public void before() throws Exception { tsdb = PowerMockito.mock(TSDB.class); @@ -70,26 +73,38 @@ public void before() throws Exception { .setInterval("10m") .setRowSpan("1d") .build(); - + + oneHourWithDelay = RollupInterval.builder() + .setTable(rollup_table_1h) + .setPreAggregationTable(preagg_table_1h) + .setInterval("1h") + .setRowSpan("1d") + .setDelaySla("2d") + .build(); + builder = RollupConfig.builder() .addAggregationId("Sum", 0) .addAggregationId("Max", 1) .addInterval(raw) - .addInterval(tenmin); + .addInterval(tenmin) + .addInterval(oneHourWithDelay); } @Test public void ctor() throws Exception { RollupConfig config = builder.build(); - assertEquals(2, config.forward_intervals.size()); + assertEquals(3, config.forward_intervals.size()); assertSame(raw, config.forward_intervals.get("1m")); assertSame(tenmin, config.forward_intervals.get("10m")); - - assertEquals(3, config.reverse_intervals.size()); + assertSame(oneHourWithDelay, config.forward_intervals.get("1h")); + + assertEquals(5, config.reverse_intervals.size()); assertSame(raw, config.reverse_intervals.get(tsdb_table)); assertSame(tenmin, config.reverse_intervals.get(rollup_table)); assertSame(tenmin, config.reverse_intervals.get(preagg_table)); - + assertSame(oneHourWithDelay, config.reverse_intervals.get(rollup_table_1h)); + assertSame(oneHourWithDelay, config.reverse_intervals.get(preagg_table_1h)); + assertEquals(2, config.aggregations_to_ids.size()); assertEquals(2, config.ids_to_aggregations.size()); @@ -102,7 +117,8 @@ public void ctor() throws Exception { // missing aggregations builder = RollupConfig.builder() .addInterval(raw) - .addInterval(tenmin); + .addInterval(tenmin) + .addInterval(oneHourWithDelay); try { builder.build(); fail("Expected IllegalArgumentException"); @@ -113,7 +129,8 @@ public void ctor() throws Exception { .addAggregationId("Sum", 1) .addAggregationId("Max", 1) .addInterval(raw) - .addInterval(tenmin); + .addInterval(tenmin) + .addInterval(oneHourWithDelay); try { builder.build(); fail("Expected IllegalArgumentException"); @@ -124,7 +141,8 @@ public void ctor() throws Exception { .addAggregationId("Sum", 0) .addAggregationId("Max", 128) .addInterval(raw) - .addInterval(tenmin); + .addInterval(tenmin) + .addInterval(oneHourWithDelay); try { builder.build(); fail("Expected IllegalArgumentException"); @@ -175,7 +193,8 @@ public void getRollupIntervalString() throws Exception { assertSame(raw, config.getRollupInterval("1m")); assertSame(tenmin, config.getRollupInterval("10m")); - + assertSame(oneHourWithDelay, config.getRollupInterval("1h")); + try { config.getRollupInterval("5m"); fail("Expected NoSuchRollupForIntervalException"); @@ -199,6 +218,8 @@ public void getRollupIntervalForTable() throws Exception { assertSame(raw, config.getRollupIntervalForTable(tsdb_table)); assertSame(tenmin, config.getRollupIntervalForTable(rollup_table)); assertSame(tenmin, config.getRollupIntervalForTable(preagg_table)); + assertSame(oneHourWithDelay, config.getRollupIntervalForTable(rollup_table_1h)); + assertSame(oneHourWithDelay, config.getRollupIntervalForTable(preagg_table_1h)); try { config.getRollupIntervalForTable("nosuchtable"); @@ -270,5 +291,7 @@ public Deferred answer(InvocationOnMock invocation) verify(client, times(2)).ensureTableExists(tsdb_table.getBytes()); verify(client, times(1)).ensureTableExists(rollup_table.getBytes()); verify(client, times(1)).ensureTableExists(preagg_table.getBytes()); + verify(client, times(1)).ensureTableExists(rollup_table_1h.getBytes()); + verify(client, times(1)).ensureTableExists(preagg_table_1h.getBytes()); } } diff --git a/test/rollup/TestRollupInterval.java b/test/rollup/TestRollupInterval.java index 61dd252a8d..d82674d61c 100644 --- a/test/rollup/TestRollupInterval.java +++ b/test/rollup/TestRollupInterval.java @@ -31,7 +31,7 @@ public class TestRollupInterval { private final static byte[] agg_table = preagg_table.getBytes(CHARSET); @Test - public void ctor1SecondHour() throws Exception { + public void ctor1SecondHourNoSla() throws Exception { final RollupInterval interval = RollupInterval.builder() .setTable(rollup_table) .setPreAggregationTable(preagg_table) @@ -47,16 +47,18 @@ public void ctor1SecondHour() throws Exception { assertEquals(preagg_table, interval.getPreAggregationTable()); assertEquals(0, Bytes.memcmp(table, interval.getTemporalTable())); assertEquals(0, Bytes.memcmp(agg_table, interval.getGroupbyTable())); + assertEquals(0, interval.getMaximumLag()); } // test odd boundaries @Test - public void ctor7SecondHour() throws Exception { + public void ctor7SecondHourTwoHoursDelay() throws Exception { final RollupInterval interval = RollupInterval.builder() .setTable(rollup_table) .setPreAggregationTable(preagg_table) .setInterval("7s") .setRowSpan("1h") + .setDelaySla("2h") .build(); assertEquals('h', interval.getUnits()); assertEquals("7s", interval.getInterval()); @@ -67,6 +69,7 @@ public void ctor7SecondHour() throws Exception { assertEquals(preagg_table, interval.getPreAggregationTable()); assertEquals(0, Bytes.memcmp(table, interval.getTemporalTable())); assertEquals(0, Bytes.memcmp(agg_table, interval.getGroupbyTable())); + assertEquals(7200, interval.getMaximumLag()); } @Test @@ -404,7 +407,7 @@ public void ctorUnknownSpan() throws Exception { .build(); } - @Test (expected = NullPointerException.class) + @Test (expected = IllegalArgumentException.class) public void ctorNullInterval() throws Exception { RollupInterval.builder() .setTable(rollup_table) @@ -414,7 +417,7 @@ public void ctorNullInterval() throws Exception { .build(); } - @Test (expected = StringIndexOutOfBoundsException.class) + @Test (expected = IllegalArgumentException.class) public void ctorEmptyInterval() throws Exception { RollupInterval.builder() .setTable(rollup_table) diff --git a/test/rollup/TestRollupQuery.java b/test/rollup/TestRollupQuery.java new file mode 100644 index 0000000000..31c0232dae --- /dev/null +++ b/test/rollup/TestRollupQuery.java @@ -0,0 +1,75 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2015-2017 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.rollup; + +import net.opentsdb.core.Aggregators; +import net.opentsdb.utils.DateTime; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + DateTime.class +}) +public class TestRollupQuery { + + private static final long MOCK_TIMESTAMP = 1554117071000L; + private static final int ONE_HOUR_SECONDS = 60 * 60; + private static final int ONE_DAY_SECONDS = 24 * ONE_HOUR_SECONDS; + private static final int TWO_DAYS_SECONDS = 2 * ONE_DAY_SECONDS; + + private RollupQuery query; + + @Before + public void before() { + final RollupInterval oneHourWithDelay = RollupInterval.builder() + .setTable("fake-rollup-table") + .setPreAggregationTable("fake-preagg-table") + .setInterval("1h") + .setRowSpan("1d") + .setDelaySla("2d") + .build(); + query = new RollupQuery( + oneHourWithDelay, + Aggregators.SUM, + 3600000, + Aggregators.SUM + ); + PowerMockito.mockStatic(DateTime.class); + PowerMockito.when(DateTime.currentTimeMillis()).thenReturn(MOCK_TIMESTAMP); + } + + + @Test + public void testGetLastRollupTimestamp() { + long nowSeconds = MOCK_TIMESTAMP / 1000; + long twoDaysAgo = nowSeconds - TWO_DAYS_SECONDS; + + assertEquals(twoDaysAgo, query.getLastRollupTimestampSeconds()); + } + + @Test + public void testIsInBlackoutPeriod() { + long oneHourAgo = MOCK_TIMESTAMP - ONE_HOUR_SECONDS * 1000; + assertTrue(query.isInBlackoutPeriod(oneHourAgo)); + + long threeDaysAgo = MOCK_TIMESTAMP - 3 * ONE_DAY_SECONDS * 1000; + assertFalse(query.isInBlackoutPeriod(threeDaysAgo)); + } +} diff --git a/test/tsd/TestRollupRpc.java b/test/tsd/TestRollupRpc.java index 23f5bf319f..ea7158f6dc 100644 --- a/test/tsd/TestRollupRpc.java +++ b/test/tsd/TestRollupRpc.java @@ -100,7 +100,8 @@ public void beforeLocal() throws Exception { storage.addTable("tsdb-rollup-agg-1h".getBytes(), families); storage.addTable("tsdb-agg".getBytes(), families); Whitebox.setInternalState(tsdb, "rollups_block_derived", true); - Whitebox.setInternalState(tsdb, "agg_tag_key", + Whitebox.setInternalState(tsdb, "rollups_split_queries", false); + Whitebox.setInternalState(tsdb, "agg_tag_key", config.getString("tsd.rollups.agg_tag_key")); Whitebox.setInternalState(tsdb, "raw_agg_tag_value", config.getString("tsd.rollups.raw_agg_tag_value")); @@ -850,5 +851,4 @@ public void httpUnknownInterval() throws Exception { validateCounters(0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0); validateSEH(false); } - -} \ No newline at end of file +} diff --git a/test/utils/TestDateTime.java b/test/utils/TestDateTime.java index 0f42843507..cee9866487 100644 --- a/test/utils/TestDateTime.java +++ b/test/utils/TestDateTime.java @@ -414,6 +414,11 @@ public void getDurationUnitsNull() { public void getDurationUnitsEmpty() { DateTime.getDurationUnits(""); } + + @Test (expected = IllegalArgumentException.class) + public void getDurationIsNull() { + DateTime.getDurationUnits(null); + } @Test public void getDurationInterval() {