Skip to content

Commit 980cddd

Browse files
committed
working on new ID based join iterator
1 parent 2b5354d commit 980cddd

3 files changed

Lines changed: 113 additions & 53 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbDupRecordIterator.java

Lines changed: 83 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -54,26 +54,26 @@ RecordIterator get(long[] quadReuse, ByteBuffer minKeyBuf, ByteBuffer maxKeyBuf,
5454
/** Size in bytes of one (v3,v4) tuple. */
5555
private static final int DUP_PAIR_BYTES = Long.BYTES * 2;
5656

57-
private final Pool pool;
58-
private final DupIndex index;
59-
private final int dupDbi;
60-
private final long cursor;
57+
private final Pool pool = Pool.get();
58+
private DupIndex index;
59+
private int dupDbi;
60+
private long cursor;
6161

62-
private final Txn txnRef;
62+
private Txn txnRef;
6363
private long txn; // refreshed on txn version changes
6464
private long txnRefVersion;
65-
private final StampedLongAdderLockManager txnLockManager;
65+
private StampedLongAdderLockManager txnLockManager;
6666
private final Thread ownerThread = Thread.currentThread();
6767

68-
private final MDBVal keyData;
69-
private final MDBVal valueData;
68+
private MDBVal keyData;
69+
private MDBVal valueData;
7070

7171
/** Reused output buffer required by the RecordIterator API. */
72-
private final long[] quad;
72+
private long[] quad;
7373

7474
/** Scalars defining the prefix to scan (subject, predicate). */
75-
private final long prefixSubj;
76-
private final long prefixPred;
75+
private long prefixSubj;
76+
private long prefixPred;
7777

7878
private ByteBuffer prefixKeyBuf;
7979

@@ -83,33 +83,67 @@ RecordIterator get(long[] quadReuse, ByteBuffer minKeyBuf, ByteBuffer maxKeyBuf,
8383
private int dupLimit;
8484

8585
private int lastResult;
86-
private boolean closed = false;
86+
private boolean closed = true;
8787

88-
private final RecordIterator fallback;
89-
private final FallbackSupplier fallbackSupplier;
88+
private RecordIterator fallback;
89+
private FallbackSupplier fallbackSupplier;
9090

9191
LmdbDupRecordIterator(DupIndex index, long subj, long pred,
9292
boolean explicit, Txn txnRef, FallbackSupplier fallbackSupplier) throws IOException {
93+
initialize(index, subj, pred, explicit, txnRef, null, fallbackSupplier);
94+
}
95+
96+
LmdbDupRecordIterator(DupIndex index, long subj, long pred,
97+
boolean explicit, Txn txnRef, long[] quadReuse, FallbackSupplier fallbackSupplier) throws IOException {
98+
initialize(index, subj, pred, explicit, txnRef, quadReuse, fallbackSupplier);
99+
}
100+
101+
void initialize(DupIndex index, long subj, long pred, boolean explicit, Txn txnRef, long[] quadReuse,
102+
FallbackSupplier fallbackSupplier) throws IOException {
103+
if (!closed || fallback != null) {
104+
throw new IllegalStateException("Cannot initialize LMDB dup iterator while it is open");
105+
}
106+
93107
this.index = index;
108+
this.dupDbi = index.getDupDB(explicit);
109+
this.txnRef = txnRef;
110+
this.txnLockManager = txnRef.lockManager();
111+
this.fallbackSupplier = fallbackSupplier;
112+
this.fallback = null;
113+
114+
this.prefixSubj = subj;
115+
this.prefixPred = pred;
94116

95-
// Output buffer (s,p are constant for the life of this iterator)
96-
this.quad = new long[4];
117+
if (quadReuse != null && quadReuse.length >= 4) {
118+
this.quad = quadReuse;
119+
} else if (this.quad == null || this.quad.length < 4) {
120+
this.quad = new long[4];
121+
}
97122
this.quad[0] = subj;
98123
this.quad[1] = pred;
99124
this.quad[2] = -1L;
100125
this.quad[3] = -1L;
101126

102-
this.prefixSubj = subj;
103-
this.prefixPred = pred;
127+
if (this.keyData == null) {
128+
this.keyData = pool.getVal();
129+
}
130+
if (this.valueData == null) {
131+
this.valueData = pool.getVal();
132+
}
104133

105-
this.fallbackSupplier = fallbackSupplier;
134+
if (this.prefixKeyBuf == null) {
135+
this.prefixKeyBuf = pool.getKeyBuffer();
136+
}
137+
prefixKeyBuf.clear();
138+
Varint.writeUnsigned(prefixKeyBuf, prefixSubj);
139+
Varint.writeUnsigned(prefixKeyBuf, prefixPred);
140+
prefixKeyBuf.flip();
106141

107-
this.pool = Pool.get();
108-
this.keyData = pool.getVal();
109-
this.valueData = pool.getVal();
110-
this.dupDbi = index.getDupDB(explicit);
111-
this.txnRef = txnRef;
112-
this.txnLockManager = txnRef.lockManager();
142+
this.dupBuf = null;
143+
this.dupPos = 0;
144+
this.dupLimit = 0;
145+
this.lastResult = MDB_SUCCESS;
146+
this.closed = false;
113147

114148
RecordIterator fallbackIterator = null;
115149

@@ -125,13 +159,6 @@ RecordIterator get(long[] quadReuse, ByteBuffer minKeyBuf, ByteBuffer maxKeyBuf,
125159

126160
cursor = openCursor(txn, dupDbi, txnRef.isReadOnly());
127161

128-
prefixKeyBuf = pool.getKeyBuffer();
129-
prefixKeyBuf.clear();
130-
Varint.writeUnsigned(prefixKeyBuf, prefixSubj);
131-
Varint.writeUnsigned(prefixKeyBuf, prefixPred);
132-
// index.toDupKeyPrefix(prefixKeyBuf, subj, pred, 0, 0);
133-
prefixKeyBuf.flip();
134-
135162
boolean positioned = positionOnPrefix();
136163
if (positioned) {
137164
positioned = primeDuplicateBlock();
@@ -340,22 +367,35 @@ private void closeInternal(boolean maybeCalledAsync) {
340367
}
341368
try {
342369
if (!closed) {
343-
if (txnRef.isReadOnly()) {
344-
pool.freeCursor(dupDbi, index, cursor);
345-
} else {
346-
mdb_cursor_close(cursor);
370+
if (cursor != 0L && txnRef != null) {
371+
if (txnRef.isReadOnly()) {
372+
pool.freeCursor(dupDbi, index, cursor);
373+
} else {
374+
mdb_cursor_close(cursor);
375+
}
376+
}
377+
cursor = 0L;
378+
if (keyData != null) {
379+
pool.free(keyData);
380+
keyData = null;
381+
}
382+
if (valueData != null) {
383+
pool.free(valueData);
384+
valueData = null;
347385
}
348-
pool.free(keyData);
349-
pool.free(valueData);
350386
if (prefixKeyBuf != null) {
351387
pool.free(prefixKeyBuf);
388+
prefixKeyBuf = null;
352389
}
353390
}
354391
} finally {
355392
closed = true;
356393
if (writeLocked) {
357394
txnLockManager.unlockWrite(writeStamp);
358395
}
396+
txnRef = null;
397+
dupBuf = null;
398+
dupPos = dupLimit = 0;
359399
}
360400
}
361401
}
@@ -364,9 +404,14 @@ private void closeInternal(boolean maybeCalledAsync) {
364404
public void close() {
365405
if (fallback != null) {
366406
fallback.close();
407+
fallback = null;
408+
fallbackSupplier = null;
367409
} else {
368410
closeInternal(true);
411+
fallbackSupplier = null;
369412
}
413+
txnLockManager = null;
414+
txnRef = null;
370415
}
371416

372417
private long openCursor(long txn, int dbi, boolean tryReuse) throws IOException {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,9 +1377,8 @@ public RecordIterator getRecordIterator(StatementPattern pattern, BindingSet bin
13771377

13781378
ByteBuffer minKeyBuf = keyBuffers != null ? keyBuffers.minKey() : null;
13791379
ByteBuffer maxKeyBuf = keyBuffers != null ? keyBuffers.maxKey() : null;
1380-
LmdbRecordIterator reuse = (iteratorReuse instanceof LmdbRecordIterator)
1381-
? (LmdbRecordIterator) iteratorReuse
1382-
: null;
1380+
RecordIterator reuse = (iteratorReuse instanceof LmdbRecordIterator
1381+
|| iteratorReuse instanceof LmdbDupRecordIterator) ? iteratorReuse : null;
13831382
return tripleStore.getTriples(txn, subjID, predID, objID, contextID, explicit, minKeyBuf, maxKeyBuf,
13841383
null, reuse);
13851384
} catch (IOException e) {
@@ -1438,8 +1437,10 @@ public RecordIterator getRecordIterator(long[] binding, int subjIndex, int predI
14381437
BindingProjectingIterator projectingReuse = iteratorReuse instanceof BindingProjectingIterator
14391438
? (BindingProjectingIterator) iteratorReuse
14401439
: null;
1441-
LmdbRecordIterator baseReuse = projectingReuse != null ? projectingReuse.getBase()
1442-
: (iteratorReuse instanceof LmdbRecordIterator ? (LmdbRecordIterator) iteratorReuse : null);
1440+
RecordIterator baseReuse = projectingReuse != null ? projectingReuse.getReusableBase()
1441+
: (iteratorReuse instanceof LmdbRecordIterator || iteratorReuse instanceof LmdbDupRecordIterator
1442+
? iteratorReuse
1443+
: null);
14431444

14441445
RecordIterator raw = tripleStore.getTriples(txn, subjQuery, predQuery, objQuery, ctxQuery, explicit,
14451446
minKeyBuf, maxKeyBuf, quadReuse, baseReuse);
@@ -1476,8 +1477,11 @@ public void close() {
14761477

14771478
BindingProjectingIterator result = projectingReuse != null ? projectingReuse
14781479
: new BindingProjectingIterator();
1479-
result.configure(raw, raw instanceof LmdbRecordIterator ? (LmdbRecordIterator) raw : null, binding,
1480-
subjIndex, predIndex, objIndex, ctxIndex, reuse);
1480+
RecordIterator reusableBase = (raw instanceof LmdbRecordIterator
1481+
|| raw instanceof LmdbDupRecordIterator)
1482+
? raw
1483+
: null;
1484+
result.configure(raw, reusableBase, binding, subjIndex, predIndex, objIndex, ctxIndex, reuse);
14811485
return result;
14821486
} catch (IOException e) {
14831487
throw new QueryEvaluationException("Unable to create LMDB record iterator", e);
@@ -1486,19 +1490,19 @@ public void close() {
14861490

14871491
private final class BindingProjectingIterator implements RecordIterator {
14881492
private RecordIterator base;
1489-
private LmdbRecordIterator lmdbBase;
1493+
private RecordIterator reusableBase;
14901494
private long[] binding;
14911495
private int subjIndex;
14921496
private int predIndex;
14931497
private int objIndex;
14941498
private int ctxIndex;
14951499
private long[] scratch;
14961500

1497-
void configure(RecordIterator base, LmdbRecordIterator lmdbBase, long[] binding, int subjIndex,
1501+
void configure(RecordIterator base, RecordIterator reusableBase, long[] binding, int subjIndex,
14981502
int predIndex, int objIndex,
14991503
int ctxIndex, long[] bindingReuse) {
15001504
this.base = base;
1501-
this.lmdbBase = lmdbBase;
1505+
this.reusableBase = reusableBase;
15021506
this.binding = binding;
15031507
this.subjIndex = subjIndex;
15041508
this.predIndex = predIndex;
@@ -1515,8 +1519,8 @@ void configure(RecordIterator base, LmdbRecordIterator lmdbBase, long[] binding,
15151519
}
15161520
}
15171521

1518-
LmdbRecordIterator getBase() {
1519-
return lmdbBase;
1522+
RecordIterator getReusableBase() {
1523+
return reusableBase;
15201524
}
15211525

15221526
@Override
@@ -1582,6 +1586,7 @@ public void close() {
15821586
if (base != null) {
15831587
base.close();
15841588
base = null;
1589+
reusableBase = null;
15851590
}
15861591
}
15871592
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ public RecordIterator getTriples(Txn txn, long subj, long pred, long obj, long c
613613
}
614614

615615
public RecordIterator getTriples(Txn txn, long subj, long pred, long obj, long context, boolean explicit,
616-
ByteBuffer minKeyBuf, ByteBuffer maxKeyBuf, long[] quadReuse, LmdbRecordIterator iteratorReuse)
616+
ByteBuffer minKeyBuf, ByteBuffer maxKeyBuf, long[] quadReuse, RecordIterator iteratorReuse)
617617
throws IOException {
618618
TripleIndex index = getBestIndex(subj, pred, obj, context);
619619
// System.out.println("get triples: " + Arrays.asList(subj, pred, obj,context));
@@ -627,15 +627,25 @@ public RecordIterator getTriples(Txn txn, long subj, long pred, long obj, long c
627627
return new LmdbRecordIterator(index, null, doRangeSearch, subj, pred, obj, context, explicit, txn, quad,
628628
minBuf, maxBuf);
629629
};
630+
LmdbRecordIterator recordReuse = iteratorReuse instanceof LmdbRecordIterator
631+
? (LmdbRecordIterator) iteratorReuse
632+
: null;
633+
LmdbDupRecordIterator dupReuse = iteratorReuse instanceof LmdbDupRecordIterator
634+
? (LmdbDupRecordIterator) iteratorReuse
635+
: null;
630636
if (dupsortRead && subjectPredicateIndex != null && subj >= 0 && pred >= 0 && obj == -1 && context == -1) {
631637
assert context == -1 && obj == -1 : "subject-predicate index can only be used for (s,p,?,?) patterns";
632638
// Use SP dup iterator, but union with the standard iterator to guard against any edge cases
633639
// in SP storage/retrieval; de-duplicate at the record level.
634-
return new LmdbDupRecordIterator(subjectPredicateIndex, subj, pred, explicit, txn,
640+
if (dupReuse != null) {
641+
dupReuse.initialize(subjectPredicateIndex, subj, pred, explicit, txn, quadReuse, fallbackSupplier);
642+
return dupReuse;
643+
}
644+
return new LmdbDupRecordIterator(subjectPredicateIndex, subj, pred, explicit, txn, quadReuse,
635645
fallbackSupplier);
636646
}
637647
return getTriplesUsingIndex(txn, subj, pred, obj, context, explicit, index, doRangeSearch, fallbackSupplier,
638-
minKeyBuf, maxKeyBuf, quadReuse, iteratorReuse);
648+
minKeyBuf, maxKeyBuf, quadReuse, recordReuse);
639649
}
640650

641651
boolean hasTriples(boolean explicit) throws IOException {

0 commit comments

Comments
 (0)