Skip to content

Commit e25913a

Browse files
authored
GH-5159 close child of GroupIterator (#5160)
2 parents 755a631 + 384d1b5 commit e25913a

2 files changed

Lines changed: 96 additions & 44 deletions

File tree

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIterator.java

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
3131
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
3232
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteratorIteration;
33+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
3334
import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode;
3435
import org.eclipse.rdf4j.model.Literal;
3536
import org.eclipse.rdf4j.model.Value;
@@ -89,6 +90,9 @@ public class GroupIterator extends AbstractCloseableIteratorIteration<BindingSet
8990

9091
private final QueryEvaluationStep arguments;
9192

93+
// The iteration of the arguments, stored while building entries for allowing premature closing
94+
private volatile CloseableIteration<BindingSet> argumentsIter;
95+
9296
private final ValueFactory vf;
9397

9498
private final CollectionFactory cf;
@@ -129,7 +133,13 @@ public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parent
129133

130134
@Override
131135
public void handleClose() throws QueryEvaluationException {
132-
cf.close();
136+
try {
137+
cf.close();
138+
} finally {
139+
var iter = argumentsIter;
140+
if (iter != null)
141+
iter.close();
142+
}
133143
}
134144

135145
@Override
@@ -256,42 +266,46 @@ private BiConsumer<Entry, MutableBindingSet> makeBindSolution(
256266

257267
private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates)
258268
throws QueryEvaluationException {
259-
try (var iter = arguments.evaluate(parentBindings)) {
269+
// store the arguments' iterator so it can be closed while building entries
270+
this.argumentsIter = arguments.evaluate(parentBindings);
271+
try (var iter = argumentsIter) {
272+
if (!iter.hasNext()) {
273+
return emptySolutionSpecialCase(aggregates);
274+
}
275+
260276
List<Function<BindingSet, Value>> getValues = group.getGroupBindingNames()
261277
.stream()
262278
.map(n -> context.getValue(n))
263279
.collect(Collectors.toList());
264280

265-
if (!iter.hasNext()) {
266-
return emptySolutionSpecialCase(aggregates);
267-
} else {
268-
// TODO: this is an in memory map with no backing into any disk form.
269-
// Fixing this requires separating the computation of the aggregates and their
270-
// distinct sets if needed from the intermediary values.
271-
272-
Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
273-
// Make an optimized hash function valid during this query evaluation step.
274-
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
275-
while (iter.hasNext()) {
276-
BindingSet sol = iter.next();
277-
// The binding set key will be constant
278-
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
279-
Entry entry = entries.get(key);
280-
if (entry == null) {
281-
List<AggregateCollector> collectors = makeCollectors(aggregates);
282-
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
283-
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
284-
predicates.add(a.makePotentialDistinctTest.get());
285-
}
286-
287-
entry = new Entry(sol, collectors, predicates);
288-
entries.put(key, entry);
281+
// TODO: this is an in memory map with no backing into any disk form.
282+
// Fixing this requires separating the computation of the aggregates and their
283+
// distinct sets if needed from the intermediary values.
284+
285+
Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
286+
// Make an optimized hash function valid during this query evaluation step.
287+
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
288+
while (!isClosed() && iter.hasNext()) {
289+
BindingSet sol = iter.next();
290+
// The binding set key will be constant
291+
BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
292+
Entry entry = entries.get(key);
293+
if (entry == null) {
294+
List<AggregateCollector> collectors = makeCollectors(aggregates);
295+
List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
296+
for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
297+
predicates.add(a.makePotentialDistinctTest.get());
289298
}
290299

291-
entry.addSolution(sol, aggregates);
300+
entry = new Entry(sol, collectors, predicates);
301+
entries.put(key, entry);
292302
}
293-
return entries.values();
303+
304+
entry.addSolution(sol, aggregates);
294305
}
306+
return entries.values();
307+
} finally {
308+
this.argumentsIter = null;
295309
}
296310
}
297311

core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIteratorTest.java

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,19 @@
1313
import static org.assertj.core.api.Assertions.assertThat;
1414
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
1515

16+
import java.time.Duration;
1617
import java.time.Instant;
1718
import java.util.ArrayList;
1819
import java.util.Collections;
1920
import java.util.Date;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.locks.Lock;
24+
import java.util.concurrent.locks.ReentrantLock;
2025
import java.util.function.Function;
2126
import java.util.function.Predicate;
2227

28+
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
2329
import org.eclipse.rdf4j.model.Literal;
2430
import org.eclipse.rdf4j.model.Value;
2531
import org.eclipse.rdf4j.model.ValueFactory;
@@ -29,22 +35,8 @@
2935
import org.eclipse.rdf4j.model.vocabulary.XSD;
3036
import org.eclipse.rdf4j.query.BindingSet;
3137
import org.eclipse.rdf4j.query.QueryEvaluationException;
32-
import org.eclipse.rdf4j.query.algebra.AggregateFunctionCall;
33-
import org.eclipse.rdf4j.query.algebra.Avg;
34-
import org.eclipse.rdf4j.query.algebra.BindingSetAssignment;
35-
import org.eclipse.rdf4j.query.algebra.Count;
36-
import org.eclipse.rdf4j.query.algebra.Group;
37-
import org.eclipse.rdf4j.query.algebra.GroupConcat;
38-
import org.eclipse.rdf4j.query.algebra.GroupElem;
39-
import org.eclipse.rdf4j.query.algebra.MathExpr;
40-
import org.eclipse.rdf4j.query.algebra.Max;
41-
import org.eclipse.rdf4j.query.algebra.Min;
42-
import org.eclipse.rdf4j.query.algebra.Sample;
43-
import org.eclipse.rdf4j.query.algebra.Sum;
44-
import org.eclipse.rdf4j.query.algebra.Var;
45-
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
46-
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
47-
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
38+
import org.eclipse.rdf4j.query.algebra.*;
39+
import org.eclipse.rdf4j.query.algebra.evaluation.*;
4840
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
4941
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
5042
import org.eclipse.rdf4j.query.algebra.evaluation.util.MathUtil;
@@ -260,6 +252,52 @@ public void testCustomAggregateFunction_WrongIri() throws QueryEvaluationExcepti
260252
}
261253
}
262254

255+
@Test
256+
public void testGroupIteratorClose() throws QueryEvaluationException, InterruptedException {
257+
// Lock which is already locked to block the thread driving the iteration
258+
Lock lock = new ReentrantLock();
259+
lock.lock();
260+
// Latch to rendezvous on with the iterating thread
261+
CountDownLatch iterating = new CountDownLatch(1);
262+
// Latch to record whether the iteration under GroupIterator was closed
263+
CountDownLatch closed = new CountDownLatch(1);
264+
265+
EvaluationStrategy evaluator = new StrictEvaluationStrategy(null, null) {
266+
@Override
267+
protected QueryEvaluationStep prepare(EmptySet emptySet, QueryEvaluationContext context)
268+
throws QueryEvaluationException {
269+
return bindings -> new LookAheadIteration<>() {
270+
@Override
271+
protected BindingSet getNextElement() {
272+
iterating.countDown(); // signal to test thread iteration started
273+
lock.lock(); // block iterating thread
274+
return null;
275+
}
276+
277+
@Override
278+
protected void handleClose() {
279+
closed.countDown();
280+
}
281+
};
282+
}
283+
};
284+
285+
Group group = new Group(new EmptySet());
286+
GroupIterator groupIterator = new GroupIterator(evaluator, group, EmptyBindingSet.getInstance(), context);
287+
288+
Thread iteratorThread = new Thread(groupIterator::next, "GroupIteratorTest#testGroupIteratorClose");
289+
try {
290+
iteratorThread.start();
291+
assertThat(iterating.await(5, TimeUnit.SECONDS)).isTrue();
292+
groupIterator.close();
293+
assertThat(closed.await(5, TimeUnit.SECONDS)).isTrue();
294+
} finally {
295+
lock.unlock();
296+
iteratorThread.join(Duration.ofSeconds(5).toMillis());
297+
assertThat(iteratorThread.isAlive()).isFalse();
298+
}
299+
}
300+
263301
/**
264302
* Dummy collector to verify custom aggregate functions
265303
*/

0 commit comments

Comments
 (0)