Skip to content

Commit

Permalink
GH-5159 close child of GroupIterator (#5160)
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad authored Nov 3, 2024
2 parents 755a631 + 384d1b5 commit e25913a
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
Expand Down Expand Up @@ -89,6 +90,9 @@ public class GroupIterator extends AbstractCloseableIteratorIteration<BindingSet

private final QueryEvaluationStep arguments;

// The iteration of the arguments, stored while building entries for allowing premature closing
private volatile CloseableIteration<BindingSet> argumentsIter;

private final ValueFactory vf;

private final CollectionFactory cf;
Expand Down Expand Up @@ -129,7 +133,13 @@ public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parent

@Override
public void handleClose() throws QueryEvaluationException {
cf.close();
try {
cf.close();
} finally {
var iter = argumentsIter;
if (iter != null)
iter.close();
}
}

@Override
Expand Down Expand Up @@ -256,42 +266,46 @@ private BiConsumer<Entry, MutableBindingSet> makeBindSolution(

private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates)
throws QueryEvaluationException {
try (var iter = arguments.evaluate(parentBindings)) {
// store the arguments' iterator so it can be closed while building entries
this.argumentsIter = arguments.evaluate(parentBindings);
try (var iter = argumentsIter) {
if (!iter.hasNext()) {
return emptySolutionSpecialCase(aggregates);
}

List<Function<BindingSet, Value>> getValues = group.getGroupBindingNames()
.stream()
.map(n -> context.getValue(n))
.collect(Collectors.toList());

if (!iter.hasNext()) {
return emptySolutionSpecialCase(aggregates);
} else {
// TODO: this is an in memory map with no backing into any disk form.
// Fixing this requires separating the computation of the aggregates and their
// distinct sets if needed from the intermediary values.

Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
// Make an optimized hash function valid during this query evaluation step.
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
while (iter.hasNext()) {
BindingSet sol = iter.next();
// The binding set key will be constant
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
Entry entry = entries.get(key);
if (entry == null) {
List<AggregateCollector> collectors = makeCollectors(aggregates);
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
predicates.add(a.makePotentialDistinctTest.get());
}

entry = new Entry(sol, collectors, predicates);
entries.put(key, entry);
// TODO: this is an in memory map with no backing into any disk form.
// Fixing this requires separating the computation of the aggregates and their
// distinct sets if needed from the intermediary values.

Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
// Make an optimized hash function valid during this query evaluation step.
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
while (!isClosed() && iter.hasNext()) {
BindingSet sol = iter.next();
// The binding set key will be constant
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
Entry entry = entries.get(key);
if (entry == null) {
List<AggregateCollector> collectors = makeCollectors(aggregates);
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
predicates.add(a.makePotentialDistinctTest.get());
}

entry.addSolution(sol, aggregates);
entry = new Entry(sol, collectors, predicates);
entries.put(key, entry);
}
return entries.values();

entry.addSolution(sol, aggregates);
}
return entries.values();
} finally {
this.argumentsIter = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;

import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
Expand All @@ -29,22 +35,8 @@
import org.eclipse.rdf4j.model.vocabulary.XSD;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.AggregateFunctionCall;
import org.eclipse.rdf4j.query.algebra.Avg;
import org.eclipse.rdf4j.query.algebra.BindingSetAssignment;
import org.eclipse.rdf4j.query.algebra.Count;
import org.eclipse.rdf4j.query.algebra.Group;
import org.eclipse.rdf4j.query.algebra.GroupConcat;
import org.eclipse.rdf4j.query.algebra.GroupElem;
import org.eclipse.rdf4j.query.algebra.MathExpr;
import org.eclipse.rdf4j.query.algebra.Max;
import org.eclipse.rdf4j.query.algebra.Min;
import org.eclipse.rdf4j.query.algebra.Sample;
import org.eclipse.rdf4j.query.algebra.Sum;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
import org.eclipse.rdf4j.query.algebra.*;
import org.eclipse.rdf4j.query.algebra.evaluation.*;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.util.MathUtil;
Expand Down Expand Up @@ -260,6 +252,52 @@ public void testCustomAggregateFunction_WrongIri() throws QueryEvaluationExcepti
}
}

@Test
public void testGroupIteratorClose() throws QueryEvaluationException, InterruptedException {
// Lock which is already locked to block the thread driving the iteration
Lock lock = new ReentrantLock();
lock.lock();
// Latch to rendezvous on with the iterating thread
CountDownLatch iterating = new CountDownLatch(1);
// Latch to record whether the iteration under GroupIterator was closed
CountDownLatch closed = new CountDownLatch(1);

EvaluationStrategy evaluator = new StrictEvaluationStrategy(null, null) {
@Override
protected QueryEvaluationStep prepare(EmptySet emptySet, QueryEvaluationContext context)
throws QueryEvaluationException {
return bindings -> new LookAheadIteration<>() {
@Override
protected BindingSet getNextElement() {
iterating.countDown(); // signal to test thread iteration started
lock.lock(); // block iterating thread
return null;
}

@Override
protected void handleClose() {
closed.countDown();
}
};
}
};

Group group = new Group(new EmptySet());
GroupIterator groupIterator = new GroupIterator(evaluator, group, EmptyBindingSet.getInstance(), context);

Thread iteratorThread = new Thread(groupIterator::next, "GroupIteratorTest#testGroupIteratorClose");
try {
iteratorThread.start();
assertThat(iterating.await(5, TimeUnit.SECONDS)).isTrue();
groupIterator.close();
assertThat(closed.await(5, TimeUnit.SECONDS)).isTrue();
} finally {
lock.unlock();
iteratorThread.join(Duration.ofSeconds(5).toMillis());
assertThat(iteratorThread.isAlive()).isFalse();
}
}

/**
* Dummy collector to verify custom aggregate functions
*/
Expand Down

0 comments on commit e25913a

Please sign in to comment.