Skip to content

Commit b23fc6f

Browse files
committed
working on new ID based join iterator
1 parent e30c859 commit b23fc6f

4 files changed

Lines changed: 220 additions & 51 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/join/LmdbIdMergeJoinIterator.java

Lines changed: 164 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -10,59 +10,106 @@
1010
*******************************************************************************/
1111
package org.eclipse.rdf4j.sail.lmdb.join;
1212

13-
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
14-
import org.eclipse.rdf4j.query.BindingSet;
15-
import org.eclipse.rdf4j.query.MutableBindingSet;
13+
import java.util.Arrays;
14+
import java.util.Collections;
15+
import java.util.HashSet;
16+
import java.util.Set;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
1619
import org.eclipse.rdf4j.query.QueryEvaluationException;
17-
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
20+
import org.eclipse.rdf4j.sail.lmdb.IdBindingInfo;
1821
import org.eclipse.rdf4j.sail.lmdb.RecordIterator;
19-
import org.eclipse.rdf4j.sail.lmdb.ValueStore;
22+
import org.eclipse.rdf4j.sail.lmdb.TripleStore;
2023
import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue;
2124

22-
public class LmdbIdMergeJoinIterator extends LookAheadIteration<BindingSet> {
25+
/**
26+
* Merge join iterator that operates entirely on LMDB ID records. Materialization to
27+
* {@link org.eclipse.rdf4j.query.BindingSet} happens in a separate {@link LmdbIdFinalBindingSetIteration}.
28+
*/
29+
public class LmdbIdMergeJoinIterator implements RecordIterator {
2330

2431
private final PeekMarkRecordIterator leftIterator;
2532
private final PeekMarkRecordIterator rightIterator;
26-
private final LmdbIdJoinIterator.PatternInfo leftInfo;
27-
private final LmdbIdJoinIterator.PatternInfo rightInfo;
2833
private final String mergeVariable;
29-
private final QueryEvaluationContext context;
30-
private final BindingSet initialBindings;
31-
private final ValueStore valueStore;
34+
private final IdBindingInfo bindingInfo;
35+
private final Set<String> sharedVariables;
36+
private final int mergeIndex;
37+
private final int bindingSize;
38+
private static final int COLUMN_COUNT = TripleStore.CONTEXT_IDX + 1;
3239

3340
private long[] currentLeftRecord;
34-
private MutableBindingSet currentLeftBinding;
3541
private long currentLeftKey;
3642
private boolean hasCurrentLeftKey;
3743

3844
private long[] leftPeekRecord;
3945
private long leftPeekKey;
4046
private boolean hasLeftPeekKey;
4147
private int currentLeftValueAndPeekEquals = -1;
48+
private boolean closed;
49+
private final int[] leftColumnBindingIndex;
50+
private final int[] rightColumnBindingIndex;
51+
private static final boolean DEBUG = Boolean.getBoolean("rdf4j.lmdb.mergeJoinDebug");
52+
private final AtomicInteger debugCounter = DEBUG ? new AtomicInteger() : null;
4253

4354
public LmdbIdMergeJoinIterator(RecordIterator leftIterator, RecordIterator rightIterator,
4455
LmdbIdJoinIterator.PatternInfo leftInfo, LmdbIdJoinIterator.PatternInfo rightInfo, String mergeVariable,
45-
QueryEvaluationContext context, BindingSet initialBindings, ValueStore valueStore) {
46-
this.leftIterator = new PeekMarkRecordIterator(leftIterator);
47-
this.rightIterator = new PeekMarkRecordIterator(rightIterator);
48-
this.leftInfo = leftInfo;
49-
this.rightInfo = rightInfo;
50-
this.mergeVariable = mergeVariable;
51-
this.context = context;
52-
this.initialBindings = initialBindings;
53-
this.valueStore = valueStore;
54-
56+
IdBindingInfo bindingInfo) {
5557
if (mergeVariable == null || mergeVariable.isEmpty()) {
5658
throw new IllegalArgumentException("Merge variable must be provided for LMDB merge join");
5759
}
60+
if (bindingInfo == null) {
61+
throw new IllegalArgumentException("Binding info must be provided for LMDB merge join");
62+
}
5863
if (leftInfo.getRecordIndex(mergeVariable) < 0 || rightInfo.getRecordIndex(mergeVariable) < 0) {
5964
throw new IllegalArgumentException("Merge variable " + mergeVariable
6065
+ " must be present in both join operands for LMDB merge join");
6166
}
67+
68+
this.leftIterator = new PeekMarkRecordIterator(leftIterator);
69+
this.rightIterator = new PeekMarkRecordIterator(rightIterator);
70+
this.mergeVariable = mergeVariable;
71+
this.bindingInfo = bindingInfo;
72+
73+
int idx = bindingInfo.getIndex(mergeVariable);
74+
if (idx < 0) {
75+
throw new IllegalArgumentException(
76+
"Merge variable " + mergeVariable + " is not tracked in the binding info for LMDB merge join");
77+
}
78+
this.mergeIndex = idx;
79+
this.bindingSize = bindingInfo.size();
80+
81+
Set<String> shared = new HashSet<>(leftInfo.getVariableNames());
82+
shared.retainAll(rightInfo.getVariableNames());
83+
this.sharedVariables = Collections.unmodifiableSet(shared);
84+
this.leftColumnBindingIndex = computeColumnBindingMap(leftInfo, bindingInfo);
85+
this.rightColumnBindingIndex = computeColumnBindingMap(rightInfo, bindingInfo);
86+
if (DEBUG) {
87+
System.out.println("DEBUG bindingSize=" + bindingSize + " mergeVar=" + mergeVariable + " shared="
88+
+ shared + " bindingVars=" + bindingInfo.getVariableNames());
89+
}
6290
}
6391

6492
@Override
65-
protected BindingSet getNextElement() throws QueryEvaluationException {
93+
public long[] next() {
94+
if (closed) {
95+
return null;
96+
}
97+
try {
98+
long[] nextRecord = computeNextElement();
99+
if (nextRecord == null) {
100+
close();
101+
}
102+
return nextRecord;
103+
} catch (RuntimeException e) {
104+
close();
105+
throw e;
106+
}
107+
}
108+
109+
private long[] computeNextElement() throws QueryEvaluationException {
110+
if (closed) {
111+
return null;
112+
}
66113
if (!ensureCurrentLeft()) {
67114
return null;
68115
}
@@ -84,9 +131,9 @@ protected BindingSet getNextElement() throws QueryEvaluationException {
84131
return null;
85132
}
86133

87-
int compare = compare(currentLeftKey, key(rightInfo, rightPeek));
134+
int compare = compare(currentLeftKey, key(rightPeek));
88135
if (compare == 0) {
89-
BindingSet result = equal();
136+
long[] result = equal();
90137
if (result != null) {
91138
return result;
92139
}
@@ -104,35 +151,39 @@ protected BindingSet getNextElement() throws QueryEvaluationException {
104151
}
105152
}
106153

107-
private boolean ensureCurrentLeft() throws QueryEvaluationException {
154+
private boolean ensureCurrentLeft() {
108155
if (currentLeftRecord != null) {
109156
return true;
110157
}
111158
return advanceLeft();
112159
}
113160

114-
private boolean advanceLeft() throws QueryEvaluationException {
161+
private boolean advanceLeft() {
115162
leftPeekRecord = null;
116163
hasLeftPeekKey = false;
117164
currentLeftValueAndPeekEquals = -1;
118165

119166
while (leftIterator.hasNext()) {
120167
long[] candidate = leftIterator.next();
121-
// Defer left materialization; keep only the ID record.
168+
if (candidate == null) {
169+
continue;
170+
}
171+
if (DEBUG && debugCounter.getAndIncrement() < 5) {
172+
System.out.println("DEBUG advanceLeft len=" + candidate.length + " record="
173+
+ Arrays.toString(candidate));
174+
}
122175
currentLeftRecord = candidate;
123-
currentLeftBinding = null;
124-
currentLeftKey = key(leftInfo, candidate);
176+
currentLeftKey = key(candidate);
125177
hasCurrentLeftKey = true;
126178
return true;
127179
}
128180

129181
currentLeftRecord = null;
130-
currentLeftBinding = null;
131182
hasCurrentLeftKey = false;
132183
return false;
133184
}
134185

135-
private boolean lessThan() throws QueryEvaluationException {
186+
private boolean lessThan() {
136187
long previous = hasCurrentLeftKey ? currentLeftKey : Long.MIN_VALUE;
137188
if (!advanceLeft()) {
138189
return false;
@@ -147,10 +198,10 @@ private boolean lessThan() throws QueryEvaluationException {
147198
return true;
148199
}
149200

150-
private BindingSet equal() throws QueryEvaluationException {
201+
private long[] equal() {
151202
while (rightIterator.hasNext()) {
152203
if (rightIterator.isResettable()) {
153-
BindingSet result = joinWithCurrentLeft(rightIterator.next());
204+
long[] result = joinWithCurrentLeft(rightIterator.next());
154205
if (result != null) {
155206
return result;
156207
}
@@ -159,7 +210,7 @@ private BindingSet equal() throws QueryEvaluationException {
159210
if (currentLeftValueAndPeekEquals == 0 && !rightIterator.isMarked()) {
160211
rightIterator.mark();
161212
}
162-
BindingSet result = joinWithCurrentLeft(rightIterator.next());
213+
long[] result = joinWithCurrentLeft(rightIterator.next());
163214
if (result != null) {
164215
return result;
165216
}
@@ -168,11 +219,11 @@ private BindingSet equal() throws QueryEvaluationException {
168219
return null;
169220
}
170221

171-
private void doLeftPeek() throws QueryEvaluationException {
222+
private void doLeftPeek() {
172223
if (leftPeekRecord == null) {
173224
leftPeekRecord = leftIterator.peek();
174225
if (leftPeekRecord != null) {
175-
leftPeekKey = key(leftInfo, leftPeekRecord);
226+
leftPeekKey = key(leftPeekRecord);
176227
hasLeftPeekKey = true;
177228
} else {
178229
hasLeftPeekKey = false;
@@ -186,23 +237,75 @@ private void doLeftPeek() throws QueryEvaluationException {
186237
}
187238
}
188239

189-
private BindingSet joinWithCurrentLeft(long[] rightRecord) throws QueryEvaluationException {
190-
MutableBindingSet result = context.createBindingSet(initialBindings);
191-
if (!leftInfo.applyRecord(currentLeftRecord, result, valueStore)) {
240+
private long[] joinWithCurrentLeft(long[] rightRecord) {
241+
if (rightRecord == null) {
192242
return null;
193243
}
194-
if (!rightInfo.applyRecord(rightRecord, result, valueStore)) {
244+
245+
long[] combined = new long[bindingSize];
246+
Arrays.fill(combined, LmdbValue.UNKNOWN_ID);
247+
248+
if (!mergeRecordInto(combined, currentLeftRecord, leftColumnBindingIndex)) {
195249
return null;
196250
}
197-
return result;
251+
if (!mergeRecordInto(combined, rightRecord, rightColumnBindingIndex)) {
252+
return null;
253+
}
254+
if (DEBUG && debugCounter.get() < 50) {
255+
System.out.println("DEBUG combined=" + Arrays.toString(combined));
256+
}
257+
return combined;
258+
}
259+
260+
private static int[] computeColumnBindingMap(LmdbIdJoinIterator.PatternInfo patternInfo, IdBindingInfo bindingInfo) {
261+
int[] map = new int[COLUMN_COUNT];
262+
Arrays.fill(map, -1);
263+
for (String var : patternInfo.getVariableNames()) {
264+
int bindingIdx = bindingInfo.getIndex(var);
265+
if (bindingIdx < 0) {
266+
continue;
267+
}
268+
int mask = patternInfo.getPositionsMask(var);
269+
for (int pos = 0; pos < COLUMN_COUNT; pos++) {
270+
if (((mask >> pos) & 1) != 0) {
271+
map[pos] = bindingIdx;
272+
}
273+
}
274+
}
275+
return map;
276+
}
277+
278+
private boolean mergeRecordInto(long[] target, long[] source, int[] colMap) {
279+
if (source == null) {
280+
return true;
281+
}
282+
for (int col = 0; col < colMap.length; col++) {
283+
int bindingIdx = colMap[col];
284+
if (bindingIdx < 0 || bindingIdx >= target.length) {
285+
continue;
286+
}
287+
long value = col < source.length ? source[col] : LmdbValue.UNKNOWN_ID;
288+
if (col == TripleStore.CONTEXT_IDX && value == 0L) {
289+
value = LmdbValue.UNKNOWN_ID;
290+
}
291+
long existing = target[bindingIdx];
292+
if (existing == LmdbValue.UNKNOWN_ID) {
293+
if (value != LmdbValue.UNKNOWN_ID) {
294+
target[bindingIdx] = value;
295+
}
296+
} else if (value != LmdbValue.UNKNOWN_ID && existing != value) {
297+
return false;
298+
}
299+
}
300+
return true;
198301
}
199302

200303
private int compare(long left, long right) {
201304
return Long.compare(left, right);
202305
}
203306

204-
private long key(LmdbIdJoinIterator.PatternInfo info, long[] record) throws QueryEvaluationException {
205-
long id = info.getId(record, mergeVariable);
307+
private long key(long[] record) {
308+
long id = valueAt(record, mergeIndex);
206309
if (id == LmdbValue.UNKNOWN_ID) {
207310
throw new QueryEvaluationException(
208311
"Merge variable " + mergeVariable + " is unbound in the current record; cannot perform merge join");
@@ -211,11 +314,27 @@ private long key(LmdbIdJoinIterator.PatternInfo info, long[] record) throws Quer
211314
}
212315

213316
@Override
214-
protected void handleClose() throws QueryEvaluationException {
317+
public void close() {
318+
if (closed) {
319+
return;
320+
}
321+
closed = true;
322+
currentLeftRecord = null;
323+
hasCurrentLeftKey = false;
324+
leftPeekRecord = null;
325+
hasLeftPeekKey = false;
326+
currentLeftValueAndPeekEquals = -1;
215327
try {
216328
leftIterator.close();
217329
} finally {
218330
rightIterator.close();
219331
}
220332
}
333+
334+
private static long valueAt(long[] record, int index) {
335+
if (index < 0 || index >= record.length) {
336+
return LmdbValue.UNKNOWN_ID;
337+
}
338+
return record[index];
339+
}
221340
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/join/LmdbIdMergeJoinQueryEvaluationStep.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import java.io.IOException;
1414
import java.util.Arrays;
15+
import java.util.Collections;
1516
import java.util.Optional;
1617

1718
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
@@ -151,8 +152,10 @@ public CloseableIteration<BindingSet> evaluate(BindingSet bindings) {
151152
}
152153

153154
join.setAlgorithm(LmdbIdMergeJoinIterator.class.getSimpleName());
154-
return new LmdbIdMergeJoinIterator(leftIterator, rightIterator, leftInfo, rightInfo, mergeVariable, context,
155-
bindings, valueStore);
155+
RecordIterator mergeIterator = new LmdbIdMergeJoinIterator(leftIterator, rightIterator, leftInfo, rightInfo,
156+
mergeVariable, bindingInfo);
157+
return new LmdbIdFinalBindingSetIteration(mergeIterator, bindingInfo, context, bindings, valueStore,
158+
Collections.emptyMap());
156159
} catch (QueryEvaluationException e) {
157160
throw e;
158161
}

core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/QueryBenchmarkTest.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void groupByQuery() {
156156
try (var stream = connection.prepareTupleQuery(query1).evaluate().stream()) {
157157
count = stream.count();
158158
}
159-
System.out.println(count);
159+
assertEquals(5, count);
160160
}
161161
}
162162

@@ -181,7 +181,7 @@ public void distinctPredicatesQuery() {
181181
try (var stream = connection.prepareTupleQuery(query_distinct_predicates).evaluate().stream()) {
182182
count = stream.count();
183183
}
184-
System.out.println(count);
184+
assertEquals(55, count);
185185
}
186186
}
187187

@@ -229,7 +229,21 @@ public void long_chain() {
229229
try (var stream = connection.prepareTupleQuery(long_chain).evaluate().stream()) {
230230
count = stream.count();
231231
}
232-
// assertEquals(???, count);
232+
assertEquals(0, count);
233+
}
234+
}
235+
236+
@Test
237+
@Timeout(30)
238+
public void long_chain_debugPreview() {
239+
try (SailRepositoryConnection connection = repository.getConnection()) {
240+
TupleQuery tupleQuery = connection.prepareTupleQuery(long_chain);
241+
try (TupleQueryResult result = tupleQuery.evaluate()) {
242+
for (int i = 0; result.hasNext() && i < 5; i++) {
243+
BindingSet bs = result.next();
244+
System.out.println("DEBUG long_chain #" + i + ": " + bs);
245+
}
246+
}
233247
}
234248
}
235249

0 commit comments

Comments
 (0)