Skip to content

Commit 1932cff

Browse files
committed
GH-2998 improve memory overflow performance of the NativeStore
1 parent 7aaffd1 commit 1932cff

7 files changed

Lines changed: 304 additions & 93 deletions

File tree

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java

Lines changed: 112 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.ObjectOutputStream;
1717
import java.nio.file.Files;
1818
import java.util.Collection;
19+
import java.util.HashSet;
1920
import java.util.Iterator;
2021
import java.util.Optional;
2122
import java.util.Set;
@@ -48,11 +49,14 @@ abstract class MemoryOverflowModel extends AbstractModel {
4849

4950
private static final Runtime RUNTIME = Runtime.getRuntime();
5051

51-
private static final int LARGE_BLOCK = 10000;
52+
private static final int LARGE_BLOCK = 1024 * 5;
53+
54+
private static volatile boolean overflow;
5255

5356
// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
54-
// we have space for one more block. The limit is currently set at 32 MB
55-
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024;
57+
// we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps.
58+
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024
59+
: 32 * 1024 * 1024;
5660

5761
final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);
5862

@@ -71,7 +75,7 @@ abstract class MemoryOverflowModel extends AbstractModel {
7175
SimpleValueFactory vf = SimpleValueFactory.getInstance();
7276

7377
public MemoryOverflowModel() {
74-
memory = new LinkedHashModel(LARGE_BLOCK);
78+
memory = new LinkedHashModel(LARGE_BLOCK * 2);
7579
}
7680

7781
public MemoryOverflowModel(Model model) {
@@ -138,6 +142,33 @@ public boolean add(Statement st) {
138142
return getDelegate().add(st);
139143
}
140144

145+
@Override
146+
public boolean addAll(Collection<? extends Statement> c) {
147+
checkMemoryOverflow();
148+
if (disk != null || c.size() <= 1024) {
149+
return getDelegate().addAll(c);
150+
} else {
151+
boolean ret = false;
152+
HashSet<Statement> buffer = new HashSet<>();
153+
for (Statement st : c) {
154+
buffer.add(st);
155+
if (buffer.size() >= 1024) {
156+
ret |= getDelegate().addAll(buffer);
157+
buffer.clear();
158+
innerCheckMemoryOverflow();
159+
}
160+
}
161+
if (!buffer.isEmpty()) {
162+
ret |= getDelegate().addAll(buffer);
163+
buffer.clear();
164+
}
165+
166+
return ret;
167+
168+
}
169+
170+
}
171+
141172
@Override
142173
public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) {
143174
return getDelegate().remove(subj, pred, obj, contexts);
@@ -195,13 +226,23 @@ public synchronized void removeTermIteration(Iterator<Statement> iter, Resource
195226
protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException;
196227

197228
private Model getDelegate() {
198-
LinkedHashModel memory = this.memory;
229+
var memory = this.memory;
199230
if (memory != null) {
200231
return memory;
201232
} else {
202-
synchronized (this) {
233+
var disk = this.disk;
234+
if (disk != null) {
203235
return disk;
204236
}
237+
synchronized (this) {
238+
if (this.memory != null) {
239+
return this.memory;
240+
}
241+
if (this.disk != null) {
242+
return this.disk;
243+
}
244+
throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state");
245+
}
205246
}
206247
}
207248

@@ -232,45 +273,76 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx
232273
}
233274
}
234275

235-
private synchronized void checkMemoryOverflow() {
236-
if (disk == null) {
237-
int size = size();
238-
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
239-
// maximum heap size the JVM can allocate
240-
long maxMemory = RUNTIME.maxMemory();
241-
242-
// total currently allocated JVM memory
243-
long totalMemory = RUNTIME.totalMemory();
244-
245-
// amount of memory free in the currently allocated JVM memory
246-
long freeMemory = RUNTIME.freeMemory();
247-
248-
// estimated memory used
249-
long used = totalMemory - freeMemory;
250-
251-
// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
252-
long freeToAllocateMemory = maxMemory - used;
253-
254-
if (baseline > 0) {
255-
long blockSize = used - baseline;
256-
if (blockSize > maxBlockSize) {
257-
maxBlockSize = blockSize;
258-
}
259-
260-
// Sync if either the estimated size of the next block is larger than remaining memory, or
261-
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
262-
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
263-
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
264-
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
265-
overflowToDisk();
266-
}
276+
private void checkMemoryOverflow() {
277+
if (disk == getDelegate()) {
278+
return;
279+
}
280+
281+
if (overflow) {
282+
innerCheckMemoryOverflow();
283+
}
284+
int size = size() + 1;
285+
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
286+
innerCheckMemoryOverflow();
287+
}
288+
289+
}
290+
291+
private void innerCheckMemoryOverflow() {
292+
if (disk == getDelegate()) {
293+
return;
294+
}
295+
296+
// maximum heap size the JVM can allocate
297+
long maxMemory = RUNTIME.maxMemory();
298+
299+
// total currently allocated JVM memory
300+
long totalMemory = RUNTIME.totalMemory();
301+
302+
// amount of memory free in the currently allocated JVM memory
303+
long freeMemory = RUNTIME.freeMemory();
304+
305+
// estimated memory used
306+
long used = totalMemory - freeMemory;
307+
308+
// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
309+
long freeToAllocateMemory = maxMemory - used;
310+
311+
if (baseline > 0) {
312+
long blockSize = used - baseline;
313+
if (blockSize > maxBlockSize) {
314+
maxBlockSize = blockSize;
315+
}
316+
if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) {
317+
// stricter memory requirements to not overflow if other models are overflowing
318+
logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
319+
overflowToDisk();
320+
System.gc();
321+
} else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
322+
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
323+
// Sync if either the estimated size of the next block is larger than remaining memory, or
324+
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
325+
326+
logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
327+
overflowToDisk();
328+
System.gc();
329+
} else {
330+
if (overflow) {
331+
overflow = false;
267332
}
268-
baseline = used;
269333
}
270334
}
335+
baseline = used;
271336
}
272337

273338
private synchronized void overflowToDisk() {
339+
overflow = true;
340+
341+
if (memory == null) {
342+
assert disk != null;
343+
return;
344+
}
345+
274346
try {
275347
LinkedHashModel memory = this.memory;
276348
this.memory = null;
@@ -279,8 +351,7 @@ private synchronized void overflowToDisk() {
279351
dataDir = Files.createTempDirectory("model").toFile();
280352
logger.debug("memory overflow using temp directory {}", dataDir);
281353
store = createSailStore(dataDir);
282-
disk = new SailSourceModel(store);
283-
disk.addAll(memory);
354+
this.disk = new SailSourceModel(store, memory);
284355
logger.debug("overflow synced to disk");
285356
} catch (IOException | SailException e) {
286357
String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)";

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,10 @@ CloseableIteration<? extends Statement> createStatementIterator(Resource subj, I
316316
return tripleStore.cardinality(subjID, predID, objID, contextID);
317317
}
318318

319+
public void disableTxnStatus() {
320+
this.tripleStore.disableTxnStatus();
321+
}
322+
319323
private final class NativeSailSource extends BackingSailSource {
320324

321325
private final boolean explicit;
@@ -444,6 +448,44 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai
444448
addStatement(subj, pred, obj, explicit, ctx);
445449
}
446450

451+
@Override
452+
public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts) {
453+
sinkStoreAccessLock.lock();
454+
startTriplestoreTransaction();
455+
456+
try {
457+
for (Statement statement : approved) {
458+
Resource subj = statement.getSubject();
459+
IRI pred = statement.getPredicate();
460+
Value obj = statement.getObject();
461+
Resource context = statement.getContext();
462+
463+
int subjID = valueStore.storeValue(subj);
464+
int predID = valueStore.storeValue(pred);
465+
int objID = valueStore.storeValue(obj);
466+
467+
int contextID = 0;
468+
if (context != null) {
469+
contextID = valueStore.storeValue(context);
470+
}
471+
472+
boolean wasNew = tripleStore.storeTriple(subjID, predID, objID, contextID, explicit);
473+
if (wasNew && context != null) {
474+
contextStore.increment(context);
475+
}
476+
477+
}
478+
} catch (IOException e) {
479+
throw new SailException(e);
480+
} catch (RuntimeException e) {
481+
logger.error("Encountered an unexpected problem while trying to add a statement", e);
482+
throw e;
483+
} finally {
484+
sinkStoreAccessLock.unlock();
485+
}
486+
487+
}
488+
447489
@Override
448490
public void deprecate(Statement statement) throws SailException {
449491
removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit,

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ private static final class OverFlowStoreCleaner implements Runnable {
8181

8282
private OverFlowStoreCleaner(NativeSailStore nativeSailStore, File dataDir) {
8383
this.nativeSailStore = nativeSailStore;
84+
nativeSailStore.disableTxnStatus();
8485
this.dataDir = dataDir;
8586
}
8687

0 commit comments

Comments
 (0)