Skip to content

Commit 1e16f58

Browse files
committed
GH-2998 improve memory overflow performance of the LmdbStore
1 parent 1932cff commit 1e16f58

6 files changed

Lines changed: 226 additions & 53 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
import java.nio.ByteOrder;
1717
import java.util.ArrayList;
1818
import java.util.Comparator;
19-
import java.util.HashMap;
2019
import java.util.List;
21-
import java.util.Map;
22-
import java.util.Map.Entry;
2320
import java.util.Objects;
2421
import java.util.Set;
2522
import java.util.concurrent.ExecutorService;
@@ -75,7 +72,7 @@ class LmdbSailStore implements SailStore {
7572
private volatile boolean asyncTransactionFinished;
7673
private volatile boolean nextTransactionAsync;
7774

78-
private final boolean enableMultiThreading = true;
75+
boolean enableMultiThreading = true;
7976

8077
private PersistentSetFactory<Long> setFactory;
8178
private PersistentSet<Long> unusedIds, nextUnusedIds;
@@ -573,6 +570,51 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai
573570
addStatement(subj, pred, obj, explicit, ctx);
574571
}
575572

573+
@Override
574+
public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts) {
575+
576+
sinkStoreAccessLock.lock();
577+
try {
578+
startTransaction(true);
579+
580+
for (Statement statement : approved) {
581+
Resource subj = statement.getSubject();
582+
IRI pred = statement.getPredicate();
583+
Value obj = statement.getObject();
584+
Resource context = statement.getContext();
585+
586+
AddQuadOperation q = new AddQuadOperation();
587+
q.s = valueStore.storeValue(subj);
588+
q.p = valueStore.storeValue(pred);
589+
q.o = valueStore.storeValue(obj);
590+
q.c = context == null ? 0 : valueStore.storeValue(context);
591+
q.context = context;
592+
q.explicit = explicit;
593+
594+
if (multiThreadingActive) {
595+
while (!opQueue.add(q)) {
596+
if (tripleStoreException != null) {
597+
throw wrapTripleStoreException();
598+
}
599+
}
600+
601+
} else {
602+
q.execute();
603+
}
604+
605+
}
606+
} catch (IOException e) {
607+
rollback();
608+
throw new SailException(e);
609+
} catch (RuntimeException e) {
610+
rollback();
611+
logger.error("Encountered an unexpected problem while trying to add a statement", e);
612+
throw e;
613+
} finally {
614+
sinkStoreAccessLock.unlock();
615+
}
616+
}
617+
576618
@Override
577619
public void deprecate(Statement statement) throws SailException {
578620
removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit,

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,11 @@ protected void initializeInternal() throws SailException {
253253
backingStore = new LmdbSailStore(dataDir, config);
254254
this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() {
255255
@Override
256-
protected SailStore createSailStore(File dataDir) throws IOException, SailException {
256+
protected LmdbSailStore createSailStore(File dataDir) throws IOException, SailException {
257257
// Model can't fit into memory, use another LmdbSailStore to store delta
258-
return new LmdbSailStore(dataDir, config);
258+
LmdbSailStore lmdbSailStore = new LmdbSailStore(dataDir, config);
259+
lmdbSailStore.enableMultiThreading = false;
260+
return lmdbSailStore;
259261
}
260262
}) {
261263

@@ -406,4 +408,5 @@ private boolean upgradeStore(File dataDir, String version) throws SailException
406408
public Supplier<CollectionFactory> getCollectionFactory() {
407409
return () -> new MapDb3CollectionFactory(getIterationCacheSyncThreshold());
408410
}
411+
409412
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java

Lines changed: 120 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
*
99
* SPDX-License-Identifier: BSD-3-Clause
1010
*******************************************************************************/
11+
1112
package org.eclipse.rdf4j.sail.lmdb;
1213

1314
import java.io.File;
@@ -16,6 +17,7 @@
1617
import java.io.ObjectOutputStream;
1718
import java.nio.file.Files;
1819
import java.util.Collection;
20+
import java.util.HashSet;
1921
import java.util.Iterator;
2022
import java.util.Optional;
2123
import java.util.Set;
@@ -32,7 +34,6 @@
3234
import org.eclipse.rdf4j.model.impl.LinkedHashModel;
3335
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
3436
import org.eclipse.rdf4j.sail.SailException;
35-
import org.eclipse.rdf4j.sail.base.SailStore;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

@@ -47,21 +48,24 @@ abstract class MemoryOverflowModel extends AbstractModel {
4748

4849
private static final Runtime RUNTIME = Runtime.getRuntime();
4950

50-
private static final int LARGE_BLOCK = 10000;
51+
private static final int LARGE_BLOCK = 5 * 1024;
52+
53+
private static volatile boolean overflow;
5154

5255
// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
53-
// we have space for one more block. The limit is currently set at 32 MB
54-
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024;
56+
// we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps.
57+
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024
58+
: 32 * 1024 * 1024;
5559

5660
final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);
5761

58-
private LinkedHashModel memory;
62+
private volatile LinkedHashModel memory;
5963

60-
transient File dataDir;
64+
private transient File dataDir;
6165

62-
transient SailStore store;
66+
private transient LmdbSailStore store;
6367

64-
transient SailSourceModel disk;
68+
private transient volatile SailSourceModel disk;
6569

6670
private long baseline = 0;
6771

@@ -70,7 +74,7 @@ abstract class MemoryOverflowModel extends AbstractModel {
7074
SimpleValueFactory vf = SimpleValueFactory.getInstance();
7175

7276
public MemoryOverflowModel() {
73-
memory = new LinkedHashModel(LARGE_BLOCK);
77+
memory = new LinkedHashModel(LARGE_BLOCK * 2);
7478
}
7579

7680
public MemoryOverflowModel(Model model) {
@@ -137,6 +141,33 @@ public boolean add(Statement st) {
137141
return getDelegate().add(st);
138142
}
139143

144+
@Override
145+
public boolean addAll(Collection<? extends Statement> c) {
146+
checkMemoryOverflow();
147+
if (disk != null || c.size() <= 1024) {
148+
return getDelegate().addAll(c);
149+
} else {
150+
boolean ret = false;
151+
HashSet<Statement> buffer = new HashSet<>();
152+
for (Statement st : c) {
153+
buffer.add(st);
154+
if (buffer.size() >= 1024) {
155+
ret |= getDelegate().addAll(buffer);
156+
buffer.clear();
157+
innerCheckMemoryOverflow();
158+
}
159+
}
160+
if (!buffer.isEmpty()) {
161+
ret |= getDelegate().addAll(buffer);
162+
buffer.clear();
163+
}
164+
165+
return ret;
166+
167+
}
168+
169+
}
170+
140171
@Override
141172
public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) {
142173
return getDelegate().remove(subj, pred, obj, contexts);
@@ -191,13 +222,27 @@ public synchronized void removeTermIteration(Iterator<Statement> iter, Resource
191222
}
192223
}
193224

194-
protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException;
225+
protected abstract LmdbSailStore createSailStore(File dataDir) throws IOException, SailException;
195226

196-
synchronized Model getDelegate() {
197-
if (disk == null) {
227+
private Model getDelegate() {
228+
var memory = this.memory;
229+
if (memory != null) {
198230
return memory;
231+
} else {
232+
var disk = this.disk;
233+
if (disk != null) {
234+
return disk;
235+
}
236+
synchronized (this) {
237+
if (this.memory != null) {
238+
return this.memory;
239+
}
240+
if (this.disk != null) {
241+
return this.disk;
242+
}
243+
throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state");
244+
}
199245
}
200-
return disk;
201246
}
202247

203248
private void writeObject(ObjectOutputStream s) throws IOException {
@@ -227,51 +272,84 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx
227272
}
228273
}
229274

230-
private synchronized void checkMemoryOverflow() {
231-
if (disk == null) {
232-
int size = size();
233-
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
234-
// maximum heap size the JVM can allocate
235-
long maxMemory = RUNTIME.maxMemory();
275+
private void checkMemoryOverflow() {
276+
if (disk == getDelegate()) {
277+
return;
278+
}
236279

237-
// total currently allocated JVM memory
238-
long totalMemory = RUNTIME.totalMemory();
280+
if (overflow) {
281+
innerCheckMemoryOverflow();
282+
}
283+
int size = size() + 1;
284+
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
285+
innerCheckMemoryOverflow();
286+
}
239287

240-
// amount of memory free in the currently allocated JVM memory
241-
long freeMemory = RUNTIME.freeMemory();
288+
}
242289

243-
// estimated memory used
244-
long used = totalMemory - freeMemory;
290+
private void innerCheckMemoryOverflow() {
291+
if (disk == getDelegate()) {
292+
return;
293+
}
245294

246-
// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
247-
long freeToAllocateMemory = maxMemory - used;
295+
// maximum heap size the JVM can allocate
296+
long maxMemory = RUNTIME.maxMemory();
248297

249-
if (baseline > 0) {
250-
long blockSize = used - baseline;
251-
if (blockSize > maxBlockSize) {
252-
maxBlockSize = blockSize;
253-
}
298+
// total currently allocated JVM memory
299+
long totalMemory = RUNTIME.totalMemory();
254300

255-
// Sync if either the estimated size of the next block is larger than remaining memory, or
256-
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
257-
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
258-
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
259-
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
260-
overflowToDisk();
261-
}
301+
// amount of memory free in the currently allocated JVM memory
302+
long freeMemory = RUNTIME.freeMemory();
303+
304+
// estimated memory used
305+
long used = totalMemory - freeMemory;
306+
307+
// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
308+
long freeToAllocateMemory = maxMemory - used;
309+
310+
if (baseline > 0) {
311+
long blockSize = used - baseline;
312+
if (blockSize > maxBlockSize) {
313+
maxBlockSize = blockSize;
314+
}
315+
if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) {
316+
// stricter memory requirements to not overflow if other models are overflowing
317+
logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
318+
overflowToDisk();
319+
System.gc();
320+
} else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
321+
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
322+
// Sync if either the estimated size of the next block is larger than remaining memory, or
323+
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
324+
325+
logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
326+
overflowToDisk();
327+
System.gc();
328+
} else {
329+
if (overflow) {
330+
overflow = false;
262331
}
263-
baseline = used;
264332
}
265333
}
334+
baseline = used;
266335
}
267336

268337
private synchronized void overflowToDisk() {
338+
overflow = true;
339+
if (memory == null) {
340+
assert disk != null;
341+
return;
342+
}
343+
269344
try {
345+
LinkedHashModel memory = this.memory;
346+
this.memory = null;
347+
270348
assert disk == null;
271349
dataDir = Files.createTempDirectory("model").toFile();
272350
logger.debug("memory overflow using temp directory {}", dataDir);
273351
store = createSailStore(dataDir);
274-
disk = new SailSourceModel(store) {
352+
disk = new SailSourceModel(store, memory) {
275353

276354
@Override
277355
protected void finalize() throws Throwable {
@@ -291,8 +369,7 @@ protected void finalize() throws Throwable {
291369
super.finalize();
292370
}
293371
};
294-
disk.addAll(memory);
295-
memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK);
372+
296373
logger.debug("overflow synced to disk");
297374
} catch (IOException | SailException e) {
298375
String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)";

0 commit comments

Comments
 (0)