Skip to content

Commit ccf6250

Browse files
authored
GH-4954 LMDB: Fix GC and record counting for deletions (#4955)
2 parents ceb8156 + 25a3df5 commit ccf6250

4 files changed

Lines changed: 77 additions & 51 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -500,13 +500,11 @@ public void flush() throws SailException {
500500
}
501501
if (activeTxn) {
502502
if (!multiThreadingActive) {
503+
tripleStore.commit();
503504
filterUsedIdsInTripleStore();
504505
}
505506
handleRemovedIdsInValueStore();
506507
valueStore.commit();
507-
if (!multiThreadingActive) {
508-
tripleStore.commit();
509-
}
510508
// do not set flag to false until _after_ commit is successfully completed.
511509
storeTxnStarted.set(false);
512510
}
@@ -604,9 +602,9 @@ private void startTransaction(boolean preferThreading) throws SailException {
604602
Operation op = opQueue.remove();
605603
if (op != null) {
606604
if (op == COMMIT_TRANSACTION) {
605+
tripleStore.commit();
607606
filterUsedIdsInTripleStore();
608607

609-
tripleStore.commit();
610608
nextTransactionAsync = false;
611609
asyncTransactionFinished = true;
612610
break;
@@ -712,23 +710,18 @@ private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit,
712710

713711
private long removeStatements(long subj, long pred, long obj, boolean explicit, long[] contexts)
714712
throws IOException {
715-
long removeCount = 0;
713+
long[] removeCount = { 0 };
716714
for (long contextId : contexts) {
717-
final Map<Long, Long> perContextCounts = new HashMap<>();
718715
tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> {
719-
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
716+
removeCount[0]++;
720717
for (long id : quad) {
721718
if (id != 0L) {
722719
unusedIds.add(id);
723720
}
724721
}
725722
});
726-
727-
for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
728-
removeCount += entry.getValue();
729-
}
730723
}
731-
return removeCount;
724+
return removeCount[0];
732725
}
733726

734727
private long removeStatements(Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts)

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ final class LmdbUtil {
5151
private LmdbUtil() {
5252
}
5353

54-
static void E(int rc) throws IOException {
54+
static int E(int rc) throws IOException {
5555
if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) {
5656
throw new IOException(mdb_strerror(rc));
5757
}
58+
return rc;
5859
}
5960

6061
static <T> T readTransaction(long env, Transaction<T> transaction) throws IOException {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,22 @@
1616
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction;
1717
import static org.eclipse.rdf4j.sail.lmdb.Varint.readListUnsigned;
1818
import static org.eclipse.rdf4j.sail.lmdb.Varint.writeListUnsigned;
19+
import static org.eclipse.rdf4j.sail.lmdb.Varint.writeUnsigned;
1920
import static org.lwjgl.system.MemoryStack.stackPush;
2021
import static org.lwjgl.system.MemoryUtil.NULL;
2122
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
2223
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
24+
import static org.lwjgl.util.lmdb.LMDB.MDB_KEYEXIST;
2325
import static org.lwjgl.util.lmdb.LMDB.MDB_LAST;
2426
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
2527
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
28+
import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE;
2629
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
2730
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
2831
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
2932
import static org.lwjgl.util.lmdb.LMDB.MDB_PREV;
3033
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
34+
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
3135
import static org.lwjgl.util.lmdb.LMDB.mdb_cmp;
3236
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
3337
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
@@ -47,6 +51,7 @@
4751
import static org.lwjgl.util.lmdb.LMDB.mdb_put;
4852
import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare;
4953
import static org.lwjgl.util.lmdb.LMDB.mdb_stat;
54+
import static org.lwjgl.util.lmdb.LMDB.mdb_strerror;
5055
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
5156
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
5257
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;
@@ -92,7 +97,7 @@
9297
import org.slf4j.LoggerFactory;
9398

9499
/**
95-
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four integer
100+
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four long
96101
* IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's
97102
* subject, predicate, object and context. The ID <tt>0</tt> is used to represent the "null" context and doesn't map to
98103
* an actual RDF value.
@@ -539,7 +544,7 @@ protected void bucketStart(double fraction, long[] lowerValues, long[] upperValu
539544
* @throws IOException
540545
*/
541546
protected void filterUsedIds(Collection<Long> ids) throws IOException {
542-
try (MemoryStack stack = stackPush()) {
547+
readTransaction(env, (stack, txn) -> {
543548
MDBVal maxKey = MDBVal.malloc(stack);
544549
ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);
545550
MDBVal keyData = MDBVal.malloc(stack);
@@ -559,7 +564,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
559564
keyBuf.clear();
560565
Varint.writeUnsigned(keyBuf, id);
561566
keyData.mv_data(keyBuf.flip());
562-
if (mdb_get(writeTxn, contextsDbi, keyData, valueData) == 0) {
567+
if (mdb_get(txn, contextsDbi, keyData, valueData) == 0) {
563568
it.remove();
564569
}
565570
}
@@ -580,7 +585,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
580585

581586
long cursor = 0;
582587
try {
583-
E(mdb_cursor_open(writeTxn, dbi, pp));
588+
E(mdb_cursor_open(txn, dbi, pp));
584589
cursor = pp.get(0);
585590

586591
if (fullScan) {
@@ -626,7 +631,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
626631
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
627632
boolean exists = false;
628633
while (!exists && rc == 0) {
629-
if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) {
634+
if (mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
630635
// id was not found
631636
break;
632637
} else if (!matcher.matches(keyData.mv_data())) {
@@ -649,7 +654,8 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
649654
}
650655
}
651656
}
652-
}
657+
return null;
658+
});
653659
}
654660

655661
protected double cardinality(long subj, long pred, long obj, long context) throws IOException {
@@ -842,6 +848,7 @@ private boolean requiresResize() {
842848

843849
public boolean storeTriple(long subj, long pred, long obj, long context, boolean explicit) throws IOException {
844850
TripleIndex mainIndex = indexes.get(0);
851+
boolean stAdded;
845852
try (MemoryStack stack = MemoryStack.stackPush()) {
846853
MDBVal keyVal = MDBVal.malloc(stack);
847854
// use calloc to get an empty data value
@@ -851,33 +858,34 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
851858
keyBuf.flip();
852859
keyVal.mv_data(keyBuf);
853860

854-
boolean foundExplicit = mdb_get(writeTxn, mainIndex.getDB(true), keyVal, dataVal) == 0;
855-
boolean foundImplicit = !foundExplicit && mdb_get(writeTxn, mainIndex.getDB(false), keyVal, dataVal) == 0;
856-
857-
boolean stAdded = !(foundExplicit || foundImplicit);
858-
if (stAdded || explicit && foundImplicit) {
859-
if (recordCache == null) {
860-
if (requiresResize()) {
861-
// map is full, resize required
862-
recordCache = new TxnRecordCache(dir);
863-
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
864-
}
861+
if (recordCache == null) {
862+
if (requiresResize()) {
863+
// map is full, resize required
864+
recordCache = new TxnRecordCache(dir);
865+
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
865866
}
866-
if (recordCache != null) {
867-
long quad[] = new long[] { subj, pred, obj, context };
868-
if (explicit && foundImplicit) {
869-
// remove implicit statement
870-
recordCache.removeRecord(quad, false);
871-
}
872-
// put record in cache and return immediately
873-
return recordCache.storeRecord(quad, explicit);
867+
}
868+
if (recordCache != null) {
869+
long quad[] = new long[] { subj, pred, obj, context };
870+
if (explicit) {
871+
// remove implicit statement
872+
recordCache.removeRecord(quad, false);
874873
}
874+
// put record in cache and return immediately
875+
return recordCache.storeRecord(quad, explicit);
876+
}
875877

876-
if (explicit && foundImplicit) {
877-
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
878-
}
879-
E(mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, 0));
878+
int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE);
879+
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
880+
throw new IOException(mdb_strerror(rc));
881+
}
882+
stAdded = rc == MDB_SUCCESS;
883+
boolean foundImplicit = false;
884+
if (explicit && stAdded) {
885+
foundImplicit = E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal)) == MDB_SUCCESS;
886+
}
880887

888+
if (stAdded) {
881889
for (int i = 1; i < indexes.size(); i++) {
882890
TripleIndex index = indexes.get(i);
883891
keyBuf.clear();
@@ -887,7 +895,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
887895
// update buffer positions in MDBVal
888896
keyVal.mv_data(keyBuf);
889897

890-
if (explicit && foundImplicit) {
898+
if (foundImplicit) {
891899
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
892900
}
893901
E(mdb_put(writeTxn, index.getDB(explicit), keyVal, dataVal, 0));
@@ -897,9 +905,9 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
897905
incrementContext(stack, context);
898906
}
899907
}
900-
901-
return stAdded;
902908
}
909+
910+
return stAdded;
903911
}
904912

905913
private void incrementContext(MemoryStack stack, long context) throws IOException {
@@ -991,6 +999,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
991999
}
9921000
if (recordCache != null) {
9931001
recordCache.removeRecord(quad, explicit);
1002+
handler.accept(quad);
9941003
continue;
9951004
}
9961005

@@ -1005,6 +1014,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
10051014
}
10061015

10071016
decrementContext(stack, quad[CONTEXT_IDX]);
1017+
handler.accept(quad);
10081018
}
10091019
} finally {
10101020
it.close();
@@ -1284,24 +1294,22 @@ GroupMatcher createMatcher(long subj, long pred, long obj, long context) {
12841294
}
12851295

12861296
void toKey(ByteBuffer bb, long subj, long pred, long obj, long context) {
1287-
long[] values = new long[4];
12881297
for (int i = 0; i < fieldSeq.length; i++) {
12891298
switch (fieldSeq[i]) {
12901299
case 's':
1291-
values[i] = subj;
1300+
writeUnsigned(bb, subj);
12921301
break;
12931302
case 'p':
1294-
values[i] = pred;
1303+
writeUnsigned(bb, pred);
12951304
break;
12961305
case 'o':
1297-
values[i] = obj;
1306+
writeUnsigned(bb, obj);
12981307
break;
12991308
case 'c':
1300-
values[i] = context;
1309+
writeUnsigned(bb, context);
13011310
break;
13021311
}
13031312
}
1304-
writeListUnsigned(bb, values);
13051313
}
13061314

13071315
void keyToQuad(ByteBuffer key, long[] quad) {

core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TripleStoreTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@
1111
package org.eclipse.rdf4j.sail.lmdb;
1212

1313
import static org.junit.Assert.assertEquals;
14+
import static org.junit.Assert.assertTrue;
1415

1516
import java.io.File;
17+
import java.util.Arrays;
18+
import java.util.HashSet;
19+
import java.util.Set;
20+
import java.util.stream.Collectors;
1621

1722
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
1823
import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig;
@@ -65,6 +70,25 @@ public void testInferredStmts() throws Exception {
6570
}
6671
}
6772

73+
@Test
74+
public void testGc() throws Exception {
75+
tripleStore.startTransaction();
76+
tripleStore.storeTriple(1, 2, 3, 1, true);
77+
tripleStore.storeTriple(1, 2, 4, 1, true);
78+
tripleStore.storeTriple(1, 2, 5, 1, true);
79+
tripleStore.storeTriple(1, 6, 7, 1, true);
80+
tripleStore.storeTriple(1, 6, 7, 8, true);
81+
Set<Long> removed = new HashSet<>();
82+
tripleStore.removeTriplesByContext(1, 6, -1, -1, true, quad -> {
83+
for (Long c : quad) {
84+
removed.add(c);
85+
}
86+
});
87+
tripleStore.commit();
88+
tripleStore.filterUsedIds(removed);
89+
assertEquals(Arrays.asList(6L, 7L, 8L), removed.stream().sorted().collect(Collectors.toList()));
90+
}
91+
6892
@AfterEach
6993
public void after() throws Exception {
7094
tripleStore.close();

0 commit comments

Comments
 (0)