Skip to content

Commit 9cb0c52

Browse files
committed
better handling of errors and resizing in LMDB
1 parent 6a618c5 commit 9cb0c52

9 files changed

Lines changed: 139 additions & 96 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
1515
import static org.lwjgl.util.lmdb.LMDB.MDB_SET;
1616
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
17+
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
1718
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
1819
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
1920
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_open;
@@ -24,6 +25,7 @@
2425
import java.nio.ByteBuffer;
2526
import java.util.concurrent.locks.StampedLock;
2627

28+
import org.eclipse.rdf4j.sail.SailException;
2729
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
2830
import org.lwjgl.PointerBuffer;
2931
import org.lwjgl.system.MemoryStack;
@@ -92,7 +94,7 @@ public long[] next() {
9294
try {
9395
if (txnRefVersion != txnRef.version()) {
9496
// cursor must be renewed
95-
mdb_cursor_renew(txn, cursor);
97+
E(mdb_cursor_renew(txn, cursor));
9698
if (fetchNext) {
9799
// cursor must be positioned on last item, reuse minKeyBuf if available
98100
if (minKeyBuf == null) {
@@ -102,10 +104,10 @@ public long[] next() {
102104
Varint.writeUnsigned(minKeyBuf, record[0]);
103105
minKeyBuf.flip();
104106
keyData.mv_data(minKeyBuf);
105-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET);
107+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET));
106108
if (lastResult != 0) {
107109
// use MDB_SET_RANGE if key was deleted
108-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
110+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
109111
}
110112
if (lastResult != 0) {
111113
closeInternal(false);
@@ -117,27 +119,29 @@ public long[] next() {
117119
}
118120

119121
if (fetchNext) {
120-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
122+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
121123
fetchNext = false;
122124
} else {
123125
if (minKeyBuf != null) {
124126
// set cursor to min key
125127
keyData.mv_data(minKeyBuf);
126-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
128+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
127129
} else {
128130
// set cursor to first item
129-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
131+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
130132
}
131133
}
132134

133-
while (lastResult == 0) {
135+
while (lastResult == MDB_SUCCESS) {
134136
record[0] = Varint.readUnsigned(keyData.mv_data());
135137
// fetch next value
136138
fetchNext = true;
137139
return record;
138140
}
139141
closeInternal(false);
140142
return null;
143+
} catch (IOException e) {
144+
throw new SailException(e);
141145
} finally {
142146
txnLock.unlockRead(stamp);
143147
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbRecordIterator.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
1616
import static org.lwjgl.util.lmdb.LMDB.MDB_SET;
1717
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
18+
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
1819
import static org.lwjgl.util.lmdb.LMDB.mdb_cmp;
1920
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
2021
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
@@ -25,6 +26,7 @@
2526
import java.nio.ByteBuffer;
2627
import java.util.concurrent.locks.StampedLock;
2728

29+
import org.eclipse.rdf4j.sail.SailException;
2830
import org.eclipse.rdf4j.sail.lmdb.TripleStore.TripleIndex;
2931
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
3032
import org.eclipse.rdf4j.sail.lmdb.Varint.GroupMatcher;
@@ -136,10 +138,10 @@ public long[] next() {
136138
index.toKey(minKeyBuf, quad[0], quad[1], quad[2], quad[3]);
137139
minKeyBuf.flip();
138140
keyData.mv_data(minKeyBuf);
139-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET);
141+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET));
140142
if (lastResult != 0) {
141143
// use MDB_SET_RANGE if key was deleted
142-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
144+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
143145
}
144146
if (lastResult != 0) {
145147
closeInternal(false);
@@ -151,26 +153,26 @@ public long[] next() {
151153
}
152154

153155
if (fetchNext) {
154-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
156+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
155157
fetchNext = false;
156158
} else {
157159
if (minKeyBuf != null) {
158160
// set cursor to min key
159161
keyData.mv_data(minKeyBuf);
160-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
162+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
161163
} else {
162164
// set cursor to first item
163-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
165+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
164166
}
165167
}
166168

167-
while (lastResult == 0) {
169+
while (lastResult == MDB_SUCCESS) {
168170
// if (maxKey != null && TripleStore.COMPARATOR.compare(keyData.mv_data(), maxKey.mv_data()) > 0) {
169171
if (maxKey != null && mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
170172
lastResult = MDB_NOTFOUND;
171173
} else if (groupMatcher != null && !groupMatcher.matches(keyData.mv_data())) {
172174
// value doesn't match search key/mask, fetch next value
173-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
175+
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
174176
} else {
175177
// Matching value found
176178
index.keyToQuad(keyData.mv_data(), quad);
@@ -181,6 +183,8 @@ public long[] next() {
181183
}
182184
closeInternal(false);
183185
return null;
186+
} catch (IOException e) {
187+
throw new SailException(e);
184188
} finally {
185189
txnLock.unlockRead(stamp);
186190
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts)
596596
if (tripleStoreException != null) {
597597
throw wrapTripleStoreException();
598598
}
599+
Thread.yield();
599600
}
600601

601602
} else {
@@ -704,7 +705,7 @@ private void startTransaction(boolean preferThreading) throws SailException {
704705
} else {
705706
tripleStore.startTransaction();
706707
}
707-
valueStore.startTransaction();
708+
valueStore.startTransaction(true);
708709
} catch (Exception e) {
709710
storeTxnStarted.set(false);
710711
throw new SailException(e);

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ final class LmdbUtil {
5454
* Percentage free space in an LMDB db before automatically resizing the map. Default is 80%.
5555
*/
5656
@SuppressWarnings("StaticNonFinalField")
57-
public static int PERCENTAGE_FULL_TRIGGERS_RESIZE = 80;
57+
public static int PERCENTAGE_FULL_TRIGGERS_RESIZE = 80;
5858

5959
private LmdbUtil() {
6060
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ private synchronized boolean update(Object element, boolean add) throws IOExcept
126126
keyVal.mv_data(keyBuf);
127127

128128
if (add) {
129-
if (mdb_put(factory.writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE) == MDB_SUCCESS) {
129+
if (E(mdb_put(factory.writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE)) == MDB_SUCCESS) {
130130
size++;
131131
return true;
132132
}
@@ -226,16 +226,16 @@ private T computeNext() throws IOException {
226226

227227
try (MemoryStack stack = MemoryStack.stackPush()) {
228228
keyData.mv_data(stack.bytes(write(current)));
229-
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET) != 0) {
229+
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET) != MDB_SUCCESS) {
230230
// use MDB_SET_RANGE if key was deleted
231-
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE) == 0) {
231+
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE) == MDB_SUCCESS) {
232232
return read(keyData.mv_data());
233233
}
234234
}
235235
}
236236
}
237237

238-
if (mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT) == 0) {
238+
if (mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT) == MDB_SUCCESS) {
239239
return read(keyData.mv_data());
240240
}
241241
close();

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
561561
keyBuf.clear();
562562
Varint.writeUnsigned(keyBuf, id);
563563
keyData.mv_data(keyBuf.flip());
564-
if (mdb_get(txn, contextsDbi, keyData, valueData) == 0) {
564+
if (E(mdb_get(txn, contextsDbi, keyData, valueData)) == MDB_SUCCESS) {
565565
it.remove();
566566
}
567567
}
@@ -587,15 +587,15 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
587587

588588
if (fullScan) {
589589
long[] quad = new long[4];
590-
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST);
591-
while (rc == 0 && !ids.isEmpty()) {
590+
int rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST));
591+
while (rc == MDB_SUCCESS && !ids.isEmpty()) {
592592
index.keyToQuad(keyData.mv_data(), quad);
593593
ids.remove(quad[0]);
594594
ids.remove(quad[1]);
595595
ids.remove(quad[2]);
596596
ids.remove(quad[3]);
597597

598-
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
598+
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
599599
}
600600
} else {
601601
for (Iterator<Long> it = ids.iterator(); it.hasNext();) {
@@ -625,15 +625,15 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
625625

626626
// set cursor to min key
627627
keyData.mv_data(keyBuf);
628-
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
628+
int rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
629629
boolean exists = false;
630630
while (!exists && rc == 0) {
631631
if (mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
632632
// id was not found
633633
break;
634634
} else if (!matcher.matches(keyData.mv_data())) {
635635
// value doesn't match search key/mask, fetch next value
636-
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
636+
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
637637
} else {
638638
exists = true;
639639
}
@@ -708,7 +708,7 @@ protected double cardinality(long subj, long pred, long obj, long context) throw
708708

709709
// set cursor to min key
710710
keyData.mv_data(keyBuf);
711-
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
711+
int rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
712712
if (rc != 0 || mdb_cmp(txn, dbi, keyData, maxKey) >= 0) {
713713
break;
714714
} else {
@@ -717,13 +717,13 @@ protected double cardinality(long subj, long pred, long obj, long context) throw
717717

718718
// set cursor to max key
719719
keyData.mv_data(maxKeyBuf);
720-
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
720+
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
721721
if (rc != 0) {
722722
// directly go to last value
723-
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_LAST);
723+
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_LAST));
724724
} else {
725725
// go to previous value of selected key
726-
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_PREV);
726+
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_PREV));
727727
}
728728
if (rc == 0) {
729729
Varint.readListUnsigned(keyData.mv_data(), s.maxValues);
@@ -747,8 +747,8 @@ protected double cardinality(long subj, long pred, long obj, long context) throw
747747
keyData.mv_data(keyBuf);
748748

749749
int currentSamplesCount = 0;
750-
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
751-
while (rc == 0 && currentSamplesCount < s.MAX_SAMPLES_PER_BUCKET) {
750+
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
751+
while (rc == MDB_SUCCESS && currentSamplesCount < s.MAX_SAMPLES_PER_BUCKET) {
752752
if (mdb_cmp(txn, dbi, keyData, maxKey) >= 0) {
753753
endOfRange = true;
754754
break;
@@ -776,7 +776,7 @@ protected double cardinality(long subj, long pred, long obj, long context) throw
776776
}
777777
}
778778
}
779-
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
779+
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
780780
if (rc != 0) {
781781
// no more elements are available
782782
endOfRange = true;
@@ -862,6 +862,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
862862
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
863863
}
864864
}
865+
865866
if (recordCache != null) {
866867
long quad[] = new long[] { subj, pred, obj, context };
867868
if (explicit) {
@@ -872,7 +873,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
872873
return recordCache.storeRecord(quad, explicit);
873874
}
874875

875-
int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE);
876+
int rc = E(mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE));
876877
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
877878
throw new IOException(mdb_strerror(rc));
878879
}
@@ -884,6 +885,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
884885

885886
if (stAdded) {
886887
for (int i = 1; i < indexes.size(); i++) {
888+
887889
TripleIndex index = indexes.get(i);
888890
keyBuf.clear();
889891
index.toKey(keyBuf, subj, pred, obj, context);
@@ -918,7 +920,7 @@ private void incrementContext(MemoryStack stack, long context) throws IOExceptio
918920
idVal.mv_data(bb);
919921
MDBVal dataVal = MDBVal.calloc(stack);
920922
long newCount = 1;
921-
if (mdb_get(writeTxn, contextsDbi, idVal, dataVal) == 0) {
923+
if (E(mdb_get(writeTxn, contextsDbi, idVal, dataVal)) == MDB_SUCCESS) {
922924
// update count
923925
newCount = Varint.readUnsigned(dataVal.mv_data()) + 1;
924926
}
@@ -942,7 +944,7 @@ private boolean decrementContext(MemoryStack stack, long context) throws IOExcep
942944
bb.flip();
943945
idVal.mv_data(bb);
944946
MDBVal dataVal = MDBVal.calloc(stack);
945-
if (mdb_get(writeTxn, contextsDbi, idVal, dataVal) == 0) {
947+
if (E(mdb_get(writeTxn, contextsDbi, idVal, dataVal)) == MDB_SUCCESS) {
946948
// update count
947949
long newCount = Varint.readUnsigned(dataVal.mv_data()) - 1;
948950
if (newCount <= 0) {
@@ -953,7 +955,7 @@ private boolean decrementContext(MemoryStack stack, long context) throws IOExcep
953955
ByteBuffer countBb = stack.malloc(Varint.calcLengthUnsigned(newCount));
954956
Varint.writeUnsigned(countBb, newCount);
955957
dataVal.mv_data(countBb.flip());
956-
mdb_put(writeTxn, contextsDbi, idVal, dataVal, 0);
958+
E(mdb_put(writeTxn, contextsDbi, idVal, dataVal, 0));
957959
}
958960
}
959961
return false;
@@ -980,7 +982,7 @@ public void removeTriplesByContext(long subj, long pred, long obj, long context,
980982
}
981983

982984
public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]> handler) throws IOException {
983-
try (MemoryStack stack = MemoryStack.stackPush()) {
985+
try (it; MemoryStack stack = MemoryStack.stackPush()) {
984986
MDBVal keyValue = MDBVal.callocStack(stack);
985987
ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH);
986988

@@ -1013,8 +1015,6 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
10131015
decrementContext(stack, quad[CONTEXT_IDX]);
10141016
handler.accept(quad);
10151017
}
1016-
} finally {
1017-
it.close();
10181018
}
10191019
}
10201020

0 commit comments

Comments
 (0)