Skip to content

Commit

Permalink
GH-5159 close child of GroupIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
frensjan committed Oct 28, 2024
1 parent 755a631 commit 9f6e7e9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
Expand Down Expand Up @@ -89,6 +90,9 @@ public class GroupIterator extends AbstractCloseableIteratorIteration<BindingSet

private final QueryEvaluationStep arguments;

// The iteration of the arguments, stored while building entries for allowing premature closing
private volatile CloseableIteration<BindingSet> argumentsIter;

private final ValueFactory vf;

private final CollectionFactory cf;
Expand Down Expand Up @@ -129,7 +133,11 @@ public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parent

@Override
public void handleClose() throws QueryEvaluationException {
cf.close();
try {
cf.close();
} finally {
argumentsIter.close();
}
}

@Override
Expand Down Expand Up @@ -256,7 +264,9 @@ private BiConsumer<Entry, MutableBindingSet> makeBindSolution(

private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates)
throws QueryEvaluationException {
try (var iter = arguments.evaluate(parentBindings)) {
// store the arguments' iterator so it can be closed while building entries
this.argumentsIter = arguments.evaluate(parentBindings);
try (var iter = argumentsIter) {
List<Function<BindingSet, Value>> getValues = group.getGroupBindingNames()
.stream()
.map(n -> context.getValue(n))
Expand Down Expand Up @@ -292,6 +302,8 @@ private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<
}
return entries.values();
}
} finally {
this.argumentsIter = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;

import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
Expand All @@ -29,22 +34,8 @@
import org.eclipse.rdf4j.model.vocabulary.XSD;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.AggregateFunctionCall;
import org.eclipse.rdf4j.query.algebra.Avg;
import org.eclipse.rdf4j.query.algebra.BindingSetAssignment;
import org.eclipse.rdf4j.query.algebra.Count;
import org.eclipse.rdf4j.query.algebra.Group;
import org.eclipse.rdf4j.query.algebra.GroupConcat;
import org.eclipse.rdf4j.query.algebra.GroupElem;
import org.eclipse.rdf4j.query.algebra.MathExpr;
import org.eclipse.rdf4j.query.algebra.Max;
import org.eclipse.rdf4j.query.algebra.Min;
import org.eclipse.rdf4j.query.algebra.Sample;
import org.eclipse.rdf4j.query.algebra.Sum;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
import org.eclipse.rdf4j.query.algebra.*;
import org.eclipse.rdf4j.query.algebra.evaluation.*;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.util.MathUtil;
Expand Down Expand Up @@ -260,6 +251,46 @@ public void testCustomAggregateFunction_WrongIri() throws QueryEvaluationExcepti
}
}

@Test
public void testGroupIteratorClose() throws QueryEvaluationException, InterruptedException {
// Lock which is already locked to block the thread driving the iteration
Lock lock = new ReentrantLock();
lock.lock();
// Latch to rendezvous on with the iterating thread
CountDownLatch iterating = new CountDownLatch(1);
// Latch to record whether the iteration under GroupIterator was closed
CountDownLatch closed = new CountDownLatch(1);

EvaluationStrategy evaluator = new StrictEvaluationStrategy(null, null) {
@Override
protected QueryEvaluationStep prepare(EmptySet emptySet, QueryEvaluationContext context)
throws QueryEvaluationException {
return bindings -> new LookAheadIteration<>() {
@Override
protected BindingSet getNextElement() {
iterating.countDown(); // signal to test thread iteration started
lock.lock(); // block iterating thread
return null;
}

@Override
protected void handleClose() {
closed.countDown();
}
};
}
};

Group group = new Group(new EmptySet());
GroupIterator groupIterator = new GroupIterator(evaluator, group, EmptyBindingSet.getInstance(), context);

Thread iteratorThread = new Thread(groupIterator::next);
iteratorThread.start();
assertThat(iterating.await(5, TimeUnit.SECONDS)).isTrue();
groupIterator.close();
assertThat(closed.await(5, TimeUnit.SECONDS)).isTrue();
}

/**
* Dummy collector to verify custom aggregate functions
*/
Expand Down

0 comments on commit 9f6e7e9

Please sign in to comment.