Skip to content

Commit 592b04a

Browse files
committed
GH-4891 LMDB store: Round trip to triple store to correctly gc any kind of value.
1 parent 540a948 commit 592b04a

7 files changed

Lines changed: 333 additions & 174 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ protected void handleClose() throws SailException {
5757
contextIdIt.close();
5858
}
5959

60-
protected SailException causeIOException(IOException e) {
60+
private SailException causeIOException(IOException e) {
6161
return new SailException(e);
6262
}
6363
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.locks.ReentrantLock;
30+
import java.util.function.Function;
3031

3132
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
3233
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
@@ -76,7 +77,8 @@ class LmdbSailStore implements SailStore {
7677

7778
private final boolean enableMultiThreading = true;
7879

79-
private final PersistentSet<Long> unusedIds;
80+
private PersistentSetFactory<Long> setFactory;
81+
private PersistentSet<Long> unusedIds, nextUnusedIds;
8082

8183
/**
8284
* A fast non-blocking circular buffer backed by an array.
@@ -179,19 +181,15 @@ abstract static class StatefulOperation implements Operation {
179181
* Creates a new {@link LmdbSailStore}.
180182
*/
181183
public LmdbSailStore(File dataDir, LmdbStoreConfig config) throws IOException, SailException {
182-
this.unusedIds = new PersistentSet<>(dataDir) {
183-
@Override
184-
protected byte[] write(Long element) {
185-
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
186-
bb.putLong(element);
187-
return bb.array();
188-
}
189-
190-
@Override
191-
protected Long read(ByteBuffer buffer) {
192-
return buffer.order(ByteOrder.BIG_ENDIAN).getLong();
193-
}
184+
this.setFactory = new PersistentSetFactory<>(dataDir);
185+
Function<Long, byte[]> encode = element -> {
186+
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
187+
bb.putLong(element);
188+
return bb.array();
194189
};
190+
Function<ByteBuffer, Long> decode = buffer -> buffer.order(ByteOrder.BIG_ENDIAN).getLong();
191+
this.unusedIds = setFactory.createSet("unusedIds", encode, decode);
192+
this.nextUnusedIds = setFactory.createSet("nextUnusedIds", encode, decode);
195193
boolean initialized = false;
196194
try {
197195
namespaceStore = new NamespaceStore(dataDir);
@@ -266,12 +264,11 @@ public void close() throws SailException {
266264
} finally {
267265
tripleStore.close();
268266
}
269-
270267
}
271-
272268
} finally {
273-
if (unusedIds != null) {
274-
unusedIds.close();
269+
if (setFactory != null) {
270+
setFactory.close();
271+
setFactory = null;
275272
}
276273
}
277274
}
@@ -460,8 +457,17 @@ protected void filterUsedIdsInTripleStore() throws IOException {
460457

461458
protected void handleRemovedIdsInValueStore() throws IOException {
462459
if (!unusedIds.isEmpty()) {
463-
valueStore.gcIds(unusedIds);
464-
unusedIds.clear();
460+
do {
461+
valueStore.gcIds(unusedIds, nextUnusedIds);
462+
unusedIds.clear();
463+
if (!nextUnusedIds.isEmpty()) {
464+
// swap sets
465+
PersistentSet<Long> ids = unusedIds;
466+
unusedIds = nextUnusedIds;
467+
nextUnusedIds = ids;
468+
filterUsedIdsInTripleStore();
469+
}
470+
} while (!unusedIds.isEmpty());
465471
}
466472
}
467473

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStatementIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ protected void handleClose() throws SailException {
8383
recordIt.close();
8484
}
8585

86-
protected SailException causeIOException(IOException e) {
86+
private SailException causeIOException(IOException e) {
8787
return new SailException(e);
8888
}
8989
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java

Lines changed: 24 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2023 Eclipse RDF4J contributors.
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
33
*
44
* All rights reserved. This program and the accompanying materials
55
* are made available under the terms of the Eclipse Distribution License v1.0
@@ -11,16 +11,9 @@
1111
package org.eclipse.rdf4j.sail.lmdb;
1212

1313
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
14-
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabase;
15-
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction;
16-
import static org.lwjgl.system.MemoryStack.stackPush;
1714
import static org.lwjgl.system.MemoryUtil.NULL;
18-
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
1915
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
20-
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
2116
import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE;
22-
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
23-
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
2417
import static org.lwjgl.util.lmdb.LMDB.MDB_SET;
2518
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
2619
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
@@ -30,114 +23,52 @@
3023
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_renew;
3124
import static org.lwjgl.util.lmdb.LMDB.mdb_del;
3225
import static org.lwjgl.util.lmdb.LMDB.mdb_drop;
33-
import static org.lwjgl.util.lmdb.LMDB.mdb_env_close;
34-
import static org.lwjgl.util.lmdb.LMDB.mdb_env_create;
35-
import static org.lwjgl.util.lmdb.LMDB.mdb_env_open;
36-
import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_mapsize;
37-
import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxdbs;
3826
import static org.lwjgl.util.lmdb.LMDB.mdb_put;
39-
import static org.lwjgl.util.lmdb.LMDB.mdb_stat;
4027
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
4128
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
42-
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;
4329

4430
import java.io.ByteArrayOutputStream;
45-
import java.io.File;
4631
import java.io.IOException;
4732
import java.io.InputStream;
4833
import java.io.ObjectInputStream;
4934
import java.io.ObjectOutputStream;
5035
import java.io.Serializable;
5136
import java.nio.ByteBuffer;
52-
import java.nio.file.Files;
53-
import java.nio.file.Path;
5437
import java.util.AbstractSet;
5538
import java.util.Iterator;
5639
import java.util.NoSuchElementException;
5740
import java.util.concurrent.locks.StampedLock;
5841

59-
import org.apache.commons.io.FileUtils;
60-
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode;
6142
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
6243
import org.lwjgl.PointerBuffer;
6344
import org.lwjgl.system.MemoryStack;
64-
import org.lwjgl.util.lmdb.MDBStat;
6545
import org.lwjgl.util.lmdb.MDBVal;
6646

6747
/**
6848
* A LMDB-based persistent set.
6949
*/
7050
class PersistentSet<T extends Serializable> extends AbstractSet<T> {
7151

72-
private final Path dbDir;
73-
private final long env;
52+
private PersistentSetFactory<T> factory;
7453
private final int dbi;
75-
private TxnManager txnManager;
76-
private long writeTxn;
77-
private PointerBuffer writeTxnPp = PointerBuffer.allocateDirect(1);
78-
private long mapSize = 1048576; // 1 MiB
79-
private long pageSize;
80-
8154
private int size;
8255

83-
public PersistentSet(File cacheDir) throws IOException {
84-
try (MemoryStack stack = stackPush()) {
85-
PointerBuffer pp = stack.mallocPointer(1);
86-
E(mdb_env_create(pp));
87-
env = pp.get(0);
88-
89-
txnManager = new TxnManager(env, Mode.ABORT);
90-
91-
E(mdb_env_set_maxdbs(env, 2));
92-
E(mdb_env_set_mapsize(env, mapSize));
93-
94-
int flags = MDB_NOTLS | MDB_NOSYNC | MDB_NOMETASYNC;
95-
96-
dbDir = Files.createTempDirectory(cacheDir.toPath(), "set");
97-
E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664));
98-
dbi = openDatabase(env, "elements", MDB_CREATE, null);
99-
100-
MDBStat stat = MDBStat.malloc(stack);
101-
readTransaction(env, (stack2, txn) -> {
102-
E(mdb_stat(txn, dbi, stat));
103-
pageSize = stat.ms_psize();
104-
return null;
105-
});
106-
}
107-
}
108-
109-
public synchronized void close() throws IOException {
110-
if (writeTxn != 0) {
111-
mdb_txn_abort(writeTxn);
112-
writeTxn = 0;
113-
}
114-
115-
// We don't need to free the pointer because it was allocated
116-
// by java.nio.ByteBuffer, which will handle freeing for us.
117-
// writeTxnPp.free();
118-
119-
mdb_env_close(env);
120-
FileUtils.deleteDirectory(dbDir.toFile());
121-
}
122-
123-
protected synchronized void commit() throws IOException {
124-
if (writeTxn != 0) {
125-
E(mdb_txn_commit(writeTxn));
126-
writeTxn = 0;
127-
}
56+
public PersistentSet(PersistentSetFactory<T> factory, int dbi) {
57+
this.factory = factory;
58+
this.dbi = dbi;
12859
}
12960

13061
public synchronized void clear() {
131-
if (writeTxn != 0) {
132-
mdb_txn_abort(writeTxn);
133-
writeTxn = 0;
62+
if (factory.writeTxn != 0) {
63+
mdb_txn_abort(factory.writeTxn);
64+
factory.writeTxn = 0;
13465
}
13566
try {
13667
// start a write transaction
137-
E(mdb_txn_begin(env, NULL, 0, writeTxnPp));
138-
writeTxn = writeTxnPp.get(0);
139-
mdb_drop(writeTxn, dbi, false);
140-
commit();
68+
E(mdb_txn_begin(factory.env, NULL, 0, factory.writeTxnPp));
69+
factory.writeTxn = factory.writeTxnPp.get(0);
70+
mdb_drop(factory.writeTxn, dbi, false);
71+
factory.commit();
14172
} catch (Exception e) {
14273
throw new RuntimeException(e);
14374
}
@@ -147,7 +78,7 @@ public synchronized void clear() {
14778
@Override
14879
public Iterator<T> iterator() {
14980
try {
150-
commit();
81+
factory.commit();
15182
} catch (Exception e) {
15283
throw new RuntimeException(e);
15384
}
@@ -175,34 +106,14 @@ public boolean remove(Object element) {
175106
}
176107
}
177108

178-
protected synchronized boolean update(Object element, boolean add) throws IOException {
109+
private synchronized boolean update(Object element, boolean add) throws IOException {
179110
try (MemoryStack stack = MemoryStack.stackPush()) {
180-
if (writeTxn == 0) {
111+
if (factory.writeTxn == 0) {
181112
// start a write transaction
182-
E(mdb_txn_begin(env, NULL, 0, writeTxnPp));
183-
writeTxn = writeTxnPp.get(0);
184-
}
185-
if (LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) {
186-
StampedLock lock = txnManager.lock();
187-
long stamp = lock.writeLock();
188-
try {
189-
txnManager.deactivate();
190-
191-
// resize map
192-
E(mdb_txn_commit(writeTxn));
193-
mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0);
194-
E(mdb_env_set_mapsize(env, mapSize));
195-
196-
E(mdb_txn_begin(env, NULL, 0, writeTxnPp));
197-
writeTxn = writeTxnPp.get(0);
198-
} finally {
199-
try {
200-
txnManager.activate();
201-
} finally {
202-
lock.unlockWrite(stamp);
203-
}
204-
}
113+
E(mdb_txn_begin(factory.env, NULL, 0, factory.writeTxnPp));
114+
factory.writeTxn = factory.writeTxnPp.get(0);
205115
}
116+
factory.ensureResize();
206117

207118
MDBVal keyVal = MDBVal.malloc(stack);
208119
// use calloc to get an empty data value
@@ -215,13 +126,13 @@ protected synchronized boolean update(Object element, boolean add) throws IOExce
215126
keyVal.mv_data(keyBuf);
216127

217128
if (add) {
218-
if (mdb_put(writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE) == MDB_SUCCESS) {
129+
if (mdb_put(factory.writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE) == MDB_SUCCESS) {
219130
size++;
220131
return true;
221132
}
222133
} else {
223134
// delete element
224-
if (mdb_del(writeTxn, dbi, keyVal, dataVal) == MDB_SUCCESS) {
135+
if (mdb_del(factory.writeTxn, dbi, keyVal, dataVal) == MDB_SUCCESS) {
225136
size--;
226137
return true;
227138
}
@@ -246,7 +157,7 @@ protected T read(ByteBuffer buffer) throws IOException {
246157
}
247158
}
248159

249-
protected class ElementIterator implements Iterator<T> {
160+
private class ElementIterator implements Iterator<T> {
250161

251162
private final MDBVal keyData = MDBVal.malloc();
252163
private final MDBVal valueData = MDBVal.malloc();
@@ -259,9 +170,9 @@ protected class ElementIterator implements Iterator<T> {
259170
private T next;
260171
private T current;
261172

262-
protected ElementIterator(int dbi) {
173+
private ElementIterator(int dbi) {
263174
try {
264-
this.txnRef = txnManager.createReadTxn();
175+
this.txnRef = factory.txnManager.createReadTxn();
265176
this.txnLock = txnRef.lock();
266177

267178
long stamp = txnLock.readLock();
@@ -306,7 +217,7 @@ public T next() {
306217
return current;
307218
}
308219

309-
public T computeNext() throws IOException {
220+
private T computeNext() throws IOException {
310221
long stamp = txnLock.readLock();
311222
try {
312223
if (txnRefVersion != txnRef.version()) {

0 commit comments

Comments
 (0)