Skip to content

Commit 576122d

Browse files
GH-4899 Fix a bug in the PathIteration due strings not matching between
different constructions. Introduce optimise-able data-structures in the collection factory for ValuePair used in the PathIteration without exposing the type by wrapping making it extend BindingSet.
1 parent 6be83a2 commit 576122d

3 files changed

Lines changed: 210 additions & 24 deletions

File tree

core/collection-factory/api/src/main/java/org/eclipse/rdf4j/collection/factory/api/CollectionFactory.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public default Set<BindingSet> createSetOfBindingSets() {
8484
* @return a set that may be optimised and/or disk based
8585
*/
8686
public Set<BindingSet> createSetOfBindingSets(Supplier<MutableBindingSet> create,
87-
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getget,
87+
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getGet,
8888
Function<String, BiConsumer<Value, MutableBindingSet>> getSet);
8989

9090
/**
@@ -116,6 +116,25 @@ public Set<BindingSet> createSetOfBindingSets(Supplier<MutableBindingSet> create
116116
*/
117117
public Queue<Value> createValueQueue();
118118

119+
/**
120+
* @return a new queue that may be optimized and may use the functions passed in.
121+
*/
122+
public default Queue<BindingSet> createBindingSetQueue(Supplier<MutableBindingSet> create,
123+
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getget,
124+
Function<String, BiConsumer<Value, MutableBindingSet>> getSet) {
125+
return createQueue();
126+
}
127+
128+
/**
129+
* @return a new queue optimized for bindingsets
130+
*/
131+
public default Queue<BindingSet> createBindingSetQueue() {
132+
Function<String, Predicate<BindingSet>> gethas = (n) -> (b) -> b.hasBinding(n);
133+
Function<String, Function<BindingSet, Value>> getget = (n) -> (b) -> b.getValue(n);
134+
Function<String, BiConsumer<Value, MutableBindingSet>> getSet = (n) -> (v, b) -> b.setBinding(n, v);
135+
return createBindingSetQueue(MapBindingSet::new, gethas, getget, getSet);
136+
}
137+
119138
@InternalUseOnly
120139
public <E> Map<BindingSetKey, E> createGroupByMap();
121140

core/collection-factory/mapdb3/src/main/java/org/eclipse/rdf4j/collection/factory/mapdb/MapDb3CollectionFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,22 @@ public Queue<Value> createValueQueue() {
259259
}
260260
}
261261

262+
@Override
263+
public Queue<BindingSet> createBindingSetQueue(Supplier<MutableBindingSet> create,
264+
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getget,
265+
Function<String, BiConsumer<Value, MutableBindingSet>> getSet) {
266+
if (iterationCacheSyncThreshold > 0) {
267+
init();
268+
Serializer<BindingSet> s = createBindingSetSerializer(create, getHas, getget, getSet);
269+
Map<Long, BindingSet> m = db.hashMap(Long.toHexString(colectionId++), new SerializerLong(), s).create();
270+
return new MemoryTillSizeXQueue<>(delegate.createBindingSetQueue(create, getHas, getget, getSet), 128,
271+
() -> new MapDb3BackedQueue<>(m));
272+
273+
} else {
274+
return delegate.createBindingSetQueue();
275+
}
276+
}
277+
262278
@Override
263279
public void close() throws RDF4JException {
264280
if (db != null) {

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java

Lines changed: 174 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,19 @@
1111
package org.eclipse.rdf4j.query.algebra.evaluation.iterator;
1212

1313
import java.util.HashSet;
14+
import java.util.Iterator;
15+
import java.util.List;
1416
import java.util.Queue;
1517
import java.util.Set;
18+
import java.util.function.BiConsumer;
19+
import java.util.function.Function;
20+
import java.util.function.Predicate;
1621

1722
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
1823
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
1924
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
2025
import org.eclipse.rdf4j.model.Value;
26+
import org.eclipse.rdf4j.query.Binding;
2127
import org.eclipse.rdf4j.query.BindingSet;
2228
import org.eclipse.rdf4j.query.MutableBindingSet;
2329
import org.eclipse.rdf4j.query.QueryEvaluationException;
@@ -29,9 +35,14 @@
2935
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
3036
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
3137
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
38+
import org.eclipse.rdf4j.query.impl.SimpleBinding;
3239

3340
public class PathIteration extends LookAheadIteration<BindingSet> {
3441

42+
// Should never be seen by code outside of this iterator
43+
private static final String END = "$end_from_path_iteration";
44+
private static final String START = "$start_from_path_iteration";
45+
3546
/**
3647
*
3748
*/
@@ -53,11 +64,11 @@ public class PathIteration extends LookAheadIteration<BindingSet> {
5364

5465
private final boolean endVarFixed;
5566

56-
private final Queue<ValuePair> valueQueue;
67+
private final Queue<BindingSet> valueQueue;
5768

58-
private final Set<ValuePair> reportedValues;
69+
private final Set<BindingSet> reportedValues;
5970

60-
private final Set<ValuePair> unreportedValues;
71+
private final Set<BindingSet> unreportedValues;
6172

6273
private final TupleExpr pathExpression;
6374

@@ -70,6 +81,10 @@ public class PathIteration extends LookAheadIteration<BindingSet> {
7081
private final Set<String> namedIntermediateJoins = new HashSet<>();
7182

7283
private final CollectionFactory collectionFactory;
84+
private static volatile int PATH_ITERATOR_ID_GENERATOR = 0;
85+
86+
private final int pathIteratorId = PATH_ITERATOR_ID_GENERATOR++;
87+
private final String endVarName = "END_" + JOINVAR_PREFIX + pathIteratorId;
7388

7489
public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar,
7590
TupleExpr pathExpression, Var endVar, Var contextVar, long minLength, BindingSet bindings)
@@ -89,13 +104,53 @@ public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar,
89104
this.bindings = bindings;
90105

91106
collectionFactory = strategy.getCollectionFactory().get();
92-
this.reportedValues = collectionFactory.createSet();
93-
this.unreportedValues = collectionFactory.createSet();
94-
this.valueQueue = collectionFactory.createQueue();
107+
108+
// This is all necessary for optimized collections to be usable. This only becomes important on very large
109+
// stores with large intermediary results.
110+
this.reportedValues = collectionFactory.createSetOfBindingSets(ValuePair::new, PathIteration::getHas,
111+
PathIteration::getGet, PathIteration::getSet);
112+
this.unreportedValues = collectionFactory.createSetOfBindingSets(ValuePair::new, PathIteration::getHas,
113+
PathIteration::getGet, PathIteration::getSet);
114+
this.valueQueue = collectionFactory.createBindingSetQueue(ValuePair::new, PathIteration::getHas,
115+
PathIteration::getGet, PathIteration::getSet);
95116

96117
createIteration();
97118
}
98119

120+
private static final BiConsumer<Value, MutableBindingSet> getSet(String s) {
121+
switch (s) {
122+
case START:
123+
return (v, vp) -> ((ValuePair) vp).startValue = v;
124+
case END:
125+
return (v, vp) -> ((ValuePair) vp).endValue = v;
126+
default:
127+
return (v, vp) -> {
128+
};
129+
}
130+
}
131+
132+
private static final Function<BindingSet, Value> getGet(String s) {
133+
switch (s) {
134+
case START:
135+
return (vp) -> ((ValuePair) vp).startValue;
136+
case END:
137+
return (vp) -> ((ValuePair) vp).endValue;
138+
default:
139+
return (vp) -> null;
140+
}
141+
};
142+
143+
private static final Predicate<BindingSet> getHas(String s) {
144+
switch (s) {
145+
case START:
146+
return (vp) -> ((ValuePair) vp).startValue != null;
147+
case END:
148+
return (vp) -> ((ValuePair) vp).endValue != null;
149+
default:
150+
return (vp) -> false;
151+
}
152+
};
153+
99154
@Override
100155
protected BindingSet getNextElement() throws QueryEvaluationException {
101156
again: while (true) {
@@ -183,11 +238,8 @@ protected BindingSet getNextElement() throws QueryEvaluationException {
183238
}
184239
}
185240

186-
// if we're done, throw away the cached lists of values to avoid
187-
// hogging resources
188-
reportedValues.clear();
189-
unreportedValues.clear();
190-
valueQueue.clear();
241+
// We are done but let the close deal with clearing up resources.
242+
// That method knows how to do it in the cheapest way possible.
191243
return null;
192244
}
193245
}
@@ -202,10 +254,10 @@ private ValuePair valuePairFromStartAndEnd(MutableBindingSet nextElement) {
202254

203255
if (startVarFixed && endVarFixed && currentLength > 2) {
204256
v1 = getVarValue(startVar, startVarFixed, nextElement);
205-
v2 = nextElement.getValue("END_" + JOINVAR_PREFIX + this.hashCode());
257+
v2 = nextElement.getValue(endVarName);
206258
} else if (startVarFixed && endVarFixed && currentLength == 2) {
207259
v1 = getVarValue(startVar, startVarFixed, nextElement);
208-
v2 = nextElement.getValue(JOINVAR_PREFIX + (currentLength - 1) + "_" + this.hashCode());
260+
v2 = nextElement.getValue(varNameAtPathLengthOf(currentLength - 1));
209261
} else {
210262
v1 = getVarValue(startVar, startVarFixed, nextElement);
211263
v2 = getVarValue(endVar, endVarFixed, nextElement);
@@ -229,15 +281,15 @@ protected void handleClose() throws QueryEvaluationException {
229281
* @param valueQueue2
230282
* @param vp
231283
*/
232-
protected boolean addToQueue(Queue<ValuePair> valueQueue2, ValuePair vp) throws QueryEvaluationException {
284+
protected boolean addToQueue(Queue<BindingSet> valueQueue2, ValuePair vp) throws QueryEvaluationException {
233285
return valueQueue2.add(vp);
234286
}
235287

236288
/**
237289
* @param valueSet
238290
* @param vp
239291
*/
240-
protected boolean add(Set<ValuePair> valueSet, ValuePair vp) throws QueryEvaluationException {
292+
protected boolean add(Set<BindingSet> valueSet, ValuePair vp) throws QueryEvaluationException {
241293
return valueSet.add(vp);
242294
}
243295

@@ -278,7 +330,7 @@ private void createIteration() throws QueryEvaluationException {
278330
TupleExpr pathExprClone = pathExpression.clone();
279331

280332
if (startVarFixed && endVarFixed) {
281-
String varName = JOINVAR_PREFIX + currentLength + "_" + this.hashCode();
333+
String varName = varNameAtPathLengthOf(currentLength);
282334
Var replacement = createAnonVar(varName, null, true);
283335

284336
VarReplacer replacer = new VarReplacer(endVar, replacement, 0, false);
@@ -288,7 +340,7 @@ private void createIteration() throws QueryEvaluationException {
288340
currentLength++;
289341
} else {
290342

291-
currentVp = valueQueue.poll();
343+
currentVp = (ValuePair) valueQueue.poll();
292344

293345
if (currentVp != null) {
294346

@@ -297,9 +349,9 @@ private void createIteration() throws QueryEvaluationException {
297349
if (startVarFixed && endVarFixed) {
298350

299351
Value v = currentVp.getEndValue();
300-
Var startReplacement = createAnonVar(JOINVAR_PREFIX + currentLength + "_" + this.hashCode(), v,
352+
Var startReplacement = createAnonVar(varNameAtPathLengthOf(currentLength), v,
301353
false);
302-
Var endReplacement = createAnonVar("END_" + JOINVAR_PREFIX + this.hashCode(), null, false);
354+
Var endReplacement = createAnonVar(endVarName, null, false);
303355

304356
VarReplacer replacer = new VarReplacer(startVar, startReplacement, 0, false);
305357
pathExprClone.visit(replacer);
@@ -317,7 +369,7 @@ private void createIteration() throws QueryEvaluationException {
317369
v = currentVp.getStartValue();
318370
}
319371

320-
String varName = JOINVAR_PREFIX + currentLength + "-" + this.hashCode();
372+
String varName = varNameAtPathLengthOf(currentLength);
321373
Var replacement = createAnonVar(varName, v, true);
322374

323375
VarReplacer replacer = new VarReplacer(toBeReplaced, replacement, 0, false);
@@ -333,6 +385,10 @@ private void createIteration() throws QueryEvaluationException {
333385
}
334386
}
335387

388+
private String varNameAtPathLengthOf(long atLength) {
389+
return JOINVAR_PREFIX + atLength + "_" + pathIteratorId;
390+
}
391+
336392
protected boolean isUnbound(Var var, BindingSet bindings) {
337393
if (var == null) {
338394
return false;
@@ -341,11 +397,16 @@ protected boolean isUnbound(Var var, BindingSet bindings) {
341397
}
342398
}
343399

344-
protected static class ValuePair {
400+
protected static class ValuePair implements MutableBindingSet {
401+
private static final long serialVersionUID = 1L;
402+
403+
private Value startValue;
404+
405+
private Value endValue;
345406

346-
private final Value startValue;
407+
public ValuePair() {
347408

348-
private final Value endValue;
409+
}
349410

350411
public ValuePair(Value startValue, Value endValue) {
351412
this.startValue = startValue;
@@ -403,6 +464,96 @@ public boolean equals(Object obj) {
403464
}
404465
return true;
405466
}
467+
468+
@Override
469+
public Iterator<Binding> iterator() {
470+
Binding sb = new SimpleBinding(START, startValue);
471+
Binding eb = new SimpleBinding(END, endValue);
472+
return List.of(sb, eb).iterator();
473+
}
474+
475+
@Override
476+
public Set<String> getBindingNames() {
477+
return Set.of(START, END);
478+
}
479+
480+
@Override
481+
public Binding getBinding(String bindingName) {
482+
switch (bindingName) {
483+
case START:
484+
return new SimpleBinding(START, startValue);
485+
case END:
486+
return new SimpleBinding(END, endValue);
487+
default:
488+
return null;
489+
}
490+
}
491+
492+
@Override
493+
public boolean hasBinding(String bindingName) {
494+
switch (bindingName) {
495+
case START:
496+
return true;
497+
case END:
498+
return false;
499+
default:
500+
return false;
501+
}
502+
}
503+
504+
@Override
505+
public Value getValue(String bindingName) {
506+
switch (bindingName) {
507+
case START:
508+
return startValue;
509+
case END:
510+
return endValue;
511+
default:
512+
return null;
513+
}
514+
}
515+
516+
@Override
517+
public int size() {
518+
return 2;
519+
}
520+
521+
@Override
522+
public void addBinding(Binding binding) {
523+
switch (binding.getName()) {
524+
case START:
525+
startValue = binding.getValue();
526+
break;
527+
case END:
528+
endValue = binding.getValue();
529+
break;
530+
}
531+
}
532+
533+
@Override
534+
public void setBinding(String name, Value value) {
535+
switch (name) {
536+
case START:
537+
startValue = value;
538+
break;
539+
case END:
540+
endValue = value;
541+
break;
542+
}
543+
544+
}
545+
546+
@Override
547+
public void setBinding(Binding binding) {
548+
switch (binding.getName()) {
549+
case START:
550+
startValue = binding.getValue();
551+
break;
552+
case END:
553+
endValue = binding.getValue();
554+
break;
555+
}
556+
}
406557
}
407558

408559
class VarReplacer extends AbstractQueryModelVisitor<QueryEvaluationException> {

0 commit comments

Comments
 (0)