Skip to content

Commit 353965c

Browse files
committed
working on new ID based join iterator
1 parent d09591a commit 353965c

8 files changed

Lines changed: 204 additions & 21 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/IdAccessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.Set;
1414

1515
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
16+
import org.eclipse.rdf4j.query.MutableBindingSet;
17+
import org.eclipse.rdf4j.query.QueryEvaluationException;
1618

1719
/**
1820
* Read-only accessor for variable ID lookups against a record long[] produced by a RecordIterator.
@@ -28,4 +30,7 @@ public interface IdAccessor {
2830
* should return {@code -1} when the variable is not part of the record.
2931
*/
3032
int getRecordIndex(String varName);
33+
34+
boolean applyRecord(long[] record, MutableBindingSet target, ValueStore valueStore)
35+
throws QueryEvaluationException;
3136
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/IdBindingInfo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public long getId(long[] record, String varName) {
101101
return record[i];
102102
}
103103

104+
@Override
104105
public boolean applyRecord(long[] record, MutableBindingSet target, ValueStore valueStore)
105106
throws QueryEvaluationException {
106107
// Fast path: pre-resolved getters/setters from the evaluation context

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/join/LmdbIdJoinIterator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ public long getId(long[] record, String varName) {
131131
return record[indices[0]];
132132
}
133133

134-
boolean applyRecord(long[] record, MutableBindingSet target, ValueStore valueStore)
134+
@Override
135+
public boolean applyRecord(long[] record, MutableBindingSet target, ValueStore valueStore)
135136
throws QueryEvaluationException {
136137
for (Map.Entry<String, int[]> entry : indexByVar.entrySet()) {
137138
String name = entry.getKey();
@@ -220,8 +221,8 @@ private Value resolveValue(long id, int position, ValueStore valueStore) throws
220221

221222
private final RecordIterator leftIterator;
222223
private final RecordIteratorFactory rightFactory;
223-
private final PatternInfo leftInfo;
224-
private final PatternInfo rightInfo;
224+
private final IdAccessor leftInfo;
225+
private final IdAccessor rightInfo;
225226
private final Set<String> sharedVariables;
226227
private final QueryEvaluationContext context;
227228
private final BindingSet initialBindings;
@@ -231,8 +232,8 @@ private Value resolveValue(long id, int position, ValueStore valueStore) throws
231232
private long[] currentLeftRecord;
232233
private BindingSet currentLeftBinding;
233234

234-
LmdbIdJoinIterator(RecordIterator leftIterator, RecordIteratorFactory rightFactory, PatternInfo leftInfo,
235-
PatternInfo rightInfo, Set<String> sharedVariables, QueryEvaluationContext context,
235+
LmdbIdJoinIterator(RecordIterator leftIterator, RecordIteratorFactory rightFactory, IdAccessor leftInfo,
236+
IdAccessor rightInfo, Set<String> sharedVariables, QueryEvaluationContext context,
236237
BindingSet initialBindings, ValueStore valueStore) {
237238
this.leftIterator = leftIterator;
238239
this.rightFactory = rightFactory;

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/join/LmdbIdJoinQueryEvaluationStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ public CloseableIteration<BindingSet> evaluate(BindingSet bindings) {
210210
return dataset.getRecordIterator(snapshot, subjIdx, predIdx, objIdx, ctxIdx, patternIds);
211211
};
212212

213-
return new LmdbIdJoinIterator(leftIterator, rightFactory, leftInfo, rightInfo, sharedVariables, context,
214-
bindings, valueStore);
213+
return new LmdbIdJoinIterator(leftIterator, rightFactory, leftInfo, bindingInfo, sharedVariables,
214+
context, bindings, valueStore);
215215
}
216216

217217
// Default: materialize right side via BindingSet and use LmdbIdJoinIterator

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/join/LmdbIdMergeJoinIterator.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public class LmdbIdMergeJoinIterator implements RecordIterator {
3333
private final String mergeVariable;
3434
private final IdBindingInfo bindingInfo;
3535
private final Set<String> sharedVariables;
36-
private final int mergeIndex;
3736
private final int bindingSize;
3837
private static final int COLUMN_COUNT = TripleStore.CONTEXT_IDX + 1;
3938

@@ -48,6 +47,8 @@ public class LmdbIdMergeJoinIterator implements RecordIterator {
4847
private boolean closed;
4948
private final int[] leftColumnBindingIndex;
5049
private final int[] rightColumnBindingIndex;
50+
private final int leftMergeColumn;
51+
private final int rightMergeColumn;
5152
private static final boolean DEBUG = Boolean.getBoolean("rdf4j.lmdb.mergeJoinDebug");
5253
private final AtomicInteger debugCounter = DEBUG ? new AtomicInteger() : null;
5354

@@ -65,6 +66,8 @@ public LmdbIdMergeJoinIterator(RecordIterator leftIterator, RecordIterator right
6566
+ " must be present in both join operands for LMDB merge join");
6667
}
6768

69+
this.leftMergeColumn = leftInfo.getRecordIndex(mergeVariable);
70+
this.rightMergeColumn = rightInfo.getRecordIndex(mergeVariable);
6871
this.leftIterator = new PeekMarkRecordIterator(leftIterator);
6972
this.rightIterator = new PeekMarkRecordIterator(rightIterator);
7073
this.mergeVariable = mergeVariable;
@@ -75,7 +78,6 @@ public LmdbIdMergeJoinIterator(RecordIterator leftIterator, RecordIterator right
7578
throw new IllegalArgumentException(
7679
"Merge variable " + mergeVariable + " is not tracked in the binding info for LMDB merge join");
7780
}
78-
this.mergeIndex = idx;
7981
this.bindingSize = bindingInfo.size();
8082

8183
Set<String> shared = new HashSet<>(leftInfo.getVariableNames());
@@ -131,7 +133,7 @@ private long[] computeNextElement() throws QueryEvaluationException {
131133
return null;
132134
}
133135

134-
int compare = compare(currentLeftKey, key(rightPeek));
136+
int compare = compare(currentLeftKey, key(rightPeek, rightMergeColumn));
135137
if (compare == 0) {
136138
long[] result = equal();
137139
if (result != null) {
@@ -173,7 +175,7 @@ private boolean advanceLeft() {
173175
+ Arrays.toString(candidate));
174176
}
175177
currentLeftRecord = candidate;
176-
currentLeftKey = key(candidate);
178+
currentLeftKey = key(candidate, leftMergeColumn);
177179
hasCurrentLeftKey = true;
178180
return true;
179181
}
@@ -223,7 +225,7 @@ private void doLeftPeek() {
223225
if (leftPeekRecord == null) {
224226
leftPeekRecord = leftIterator.peek();
225227
if (leftPeekRecord != null) {
226-
leftPeekKey = key(leftPeekRecord);
228+
leftPeekKey = key(leftPeekRecord, leftMergeColumn);
227229
hasLeftPeekKey = true;
228230
} else {
229231
hasLeftPeekKey = false;
@@ -257,7 +259,8 @@ private long[] joinWithCurrentLeft(long[] rightRecord) {
257259
return combined;
258260
}
259261

260-
private static int[] computeColumnBindingMap(LmdbIdJoinIterator.PatternInfo patternInfo, IdBindingInfo bindingInfo) {
262+
private static int[] computeColumnBindingMap(LmdbIdJoinIterator.PatternInfo patternInfo,
263+
IdBindingInfo bindingInfo) {
261264
int[] map = new int[COLUMN_COUNT];
262265
Arrays.fill(map, -1);
263266
for (String var : patternInfo.getVariableNames()) {
@@ -304,8 +307,8 @@ private int compare(long left, long right) {
304307
return Long.compare(left, right);
305308
}
306309

307-
private long key(long[] record) {
308-
long id = valueAt(record, mergeIndex);
310+
private long key(long[] record, int columnIndex) {
311+
long id = record[columnIndex];
309312
if (id == LmdbValue.UNKNOWN_ID) {
310313
throw new QueryEvaluationException(
311314
"Merge variable " + mergeVariable + " is unbound in the current record; cannot perform merge join");
@@ -331,10 +334,4 @@ public void close() {
331334
}
332335
}
333336

334-
private static long valueAt(long[] record, int index) {
335-
if (index < 0 || index >= record.length) {
336-
return LmdbValue.UNKNOWN_ID;
337-
}
338-
return record[index];
339-
}
340337
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
******************************************************************************/
11+
package org.eclipse.rdf4j.sail.lmdb.join;
12+
13+
import java.util.Arrays;
14+
15+
import org.eclipse.rdf4j.sail.lmdb.RecordIterator;
16+
17+
public final class LmdbIdOrderVerifier {
18+
19+
private LmdbIdOrderVerifier() {
20+
}
21+
22+
public static RecordIterator wrap(String label, int mergeIndex, RecordIterator delegate) {
23+
if (label == null || label.isEmpty()) {
24+
throw new IllegalArgumentException("Label must be provided");
25+
}
26+
if (mergeIndex < 0) {
27+
throw new IllegalArgumentException("Merge index must be non-negative");
28+
}
29+
return new RecordIterator() {
30+
private long position = 0;
31+
private long previous = Long.MIN_VALUE;
32+
private boolean first = true;
33+
34+
@Override
35+
public long[] next() {
36+
long[] record = delegate.next();
37+
if (record == null) {
38+
return null;
39+
}
40+
if (mergeIndex >= record.length) {
41+
throw new AssertionError(String.format(
42+
"%s iterator produced record #%d shorter than merge index %d: %s",
43+
label, position, mergeIndex, Arrays.toString(record)));
44+
}
45+
long current = record[mergeIndex];
46+
if (!first && current < previous) {
47+
throw new AssertionError(String.format(
48+
"%s iterator out of order at position %d: previous=%d current=%d (record=%s)",
49+
label, position, previous, current, Arrays.toString(record)));
50+
}
51+
first = false;
52+
previous = current;
53+
long[] copy = Arrays.copyOf(record, record.length);
54+
position++;
55+
return copy;
56+
}
57+
58+
@Override
59+
public void close() {
60+
delegate.close();
61+
}
62+
};
63+
}
64+
}

core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/LmdbIdJoinEvaluationTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.eclipse.rdf4j.model.IRI;
2727
import org.eclipse.rdf4j.model.ValueFactory;
2828
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
29+
import org.eclipse.rdf4j.model.vocabulary.RDF;
30+
import org.eclipse.rdf4j.model.vocabulary.RDFS;
2931
import org.eclipse.rdf4j.query.BindingSet;
3032
import org.eclipse.rdf4j.query.QueryEvaluationException;
3133
import org.eclipse.rdf4j.query.QueryLanguage;
@@ -42,6 +44,7 @@
4244
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
4345
import org.eclipse.rdf4j.query.explanation.Explanation;
4446
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
47+
import org.eclipse.rdf4j.query.impl.MapBindingSet;
4548
import org.eclipse.rdf4j.query.parser.ParsedTupleQuery;
4649
import org.eclipse.rdf4j.query.parser.QueryParserUtil;
4750
import org.eclipse.rdf4j.repository.RepositoryConnection;
@@ -247,6 +250,100 @@ public void mergeJoinUsesArrayDatasetApi(@TempDir Path tempDir) throws Exception
247250
}
248251
}
249252

253+
@Test
254+
public void mergeJoinRespectsQueryBindings(@TempDir Path tempDir) throws Exception {
255+
LmdbStore store = new LmdbStore(tempDir.toFile());
256+
SailRepository repository = new SailRepository(store);
257+
repository.init();
258+
259+
ValueFactory vf = SimpleValueFactory.getInstance();
260+
IRI painter = vf.createIRI(NS, "Painter");
261+
IRI painting = vf.createIRI(NS, "Painting");
262+
IRI paints = vf.createIRI(NS, "paints");
263+
IRI picasso = vf.createIRI(NS, "picasso");
264+
IRI guernica = vf.createIRI(NS, "guernica");
265+
266+
try (RepositoryConnection conn = repository.getConnection()) {
267+
conn.add(painter, RDF.TYPE, RDFS.CLASS);
268+
conn.add(painting, RDF.TYPE, RDFS.CLASS);
269+
conn.add(picasso, RDF.TYPE, painter);
270+
conn.add(guernica, RDF.TYPE, painting);
271+
conn.add(picasso, paints, guernica);
272+
}
273+
274+
String query = "SELECT ?X WHERE {\n" +
275+
" ?X a ?Y .\n" +
276+
" ?Y a <http://www.w3.org/2000/01/rdf-schema#Class> .\n" +
277+
"}";
278+
279+
ParsedTupleQuery parsed = QueryParserUtil.parseTupleQuery(QueryLanguage.SPARQL, query, null);
280+
TupleExpr tupleExpr = parsed.getTupleExpr();
281+
TupleExpr joinExpr = unwrap(tupleExpr);
282+
assertThat(joinExpr).isInstanceOf(Join.class);
283+
Join join = (Join) joinExpr;
284+
285+
StatementPattern left = (StatementPattern) join.getLeftArg();
286+
StatementPattern right = (StatementPattern) join.getRightArg();
287+
Var joinVar = left.getObjectVar();
288+
join.setOrder(joinVar);
289+
left.setOrder(joinVar);
290+
right.setOrder(right.getSubjectVar());
291+
join.setMergeJoin(true);
292+
293+
SailSource branch = store.getBackingStore().getExplicitSailSource();
294+
SailDataset dataset = branch.dataset(IsolationLevels.SNAPSHOT_READ);
295+
296+
try {
297+
SailDatasetTripleSource tripleSource = new SailDatasetTripleSource(repository.getValueFactory(), dataset);
298+
EvaluationStrategyFactory factory = store.getEvaluationStrategyFactory();
299+
EvaluationStrategy strategy = factory.createEvaluationStrategy(null, tripleSource,
300+
store.getBackingStore().getEvaluationStatistics());
301+
LmdbEvaluationDataset lmdbDataset = (LmdbEvaluationDataset) dataset;
302+
RecordingDataset recordingDataset = new RecordingDataset(lmdbDataset);
303+
QueryEvaluationContext context = new LmdbQueryEvaluationContext(null,
304+
tripleSource.getValueFactory(), tripleSource.getComparator(), recordingDataset,
305+
lmdbDataset.getValueStore());
306+
307+
QueryEvaluationStep step = strategy.precompile(join, context);
308+
309+
MapBindingSet bindings = new MapBindingSet(2);
310+
311+
try (CloseableIteration<BindingSet> iter = step.evaluate(bindings)) {
312+
List<? extends BindingSet> results = Iterations.asList(iter);
313+
assertThat(results).hasSize(2);
314+
assertThat(results).extracting(bs -> bs.getValue("X")).containsExactlyInAnyOrder(picasso, guernica);
315+
}
316+
317+
bindings.addBinding("Y", painter);
318+
try (CloseableIteration<BindingSet> iter = step.evaluate(bindings)) {
319+
List<? extends BindingSet> results = Iterations.asList(iter);
320+
assertThat(results).hasSize(1);
321+
assertThat(results.get(0).getValue("X")).isEqualTo(picasso);
322+
}
323+
324+
bindings.addBinding("Z", painting);
325+
try (CloseableIteration<BindingSet> iter = step.evaluate(bindings)) {
326+
List<? extends BindingSet> results = Iterations.asList(iter);
327+
assertThat(results).hasSize(1);
328+
assertThat(results.get(0).getValue("X")).isEqualTo(picasso);
329+
}
330+
331+
bindings.removeBinding("Y");
332+
try (CloseableIteration<BindingSet> iter = step.evaluate(bindings)) {
333+
List<? extends BindingSet> results = Iterations.asList(iter);
334+
assertThat(results).hasSize(2);
335+
assertThat(results).extracting(bs -> bs.getValue("X")).containsExactlyInAnyOrder(picasso, guernica);
336+
}
337+
338+
assertThat(recordingDataset.wasArrayOrderedApiUsed()).isTrue();
339+
assertThat(join.getAlgorithmName()).isEqualTo("LmdbIdMergeJoinIterator");
340+
} finally {
341+
dataset.close();
342+
branch.close();
343+
repository.shutDown();
344+
}
345+
}
346+
250347
@Test
251348
public void mergeJoinFallsBackWhenOrderUnsupported(@TempDir Path tempDir) throws Exception {
252349
LmdbStore store = new LmdbStore(tempDir.toFile());

core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/join/LmdbIdMergeJoinIteratorTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package org.eclipse.rdf4j.sail.lmdb.join;
1212

1313
import static org.assertj.core.api.Assertions.assertThat;
14+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1415

1516
import java.util.Arrays;
1617

@@ -91,6 +92,23 @@ void nextReturnsNullWhenNoMatches() throws Exception {
9192
}
9293
}
9394

95+
@Test
96+
void orderVerifierDetectsDisorder() {
97+
RecordIterator unordered = new ArrayRecordIterator(
98+
new long[] { 1L, LmdbValue.UNKNOWN_ID, LmdbValue.UNKNOWN_ID, 0L },
99+
new long[] { 0L, LmdbValue.UNKNOWN_ID, LmdbValue.UNKNOWN_ID, 0L });
100+
101+
RecordIterator verifying = LmdbIdOrderVerifier.wrap("left", 0, unordered);
102+
103+
assertThatThrownBy(() -> {
104+
while (verifying.next() != null) {
105+
// consume iterator
106+
}
107+
})
108+
.isInstanceOf(AssertionError.class)
109+
.hasMessageContaining("left");
110+
}
111+
94112
private static final class ArrayRecordIterator implements RecordIterator {
95113
private final long[][] data;
96114
private int index;

0 commit comments

Comments
 (0)