diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/StampedLongAdderLockManager.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/StampedLongAdderLockManager.java new file mode 100644 index 00000000000..9cc709e1bdf --- /dev/null +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/StampedLongAdderLockManager.java @@ -0,0 +1,198 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.common.concurrent.locks; + +import java.lang.invoke.VarHandle; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.StampedLock; + +/** + * A lightweight read/write lock manager that avoids allocating per-lock objects. Readers are tracked using two + * {@link LongAdder LongAdders} while writers rely on a {@link StampedLock}. The read-lock method returns a constant + * stamp ({@link #READ_LOCK_STAMP}) and writers receive the stamp produced by the underlying {@link StampedLock}. + */ +public class StampedLongAdderLockManager { + + /** + * Stamp returned to callers holding a read lock. Passing any other value to {@link #unlockRead(long)} is considered + * an illegal monitor state. + */ + public static final long READ_LOCK_STAMP = Long.MIN_VALUE; + + private final StampedLock stampedLock = new StampedLock(); + private final LongAdder readersLocked = new LongAdder(); + private final LongAdder readersUnlocked = new LongAdder(); + + // milliseconds to wait when trying to acquire the write lock interruptibly + private final int tryWriteLockMillis; + + // Number of spin attempts before temporarily releasing the write lock when readers are active. + private final int writePreference; + + public StampedLongAdderLockManager() { + this(1, 100); + } + + public StampedLongAdderLockManager(int writePreference, int tryWriteLockMillis) { + this.writePreference = Math.max(1, writePreference); + this.tryWriteLockMillis = Math.max(1, tryWriteLockMillis); + } + + public boolean isWriterActive() { + return stampedLock.isWriteLocked(); + } + + public boolean isReaderActive() { + return readersUnlocked.sum() != readersLocked.sum(); + } + + public void waitForActiveWriter() throws InterruptedException { + while (stampedLock.isWriteLocked() && !isReaderActive()) { + spinWait(); + } + } + + public void waitForActiveReaders() throws InterruptedException { + while (isReaderActive()) { + spinWait(); + } + } + + public long readLock() throws InterruptedException { + readersLocked.increment(); + while (stampedLock.isWriteLocked()) { + try { + spinWaitAtReadLock(); + } catch (InterruptedException e) { + readersUnlocked.increment(); + throw e; + } + } + return READ_LOCK_STAMP; + } + + public long tryReadLock() { + readersLocked.increment(); + if (!stampedLock.isWriteLocked()) { + return READ_LOCK_STAMP; + } + readersUnlocked.increment(); + return 0L; + } + + public void unlockRead(long stamp) { + if (stamp != READ_LOCK_STAMP) { + throw new IllegalMonitorStateException("Trying to release a stamp that is not a read lock"); + } + + VarHandle.acquireFence(); + readersUnlocked.increment(); + } + + public long writeLock() throws InterruptedException { + long writeStamp = writeLockInterruptibly(); + boolean lockAcquired = false; + + try { + int attempts = 0; + do { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + if (!hasActiveReaders()) { + lockAcquired = true; + break; + } + + if (attempts++ > writePreference) { + attempts = 0; + + stampedLock.unlockWrite(writeStamp); + writeStamp = 0; + + yieldWait(); + + writeStamp = writeLockInterruptibly(); + } else { + spinWait(); + } + + } while (!lockAcquired); + } finally { + if (!lockAcquired && writeStamp != 0) { + stampedLock.unlockWrite(writeStamp); + } + } + + VarHandle.releaseFence(); + return writeStamp; + } + + public long tryWriteLock() { + long writeStamp = stampedLock.tryWriteLock(); + if (writeStamp == 0) { + return 0L; + } + + if (!hasActiveReaders()) { + VarHandle.releaseFence(); + return writeStamp; + } + + stampedLock.unlockWrite(writeStamp); + return 0L; + } + + public void unlockWrite(long stamp) { + if (stamp == 0) { + throw new IllegalMonitorStateException("Trying to release a write lock that is not locked"); + } + stampedLock.unlockWrite(stamp); + } + + private boolean hasActiveReaders() { + return readersUnlocked.sum() != readersLocked.sum(); + } + + private long writeLockInterruptibly() throws InterruptedException { + long writeStamp; + do { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + writeStamp = stampedLock.tryWriteLock(tryWriteLockMillis, TimeUnit.MILLISECONDS); + } while (writeStamp == 0); + return writeStamp; + } + + private void spinWait() throws InterruptedException { + Thread.onSpinWait(); + if (Thread.interrupted()) { + throw new InterruptedException(); + } + } + + private void spinWaitAtReadLock() throws InterruptedException { + Thread.onSpinWait(); + if (Thread.interrupted()) { + throw new InterruptedException(); + } + } + + private void yieldWait() throws InterruptedException { + Thread.yield(); + if (Thread.interrupted()) { + throw new InterruptedException(); + } + } +} diff --git a/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/StampedLongAdderLockManagerTest.java b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/StampedLongAdderLockManagerTest.java new file mode 100644 index 00000000000..d311615edfb --- /dev/null +++ b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/StampedLongAdderLockManagerTest.java @@ -0,0 +1,96 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.common.concurrent.locks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; + +class StampedLongAdderLockManagerTest { + + @Test + void writeLockWaitsForReaders() throws Exception { + StampedLongAdderLockManager manager = new StampedLongAdderLockManager(); + long readStamp = manager.readLock(); + assertTrue(manager.isReaderActive()); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + CountDownLatch attemptingWrite = new CountDownLatch(1); + AtomicBoolean acquiredWrite = new AtomicBoolean(false); + + Future writeFuture = executor.submit(() -> { + attemptingWrite.countDown(); + long stamp = manager.writeLock(); + acquiredWrite.set(true); + return stamp; + }); + + assertTrue(attemptingWrite.await(500, TimeUnit.MILLISECONDS), "write attempt did not start in time"); + TimeUnit.MILLISECONDS.sleep(100); + assertFalse(acquiredWrite.get(), "write lock acquired while read lock active"); + + manager.unlockRead(readStamp); + long writeStamp = writeFuture.get(2, TimeUnit.SECONDS); + assertTrue(acquiredWrite.get()); + assertTrue(manager.isWriterActive()); + manager.unlockWrite(writeStamp); + assertFalse(manager.isWriterActive()); + } finally { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + } + + @Test + void readLockWaitsForWriters() throws Exception { + StampedLongAdderLockManager manager = new StampedLongAdderLockManager(); + long writeStamp = manager.writeLock(); + assertTrue(manager.isWriterActive()); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + CountDownLatch attemptingRead = new CountDownLatch(1); + AtomicBoolean acquiredRead = new AtomicBoolean(false); + + Future readFuture = executor.submit(() -> { + attemptingRead.countDown(); + long stamp = manager.readLock(); + acquiredRead.set(true); + return stamp; + }); + + assertTrue(attemptingRead.await(500, TimeUnit.MILLISECONDS), "read attempt did not start in time"); + TimeUnit.MILLISECONDS.sleep(100); + assertFalse(acquiredRead.get(), "read lock acquired while write lock active"); + + manager.unlockWrite(writeStamp); + long readStamp = readFuture.get(2, TimeUnit.SECONDS); + assertTrue(acquiredRead.get()); + assertEquals(StampedLongAdderLockManager.READ_LOCK_STAMP, readStamp); + assertTrue(manager.isReaderActive()); + manager.unlockRead(readStamp); + assertFalse(manager.isReaderActive()); + } finally { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + } +} diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java index 1e81df6fd98..5e1a2269994 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java @@ -23,8 +23,8 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.locks.StampedLock; +import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn; import org.lwjgl.PointerBuffer; @@ -61,20 +61,25 @@ class LmdbContextIdIterator implements Closeable { private boolean fetchNext = false; - private final StampedLock txnLock; + private final StampedLongAdderLockManager txnLockManager; private final Thread ownerThread = Thread.currentThread(); - LmdbContextIdIterator(Pool pool, int dbi, Txn txnRef) throws IOException { - this.pool = pool; + LmdbContextIdIterator(int dbi, Txn txnRef) throws IOException { + this.pool = Pool.get(); this.keyData = pool.getVal(); this.valueData = pool.getVal(); this.dbi = dbi; this.txnRef = txnRef; - this.txnLock = txnRef.lock(); + this.txnLockManager = txnRef.lockManager(); - long stamp = txnLock.readLock(); + long readStamp; + try { + readStamp = txnLockManager.readLock(); + } catch (InterruptedException e) { + throw new SailException(e); + } try { this.txnRefVersion = txnRef.version(); this.txn = txnRef.get(); @@ -85,12 +90,17 @@ class LmdbContextIdIterator implements Closeable { cursor = pp.get(0); } } finally { - txnLock.unlockRead(stamp); + txnLockManager.unlockRead(readStamp); } } public long[] next() { - long stamp = txnLock.readLock(); + long readStamp; + try { + readStamp = txnLockManager.readLock(); + } catch (InterruptedException e) { + throw new SailException(e); + } try { if (txnRefVersion != txnRef.version()) { // cursor must be renewed @@ -143,17 +153,21 @@ public long[] next() { } catch (IOException e) { throw new SailException(e); } finally { - txnLock.unlockRead(stamp); + txnLockManager.unlockRead(readStamp); } } private void closeInternal(boolean maybeCalledAsync) { if (!closed) { - long stamp; + long writeStamp = 0L; + boolean writeLocked = false; if (maybeCalledAsync && ownerThread != Thread.currentThread()) { - stamp = txnLock.writeLock(); - } else { - stamp = 0; + try { + writeStamp = txnLockManager.writeLock(); + writeLocked = true; + } catch (InterruptedException e) { + throw new SailException(e); + } } try { if (!closed) { @@ -166,8 +180,8 @@ private void closeInternal(boolean maybeCalledAsync) { } } finally { closed = true; - if (stamp != 0) { - txnLock.unlockWrite(stamp); + if (writeLocked) { + txnLockManager.unlockWrite(writeStamp); } } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbRecordIterator.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbRecordIterator.java index 896df84de25..197c68deb5f 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbRecordIterator.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbRecordIterator.java @@ -24,8 +24,9 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.locks.StampedLock; +import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager; +import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.lmdb.TripleStore.TripleIndex; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn; import org.eclipse.rdf4j.sail.lmdb.Varint.GroupMatcher; @@ -74,13 +75,13 @@ class LmdbRecordIterator implements RecordIterator { private boolean fetchNext = false; - private final StampedLock txnLock; + private final StampedLongAdderLockManager txnLockManager; private final Thread ownerThread = Thread.currentThread(); - LmdbRecordIterator(Pool pool, TripleIndex index, boolean rangeSearch, long subj, long pred, long obj, + LmdbRecordIterator(TripleIndex index, boolean rangeSearch, long subj, long pred, long obj, long context, boolean explicit, Txn txnRef) throws IOException { - this.pool = pool; + this.pool = Pool.get(); this.keyData = pool.getVal(); this.valueData = pool.getVal(); this.index = index; @@ -107,9 +108,14 @@ class LmdbRecordIterator implements RecordIterator { } this.dbi = index.getDB(explicit); this.txnRef = txnRef; - this.txnLock = txnRef.lock(); + this.txnLockManager = txnRef.lockManager(); - long stamp = txnLock.readLock(); + long readStamp; + try { + readStamp = txnLockManager.readLock(); + } catch (InterruptedException e) { + throw new SailException(e); + } try { this.txnRefVersion = txnRef.version(); this.txn = txnRef.get(); @@ -120,13 +126,18 @@ class LmdbRecordIterator implements RecordIterator { cursor = pp.get(0); } } finally { - txnLock.unlockRead(stamp); + txnLockManager.unlockRead(readStamp); } } @Override public long[] next() { - long stamp = txnLock.readLock(); + long readStamp; + try { + readStamp = txnLockManager.readLock(); + } catch (InterruptedException e) { + throw new SailException(e); + } try { if (closed) { log.debug("Calling next() on an LmdbRecordIterator that is already closed, returning null"); @@ -191,17 +202,21 @@ public long[] next() { closeInternal(false); return null; } finally { - txnLock.unlockRead(stamp); + txnLockManager.unlockRead(readStamp); } } private void closeInternal(boolean maybeCalledAsync) { if (!closed) { - long stamp; + long writeStamp = 0L; + boolean writeLocked = false; if (maybeCalledAsync && ownerThread != Thread.currentThread()) { - stamp = txnLock.writeLock(); - } else { - stamp = 0; + try { + writeStamp = txnLockManager.writeLock(); + writeLocked = true; + } catch (InterruptedException e) { + throw new SailException(e); + } } try { if (!closed) { @@ -218,8 +233,8 @@ private void closeInternal(boolean maybeCalledAsync) { } } finally { closed = true; - if (stamp != 0) { - txnLock.unlockWrite(stamp); + if (writeLocked) { + txnLockManager.unlockWrite(writeStamp); } } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java index 8dc3c7019ed..9c0577e655a 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java @@ -200,4 +200,10 @@ public void clearInferred(Resource... contexts) throws SailException { sailChangedEvent.setStatementsRemoved(true); } + @Override + protected void closeInternal() throws SailException { + super.closeInternal(); + // release thread-local pool + Pool.release(); + } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java index 79d27cf4b7a..b154bc85ffd 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java @@ -39,8 +39,9 @@ import java.util.AbstractSet; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.locks.StampedLock; +import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager; +import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn; import org.lwjgl.PointerBuffer; import org.lwjgl.system.MemoryStack; @@ -112,7 +113,7 @@ public boolean remove(Object element) { } } - private synchronized boolean update(Object element, boolean add) throws IOException { + private synchronized boolean update(Object element, boolean add) throws IOException, InterruptedException { try (MemoryStack stack = MemoryStack.stackPush()) { if (factory.writeTxn == 0) { // start a write transaction @@ -189,7 +190,7 @@ private class ElementIterator implements Iterator { private final MDBVal valueData = MDBVal.malloc(); private final long cursor; - private final StampedLock txnLock; + private final StampedLongAdderLockManager txnLockManager; private Txn txnRef; private long txnRefVersion; @@ -199,9 +200,9 @@ private class ElementIterator implements Iterator { private ElementIterator(int dbi) { try { this.txnRef = factory.txnManager.createReadTxn(); - this.txnLock = txnRef.lock(); + this.txnLockManager = txnRef.lockManager(); - long stamp = txnLock.readLock(); + long readStamp = txnLockManager.readLock(); try { this.txnRefVersion = txnRef.version(); @@ -211,7 +212,7 @@ private ElementIterator(int dbi) { cursor = pp.get(0); } } finally { - txnLock.unlockRead(stamp); + txnLockManager.unlockRead(readStamp); } } catch (Exception e) { throw new RuntimeException(e); @@ -243,8 +244,8 @@ public T next() { return current; } - private T computeNext() throws IOException { - long stamp = txnLock.readLock(); + private T computeNext() throws IOException, InterruptedException { + long readStamp = txnLockManager.readLock(); try { if (txnRefVersion != txnRef.version()) { // cursor must be renewed @@ -267,7 +268,7 @@ private T computeNext() throws IOException { close(); return null; } finally { - txnLock.unlockRead(stamp); + txnLockManager.unlockRead(readStamp); } } @@ -275,13 +276,18 @@ public void close() { if (txnRef != null) { keyData.close(); valueData.close(); - long stamp = txnLock.readLock(); + long readStamp; + try { + readStamp = txnLockManager.readLock(); + } catch (InterruptedException e) { + throw new SailException(e); + } try { mdb_cursor_close(cursor); txnRef.close(); txnRef = null; } finally { - txnLock.unlockRead(stamp); + txnLockManager.unlockRead(readStamp); } } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSetFactory.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSetFactory.java index 2b8e513e4de..a7e04daea94 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSetFactory.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSetFactory.java @@ -36,10 +36,10 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; -import java.util.concurrent.locks.StampedLock; import java.util.function.Function; import org.apache.commons.io.FileUtils; +import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode; import org.lwjgl.PointerBuffer; import org.lwjgl.system.MemoryStack; @@ -106,10 +106,10 @@ synchronized void commit() throws IOException { } } - void ensureResize() throws IOException { + void ensureResize() throws IOException, InterruptedException { if (LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) { - StampedLock lock = txnManager.lock(); - long stamp = lock.writeLock(); + StampedLongAdderLockManager lockManager = txnManager.lockManager(); + long writeStamp = lockManager.writeLock(); try { txnManager.deactivate(); @@ -124,7 +124,7 @@ void ensureResize() throws IOException { try { txnManager.activate(); } finally { - lock.unlockWrite(stamp); + lockManager.unlockWrite(writeStamp); } } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/Pool.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/Pool.java index e62c1142b5d..ebb99525824 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/Pool.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/Pool.java @@ -19,81 +19,85 @@ * A simple pool for {@link MDBVal}, {@link ByteBuffer} and {@link Statistics} instances. */ class Pool { + // thread-local pool instance + private static final ThreadLocal threadlocal = ThreadLocal.withInitial(Pool::new); private final MDBVal[] valPool = new MDBVal[1024]; private final ByteBuffer[] keyPool = new ByteBuffer[1024]; private final Statistics[] statisticsPool = new Statistics[512]; - private volatile int valPoolIndex = -1; - private volatile int keyPoolIndex = -1; - private volatile int statisticsPoolIndex = -1; + private int valPoolIndex = -1; + private int keyPoolIndex = -1; + private int statisticsPoolIndex = -1; - MDBVal getVal() { - synchronized (valPool) { - if (valPoolIndex >= 0) { - return valPool[valPoolIndex--]; - } + final MDBVal getVal() { + if (valPoolIndex >= 0) { + return valPool[valPoolIndex--]; } return MDBVal.malloc(); } - ByteBuffer getKeyBuffer() { - synchronized (keyPool) { - if (keyPoolIndex >= 0) { - ByteBuffer bb = keyPool[keyPoolIndex--]; - bb.clear(); - return bb; - } + final ByteBuffer getKeyBuffer() { + if (keyPoolIndex >= 0) { + ByteBuffer bb = keyPool[keyPoolIndex--]; + bb.clear(); + return bb; } return MemoryUtil.memAlloc(TripleStore.MAX_KEY_LENGTH); } - Statistics getStatistics() { - synchronized (statisticsPool) { - if (statisticsPoolIndex >= 0) { - return statisticsPool[statisticsPoolIndex--]; - } + final Statistics getStatistics() { + if (statisticsPoolIndex >= 0) { + return statisticsPool[statisticsPoolIndex--]; } return new Statistics(); } - void free(MDBVal val) { - synchronized (valPool) { - if (valPoolIndex < valPool.length - 1) { - valPool[++valPoolIndex] = val; - } else { - val.close(); - } + final void free(MDBVal val) { + if (valPoolIndex < valPool.length - 1) { + valPool[++valPoolIndex] = val; + } else { + val.close(); } } - void free(ByteBuffer bb) { - synchronized (keyPool) { - if (keyPoolIndex < keyPool.length - 1) { - keyPool[++keyPoolIndex] = bb; - } else { - MemoryUtil.memFree(bb); - } + final void free(ByteBuffer bb) { + if (keyPoolIndex < keyPool.length - 1) { + keyPool[++keyPoolIndex] = bb; + } else { + MemoryUtil.memFree(bb); } } - void free(Statistics statistics) { - synchronized (statisticsPool) { - if (statisticsPoolIndex < statisticsPool.length - 1) { - statisticsPool[++statisticsPoolIndex] = statistics; - } + final void free(Statistics statistics) { + if (statisticsPoolIndex < statisticsPool.length - 1) { + statisticsPool[++statisticsPoolIndex] = statistics; } } - void close() { - synchronized (valPool) { - while (valPoolIndex >= 0) { - valPool[valPoolIndex--].close(); - } + final void close() { + while (valPoolIndex >= 0) { + valPool[valPoolIndex--].close(); } - synchronized (keyPool) { - while (keyPoolIndex >= 0) { - MemoryUtil.memFree(keyPool[keyPoolIndex--]); - } + while (keyPoolIndex >= 0) { + MemoryUtil.memFree(keyPool[keyPoolIndex--]); } } + + /** + * Get a pool instance for the current thread. + * + * @return a Pool instance + */ + public static Pool get() { + return threadlocal.get(); + } + + /** + * Release the pool instance for the current thread. + */ + public static void release() { + Pool pool = threadlocal.get(); + pool.close(); + threadlocal.remove(); + } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java index ef3ab8ebc67..a39762135f1 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java @@ -78,6 +78,7 @@ import java.util.concurrent.locks.StampedLock; import java.util.function.Consumer; +import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn; @@ -162,7 +163,6 @@ class TripleStore implements Closeable { private long mapSize; private long writeTxn; private final TxnManager txnManager; - private final Pool pool = new Pool(); private TxnRecordCache recordCache = null; @@ -390,7 +390,7 @@ private void reindex(Set currentIndexSpecs, Set newIndexSpecs) t TripleIndex addedIndex = new TripleIndex(fieldSeq); RecordIterator[] sourceIter = { null }; try { - sourceIter[0] = new LmdbRecordIterator(pool, sourceIndex, false, -1, -1, -1, -1, + sourceIter[0] = new LmdbRecordIterator(sourceIndex, false, -1, -1, -1, -1, explicit, txnManager.createTxn(txn)); RecordIterator it = sourceIter[0]; @@ -481,7 +481,7 @@ public void close() throws IOException { * @throws IOException */ public LmdbContextIdIterator getContexts(Txn txn) throws IOException { - return new LmdbContextIdIterator(this.pool, this.contextsDbi, txn); + return new LmdbContextIdIterator(this.contextsDbi, txn); } /** @@ -510,7 +510,7 @@ public RecordIterator getTriples(Txn txn, long subj, long pred, long obj, long c private RecordIterator getTriplesUsingIndex(Txn txn, long subj, long pred, long obj, long context, boolean explicit, TripleIndex index, boolean rangeSearch) throws IOException { - return new LmdbRecordIterator(pool, index, rangeSearch, subj, pred, obj, context, explicit, txn); + return new LmdbRecordIterator(index, rangeSearch, subj, pred, obj, context, explicit, txn); } /** @@ -675,6 +675,7 @@ protected double cardinality(long subj, long pred, long obj, long context) throw } return txnManager.doWith((stack, txn) -> { + Pool pool = Pool.get(); final Statistics s = pool.getStatistics(); try { MDBVal maxKey = MDBVal.malloc(stack); @@ -1081,8 +1082,13 @@ void endTransaction(boolean commit) throws IOException { try { E(mdb_txn_commit(writeTxn)); if (recordCache != null) { - StampedLock lock = txnManager.lock(); - long stamp = lock.writeLock(); + StampedLongAdderLockManager lockManager = txnManager.lockManager(); + long readStamp; + try { + readStamp = lockManager.readLock(); + } catch (InterruptedException e) { + throw new SailException(e); + } try { txnManager.deactivate(); mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0); @@ -1102,7 +1108,7 @@ void endTransaction(boolean commit) throws IOException { try { txnManager.activate(); } finally { - lock.unlockWrite(stamp); + lockManager.unlockRead(readStamp); } } } else { @@ -1323,7 +1329,6 @@ public String toString() { void close() { mdb_dbi_close(env, dbiExplicit); mdb_dbi_close(env, dbiInferred); - pool.close(); } void clear(long txn) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java index 70d25a35a71..cf3d486b6fa 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java @@ -22,8 +22,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.IdentityHashMap; -import java.util.concurrent.locks.StampedLock; +import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager; +import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.lmdb.LmdbUtil.Transaction; import org.lwjgl.PointerBuffer; import org.lwjgl.system.MemoryStack; @@ -36,7 +37,7 @@ class TxnManager { private final Mode mode; private final IdentityHashMap active = new IdentityHashMap<>(); private final long[] pool; - private final StampedLock lock = new StampedLock(); + private final StampedLongAdderLockManager lockManager = new StampedLongAdderLockManager(); private final long env; private volatile int poolIndex = -1; @@ -105,26 +106,28 @@ long createReadTxnInternal() throws IOException { } T doWith(Transaction transaction) throws IOException { - long stamp = lock.readLock(); + long readStamp; + try { + readStamp = lockManager.readLock(); + } catch (InterruptedException e) { + throw new SailException(e); + } T ret; try (MemoryStack stack = stackPush()) { try (Txn txn = createReadTxn()) { ret = transaction.exec(stack, txn.get()); } } finally { - lock.unlockRead(stamp); + lockManager.unlockRead(readStamp); } return ret; } /** - * This lock is used to globally block all transactions by using a {@link StampedLock#writeLock()}. This is required - * to block transactions while automatic resizing the memory map. - * - * @return lock for managed transactions + * Exposes the shared lock manager used to coordinate read and write transactions. */ - StampedLock lock() { - return lock; + StampedLongAdderLockManager lockManager() { + return lockManager; } void activate() throws IOException { @@ -170,14 +173,8 @@ long get() { return txn; } - /** - * A {@link StampedLock#readLock()} should be acquired while working with the transaction. This is required to - * block transactions while automatic resizing the memory map. - * - * @return lock for managed transactions - */ - StampedLock lock() { - return lock; + StampedLongAdderLockManager lockManager() { + return lockManager; } private void free(long txn) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java index 75dae449f68..4cfc59a5be0 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java @@ -57,6 +57,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.StampedLock; @@ -125,7 +126,7 @@ class ValueStore extends AbstractValueFactory { /** * A simple cache containing the [VALUE_CACHE_SIZE] most-recently used values stored by their ID. */ - private final LmdbValue[] valueCache; + private final AtomicReferenceArray valueCache; /** * A simple cache containing the [ID_CACHE_SIZE] most-recently used value-IDs stored by their value. */ @@ -193,7 +194,7 @@ class ValueStore extends AbstractValueFactory { this.mapSize = config.getValueDBSize(); open(); - valueCache = new LmdbValue[config.getValueCacheSize()]; + valueCache = new AtomicReferenceArray<>(config.getValueCacheSize()); valueIDCache = new ConcurrentCache<>(config.getValueIDCacheSize()); namespaceCache = new ConcurrentCache<>(config.getNamespaceCacheSize()); namespaceIDCache = new ConcurrentCache<>(config.getNamespaceIDCacheSize()); @@ -442,7 +443,7 @@ protected byte[] getData(long id) throws IOException { * @return the value object or null if not found */ LmdbValue cachedValue(long id) { - LmdbValue value = valueCache[(int) (id % valueCache.length)]; + LmdbValue value = valueCache.get((int) (id % valueCache.length())); if (value != null && value.getInternalID() == id) { return value; } @@ -459,7 +460,7 @@ LmdbValue cachedValue(long id) { * @return the value object or null if not found */ void cacheValue(long id, LmdbValue value) { - valueCache[(int) (id % valueCache.length)] = value; + valueCache.lazySet((int) (id % valueCache.length()), value); } /** @@ -470,34 +471,28 @@ void cacheValue(long id, LmdbValue value) { * @throws IOException If an I/O error occurred. */ public LmdbValue getLazyValue(long id) throws IOException { - long stamp = revisionLock.readLock(); - try { - // Check value cache - Long cacheID = id; - LmdbValue resultValue = cachedValue(cacheID); - - if (resultValue == null) { - switch ((byte) (id & 0x3)) { - case URI_VALUE: - resultValue = new LmdbIRI(lazyRevision, id); - break; - case LITERAL_VALUE: - resultValue = new LmdbLiteral(lazyRevision, id); - break; - case BNODE_VALUE: - resultValue = new LmdbBNode(lazyRevision, id); - break; - default: - throw new IOException("Unsupported value with type id " + (id & 0x3)); - } - // Store value in cache - cacheValue(cacheID, resultValue); + // Check value cache + LmdbValue resultValue = cachedValue(id); + + if (resultValue == null) { + switch ((byte) (id & 0x3)) { + case URI_VALUE: + resultValue = new LmdbIRI(lazyRevision, id); + break; + case LITERAL_VALUE: + resultValue = new LmdbLiteral(lazyRevision, id); + break; + case BNODE_VALUE: + resultValue = new LmdbBNode(lazyRevision, id); + break; + default: + throw new IOException("Unsupported value with type id " + (id & 0x3)); } - - return resultValue; - } finally { - revisionLock.unlockRead(stamp); + // Store value in cache + cacheValue(id, resultValue); } + + return resultValue; } /** @@ -511,8 +506,7 @@ public LmdbValue getValue(long id) throws IOException { long stamp = revisionLock.readLock(); try { // Check value cache - Long cacheID = id; - LmdbValue resultValue = cachedValue(cacheID); + LmdbValue resultValue = cachedValue(id); if (resultValue == null) { // Value not in cache, fetch it from file @@ -521,7 +515,7 @@ public LmdbValue getValue(long id) throws IOException { if (data != null) { resultValue = data2value(id, data, null); // Store value in cache - cacheValue(cacheID, resultValue); + cacheValue(id, resultValue); } } @@ -1243,7 +1237,9 @@ public void clear() throws IOException { } protected void clearCaches() { - Arrays.fill(valueCache, null); + for (int i = 0; i < valueCache.length(); i++) { + valueCache.set(i, null); + } valueIDCache.clear(); namespaceCache.clear(); namespaceIDCache.clear(); diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/RecordIteratorBenchmark.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/RecordIteratorBenchmark.java new file mode 100644 index 00000000000..b26b86a6279 --- /dev/null +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/RecordIteratorBenchmark.java @@ -0,0 +1,70 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ + +package org.eclipse.rdf4j.sail.lmdb; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.assertj.core.util.Files; +import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +/** + * @author Piotr SowiƄski + */ +@State(Scope.Benchmark) +@Warmup(iterations = 5) +@BenchmarkMode({ Mode.AverageTime }) +@Fork(value = 4, jvmArgs = { "-Xms1G", "-Xmx1G" }) +//@Fork(value = 1, jvmArgs = {"-Xms1G", "-Xmx1G", "-XX:StartFlightRecording=jdk.CPUTimeSample#enabled=true,filename=profile.jfr,method-profiling=max","-XX:FlightRecorderOptions=stackdepth=1024", "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints"}) +@Threads(value = 8) +@Measurement(iterations = 10) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class RecordIteratorBenchmark { + + private File dataDir; + private TripleStore tripleStore; + + @Setup(Level.Trial) + public void setup() throws IOException { + dataDir = Files.newTemporaryFolder(); + tripleStore = new TripleStore(dataDir, new LmdbStoreConfig("spoc,posc")); + + final int statements = 1_000_000; + tripleStore.startTransaction(); + for (int i = 0; i < statements; i++) { + tripleStore.storeTriple(i, i + 1, i + 2, 1, true); + } + tripleStore.commit(); + } + + @TearDown(Level.Trial) + public void tearDown() throws IOException { + tripleStore.close(); + FileUtils.deleteDirectory(dataDir); + } + + @Benchmark + public void iterateAll(Blackhole blackhole) throws IOException { + try (TxnManager.Txn txn = tripleStore.getTxnManager().createReadTxn()) { + try (RecordIterator it = tripleStore.getTriples(txn, 0, 0, 0, 1, true)) { + long[] item; + while ((item = it.next()) != null) { + blackhole.consume(item); + } + } + } + } +}