Skip to content

Commit d4e13c8

Browse files
committed
GH-4891 LMDB store: Fix computation of next ID and add reference counting for GC
Correctly load the next ID to avoid collisions with already assigned IDs. Use reference counting for data types of literals and namespaces of URIs to ensure referenced values are not GCed and are fully deleted if not referenced anymore.
1 parent 440dc0e commit d4e13c8

4 files changed

Lines changed: 197 additions & 37 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ContextStore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,11 @@ void increment(Resource context) {
130130
* @param context the context identifier.
131131
* @param amount the number by which to decrease the size
132132
*/
133-
void decrementBy(Resource context, long amount) {
134-
contextInfoMap.computeIfPresent(context, (c, size) -> size <= amount ? null : size - amount);
133+
boolean decrementBy(Resource context, long amount) {
134+
boolean removed = contextInfoMap.computeIfPresent(context,
135+
(c, size) -> size <= amount ? null : size - amount) == null;
135136
contentsChanged = true;
137+
return removed;
136138
}
137139

138140
@Override

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,11 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit,
740740
Long entryContextId = entry.getKey();
741741
if (entryContextId > 0) {
742742
Resource modifiedContext = (Resource) valueStore.getValue(entryContextId);
743-
contextStore.decrementBy(modifiedContext, entry.getValue());
743+
boolean contextRemoved = contextStore.decrementBy(modifiedContext, entry.getValue());
744+
if (contextRemoved) {
745+
unusedIds.add(entryContextId);
746+
}
747+
744748
}
745749
removeCount += entry.getValue();
746750
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
891891
}
892892
if (recordCache != null) {
893893
recordCache.removeRecord(quad, explicit);
894+
handler.accept(quad);
894895
continue;
895896
}
896897

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java

Lines changed: 187 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.lwjgl.system.MemoryUtil.NULL;
1717
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
1818
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
19+
import static org.lwjgl.util.lmdb.LMDB.MDB_LAST;
1920
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
2021
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
2122
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
@@ -48,6 +49,7 @@
4849
import java.io.IOException;
4950
import java.nio.ByteBuffer;
5051
import java.nio.charset.StandardCharsets;
52+
import java.util.ArrayList;
5153
import java.util.Arrays;
5254
import java.util.Collection;
5355
import java.util.Collections;
@@ -148,6 +150,8 @@ class ValueStore extends AbstractValueFactory {
148150
private int unusedDbi;
149151
// database with free IDs
150152
private int freeDbi;
153+
// database with internal reference counts for IRIs and namespaces
154+
private int refCountsDbi;
151155
private long writeTxn;
152156
private final boolean forceSync;
153157
private final boolean autoGrow;
@@ -171,7 +175,7 @@ class ValueStore extends AbstractValueFactory {
171175
/**
172176
* The next ID that is associated with a stored value
173177
*/
174-
private long nextId;
178+
private long nextId = 1;
175179
private boolean freeIdsAvailable;
176180

177181
private volatile long nextValueEvictionTime = 0;
@@ -198,28 +202,76 @@ class ValueStore extends AbstractValueFactory {
198202
// read maximum id from store
199203
readTransaction(env, (stack, txn) -> {
200204
long cursor = 0;
205+
PointerBuffer pp = stack.mallocPointer(1);
206+
207+
for (int lookupDbi : new int[] { dbi, freeDbi }) {
208+
try {
209+
E(mdb_cursor_open(txn, lookupDbi, pp));
210+
cursor = pp.get(0);
211+
212+
MDBVal keyData = MDBVal.calloc(stack);
213+
// set cursor after max ID
214+
keyData.mv_data(stack.bytes(new byte[] { ID_KEY, (byte) 0xFF }));
215+
MDBVal valueData = MDBVal.calloc(stack);
216+
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
217+
if (rc != 0) {
218+
// directly go to last value
219+
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_LAST);
220+
} else {
221+
// go to previous value of selected key
222+
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_PREV);
223+
}
224+
if (rc == 0 && keyData.mv_data().get(0) == ID_KEY) {
225+
// remove lower 2 type bits
226+
nextId = Math.max(nextId, (data2id(keyData.mv_data()) >> 2) + 1);
227+
}
228+
} finally {
229+
if (cursor != 0) {
230+
mdb_cursor_close(cursor);
231+
}
232+
}
233+
}
234+
return null;
235+
});
236+
237+
if (logger.isDebugEnabled()) {
238+
// trigger deletion of values marked for GC
239+
startTransaction();
240+
commit();
241+
// print current values in store
242+
logValues();
243+
}
244+
}
245+
246+
private void logValues() throws IOException {
247+
readTransaction(env, (stack, txn) -> {
248+
long cursor = 0;
249+
PointerBuffer pp = stack.mallocPointer(1);
250+
201251
try {
202-
PointerBuffer pp = stack.mallocPointer(1);
203252
E(mdb_cursor_open(txn, dbi, pp));
204253
cursor = pp.get(0);
205254

206255
MDBVal keyData = MDBVal.calloc(stack);
207256
// set cursor to min key
208-
keyData.mv_data(stack.bytes(new byte[] { ID_KEY, (byte) 0xFF }));
257+
keyData.mv_data(stack.bytes(new byte[] { ID_KEY }));
209258
MDBVal valueData = MDBVal.calloc(stack);
210-
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE) == 0 &&
211-
mdb_cursor_get(cursor, keyData, valueData, MDB_PREV) == 0) {
212-
// remove lower 2 type bits
213-
nextId = (data2id(keyData.mv_data()) >> 2) + 1;
214-
} else {
215-
nextId = 1;
259+
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
260+
while (rc == 0 && keyData.mv_data().get(0) == ID_KEY) {
261+
long id = data2id(keyData.mv_data());
262+
try {
263+
logger.debug("id {} has value {}", id, getValue(id));
264+
} catch (IllegalArgumentException e) {
265+
logger.debug("id {} has namespace value {}", id, getNamespace(id));
266+
}
267+
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
216268
}
217-
return null;
218269
} finally {
219270
if (cursor != 0) {
220271
mdb_cursor_close(cursor);
221272
}
222273
}
274+
return null;
223275
});
224276
}
225277

@@ -278,6 +330,8 @@ private void open() throws IOException {
278330
unusedDbi = openDatabase(env, "unused_ids", MDB_CREATE, null);
279331
// open free IDs database
280332
freeDbi = openDatabase(env, "free_ids", MDB_CREATE, null);
333+
// open ref_counts database
334+
refCountsDbi = openDatabase(env, "ref_counts", MDB_CREATE, null);
281335

282336
// check if free IDs are available
283337
readTransaction(env, (stack, txn) -> {
@@ -532,6 +586,60 @@ private void resizeMap(long txn, long requiredSize) throws IOException {
532586
}
533587
}
534588

589+
private void incrementRefCount(MemoryStack stack, long writeTxn, byte[] data) throws IOException {
590+
// literals have a datatype id and URIs have a namespace id
591+
if (data[0] == LITERAL_VALUE || data[0] == URI_VALUE) {
592+
try {
593+
stack.push();
594+
ByteBuffer bb = ByteBuffer.wrap(data);
595+
// skip type marker
596+
int idLength = Varint.firstToLength(bb.get(1));
597+
MDBVal idVal = MDBVal.calloc(stack);
598+
MDBVal dataVal = MDBVal.calloc(stack);
599+
idVal.mv_data(idBuffer(stack).put(ID_KEY).put(data, 1, idLength).flip());
600+
long newCount = 1;
601+
if (mdb_get(writeTxn, refCountsDbi, idVal, dataVal) == 0) {
602+
// update count
603+
newCount = Varint.readUnsigned(dataVal.mv_data()) + 1;
604+
}
605+
// write count
606+
ByteBuffer countBb = stack.malloc(Varint.calcLengthUnsigned(newCount));
607+
Varint.writeUnsigned(countBb, newCount);
608+
dataVal.mv_data(countBb.flip());
609+
E(mdb_put(writeTxn, refCountsDbi, idVal, dataVal, 0));
610+
} finally {
611+
stack.pop();
612+
}
613+
}
614+
}
615+
616+
private boolean decrementRefCount(MemoryStack stack, long writeTxn, ByteBuffer idBb) throws IOException {
617+
try {
618+
stack.push();
619+
620+
MDBVal idVal = MDBVal.calloc(stack);
621+
idVal.mv_data(idBb);
622+
MDBVal dataVal = MDBVal.calloc(stack);
623+
if (mdb_get(writeTxn, refCountsDbi, idVal, dataVal) == 0) {
624+
// update count
625+
long newCount = Varint.readUnsigned(dataVal.mv_data()) - 1;
626+
if (newCount <= 0) {
627+
E(mdb_del(writeTxn, refCountsDbi, idVal, null));
628+
return true;
629+
} else {
630+
// write count
631+
ByteBuffer countBb = stack.malloc(Varint.calcLengthUnsigned(newCount));
632+
Varint.writeUnsigned(countBb, newCount);
633+
dataVal.mv_data(countBb.flip());
634+
mdb_put(writeTxn, refCountsDbi, idVal, dataVal, 0);
635+
}
636+
}
637+
return false;
638+
} finally {
639+
stack.pop();
640+
}
641+
}
642+
535643
private long findId(byte[] data, boolean create) throws IOException {
536644
Long id = readTransaction(env, (stack, txn) -> {
537645
if (data.length <= MAX_KEY_SIZE) {
@@ -553,6 +661,9 @@ private long findId(byte[] data, boolean create) throws IOException {
553661

554662
E(mdb_put(writeTxn, dbi, dataVal, idVal, 0));
555663
E(mdb_put(writeTxn, dbi, idVal, dataVal, 0));
664+
665+
// update ref count if necessary
666+
incrementRefCount(stack2, writeTxn, data);
556667
return null;
557668
});
558669
return newId;
@@ -597,6 +708,9 @@ private long findId(byte[] data, boolean create) throws IOException {
597708
// store mapping of ID -> data
598709
E(mdb_put(writeTxn, dbi, idVal, dataVal, MDB_RESERVE));
599710
dataVal.mv_data().put(data);
711+
712+
// update ref count if necessary
713+
incrementRefCount(stack2, writeTxn, data);
600714
return null;
601715
});
602716
return newId;
@@ -783,50 +897,89 @@ public long getId(Value value, boolean create) throws IOException {
783897

784898
public void gcIds(Collection<Long> ids) throws IOException {
785899
if (!ids.isEmpty()) {
786-
resizeMap(writeTxn, 2 * ids.size() * (1 + Long.BYTES + 2 + Long.BYTES));
787-
788-
writeTransaction((stack, writeTxn) -> {
789-
MDBVal revIdVal = MDBVal.calloc(stack);
790-
MDBVal dataVal = MDBVal.calloc(stack);
791-
792-
ByteBuffer revIdBb = stack.malloc(1 + Long.BYTES + 2 + Long.BYTES);
793-
Varint.writeUnsigned(revIdBb, revision.getRevisionId());
794-
int revLength = revIdBb.position();
795-
for (Long id : ids) {
796-
revIdBb.position(revLength).limit(revIdBb.capacity());
797-
revIdVal.mv_data(id2data(revIdBb, id).flip());
798-
E(mdb_put(writeTxn, unusedDbi, revIdVal, dataVal, 0));
900+
// contains IDs for data types and namespaces which are freed by garbage collecting literals and URIs
901+
Collection<Long> nextIds = new ArrayList<>();
902+
do {
903+
if (!nextIds.isEmpty()) {
904+
ids = nextIds;
905+
nextIds = new ArrayList<>();
799906
}
800907

801-
deleteValueToIdMappings(stack, writeTxn, ids);
908+
resizeMap(writeTxn, 2 * ids.size() * (1 + Long.BYTES + 2 + Long.BYTES));
909+
910+
final Collection<Long> finalIds = ids;
911+
final Collection<Long> finalNextIds = nextIds;
912+
writeTransaction((stack, writeTxn) -> {
913+
MDBVal revIdVal = MDBVal.calloc(stack);
914+
MDBVal idVal = MDBVal.calloc(stack);
915+
MDBVal dataVal = MDBVal.calloc(stack);
916+
917+
ByteBuffer revIdBb = stack.malloc(1 + Long.BYTES + 2 + Long.BYTES);
918+
Varint.writeUnsigned(revIdBb, revision.getRevisionId());
919+
int revLength = revIdBb.position();
920+
for (Long id : finalIds) {
921+
revIdBb.position(revLength).limit(revIdBb.capacity());
922+
revIdVal.mv_data(id2data(revIdBb, id).flip());
923+
// check if id has internal references and therefore cannot be deleted
924+
idVal.mv_data(revIdBb.slice().position(revLength));
925+
if (mdb_get(writeTxn, refCountsDbi, idVal, dataVal) == 0) {
926+
continue;
927+
}
928+
// mark id as unused
929+
E(mdb_put(writeTxn, unusedDbi, revIdVal, dataVal, 0));
930+
}
802931

803-
invalidateRevisionOnCommit = true;
804-
if (nextValueEvictionTime < 0) {
805-
nextValueEvictionTime = System.currentTimeMillis() + VALUE_EVICTION_INTERVAL;
806-
}
807-
return null;
808-
});
932+
deleteValueToIdMappings(stack, writeTxn, finalIds, finalNextIds);
933+
934+
invalidateRevisionOnCommit = true;
935+
if (nextValueEvictionTime < 0) {
936+
nextValueEvictionTime = System.currentTimeMillis() + VALUE_EVICTION_INTERVAL;
937+
}
938+
return null;
939+
});
940+
} while (!nextIds.isEmpty());
809941
}
810942
}
811943

812-
protected void deleteValueToIdMappings(MemoryStack stack, long txn, Collection<Long> ids) throws IOException {
944+
protected void deleteValueToIdMappings(MemoryStack stack, long txn, Collection<Long> ids, Collection<Long> newGcIds)
945+
throws IOException {
813946
int maxHashKeyLength = 2 + 2 * Long.BYTES + 2;
814947
ByteBuffer hashBb = stack.malloc(maxHashKeyLength);
815948
MDBVal idVal = MDBVal.calloc(stack);
816949
ByteBuffer idBb = idBuffer(stack);
817950
MDBVal hashVal = MDBVal.calloc(stack);
818951
MDBVal dataVal = MDBVal.calloc(stack);
952+
MDBVal ignoreVal = MDBVal.calloc(stack);
953+
954+
ByteBuffer refIdBb = idBuffer(stack);
819955

820956
long valuesCursor = 0;
821957
try {
822958
for (Long id : ids) {
823959
idVal.mv_data(id2data(idBb.clear(), id).flip());
824-
if (mdb_get(txn, dbi, idVal, dataVal) == 0) {
960+
// id must not have a reference count and must have an existing value
961+
if (mdb_get(writeTxn, refCountsDbi, idVal, ignoreVal) != 0 &&
962+
mdb_get(txn, dbi, idVal, dataVal) == 0) {
825963
ByteBuffer dataBuffer = dataVal.mv_data();
826-
int dataLength = dataVal.mv_data().remaining();
964+
965+
// update ref count if literal or URI namespace is removed
966+
if (dataBuffer.get(0) == LITERAL_VALUE || dataBuffer.get(0) == URI_VALUE) {
967+
refIdBb.clear()
968+
.put(ID_KEY)
969+
.put(dataBuffer.slice()
970+
.position(1)
971+
.limit(1 + Varint.firstToLength(dataBuffer.get(1))))
972+
.flip();
973+
if (decrementRefCount(stack, txn, refIdBb)) {
974+
// add ID for GC
975+
newGcIds.add(Varint.readUnsigned(refIdBb, 1));
976+
}
977+
}
978+
979+
int dataLength = dataBuffer.remaining();
827980
if (dataLength > MAX_KEY_SIZE) {
828981
byte[] data = new byte[dataLength];
829-
dataVal.mv_data().get(data);
982+
dataBuffer.get(data);
830983
long dataHash = hash(data);
831984

832985
hashBb.clear();

0 commit comments

Comments
 (0)