Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-5159 close child of GroupIterator #5160

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
Expand Down Expand Up @@ -89,6 +90,9 @@ public class GroupIterator extends AbstractCloseableIteratorIteration<BindingSet

private final QueryEvaluationStep arguments;

// The iteration of the arguments, stored while building entries for allowing premature closing
private volatile CloseableIteration<BindingSet> argumentsIter;

private final ValueFactory vf;

private final CollectionFactory cf;
Expand Down Expand Up @@ -129,7 +133,13 @@ public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parent

@Override
public void handleClose() throws QueryEvaluationException {
cf.close();
try {
cf.close();
} finally {
var iter = argumentsIter;
if (iter != null)
iter.close();
}
}

@Override
Expand Down Expand Up @@ -256,42 +266,46 @@ private BiConsumer<Entry, MutableBindingSet> makeBindSolution(

private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates)
throws QueryEvaluationException {
try (var iter = arguments.evaluate(parentBindings)) {
// store the arguments' iterator so it can be closed while building entries
this.argumentsIter = arguments.evaluate(parentBindings);
try (var iter = argumentsIter) {
if (!iter.hasNext()) {
return emptySolutionSpecialCase(aggregates);
}

List<Function<BindingSet, Value>> getValues = group.getGroupBindingNames()
.stream()
.map(n -> context.getValue(n))
.collect(Collectors.toList());

if (!iter.hasNext()) {
return emptySolutionSpecialCase(aggregates);
} else {
// TODO: this is an in memory map with no backing into any disk form.
// Fixing this requires separating the computation of the aggregates and their
// distinct sets if needed from the intermediary values.

Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
// Make an optimized hash function valid during this query evaluation step.
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
while (iter.hasNext()) {
BindingSet sol = iter.next();
// The binding set key will be constant
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
Entry entry = entries.get(key);
if (entry == null) {
List<AggregateCollector> collectors = makeCollectors(aggregates);
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
predicates.add(a.makePotentialDistinctTest.get());
}

entry = new Entry(sol, collectors, predicates);
entries.put(key, entry);
// TODO: this is an in memory map with no backing into any disk form.
// Fixing this requires separating the computation of the aggregates and their
// distinct sets if needed from the intermediary values.

Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
// Make an optimized hash function valid during this query evaluation step.
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
while (!isClosed() && iter.hasNext()) {
BindingSet sol = iter.next();
// The binding set key will be constant
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
Entry entry = entries.get(key);
if (entry == null) {
List<AggregateCollector> collectors = makeCollectors(aggregates);
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
predicates.add(a.makePotentialDistinctTest.get());
}

entry.addSolution(sol, aggregates);
entry = new Entry(sol, collectors, predicates);
entries.put(key, entry);
}
return entries.values();

entry.addSolution(sol, aggregates);
}
return entries.values();
} finally {
this.argumentsIter = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;

import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
Expand All @@ -29,22 +35,8 @@
import org.eclipse.rdf4j.model.vocabulary.XSD;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.AggregateFunctionCall;
import org.eclipse.rdf4j.query.algebra.Avg;
import org.eclipse.rdf4j.query.algebra.BindingSetAssignment;
import org.eclipse.rdf4j.query.algebra.Count;
import org.eclipse.rdf4j.query.algebra.Group;
import org.eclipse.rdf4j.query.algebra.GroupConcat;
import org.eclipse.rdf4j.query.algebra.GroupElem;
import org.eclipse.rdf4j.query.algebra.MathExpr;
import org.eclipse.rdf4j.query.algebra.Max;
import org.eclipse.rdf4j.query.algebra.Min;
import org.eclipse.rdf4j.query.algebra.Sample;
import org.eclipse.rdf4j.query.algebra.Sum;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
import org.eclipse.rdf4j.query.algebra.*;
import org.eclipse.rdf4j.query.algebra.evaluation.*;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.util.MathUtil;
Expand Down Expand Up @@ -260,6 +252,52 @@ public void testCustomAggregateFunction_WrongIri() throws QueryEvaluationExcepti
}
}

@Test
public void testGroupIteratorClose() throws QueryEvaluationException, InterruptedException {
// Lock which is already locked to block the thread driving the iteration
Lock lock = new ReentrantLock();
lock.lock();
// Latch to rendezvous on with the iterating thread
CountDownLatch iterating = new CountDownLatch(1);
// Latch to record whether the iteration under GroupIterator was closed
CountDownLatch closed = new CountDownLatch(1);

EvaluationStrategy evaluator = new StrictEvaluationStrategy(null, null) {
@Override
protected QueryEvaluationStep prepare(EmptySet emptySet, QueryEvaluationContext context)
throws QueryEvaluationException {
return bindings -> new LookAheadIteration<>() {
@Override
protected BindingSet getNextElement() {
iterating.countDown(); // signal to test thread iteration started
lock.lock(); // block iterating thread
return null;
}

@Override
protected void handleClose() {
closed.countDown();
}
};
}
};

Group group = new Group(new EmptySet());
GroupIterator groupIterator = new GroupIterator(evaluator, group, EmptyBindingSet.getInstance(), context);

Thread iteratorThread = new Thread(groupIterator::next, "GroupIteratorTest#testGroupIteratorClose");
try {
iteratorThread.start();
assertThat(iterating.await(5, TimeUnit.SECONDS)).isTrue();
groupIterator.close();
assertThat(closed.await(5, TimeUnit.SECONDS)).isTrue();
} finally {
lock.unlock();
iteratorThread.join(Duration.ofSeconds(5).toMillis());
assertThat(iteratorThread.isAlive()).isFalse();
}
}

/**
* Dummy collector to verify custom aggregate functions
*/
Expand Down
Loading