Skip to content

Commit

Permalink
Code review, buidler.
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright committed Sep 19, 2024
1 parent 22a2975 commit 2a7f6c0
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,8 @@ void doRefilter(
unmatchedRows.insert(refilterRequestedRowset);
refilterRequestedRowset = null;
}
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatchedRows);
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(unmatchedRows);
filterExecution.scheduleCompletion((adds, unusedMods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
Expand Down Expand Up @@ -41,6 +40,107 @@
public class TimeSeriesFilter
extends WhereFilterLivenessArtifactImpl
implements NotificationQueue.Dependency {
/**
* A builder to create a TimeSeriesFilter.
*/
public static class Builder {
private String columnName;
private long periodNanos;
private boolean invert;
private Clock clock;

private Builder() {
// use newBuilder
}

/**
* Set the column name to filter on.
*
* @param columnName the column name to filter on, required
* @return this builder
*/
public Builder columnName(String columnName) {
this.columnName = columnName;
return this;
}

/**
* Set an optional Clock to use for this filter. When not specified, the clock is determined by
* {@link DateTimeUtils#currentClock()}.
*
* @param clock the clock to use for the filter
* @return this builder
*/
public Builder clock(Clock clock) {
this.clock = clock;
return this;
}

/**
* Set the period for this filter.
*
* @param period the period as a string for the filter
* @return this builder
*/
public Builder period(final String period) {
return period(DateTimeUtils.parseDurationNanos(period));
}

/**
* Set the period for this filter.
*
* @param period the period as a Duration for the filter
* @return this builder
*/
public Builder period(Duration period) {
return period(period.toNanos());
}

/**
* Set the period for this filter.
*
* @param period the period in nanoseconds for the filter
* @return this builder
*/
public Builder period(long period) {
this.periodNanos = period;
return this;
}

/**
* Set whether this filter should be inverted. An inverted filter accepts only rows that have null timestamps or
* timestamps older than the period.
*
* @param invert true if the filter should be inverted.
* @return this builder
*/
public Builder invert(boolean invert) {
this.invert = invert;
return this;
}

/**
* Create the TimeSeriesFilter described by this builder.
*
* @return a TimeSeriesFilter using the options from this builder.
*/
public TimeSeriesFilter build() {
if (columnName == null) {
throw new IllegalArgumentException("Column name is required");
}
return new TimeSeriesFilter(columnName, periodNanos, invert, clock);
}
}

/**
* Create a builder for a time series filter.
*
* @return a Builder object to configure a TimeSeriesFilter
*/
public static Builder newBuilder() {
return new Builder();
}

private final String columnName;
private final long periodNanos;
private final boolean invert;
Expand All @@ -54,8 +154,8 @@ public class TimeSeriesFilter
private Runnable refreshFunctionForUnitTests;

/**
* The listener is responsible for listening to the WindowCheck result, updating our RowSet that contains the
* rows in our window, and then notifying the WhereListener that we are requesting recomputation.
* The listener is responsible for listening to the WindowCheck result, updating our RowSet that contains the rows
* in our window, and then notifying the WhereListener that we are requesting recomputation.
*/
private TimeSeriesFilterWindowListener windowListener;

Expand All @@ -79,62 +179,19 @@ public TimeSeriesFilter(final String columnName,
*/
public TimeSeriesFilter(final String columnName,
final long periodNanos) {
this(columnName, periodNanos, false);
}

// TODO: USE A BUILDER FOR THE CONSTRUCTOR

/**
* Create a TimeSeriesFilter on the given column for the given period in nanoseconds.
*
* <p>
* The filter may be <i>inverted</i>, meaning that instead of including recent rows within the window it only
* includes rows outside the window or that are null.
* </p>
*
* @param columnName the name of the timestamp column
* @param periodNanos the duration of the window in nanoseconds
* @param invert true if only rows outside the window should be included in the result
*/
public TimeSeriesFilter(final String columnName,
final long periodNanos,
final boolean invert) {
this(columnName, periodNanos, invert, null);
this(columnName, periodNanos, false, null);
}

/**
* Create a TimeSeriesFilter on the given column for the given period in nanoseconds.
*
* <p>
* The filter may be <i>inverted</i>, meaning that instead of including recent rows within the window it only
* includes rows outside the window.
* </p>
*
* @param columnName the name of the timestamp column
* @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String)}.
* @param invert true if only rows outside the window should be included in the result
*/
public TimeSeriesFilter(final String columnName,
final String period,
final boolean invert) {
this(columnName, DateTimeUtils.parseDurationNanos(period), invert, null);
}

/**
* Create a TimeSeriesFilter on the given column for the given period in nanoseconds.
*
* <p>
* The filter may be <i>inverted</i>, meaning that instead of including recent rows within the window it only
* includes rows outside the window.
* </p>
*
* @param columnName the name of the timestamp column
* @param periodNanos the duration of the window in nanoseconds
* @param invert true if only rows outside the window should be included in the result
* @param invert true if only rows outside the window (or with a null timestamp) should be included in the result
* @param clock the Clock to use as a time source for this filter, when null the clock supplied by
* {@link DateTimeUtils#currentClock()} is used.
*/
public TimeSeriesFilter(final String columnName,
private TimeSeriesFilter(final String columnName,
final long periodNanos,
final boolean invert,
@Nullable final Clock clock) {
Expand Down Expand Up @@ -236,7 +293,8 @@ private class TimeSeriesFilterWindowListener extends InstrumentedTableUpdateList
// the column source containing the window; which we match on
private final ColumnSource<Object> windowColumnSource;

protected TimeSeriesFilterWindowListener(String listenerDescription, QueryTable tableWithWindow, final String windowSourceName) {
protected TimeSeriesFilterWindowListener(String listenerDescription, QueryTable tableWithWindow,
final String windowSourceName) {
super(listenerDescription, tableWithWindow, false);
this.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName);
this.windowColumnSource = tableWithWindow.getColumnSource(windowSourceName);
Expand Down Expand Up @@ -293,7 +351,8 @@ public SafeCloseable beginOperation(@NotNull Table sourceTable) {
final QueryTable tableWithWindow = (QueryTable) pair.first;
refreshFunctionForUnitTests = pair.second;

windowListener = new TimeSeriesFilterWindowListener("TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")",
windowListener = new TimeSeriesFilterWindowListener(
"TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")",
tableWithWindow, windowSourceName);
tableWithWindow.addUpdateListener(windowListener);
manage(windowListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ interface RecomputeListener {

/**
* Notify that something about the filters has changed such that the following rows of the source table should
* be re-evaluated. The rowSet ownership is not taken by requestRecompute.
* be re-evaluated. The rowSet ownership is not taken by requestRecompute.
*/
void requestRecompute(RowSet rowSet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.util.TestClock;
import io.deephaven.engine.testutil.*;
import io.deephaven.engine.testutil.generator.DateGenerator;
import io.deephaven.engine.testutil.generator.IntGenerator;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.util.TestClock;
import io.deephaven.time.DateTimeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -49,7 +49,7 @@ public void testSimple() {
final TestClock testClock = new TestClock().setMillis(startTime);

final TimeSeriesFilter timeSeriesFilter =
new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), false, testClock);
TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT00:00:05").clock(testClock).build();
Table filtered = source.where(timeSeriesFilter);

TableTools.show(filtered);
Expand Down Expand Up @@ -97,7 +97,8 @@ public void testInverted() {
final TestClock testClock = new TestClock().setMillis(startTime);

final TimeSeriesFilter timeSeriesFilter =
new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), true, testClock);
TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT00:00:05").clock(testClock).invert(true)
.build();
Table filtered = source.where(timeSeriesFilter);

TableTools.show(filtered);
Expand Down Expand Up @@ -147,9 +148,11 @@ public void testIncremental() throws ParseException {
final TestClock testClock = new TestClock().setMillis(startDate.getTime());

final TimeSeriesFilter inclusionFilter =
new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock);
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(false)
.build();
final TimeSeriesFilter exclusionFilter =
new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock);
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true)
.build();
final ArrayList<WeakReference<TimeSeriesFilter>> filtersToRefresh = new ArrayList<>();

final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
Expand Down Expand Up @@ -184,9 +187,11 @@ public void testIncremental2() throws ParseException {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

final TimeSeriesFilter inclusionFilter =
new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock);
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(false)
.build();
final TimeSeriesFilter exclusionFilter =
new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock);
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true)
.build();
final ArrayList<WeakReference<TimeSeriesFilter>> filtersToRefresh = new ArrayList<>();

EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter);
Expand Down Expand Up @@ -303,9 +308,11 @@ public void testFilterSequence() {
final CountingFilter cfe2 = new CountingFilter();

final TimeSeriesFilter inclusionFilter =
new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock);
TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT01:00:00").clock(testClock)
.invert(false).build();
final TimeSeriesFilter exclusionFilter =
new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock);
TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT01:00:00").clock(testClock).invert(true)
.build();

final Table inclusion = source.where(Filter.and(cfi1, inclusionFilter, cfi2));
final Table exclusion = source.where(Filter.and(cfe1, exclusionFilter, cfe2));
Expand Down

This file was deleted.

Loading

0 comments on commit 2a7f6c0

Please sign in to comment.