Skip to content

Commit

Permalink
NestedIterator provides native support for a seek method (#2684)
Browse files Browse the repository at this point in the history
* NestedIterator provides native support for a seek method

* Update ArrayIterator

* Update class level documentation for SeekableIterator interface

* Add test to document NestedQueryIterator behavior when multiple nests exist

---------

Co-authored-by: alerman <awlerma@uwe.nsa.gov>
  • Loading branch information
apmoriarty and alerman authored Jan 15, 2025
1 parent 0c28a11 commit e57c855
Show file tree
Hide file tree
Showing 22 changed files with 231 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,8 @@ public Iterator<Entry<T,Document>> iterator() {
}

public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
Iterable<? extends NestedIterator<?>> leaves = tree.leaves();
for (NestedIterator<?> leaf : leaves) {
if (leaf instanceof SeekableIterator) {
((SeekableIterator) leaf).seek(range, columnFamilies, inclusive);
}
for (NestedIterator<?> leaf : tree.leaves()) {
leaf.seek(range, columnFamilies, inclusive);
}
seenSeek = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package datawave.query.iterator;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;

import datawave.query.attributes.Document;

/**
*
* A stub for the NestedIterator, functionally equivalent to {@link Collections#emptyIterator()}
*/
public class EmptyTreeIterable implements NestedIterator<Key> {

Expand All @@ -22,14 +25,19 @@ public Key move(Key minimum) {
return null;
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
// no-op
}

@Override
public Collection<NestedIterator<Key>> leaves() {
return Collections.EMPTY_SET;
return Collections.emptySet();
}

@Override
public Collection<NestedIterator<Key>> children() {
return Collections.EMPTY_SET;
return Collections.emptySet();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import datawave.query.attributes.Document;

/**
*
* This iterator supports a full table scan over the event column
*/
public class EventDataScanNestedIterator implements NestedIterator<Key>, SeekableIterator {
public class EventDataScanNestedIterator implements NestedIterator<Key> {
private static final Logger log = Logger.getLogger(EventDataScanNestedIterator.class);
protected SortedKeyValueIterator<Key,Value> source;
protected Key topKey = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collection;
import java.util.Collections;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
Expand Down Expand Up @@ -78,6 +79,11 @@ public Key move(Key minimum) {
return next();
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
source.seek(range, columnFamilies, inclusive);
}

@Override
public Collection<NestedIterator<Key>> leaves() {
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package datawave.query.iterator;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iterators.IteratorEnvironment;

import datawave.query.attributes.Document;
Expand Down Expand Up @@ -33,6 +36,20 @@ public interface NestedIterator<T> extends Iterator<T> {
*/
T move(T minimum);

/**
* Hook to allow issuing a seek to the underlying source iterator(s)
*
* @param range
* the seek range
* @param columnFamilies
* the column families
* @param inclusive
* true if range is inclusive
* @throws IOException
* for issues with reads
*/
void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException;

/**
* Returns a reference to all of the leaf nodes at or below <code>this</code>. This is useful when we need to call <code>seek</code> on leaf nodes that are
* <code>SortedKeyValueIterators</code>.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package datawave.query.iterator;

import java.io.IOException;
import java.util.Collection;
import java.util.Queue;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -81,23 +83,40 @@ public T next() {
@Override
public void remove() {
currentNest.remove();

}

@Override
public void initialize() {
if (null == currentNest) {
popNextNest();
} else
} else {
currentNest.initialize();

}
}

@Override
public T move(T minimum) {
return currentNest.move(minimum);
}

/**
* Seeks the current nest using the provided range. Note: if the range is beyond the current nest it is up to the caller to advance to the next nest via a
* call to {@link #hasNext()}
*
* @param range
* the seek range
* @param columnFamilies
* the column families
* @param inclusive
* true if range is inclusive
* @throws IOException
* if the underlying source has a problem
*/
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
currentNest.seek(range, columnFamilies, inclusive);
}

@Override
public Collection<NestedIterator<T>> leaves() {
return currentNest.leaves();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import datawave.query.iterator.profile.QuerySpanCollector;
import datawave.query.iterator.profile.SourceTrackingIterator;
import datawave.query.jexl.DatawaveJexlContext;
import datawave.query.jexl.JexlASTHelper;
import datawave.query.jexl.StatefulArithmetic;
import datawave.query.jexl.functions.FieldIndexAggregator;
import datawave.query.jexl.functions.IdentityAggregator;
Expand Down Expand Up @@ -636,7 +635,7 @@ protected NestedIterator<Key> buildDocumentIterator(Range documentRange, Range s
}

// Seek() the boolean logic stuff
((SeekableIterator) docIter).seek(range, columnFamilies, inclusive);
docIter.seek(range, columnFamilies, inclusive);

// now lets start off the nested iterator
docIter.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import org.apache.accumulo.core.data.Range;

/**
*
* See {@link NestedIterator#seek(Range, Collection, boolean)} for examples of how this interface was previously used.
*/
@Deprecated(forRemoval = true, since = "7.13.0")
public interface SeekableIterator {
/**
* @see org.apache.accumulo.core.iterators.SortedKeyValueIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import datawave.query.attributes.Document;

/**
*
* This class is a delegate/base class now that the SeekableIterator was merged into the NestedIterator interface
*/
public class SeekableNestedIterator<T> implements NestedIterator<T>, SeekableIterator {
public class SeekableNestedIterator<T> implements NestedIterator<T> {
private static final Logger log = Logger.getLogger(SeekableNestedIterator.class);
private NestedIterator<T> source;
private final NestedIterator<T> source;
protected Range totalRange = null;
protected Collection<ByteSequence> columnFamilies = null;
protected boolean inclusive = false;
Expand All @@ -30,16 +30,7 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
this.totalRange = range;
this.columnFamilies = columnFamilies;
this.inclusive = inclusive;
if (source instanceof SeekableIterator) {
((SeekableIterator) source).seek(range, columnFamilies, inclusive);
} else {
Iterable<? extends NestedIterator<?>> leaves = source.leaves();
for (NestedIterator<?> leaf : leaves) {
if (leaf instanceof SeekableIterator) {
((SeekableIterator) leaf).seek(range, columnFamilies, inclusive);
}
}
}
source.seek(range, columnFamilies, inclusive);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
import datawave.query.exceptions.DatawaveFatalQueryException;
import datawave.query.exceptions.QueryIteratorYieldingException;
import datawave.query.iterator.NestedIterator;
import datawave.query.iterator.SeekableIterator;
import datawave.query.iterator.Util;
import datawave.query.iterator.Util.Transformer;

/**
* Performs a merge join of the child iterators. It is expected that all child iterators return values in sorted order.
*/
public class AndIterator<T extends Comparable<T>> implements NestedIterator<T>, SeekableIterator {
public class AndIterator<T extends Comparable<T>> implements NestedIterator<T> {
// temporary stores of uninitialized streams of iterators
private List<NestedIterator<T>> includes, excludes, contextIncludes, contextExcludes;

Expand Down Expand Up @@ -258,49 +257,44 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
while (include.hasNext()) {
NestedIterator<T> child = include.next();
try {
for (NestedIterator<T> itr : child.leaves()) {
if (itr instanceof SeekableIterator) {
try {
((SeekableIterator) itr).seek(range, columnFamilies, inclusive);
} catch (IterationInterruptedException e2) {
// throw IterationInterrupted exceptions as-is with no modifications so the QueryIterator can handle it
throw e2;
} catch (Exception e2) {
if (itr.isNonEventField()) {
// dropping a non-event term from the query means that the accuracy of the query
// cannot be guaranteed. Thus, a fatal exception.
log.error("Lookup of a non-event field failed, failing query");
throw new DatawaveFatalQueryException("Lookup of non-event field failed", e2);
}
// otherwise we can safely drop this term from the intersection as the field will get re-introduced
// to the context when the event is aggregated
// Note: even though the precision of the query is affected the accuracy is not. i.e., documents that
// would have been defeated at the field index will now be defeated at evaluation time
throw e2;
}
try {
child.seek(range, columnFamilies, inclusive);
} catch (IterationInterruptedException e2) {
// throw IterationInterrupted exceptions as-is with no modifications so the QueryIterator can handle it
throw e2;
} catch (Exception e2) {
if (child.isNonEventField()) {
// dropping a non-event term from the query means that the accuracy of the query
// cannot be guaranteed. Thus, a fatal exception.
log.error("Lookup of a non-event field failed, failing query");
throw new DatawaveFatalQueryException("Lookup of non-event field failed", e2);
}
// otherwise we can safely drop this term from the intersection as the field will get re-introduced
// to the context when the event is aggregated
// Note: even though the precision of the query is affected the accuracy is not. i.e., documents that
// would have been defeated at the field index will now be defeated at evaluation time
throw e2;
}
} catch (QueryIteratorYieldingException qye) {
throw qye;
} catch (IterationInterruptedException iie) {
throw iie;
} catch (Exception e) {
include.remove();
if (includes.isEmpty() || e instanceof DatawaveFatalQueryException || e instanceof IterationInterruptedException) {
if (includes.isEmpty() || e instanceof DatawaveFatalQueryException) {
throw e;
} else {
log.warn("Lookup of event field failed, precision of query reduced.");
}
}
}
Iterator<NestedIterator<T>> exclude = excludes.iterator();
while (exclude.hasNext()) {
NestedIterator<T> child = exclude.next();
for (NestedIterator<T> itr : child.leaves()) {
if (itr instanceof SeekableIterator) {
((SeekableIterator) itr).seek(range, columnFamilies, inclusive);
}
}

for (NestedIterator<T> contextInclude : contextIncludes) {
contextInclude.seek(range, columnFamilies, inclusive);
}

for (NestedIterator<T> exclude : excludes) {
exclude.seek(range, columnFamilies, inclusive);
}

if (isInitialized()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package datawave.query.iterator.logic;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator;

import datawave.query.attributes.Document;
import datawave.query.iterator.NestedIterator;

/**
* A leaf node in an nested iterator tree. This is supposed to be a sample iterator that returns data from a sorted array.
*
*
* A leaf node in a nested iterator tree. This is supposed to be a sample iterator that returns data from a sorted array.
* <p>
* This class is deprecated. A suitable replacement is an {@link IndexIteratorBridge} using a {@link SortedMapIterator}.
*
* @param <T>
* the type of the array iterator
*/
@Deprecated(since = "7.13.0")
public class ArrayIterator<T extends Comparable<T>> implements NestedIterator<T> {
private static final Document doc = new Document();

Expand Down Expand Up @@ -58,6 +64,11 @@ public T move(T minimum) {
}
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
// no-op
}

public Collection<NestedIterator<T>> leaves() {
Collection<NestedIterator<T>> c = new LinkedList<>();
c.add(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@
import datawave.query.attributes.Document;
import datawave.query.iterator.DocumentIterator;
import datawave.query.iterator.NestedIterator;
import datawave.query.iterator.SeekableIterator;

/**
* Wraps an Accumulo iterator with a NestedIterator interface. This bridges the gap between an IndexIterator and a NestedIterator.
*
*
*
*/
public class IndexIteratorBridge implements SeekableIterator, NestedIterator<Key>, Comparable<IndexIteratorBridge> {
public class IndexIteratorBridge implements NestedIterator<Key>, Comparable<IndexIteratorBridge> {
private static final Logger log = Logger.getLogger(IndexIteratorBridge.class);

/*
Expand Down
Loading

0 comments on commit e57c855

Please sign in to comment.