Skip to content

Commit 9404058

Browse files
committed
GH-4950 LMDB: fix transaction handling for index creation and reindexing
1 parent f137a04 commit 9404058

6 files changed

Lines changed: 116 additions & 107 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,18 @@
2121
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
2222
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
2323
import static org.lwjgl.util.lmdb.LMDB.mdb_dbi_open;
24-
import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare;
2524
import static org.lwjgl.util.lmdb.LMDB.mdb_strerror;
2625
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
2726
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
2827
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;
2928

3029
import java.io.IOException;
31-
import java.nio.ByteBuffer;
3230
import java.nio.IntBuffer;
33-
import java.util.Comparator;
3431

3532
import org.lwjgl.PointerBuffer;
3633
import org.lwjgl.system.MemoryStack;
3734
import org.lwjgl.system.MemoryUtil;
3835
import org.lwjgl.system.Pointer;
39-
import org.lwjgl.util.lmdb.MDBCmpFuncI;
40-
import org.lwjgl.util.lmdb.MDBVal;
4136
import org.slf4j.Logger;
4237
import org.slf4j.LoggerFactory;
4338

@@ -121,22 +116,16 @@ static <T> T transaction(long env, Transaction<T> transaction) throws IOExceptio
121116
return ret;
122117
}
123118

124-
static int openDatabase(long env, String name, int flags, Comparator<ByteBuffer> comparator) throws IOException {
125-
return transaction(env, (stack, txn) -> {
126-
IntBuffer ip = stack.mallocInt(1);
119+
static int openDatabase(long env, String name, int flags) throws IOException {
120+
return transaction(env, (stack, txn) -> openDatabaseWithTxn(txn, name, flags));
121+
}
127122

123+
static int openDatabaseWithTxn(long txn, String name, int flags) throws IOException {
124+
try (MemoryStack stack = stackPush()) {
125+
IntBuffer ip = stack.mallocInt(1);
128126
E(mdb_dbi_open(txn, name, flags, ip));
129-
int dbi = ip.get(0);
130-
if (comparator != null) {
131-
MDBCmpFuncI cmp = (a, b) -> {
132-
MDBVal aVal = MDBVal.create(a);
133-
MDBVal bVal = MDBVal.create(b);
134-
return comparator.compare(aVal.mv_data(), bVal.mv_data());
135-
};
136-
mdb_set_compare(txn, dbi, cmp);
137-
}
138-
return dbi;
139-
});
127+
return ip.get(0);
128+
}
140129
}
141130

142131
/**
@@ -173,7 +162,6 @@ static boolean requiresResize(long mapSize, long pageSize, long txn, long requir
173162
if (percentageUsed > PERCENTAGE_FULL_TRIGGERS_RESIZE) {
174163
return true;
175164
}
176-
177165
return mapSize - nextPageNo * pageSize < Math.max(requiredSize, MIN_FREE_SPACE);
178166
}
179167

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSetFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class PersistentSetFactory<T extends Serializable> {
7474

7575
dbDir = Files.createTempDirectory(cacheDir.toPath(), "set");
7676
E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664));
77-
this.defaultDbi = openDatabase(env, null, MDB_CREATE, null);
77+
this.defaultDbi = openDatabase(env, null, MDB_CREATE);
7878

7979
MDBStat stat = MDBStat.malloc(stack);
8080
readTransaction(env, (stack2, txn) -> {
@@ -132,7 +132,7 @@ void ensureResize() throws IOException, InterruptedException {
132132

133133
PersistentSet<T> createSet(String name, Function<T, byte[]> writeFunc, Function<ByteBuffer, T> readFunc)
134134
throws IOException {
135-
int dbi = openDatabase(env, name, MDB_CREATE, null);
135+
int dbi = openDatabase(env, name, MDB_CREATE);
136136
return new PersistentSet<>(this, dbi) {
137137
@Override
138138
protected byte[] write(T element) throws IOException {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 97 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
package org.eclipse.rdf4j.sail.lmdb;
1313

1414
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
15-
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabase;
15+
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabaseWithTxn;
1616
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction;
1717
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction;
1818
import static org.eclipse.rdf4j.sail.lmdb.Varint.readQuadUnsigned;
@@ -200,38 +200,50 @@ class TripleStore implements Closeable {
200200

201201
txnManager = new TxnManager(env, Mode.RESET);
202202

203-
String indexSpecStr = config.getTripleIndexes();
204-
if (!properties.isLoaded()) {
205-
// newly created lmdb store
206-
Set<String> indexSpecs = parseIndexSpecList(indexSpecStr);
203+
try {
204+
String indexSpecStr = config.getTripleIndexes();
205+
if (!properties.isLoaded()) {
206+
// newly created lmdb store
207+
Set<String> indexSpecs = parseIndexSpecList(indexSpecStr);
208+
209+
if (indexSpecs.isEmpty()) {
210+
logger.debug("No indexes specified, using default indexes: {}", DEFAULT_INDEXES);
211+
indexSpecStr = DEFAULT_INDEXES;
212+
indexSpecs = parseIndexSpecList(indexSpecStr);
213+
}
207214

208-
if (indexSpecs.isEmpty()) {
209-
logger.debug("No indexes specified, using default indexes: {}", DEFAULT_INDEXES);
210-
indexSpecStr = DEFAULT_INDEXES;
211-
indexSpecs = parseIndexSpecList(indexSpecStr);
215+
startTransaction();
216+
initIndexes(indexSpecs);
217+
endTransaction(true);
218+
initializePageAndMapSize(config.getTripleDBSize());
219+
} else {
220+
// Initialize existing indexes
221+
Set<String> indexSpecs = getIndexSpecs();
222+
startTransaction();
223+
initIndexes(indexSpecs);
224+
endTransaction(true);
225+
initializePageAndMapSize(config.getTripleDBSize());
226+
227+
// Compare the existing indexes with the requested indexes
228+
Set<String> reqIndexSpecs = parseIndexSpecList(indexSpecStr);
229+
if (reqIndexSpecs.isEmpty()) {
230+
// No indexes specified, use the existing ones
231+
indexSpecStr = properties.getTripleIndexes();
232+
} else if (!reqIndexSpecs.equals(indexSpecs)) {
233+
// Set of indexes needs to be changed
234+
startTransaction();
235+
reindex(indexSpecs, reqIndexSpecs);
236+
endTransaction(true);
237+
}
212238
}
213239

214-
initIndexes(indexSpecs, config.getTripleDBSize());
215-
} else {
216-
// Initialize existing indexes
217-
Set<String> indexSpecs = getIndexSpecs();
218-
initIndexes(indexSpecs, config.getTripleDBSize());
219-
220-
// Compare the existing indexes with the requested indexes
221-
Set<String> reqIndexSpecs = parseIndexSpecList(indexSpecStr);
222-
223-
if (reqIndexSpecs.isEmpty()) {
224-
// No indexes specified, use the existing ones
225-
indexSpecStr = properties.getTripleIndexes();
226-
} else if (!reqIndexSpecs.equals(indexSpecs)) {
227-
// Set of indexes needs to be changed
228-
reindex(indexSpecs, reqIndexSpecs);
240+
if (!indexSpecStr.equals(properties.getTripleIndexes())) {
241+
// Store up-to-date properties
242+
properties.setTripleIndexes(indexSpecStr);
229243
}
230-
}
231-
232-
if (!indexSpecStr.equals(properties.getTripleIndexes())) {
233-
// Store up-to-date properties
234-
properties.setTripleIndexes(indexSpecStr);
244+
} catch (IOException | SailException e) {
245+
endTransaction(false);
246+
throw e;
235247
}
236248
}
237249

@@ -284,16 +296,18 @@ private Set<String> parseIndexSpecList(String indexSpecStr) throws SailException
284296
return indexes;
285297
}
286298

287-
private void initIndexes(Set<String> indexSpecs, long tripleDbSize) throws IOException {
299+
private void initIndexes(Set<String> indexSpecs) throws IOException {
288300
for (String fieldSeq : indexSpecs) {
289301
logger.trace("Initializing index '{}'...", fieldSeq);
290302
indexes.add(new TripleIndex(fieldSeq));
291303
}
304+
}
292305

306+
private void initializePageAndMapSize(long tripleDbSize) throws IOException {
293307
// initialize page size and set map size for env
294308
readTransaction(env, (stack, txn) -> {
295309
MDBStat stat = MDBStat.malloc(stack);
296-
TripleIndex mainIndex = indexes.get(0);
310+
TripleIndex mainIndex = indexes.getFirst();
297311
mdb_stat(txn, mainIndex.getDB(true), stat);
298312

299313
boolean isEmpty = stat.ms_entries() == 0;
@@ -330,7 +344,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
330344
if (!addedIndexSpecs.isEmpty()) {
331345
TripleIndex sourceIndex = indexes.get(0);
332346
for (boolean explicit : new boolean[] { true, false }) {
333-
transaction(env, (stack, txn) -> {
347+
try (MemoryStack stack = stackPush()) {
334348
MDBVal keyValue = MDBVal.callocStack(stack);
335349
ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH);
336350
keyValue.mv_data(keyBuf);
@@ -342,7 +356,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
342356
RecordIterator[] sourceIter = { null };
343357
try {
344358
sourceIter[0] = new LmdbRecordIterator(sourceIndex, false, -1, -1, -1, -1,
345-
explicit, txnManager.createTxn(txn));
359+
explicit, txnManager.createReadTxn());
346360

347361
RecordIterator it = sourceIter[0];
348362
long[] quad;
@@ -352,7 +366,34 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
352366
quad[CONTEXT_IDX]);
353367
keyBuf.flip();
354368

355-
E(mdb_put(txn, addedIndex.getDB(explicit), keyValue, dataValue, 0));
369+
if (requiresResize()) {
370+
endTransaction(true);
371+
372+
// the lock is just a safety measure if reindex is somehow called outside of the
373+
// constructor
374+
StampedLongAdderLockManager lockManager = txnManager.lockManager();
375+
long readStamp;
376+
try {
377+
readStamp = lockManager.readLock();
378+
} catch (InterruptedException e) {
379+
throw new SailException(e);
380+
}
381+
try {
382+
txnManager.deactivate();
383+
mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0);
384+
E(mdb_env_set_mapsize(env, mapSize));
385+
logger.debug("resized map to {}", mapSize);
386+
} finally {
387+
try {
388+
txnManager.activate();
389+
} finally {
390+
lockManager.unlockRead(readStamp);
391+
}
392+
}
393+
startTransaction();
394+
}
395+
396+
E(mdb_put(writeTxn, addedIndex.getDB(explicit), keyValue, dataValue, 0));
356397
}
357398
} finally {
358399
if (sourceIter[0] != null) {
@@ -362,9 +403,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
362403

363404
currentIndexes.put(fieldSeq, addedIndex);
364405
}
365-
366-
return null;
367-
});
406+
}
368407
}
369408

370409
logger.debug("New index(es) initialized");
@@ -375,19 +414,16 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
375414
removedIndexSpecs.removeAll(newIndexSpecs);
376415

377416
List<Throwable> removedIndexExceptions = new ArrayList<>();
378-
transaction(env, (stack, txn) -> {
379-
// Delete files for removed indexes
380-
for (String fieldSeq : removedIndexSpecs) {
381-
try {
382-
TripleIndex removedIndex = currentIndexes.remove(fieldSeq);
383-
removedIndex.destroy(txn);
384-
logger.debug("Deleted file(s) for removed {} index", fieldSeq);
385-
} catch (Throwable e) {
386-
removedIndexExceptions.add(e);
387-
}
417+
// Delete files for removed indexes
418+
for (String fieldSeq : removedIndexSpecs) {
419+
try {
420+
TripleIndex removedIndex = currentIndexes.remove(fieldSeq);
421+
removedIndex.destroy(writeTxn);
422+
logger.debug("Deleted file(s) for removed {} index", fieldSeq);
423+
} catch (Throwable e) {
424+
removedIndexExceptions.add(e);
388425
}
389-
return null;
390-
});
426+
}
391427

392428
if (!removedIndexExceptions.isEmpty()) {
393429
throw new IOException(removedIndexExceptions.get(0));
@@ -849,7 +885,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
849885
}
850886

851887
if (recordCache != null) {
852-
long quad[] = new long[] { subj, pred, obj, context };
888+
long[] quad = new long[] { subj, pred, obj, context };
853889
if (explicit) {
854890
// remove implicit statement
855891
recordCache.removeRecord(quad, false);
@@ -870,7 +906,6 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
870906

871907
if (stAdded) {
872908
for (int i = 1; i < indexes.size(); i++) {
873-
874909
TripleIndex index = indexes.get(i);
875910
keyBuf.clear();
876911
index.toKey(keyBuf, subj, pred, obj, context);
@@ -885,9 +920,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
885920
E(mdb_put(writeTxn, index.getDB(explicit), keyVal, dataVal, 0));
886921
}
887922

888-
if (stAdded) {
889-
incrementContext(stack, context);
890-
}
923+
incrementContext(stack, context);
891924
}
892925
}
893926

@@ -1143,8 +1176,8 @@ public TripleIndex(String fieldSeq) throws IOException {
11431176
this.matcherFactory = IndexKeyWriters.matcherFactory(fieldSeq);
11441177
this.indexMap = getIndexes(this.fieldSeq);
11451178
// open database and use native sort order without comparator
1146-
dbiExplicit = openDatabase(env, fieldSeq, MDB_CREATE, null);
1147-
dbiInferred = openDatabase(env, fieldSeq + "-inf", MDB_CREATE, null);
1179+
dbiExplicit = openDatabaseWithTxn(writeTxn, fieldSeq, MDB_CREATE);
1180+
dbiInferred = openDatabaseWithTxn(writeTxn, fieldSeq + "-inf", MDB_CREATE);
11481181
}
11491182

11501183
public char[] getFieldSeq() {
@@ -1159,24 +1192,14 @@ protected int[] getIndexes(char[] fieldSeq) {
11591192
int[] indexes = new int[fieldSeq.length];
11601193
for (int i = 0; i < fieldSeq.length; i++) {
11611194
char field = fieldSeq[i];
1162-
int fieldIdx;
1163-
switch (field) {
1164-
case 's':
1165-
fieldIdx = SUBJ_IDX;
1166-
break;
1167-
case 'p':
1168-
fieldIdx = PRED_IDX;
1169-
break;
1170-
case 'o':
1171-
fieldIdx = OBJ_IDX;
1172-
break;
1173-
case 'c':
1174-
fieldIdx = CONTEXT_IDX;
1175-
break;
1176-
default:
1177-
throw new IllegalArgumentException(
1178-
"invalid character '" + field + "' in field sequence: " + new String(fieldSeq));
1179-
}
1195+
int fieldIdx = switch (field) {
1196+
case 's' -> SUBJ_IDX;
1197+
case 'p' -> PRED_IDX;
1198+
case 'o' -> OBJ_IDX;
1199+
case 'c' -> CONTEXT_IDX;
1200+
default -> throw new IllegalArgumentException(
1201+
"invalid character '" + field + "' in field sequence: " + new String(fieldSeq));
1202+
};
11801203
indexes[i] = fieldIdx;
11811204
}
11821205
return indexes;

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnRecordCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public TxnRecordCache(File cacheDir) throws IOException {
7777

7878
dbDir = Files.createTempDirectory(cacheDir.toPath(), "txncache");
7979
E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664));
80-
dbiExplicit = openDatabase(env, "quads", MDB_CREATE, null);
81-
dbiInferred = openDatabase(env, "quads-inf", MDB_CREATE, null);
80+
dbiExplicit = openDatabase(env, "quads", MDB_CREATE);
81+
dbiInferred = openDatabase(env, "quads-inf", MDB_CREATE);
8282

8383
MDBStat stat = MDBStat.malloc(stack);
8484
readTransaction(env, (stack2, txn) -> {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ private void open() throws IOException {
359359
E(mdb_env_open(env, dir.getAbsolutePath(), flags, 0664));
360360

361361
// open main database
362-
dbi = openDatabase(env, null, MDB_CREATE, null);
362+
dbi = openDatabase(env, null, MDB_CREATE);
363363

364364
// initialize page size and set map size for env
365365
readTransaction(env, (stack, txn) -> {
@@ -390,11 +390,11 @@ private void open() throws IOException {
390390
});
391391

392392
// open unused IDs database
393-
unusedDbi = openDatabase(env, "unused_ids", MDB_CREATE, null);
393+
unusedDbi = openDatabase(env, "unused_ids", MDB_CREATE);
394394
// open free IDs database
395-
freeDbi = openDatabase(env, "free_ids", MDB_CREATE, null);
395+
freeDbi = openDatabase(env, "free_ids", MDB_CREATE);
396396
// open ref_counts database
397-
refCountsDbi = openDatabase(env, "ref_counts", MDB_CREATE, null);
397+
refCountsDbi = openDatabase(env, "ref_counts", MDB_CREATE);
398398

399399
// check if free IDs are available
400400
readTransaction(env, (stack, txn) -> {

0 commit comments

Comments
 (0)