Skip to content

Commit

Permalink
GH-5159 close child of GroupIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
frensjan committed Nov 2, 2024
1 parent 755a631 commit c58c287
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
Expand Down Expand Up @@ -89,6 +90,9 @@ public class GroupIterator extends AbstractCloseableIteratorIteration<BindingSet

private final QueryEvaluationStep arguments;

// The iteration of the arguments, stored while building entries for allowing premature closing
private volatile CloseableIteration<BindingSet> argumentsIter;

private final ValueFactory vf;

private final CollectionFactory cf;
Expand Down Expand Up @@ -129,7 +133,12 @@ public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parent

@Override
public void handleClose() throws QueryEvaluationException {
cf.close();
try {
cf.close();
} finally {
var iter = argumentsIter;
if (iter != null) iter.close();
}
}

@Override
Expand Down Expand Up @@ -256,42 +265,46 @@ private BiConsumer<Entry, MutableBindingSet> makeBindSolution(

private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates)
throws QueryEvaluationException {
try (var iter = arguments.evaluate(parentBindings)) {
// store the arguments' iterator so it can be closed while building entries
this.argumentsIter = arguments.evaluate(parentBindings);
try (var iter = argumentsIter) {
if (!iter.hasNext()) {
return emptySolutionSpecialCase(aggregates);
}

List<Function<BindingSet, Value>> getValues = group.getGroupBindingNames()
.stream()
.map(n -> context.getValue(n))
.collect(Collectors.toList());

if (!iter.hasNext()) {
return emptySolutionSpecialCase(aggregates);
} else {
// TODO: this is an in memory map with no backing into any disk form.
// Fixing this requires separating the computation of the aggregates and their
// distinct sets if needed from the intermediary values.

Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
// Make an optimized hash function valid during this query evaluation step.
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
while (iter.hasNext()) {
BindingSet sol = iter.next();
// The binding set key will be constant
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
Entry entry = entries.get(key);
if (entry == null) {
List<AggregateCollector> collectors = makeCollectors(aggregates);
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
predicates.add(a.makePotentialDistinctTest.get());
}

entry = new Entry(sol, collectors, predicates);
entries.put(key, entry);
// TODO: this is an in memory map with no backing into any disk form.
// Fixing this requires separating the computation of the aggregates and their
// distinct sets if needed from the intermediary values.

Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
// Make an optimized hash function valid during this query evaluation step.
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
while (!isClosed() && iter.hasNext()) {
BindingSet sol = iter.next();
// The binding set key will be constant
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
Entry entry = entries.get(key);
if (entry == null) {
List<AggregateCollector> collectors = makeCollectors(aggregates);
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
predicates.add(a.makePotentialDistinctTest.get());
}

entry.addSolution(sol, aggregates);
entry = new Entry(sol, collectors, predicates);
entries.put(key, entry);
}
return entries.values();

entry.addSolution(sol, aggregates);
}
return entries.values();
} finally {
this.argumentsIter = null;
}
}

Expand Down
Loading

0 comments on commit c58c287

Please sign in to comment.