Skip to content

Commit 6d7c848

Browse files
committed
GH-4950 LMDB: fix transaction handling for index creation and reindexing
1 parent b663a61 commit 6d7c848

8 files changed

Lines changed: 265 additions & 301 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,18 @@
2121
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
2222
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
2323
import static org.lwjgl.util.lmdb.LMDB.mdb_dbi_open;
24-
import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare;
2524
import static org.lwjgl.util.lmdb.LMDB.mdb_strerror;
2625
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
2726
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
2827
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;
2928

3029
import java.io.IOException;
31-
import java.nio.ByteBuffer;
3230
import java.nio.IntBuffer;
33-
import java.util.Comparator;
3431

3532
import org.lwjgl.PointerBuffer;
3633
import org.lwjgl.system.MemoryStack;
3734
import org.lwjgl.system.MemoryUtil;
3835
import org.lwjgl.system.Pointer;
39-
import org.lwjgl.util.lmdb.MDBCmpFuncI;
40-
import org.lwjgl.util.lmdb.MDBVal;
4136
import org.slf4j.Logger;
4237
import org.slf4j.LoggerFactory;
4338

@@ -121,22 +116,16 @@ static <T> T transaction(long env, Transaction<T> transaction) throws IOExceptio
121116
return ret;
122117
}
123118

124-
static int openDatabase(long env, String name, int flags, Comparator<ByteBuffer> comparator) throws IOException {
125-
return transaction(env, (stack, txn) -> {
126-
IntBuffer ip = stack.mallocInt(1);
119+
static int openDatabase(long env, String name, int flags) throws IOException {
120+
return transaction(env, (stack, txn) -> openDatabaseWithTxn(txn, name, flags));
121+
}
127122

123+
static int openDatabaseWithTxn(long txn, String name, int flags) throws IOException {
124+
try (MemoryStack stack = stackPush()) {
125+
IntBuffer ip = stack.mallocInt(1);
128126
E(mdb_dbi_open(txn, name, flags, ip));
129-
int dbi = ip.get(0);
130-
if (comparator != null) {
131-
MDBCmpFuncI cmp = (a, b) -> {
132-
MDBVal aVal = MDBVal.create(a);
133-
MDBVal bVal = MDBVal.create(b);
134-
return comparator.compare(aVal.mv_data(), bVal.mv_data());
135-
};
136-
mdb_set_compare(txn, dbi, cmp);
137-
}
138-
return dbi;
139-
});
127+
return ip.get(0);
128+
}
140129
}
141130

142131
/**
@@ -173,7 +162,6 @@ static boolean requiresResize(long mapSize, long pageSize, long txn, long requir
173162
if (percentageUsed > PERCENTAGE_FULL_TRIGGERS_RESIZE) {
174163
return true;
175164
}
176-
177165
return mapSize - nextPageNo * pageSize < Math.max(requiredSize, MIN_FREE_SPACE);
178166
}
179167

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSetFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class PersistentSetFactory<T extends Serializable> {
7474

7575
dbDir = Files.createTempDirectory(cacheDir.toPath(), "set");
7676
E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664));
77-
this.defaultDbi = openDatabase(env, null, MDB_CREATE, null);
77+
this.defaultDbi = openDatabase(env, null, MDB_CREATE);
7878

7979
MDBStat stat = MDBStat.malloc(stack);
8080
readTransaction(env, (stack2, txn) -> {
@@ -132,7 +132,7 @@ void ensureResize() throws IOException, InterruptedException {
132132

133133
PersistentSet<T> createSet(String name, Function<T, byte[]> writeFunc, Function<ByteBuffer, T> readFunc)
134134
throws IOException {
135-
int dbi = openDatabase(env, name, MDB_CREATE, null);
135+
int dbi = openDatabase(env, name, MDB_CREATE);
136136
return new PersistentSet<>(this, dbi) {
137137
@Override
138138
protected byte[] write(T element) throws IOException {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 96 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
package org.eclipse.rdf4j.sail.lmdb;
1313

1414
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
15-
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabase;
15+
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabaseWithTxn;
1616
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction;
1717
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction;
1818
import static org.eclipse.rdf4j.sail.lmdb.Varint.readQuadUnsigned;
@@ -209,38 +209,50 @@ class TripleStore implements Closeable {
209209

210210
txnManager = new TxnManager(env, Mode.RESET);
211211

212-
String indexSpecStr = config.getTripleIndexes();
213-
if (!properties.isLoaded()) {
214-
// newly created lmdb store
215-
Set<String> indexSpecs = parseIndexSpecList(indexSpecStr);
212+
try {
213+
String indexSpecStr = config.getTripleIndexes();
214+
if (!properties.isLoaded()) {
215+
// newly created lmdb store
216+
Set<String> indexSpecs = parseIndexSpecList(indexSpecStr);
217+
218+
if (indexSpecs.isEmpty()) {
219+
logger.debug("No indexes specified, using default indexes: {}", DEFAULT_INDEXES);
220+
indexSpecStr = DEFAULT_INDEXES;
221+
indexSpecs = parseIndexSpecList(indexSpecStr);
222+
}
216223

217-
if (indexSpecs.isEmpty()) {
218-
logger.debug("No indexes specified, using default indexes: {}", DEFAULT_INDEXES);
219-
indexSpecStr = DEFAULT_INDEXES;
220-
indexSpecs = parseIndexSpecList(indexSpecStr);
224+
startTransaction();
225+
initIndexes(indexSpecs);
226+
endTransaction(true);
227+
initializePageAndMapSize(config.getTripleDBSize());
228+
} else {
229+
// Initialize existing indexes
230+
Set<String> indexSpecs = getIndexSpecs();
231+
startTransaction();
232+
initIndexes(indexSpecs);
233+
endTransaction(true);
234+
initializePageAndMapSize(config.getTripleDBSize());
235+
236+
// Compare the existing indexes with the requested indexes
237+
Set<String> reqIndexSpecs = parseIndexSpecList(indexSpecStr);
238+
if (reqIndexSpecs.isEmpty()) {
239+
// No indexes specified, use the existing ones
240+
indexSpecStr = properties.getTripleIndexes();
241+
} else if (!reqIndexSpecs.equals(indexSpecs)) {
242+
// Set of indexes needs to be changed
243+
startTransaction();
244+
reindex(indexSpecs, reqIndexSpecs);
245+
endTransaction(true);
246+
}
221247
}
222248

223-
initIndexes(indexSpecs, config.getTripleDBSize());
224-
} else {
225-
// Initialize existing indexes
226-
Set<String> indexSpecs = getIndexSpecs();
227-
initIndexes(indexSpecs, config.getTripleDBSize());
228-
229-
// Compare the existing indexes with the requested indexes
230-
Set<String> reqIndexSpecs = parseIndexSpecList(indexSpecStr);
231-
232-
if (reqIndexSpecs.isEmpty()) {
233-
// No indexes specified, use the existing ones
234-
indexSpecStr = properties.getTripleIndexes();
235-
} else if (!reqIndexSpecs.equals(indexSpecs)) {
236-
// Set of indexes needs to be changed
237-
reindex(indexSpecs, reqIndexSpecs);
249+
if (!indexSpecStr.equals(properties.getTripleIndexes())) {
250+
// Store up-to-date properties
251+
properties.setTripleIndexes(indexSpecStr);
238252
}
239-
}
240-
241-
if (!indexSpecStr.equals(properties.getTripleIndexes())) {
242-
// Store up-to-date properties
243-
properties.setTripleIndexes(indexSpecStr);
253+
} catch (IOException | SailException e) {
254+
endTransaction(false);
255+
throw e;
244256
}
245257
}
246258

@@ -293,16 +305,18 @@ private Set<String> parseIndexSpecList(String indexSpecStr) throws SailException
293305
return indexes;
294306
}
295307

296-
private void initIndexes(Set<String> indexSpecs, long tripleDbSize) throws IOException {
297-
for (String fieldSeq : orderIndexSpecs(indexSpecs)) {
308+
private void initIndexes(Set<String> indexSpecs) throws IOException {
309+
for (String fieldSeq : indexSpecs) {
298310
logger.trace("Initializing index '{}'...", fieldSeq);
299311
indexes.add(new TripleIndex(fieldSeq));
300312
}
313+
}
301314

315+
private void initializePageAndMapSize(long tripleDbSize) throws IOException {
302316
// initialize page size and set map size for env
303317
readTransaction(env, (stack, txn) -> {
304318
MDBStat stat = MDBStat.malloc(stack);
305-
TripleIndex mainIndex = indexes.get(0);
319+
TripleIndex mainIndex = indexes.getFirst();
306320
mdb_stat(txn, mainIndex.getDB(true), stat);
307321

308322
boolean isEmpty = stat.ms_entries() == 0;
@@ -498,8 +512,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
498512
if (!addedIndexSpecs.isEmpty()) {
499513
TripleIndex sourceIndex = indexes.get(0);
500514
for (boolean explicit : new boolean[] { true, false }) {
501-
transaction(env, (stack, txn) -> {
502-
515+
try (MemoryStack stack = stackPush()) {
503516
MDBVal keyValue = MDBVal.calloc(stack);
504517
ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH);
505518
keyValue.mv_data(keyBuf);
@@ -511,7 +524,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
511524
RecordIterator[] sourceIter = { null };
512525
try {
513526
sourceIter[0] = new LmdbRecordIterator(sourceIndex, false, -1, -1, -1, -1,
514-
explicit, txnManager.createTxn(txn));
527+
explicit, txnManager.createReadTxn());
515528

516529
RecordIterator it = sourceIter[0];
517530
long[] quad;
@@ -521,7 +534,34 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
521534
quad[CONTEXT_IDX]);
522535
keyBuf.flip();
523536

524-
E(mdb_put(txn, addedIndex.getDB(explicit), keyValue, dataValue, 0));
537+
if (requiresResize()) {
538+
endTransaction(true);
539+
540+
// the lock is just a safety measure if reindex is somehow called outside of the
541+
// constructor
542+
StampedLongAdderLockManager lockManager = txnManager.lockManager();
543+
long readStamp;
544+
try {
545+
readStamp = lockManager.readLock();
546+
} catch (InterruptedException e) {
547+
throw new SailException(e);
548+
}
549+
try {
550+
txnManager.deactivate();
551+
mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0);
552+
E(mdb_env_set_mapsize(env, mapSize));
553+
logger.debug("resized map to {}", mapSize);
554+
} finally {
555+
try {
556+
txnManager.activate();
557+
} finally {
558+
lockManager.unlockRead(readStamp);
559+
}
560+
}
561+
startTransaction();
562+
}
563+
564+
E(mdb_put(writeTxn, addedIndex.getDB(explicit), keyValue, dataValue, 0));
525565
}
526566
} finally {
527567
if (sourceIter[0] != null) {
@@ -531,9 +571,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
531571

532572
currentIndexes.put(fieldSeq, addedIndex);
533573
}
534-
535-
return null;
536-
});
574+
}
537575
}
538576

539577
logger.debug("New index(es) initialized");
@@ -544,19 +582,16 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
544582
removedIndexSpecs.removeAll(newIndexSpecs);
545583

546584
List<Throwable> removedIndexExceptions = new ArrayList<>();
547-
transaction(env, (stack, txn) -> {
548-
// Delete files for removed indexes
549-
for (String fieldSeq : removedIndexSpecs) {
550-
try {
551-
TripleIndex removedIndex = currentIndexes.remove(fieldSeq);
552-
removedIndex.destroy(txn);
553-
logger.debug("Deleted file(s) for removed {} index", fieldSeq);
554-
} catch (Throwable e) {
555-
removedIndexExceptions.add(e);
556-
}
585+
// Delete files for removed indexes
586+
for (String fieldSeq : removedIndexSpecs) {
587+
try {
588+
TripleIndex removedIndex = currentIndexes.remove(fieldSeq);
589+
removedIndex.destroy(writeTxn);
590+
logger.debug("Deleted file(s) for removed {} index", fieldSeq);
591+
} catch (Throwable e) {
592+
removedIndexExceptions.add(e);
557593
}
558-
return null;
559-
});
594+
}
560595

561596
if (!removedIndexExceptions.isEmpty()) {
562597
throw new IOException(removedIndexExceptions.get(0));
@@ -1051,7 +1086,6 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
10511086

10521087
if (stAdded) {
10531088
for (int i = 1; i < indexes.size(); i++) {
1054-
10551089
TripleIndex index = indexes.get(i);
10561090
keyBuf.clear();
10571091
index.toKey(keyBuf, subj, pred, obj, context);
@@ -1669,8 +1703,8 @@ public TripleIndex(String fieldSeq) throws IOException {
16691703
this.leadingFieldValueAccessor = this.fieldValueAccessors[0];
16701704
this.indexMap = getIndexes(this.fieldSeq);
16711705
// open database and use native sort order without comparator
1672-
dbiExplicit = openDatabase(env, fieldSeq, MDB_CREATE, null);
1673-
dbiInferred = openDatabase(env, fieldSeq + "-inf", MDB_CREATE, null);
1706+
dbiExplicit = openDatabaseWithTxn(writeTxn, fieldSeq, MDB_CREATE);
1707+
dbiInferred = openDatabaseWithTxn(writeTxn, fieldSeq + "-inf", MDB_CREATE);
16741708
}
16751709

16761710
public char[] getFieldSeq() {
@@ -1708,24 +1742,14 @@ protected int[] getIndexes(char[] fieldSeq) {
17081742
int[] indexes = new int[fieldSeq.length];
17091743
for (int i = 0; i < fieldSeq.length; i++) {
17101744
char field = fieldSeq[i];
1711-
int fieldIdx;
1712-
switch (field) {
1713-
case 's':
1714-
fieldIdx = SUBJ_IDX;
1715-
break;
1716-
case 'p':
1717-
fieldIdx = PRED_IDX;
1718-
break;
1719-
case 'o':
1720-
fieldIdx = OBJ_IDX;
1721-
break;
1722-
case 'c':
1723-
fieldIdx = CONTEXT_IDX;
1724-
break;
1725-
default:
1726-
throw new IllegalArgumentException(
1727-
"invalid character '" + field + "' in field sequence: " + new String(fieldSeq));
1728-
}
1745+
int fieldIdx = switch (field) {
1746+
case 's' -> SUBJ_IDX;
1747+
case 'p' -> PRED_IDX;
1748+
case 'o' -> OBJ_IDX;
1749+
case 'c' -> CONTEXT_IDX;
1750+
default -> throw new IllegalArgumentException(
1751+
"invalid character '" + field + "' in field sequence: " + new String(fieldSeq));
1752+
};
17291753
indexes[i] = fieldIdx;
17301754
}
17311755
return indexes;

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnRecordCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public TxnRecordCache(File cacheDir) throws IOException {
8181

8282
dbDir = Files.createTempDirectory(cacheDir.toPath(), "txncache");
8383
E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664));
84-
dbiExplicit = openDatabase(env, "quads", MDB_CREATE, null);
85-
dbiInferred = openDatabase(env, "quads-inf", MDB_CREATE, null);
84+
dbiExplicit = openDatabase(env, "quads", MDB_CREATE);
85+
dbiInferred = openDatabase(env, "quads-inf", MDB_CREATE);
8686

8787
MDBStat stat = MDBStat.malloc(stack);
8888
readTransaction(env, (stack2, txn) -> {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ private void open() throws IOException {
373373
E(mdb_env_open(env, dir.getAbsolutePath(), flags, 0664));
374374

375375
// open main database
376-
dbi = openDatabase(env, null, MDB_CREATE, null);
376+
dbi = openDatabase(env, null, MDB_CREATE);
377377

378378
// initialize page size and set map size for env
379379
readTransaction(env, (stack, txn) -> {
@@ -404,11 +404,11 @@ private void open() throws IOException {
404404
});
405405

406406
// open unused IDs database
407-
unusedDbi = openDatabase(env, "unused_ids", MDB_CREATE, null);
407+
unusedDbi = openDatabase(env, "unused_ids", MDB_CREATE);
408408
// open free IDs database
409-
freeDbi = openDatabase(env, "free_ids", MDB_CREATE, null);
409+
freeDbi = openDatabase(env, "free_ids", MDB_CREATE);
410410
// open ref_counts database
411-
refCountsDbi = openDatabase(env, "ref_counts", MDB_CREATE, null);
411+
refCountsDbi = openDatabase(env, "ref_counts", MDB_CREATE);
412412

413413
// check if free IDs are available
414414
readTransaction(env, (stack, txn) -> {

0 commit comments

Comments
 (0)