Skip to content

Commit b784f07

Browse files
committed
GH-4954 LMDB: Fix GC and record counting for deletions
Ensures that GC is also working if triple store needs to grow and that the correct count is returned for statement removals.
1 parent 460329b commit b784f07

4 files changed

Lines changed: 72 additions & 45 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -500,13 +500,11 @@ public void flush() throws SailException {
500500
}
501501
if (activeTxn) {
502502
if (!multiThreadingActive) {
503+
tripleStore.commit();
503504
filterUsedIdsInTripleStore();
504505
}
505506
handleRemovedIdsInValueStore();
506507
valueStore.commit();
507-
if (!multiThreadingActive) {
508-
tripleStore.commit();
509-
}
510508
// do not set flag to false until _after_ commit is successfully completed.
511509
storeTxnStarted.set(false);
512510
}
@@ -604,9 +602,9 @@ private void startTransaction(boolean preferThreading) throws SailException {
604602
Operation op = opQueue.remove();
605603
if (op != null) {
606604
if (op == COMMIT_TRANSACTION) {
605+
tripleStore.commit();
607606
filterUsedIdsInTripleStore();
608607

609-
tripleStore.commit();
610608
nextTransactionAsync = false;
611609
asyncTransactionFinished = true;
612610
break;
@@ -712,23 +710,18 @@ private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit,
712710

713711
private long removeStatements(long subj, long pred, long obj, boolean explicit, long[] contexts)
714712
throws IOException {
715-
long removeCount = 0;
713+
long[] removeCount = { 0 };
716714
for (long contextId : contexts) {
717-
final Map<Long, Long> perContextCounts = new HashMap<>();
718715
tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> {
719-
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
716+
removeCount[0]++;
720717
for (long id : quad) {
721718
if (id != 0L) {
722719
unusedIds.add(id);
723720
}
724721
}
725722
});
726-
727-
for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
728-
removeCount += entry.getValue();
729-
}
730723
}
731-
return removeCount;
724+
return removeCount[0];
732725
}
733726

734727
private long removeStatements(Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts)

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ final class LmdbUtil {
5151
private LmdbUtil() {
5252
}
5353

54-
static void E(int rc) throws IOException {
54+
static int E(int rc) throws IOException {
5555
if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) {
5656
throw new IOException(mdb_strerror(rc));
5757
}
58+
return rc;
5859
}
5960

6061
static <T> T readTransaction(long env, Transaction<T> transaction) throws IOException {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020
import static org.lwjgl.system.MemoryUtil.NULL;
2121
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
2222
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
23+
import static org.lwjgl.util.lmdb.LMDB.MDB_KEYEXIST;
2324
import static org.lwjgl.util.lmdb.LMDB.MDB_LAST;
2425
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
2526
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
27+
import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE;
2628
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
2729
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
2830
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
2931
import static org.lwjgl.util.lmdb.LMDB.MDB_PREV;
3032
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
33+
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
3134
import static org.lwjgl.util.lmdb.LMDB.mdb_cmp;
3235
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
3336
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
@@ -47,6 +50,7 @@
4750
import static org.lwjgl.util.lmdb.LMDB.mdb_put;
4851
import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare;
4952
import static org.lwjgl.util.lmdb.LMDB.mdb_stat;
53+
import static org.lwjgl.util.lmdb.LMDB.mdb_strerror;
5054
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
5155
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
5256
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;
@@ -92,7 +96,7 @@
9296
import org.slf4j.LoggerFactory;
9397

9498
/**
95-
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four integer
99+
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four long
96100
* IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's
97101
* subject, predicate, object and context. The ID <tt>0</tt> is used to represent the "null" context and doesn't map to
98102
* an actual RDF value.
@@ -539,7 +543,7 @@ protected void bucketStart(double fraction, long[] lowerValues, long[] upperValu
539543
* @throws IOException
540544
*/
541545
protected void filterUsedIds(Collection<Long> ids) throws IOException {
542-
try (MemoryStack stack = stackPush()) {
546+
readTransaction(env, (stack, txn) -> {
543547
MDBVal maxKey = MDBVal.malloc(stack);
544548
ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);
545549
MDBVal keyData = MDBVal.malloc(stack);
@@ -559,7 +563,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
559563
keyBuf.clear();
560564
Varint.writeUnsigned(keyBuf, id);
561565
keyData.mv_data(keyBuf.flip());
562-
if (mdb_get(writeTxn, contextsDbi, keyData, valueData) == 0) {
566+
if (mdb_get(txn, contextsDbi, keyData, valueData) == 0) {
563567
it.remove();
564568
}
565569
}
@@ -580,7 +584,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
580584

581585
long cursor = 0;
582586
try {
583-
E(mdb_cursor_open(writeTxn, dbi, pp));
587+
E(mdb_cursor_open(txn, dbi, pp));
584588
cursor = pp.get(0);
585589

586590
if (fullScan) {
@@ -626,7 +630,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
626630
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
627631
boolean exists = false;
628632
while (!exists && rc == 0) {
629-
if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) {
633+
if (mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
630634
// id was not found
631635
break;
632636
} else if (!matcher.matches(keyData.mv_data())) {
@@ -649,7 +653,8 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
649653
}
650654
}
651655
}
652-
}
656+
return null;
657+
});
653658
}
654659

655660
protected double cardinality(long subj, long pred, long obj, long context) throws IOException {
@@ -842,6 +847,7 @@ private boolean requiresResize() {
842847

843848
public boolean storeTriple(long subj, long pred, long obj, long context, boolean explicit) throws IOException {
844849
TripleIndex mainIndex = indexes.get(0);
850+
boolean stAdded;
845851
try (MemoryStack stack = MemoryStack.stackPush()) {
846852
MDBVal keyVal = MDBVal.malloc(stack);
847853
// use calloc to get an empty data value
@@ -851,33 +857,34 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
851857
keyBuf.flip();
852858
keyVal.mv_data(keyBuf);
853859

854-
boolean foundExplicit = mdb_get(writeTxn, mainIndex.getDB(true), keyVal, dataVal) == 0;
855-
boolean foundImplicit = !foundExplicit && mdb_get(writeTxn, mainIndex.getDB(false), keyVal, dataVal) == 0;
856-
857-
boolean stAdded = !(foundExplicit || foundImplicit);
858-
if (stAdded || explicit && foundImplicit) {
859-
if (recordCache == null) {
860-
if (requiresResize()) {
861-
// map is full, resize required
862-
recordCache = new TxnRecordCache(dir);
863-
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
864-
}
860+
if (recordCache == null) {
861+
if (requiresResize()) {
862+
// map is full, resize required
863+
recordCache = new TxnRecordCache(dir);
864+
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
865865
}
866-
if (recordCache != null) {
867-
long quad[] = new long[] { subj, pred, obj, context };
868-
if (explicit && foundImplicit) {
869-
// remove implicit statement
870-
recordCache.removeRecord(quad, false);
871-
}
872-
// put record in cache and return immediately
873-
return recordCache.storeRecord(quad, explicit);
866+
}
867+
if (recordCache != null) {
868+
long quad[] = new long[] { subj, pred, obj, context };
869+
if (explicit) {
870+
// remove implicit statement
871+
recordCache.removeRecord(quad, false);
874872
}
873+
// put record in cache and return immediately
874+
return recordCache.storeRecord(quad, explicit);
875+
}
875876

876-
if (explicit && foundImplicit) {
877-
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
878-
}
879-
E(mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, 0));
877+
int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE);
878+
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
879+
throw new IOException(mdb_strerror(rc));
880+
}
881+
stAdded = rc == MDB_SUCCESS;
882+
boolean foundImplicit = false;
883+
if (explicit && stAdded) {
884+
foundImplicit = E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal)) == MDB_SUCCESS;
885+
}
880886

887+
if (stAdded) {
881888
for (int i = 1; i < indexes.size(); i++) {
882889
TripleIndex index = indexes.get(i);
883890
keyBuf.clear();
@@ -887,7 +894,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
887894
// update buffer positions in MDBVal
888895
keyVal.mv_data(keyBuf);
889896

890-
if (explicit && foundImplicit) {
897+
if (foundImplicit) {
891898
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
892899
}
893900
E(mdb_put(writeTxn, index.getDB(explicit), keyVal, dataVal, 0));
@@ -897,9 +904,9 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
897904
incrementContext(stack, context);
898905
}
899906
}
900-
901-
return stAdded;
902907
}
908+
909+
return stAdded;
903910
}
904911

905912
private void incrementContext(MemoryStack stack, long context) throws IOException {
@@ -991,6 +998,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
991998
}
992999
if (recordCache != null) {
9931000
recordCache.removeRecord(quad, explicit);
1001+
handler.accept(quad);
9941002
continue;
9951003
}
9961004

@@ -1005,6 +1013,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
10051013
}
10061014

10071015
decrementContext(stack, quad[CONTEXT_IDX]);
1016+
handler.accept(quad);
10081017
}
10091018
} finally {
10101019
it.close();

core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TripleStoreTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@
1111
package org.eclipse.rdf4j.sail.lmdb;
1212

1313
import static org.junit.Assert.assertEquals;
14+
import static org.junit.Assert.assertTrue;
1415

1516
import java.io.File;
17+
import java.util.Arrays;
18+
import java.util.HashSet;
19+
import java.util.Set;
20+
import java.util.stream.Collectors;
1621

1722
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
1823
import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig;
@@ -65,6 +70,25 @@ public void testInferredStmts() throws Exception {
6570
}
6671
}
6772

73+
@Test
74+
public void testGc() throws Exception {
75+
tripleStore.startTransaction();
76+
tripleStore.storeTriple(1, 2, 3, 1, true);
77+
tripleStore.storeTriple(1, 2, 4, 1, true);
78+
tripleStore.storeTriple(1, 2, 5, 1, true);
79+
tripleStore.storeTriple(1, 6, 7, 1, true);
80+
tripleStore.storeTriple(1, 6, 7, 8, true);
81+
Set<Long> removed = new HashSet<>();
82+
tripleStore.removeTriplesByContext(1, 6, -1, -1, true, quad -> {
83+
for (Long c : quad) {
84+
removed.add(c);
85+
}
86+
});
87+
tripleStore.commit();
88+
tripleStore.filterUsedIds(removed);
89+
assertEquals(Arrays.asList(6L, 7L, 8L), removed.stream().sorted().collect(Collectors.toList()));
90+
}
91+
6892
@AfterEach
6993
public void after() throws Exception {
7094
tripleStore.close();

0 commit comments

Comments
 (0)