From 384d1b520c09fb600146160cd8459a42f7bc756b Mon Sep 17 00:00:00 2001 From: Frens Jan Rumph Date: Mon, 28 Oct 2024 08:02:37 +0100 Subject: [PATCH] GH-5159 close child of GroupIterator --- .../evaluation/iterator/GroupIterator.java | 70 +++++++++++-------- .../iterator/GroupIteratorTest.java | 70 ++++++++++++++----- 2 files changed, 96 insertions(+), 44 deletions(-) diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIterator.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIterator.java index a860d9430f9..7f7b290771c 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIterator.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIterator.java @@ -30,6 +30,7 @@ import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory; import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteratorIteration; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode; import org.eclipse.rdf4j.model.Literal; import org.eclipse.rdf4j.model.Value; @@ -89,6 +90,9 @@ public class GroupIterator extends AbstractCloseableIteratorIteration argumentsIter; + private final ValueFactory vf; private final CollectionFactory cf; @@ -129,7 +133,13 @@ public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parent @Override public void handleClose() throws QueryEvaluationException { - cf.close(); + try { + cf.close(); + } finally { + var iter = argumentsIter; + if (iter != null) + iter.close(); + } } @Override @@ -256,42 +266,46 @@ private BiConsumer makeBindSolution( private Collection buildEntries(List> aggregates) throws QueryEvaluationException { - try (var iter = arguments.evaluate(parentBindings)) { + // store the arguments' iterator so it can be closed while building entries + this.argumentsIter = arguments.evaluate(parentBindings); + try (var iter = argumentsIter) { + if (!iter.hasNext()) { + return emptySolutionSpecialCase(aggregates); + } + List> getValues = group.getGroupBindingNames() .stream() .map(n -> context.getValue(n)) .collect(Collectors.toList()); - if (!iter.hasNext()) { - return emptySolutionSpecialCase(aggregates); - } else { - // TODO: this is an in memory map with no backing into any disk form. - // Fixing this requires separating the computation of the aggregates and their - // distinct sets if needed from the intermediary values. - - Map entries = cf.createGroupByMap(); - // Make an optimized hash function valid during this query evaluation step. - ToIntFunction hashMaker = cf.hashOfBindingSetFuntion(getValues); - while (iter.hasNext()) { - BindingSet sol = iter.next(); - // The binding set key will be constant - BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker); - Entry entry = entries.get(key); - if (entry == null) { - List collectors = makeCollectors(aggregates); - List> predicates = new ArrayList<>(aggregates.size()); - for (AggregatePredicateCollectorSupplier a : aggregates) { - predicates.add(a.makePotentialDistinctTest.get()); - } - - entry = new Entry(sol, collectors, predicates); - entries.put(key, entry); + // TODO: this is an in memory map with no backing into any disk form. + // Fixing this requires separating the computation of the aggregates and their + // distinct sets if needed from the intermediary values. + + Map entries = cf.createGroupByMap(); + // Make an optimized hash function valid during this query evaluation step. + ToIntFunction hashMaker = cf.hashOfBindingSetFuntion(getValues); + while (!isClosed() && iter.hasNext()) { + BindingSet sol = iter.next(); + // The binding set key will be constant + BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker); + Entry entry = entries.get(key); + if (entry == null) { + List collectors = makeCollectors(aggregates); + List> predicates = new ArrayList<>(aggregates.size()); + for (AggregatePredicateCollectorSupplier a : aggregates) { + predicates.add(a.makePotentialDistinctTest.get()); } - entry.addSolution(sol, aggregates); + entry = new Entry(sol, collectors, predicates); + entries.put(key, entry); } - return entries.values(); + + entry.addSolution(sol, aggregates); } + return entries.values(); + } finally { + this.argumentsIter = null; } } diff --git a/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIteratorTest.java b/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIteratorTest.java index 23fa0211849..bdcb5d6abbe 100644 --- a/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIteratorTest.java +++ b/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIteratorTest.java @@ -13,13 +13,19 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.Predicate; +import org.eclipse.rdf4j.common.iteration.LookAheadIteration; import org.eclipse.rdf4j.model.Literal; import org.eclipse.rdf4j.model.Value; import org.eclipse.rdf4j.model.ValueFactory; @@ -29,22 +35,8 @@ import org.eclipse.rdf4j.model.vocabulary.XSD; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.QueryEvaluationException; -import org.eclipse.rdf4j.query.algebra.AggregateFunctionCall; -import org.eclipse.rdf4j.query.algebra.Avg; -import org.eclipse.rdf4j.query.algebra.BindingSetAssignment; -import org.eclipse.rdf4j.query.algebra.Count; -import org.eclipse.rdf4j.query.algebra.Group; -import org.eclipse.rdf4j.query.algebra.GroupConcat; -import org.eclipse.rdf4j.query.algebra.GroupElem; -import org.eclipse.rdf4j.query.algebra.MathExpr; -import org.eclipse.rdf4j.query.algebra.Max; -import org.eclipse.rdf4j.query.algebra.Min; -import org.eclipse.rdf4j.query.algebra.Sample; -import org.eclipse.rdf4j.query.algebra.Sum; -import org.eclipse.rdf4j.query.algebra.Var; -import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy; -import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet; -import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException; +import org.eclipse.rdf4j.query.algebra.*; +import org.eclipse.rdf4j.query.algebra.evaluation.*; import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext; import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.util.MathUtil; @@ -260,6 +252,52 @@ public void testCustomAggregateFunction_WrongIri() throws QueryEvaluationExcepti } } + @Test + public void testGroupIteratorClose() throws QueryEvaluationException, InterruptedException { + // Lock which is already locked to block the thread driving the iteration + Lock lock = new ReentrantLock(); + lock.lock(); + // Latch to rendezvous on with the iterating thread + CountDownLatch iterating = new CountDownLatch(1); + // Latch to record whether the iteration under GroupIterator was closed + CountDownLatch closed = new CountDownLatch(1); + + EvaluationStrategy evaluator = new StrictEvaluationStrategy(null, null) { + @Override + protected QueryEvaluationStep prepare(EmptySet emptySet, QueryEvaluationContext context) + throws QueryEvaluationException { + return bindings -> new LookAheadIteration<>() { + @Override + protected BindingSet getNextElement() { + iterating.countDown(); // signal to test thread iteration started + lock.lock(); // block iterating thread + return null; + } + + @Override + protected void handleClose() { + closed.countDown(); + } + }; + } + }; + + Group group = new Group(new EmptySet()); + GroupIterator groupIterator = new GroupIterator(evaluator, group, EmptyBindingSet.getInstance(), context); + + Thread iteratorThread = new Thread(groupIterator::next, "GroupIteratorTest#testGroupIteratorClose"); + try { + iteratorThread.start(); + assertThat(iterating.await(5, TimeUnit.SECONDS)).isTrue(); + groupIterator.close(); + assertThat(closed.await(5, TimeUnit.SECONDS)).isTrue(); + } finally { + lock.unlock(); + iteratorThread.join(Duration.ofSeconds(5).toMillis()); + assertThat(iteratorThread.isAlive()).isFalse(); + } + } + /** * Dummy collector to verify custom aggregate functions */