Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.common.concurrent.locks;

import java.lang.invoke.VarHandle;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.StampedLock;

/**
* A lightweight read/write lock manager that avoids allocating per-lock objects. Readers are tracked using two
* {@link LongAdder LongAdders} while writers rely on a {@link StampedLock}. The read-lock method returns a constant
* stamp ({@link #READ_LOCK_STAMP}) and writers receive the stamp produced by the underlying {@link StampedLock}.
*/
public class StampedLongAdderLockManager {

/**
* Stamp returned to callers holding a read lock. Passing any other value to {@link #unlockRead(long)} is considered
* an illegal monitor state.
*/
public static final long READ_LOCK_STAMP = Long.MIN_VALUE;

private final StampedLock stampedLock = new StampedLock();
private final LongAdder readersLocked = new LongAdder();
private final LongAdder readersUnlocked = new LongAdder();

// milliseconds to wait when trying to acquire the write lock interruptibly
private final int tryWriteLockMillis;

// Number of spin attempts before temporarily releasing the write lock when readers are active.
private final int writePreference;

public StampedLongAdderLockManager() {
this(1, 100);
}

public StampedLongAdderLockManager(int writePreference, int tryWriteLockMillis) {
this.writePreference = Math.max(1, writePreference);
this.tryWriteLockMillis = Math.max(1, tryWriteLockMillis);
}

public boolean isWriterActive() {
return stampedLock.isWriteLocked();
}

public boolean isReaderActive() {
return readersUnlocked.sum() != readersLocked.sum();
}

public void waitForActiveWriter() throws InterruptedException {
while (stampedLock.isWriteLocked() && !isReaderActive()) {
spinWait();
}
}

public void waitForActiveReaders() throws InterruptedException {
while (isReaderActive()) {
spinWait();
}
}

public long readLock() throws InterruptedException {
readersLocked.increment();
while (stampedLock.isWriteLocked()) {
try {
spinWaitAtReadLock();
} catch (InterruptedException e) {
readersUnlocked.increment();
throw e;
}
}
return READ_LOCK_STAMP;
}

public long tryReadLock() {
readersLocked.increment();
if (!stampedLock.isWriteLocked()) {
return READ_LOCK_STAMP;
}
readersUnlocked.increment();
return 0L;
}

public void unlockRead(long stamp) {
if (stamp != READ_LOCK_STAMP) {
throw new IllegalMonitorStateException("Trying to release a stamp that is not a read lock");
}

VarHandle.acquireFence();
readersUnlocked.increment();
}

public long writeLock() throws InterruptedException {
long writeStamp = writeLockInterruptibly();
boolean lockAcquired = false;

try {
int attempts = 0;
do {
if (Thread.interrupted()) {
throw new InterruptedException();
}

if (!hasActiveReaders()) {
lockAcquired = true;
break;
}

if (attempts++ > writePreference) {
attempts = 0;

stampedLock.unlockWrite(writeStamp);
writeStamp = 0;

yieldWait();

writeStamp = writeLockInterruptibly();
} else {
spinWait();
}

} while (!lockAcquired);
} finally {
if (!lockAcquired && writeStamp != 0) {
stampedLock.unlockWrite(writeStamp);
}
}

VarHandle.releaseFence();
return writeStamp;
}

public long tryWriteLock() {
long writeStamp = stampedLock.tryWriteLock();
if (writeStamp == 0) {
return 0L;
}

if (!hasActiveReaders()) {
VarHandle.releaseFence();
return writeStamp;
}

stampedLock.unlockWrite(writeStamp);
return 0L;
}

public void unlockWrite(long stamp) {
if (stamp == 0) {
throw new IllegalMonitorStateException("Trying to release a write lock that is not locked");
}
stampedLock.unlockWrite(stamp);
}

private boolean hasActiveReaders() {
return readersUnlocked.sum() != readersLocked.sum();
}

private long writeLockInterruptibly() throws InterruptedException {
long writeStamp;
do {
if (Thread.interrupted()) {
throw new InterruptedException();
}
writeStamp = stampedLock.tryWriteLock(tryWriteLockMillis, TimeUnit.MILLISECONDS);
} while (writeStamp == 0);
return writeStamp;
}

private void spinWait() throws InterruptedException {
Thread.onSpinWait();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}

private void spinWaitAtReadLock() throws InterruptedException {
Thread.onSpinWait();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}

private void yieldWait() throws InterruptedException {
Thread.yield();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.common.concurrent.locks;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Test;

class StampedLongAdderLockManagerTest {

@Test
void writeLockWaitsForReaders() throws Exception {
StampedLongAdderLockManager manager = new StampedLongAdderLockManager();
long readStamp = manager.readLock();
assertTrue(manager.isReaderActive());

ExecutorService executor = Executors.newSingleThreadExecutor();
try {
CountDownLatch attemptingWrite = new CountDownLatch(1);
AtomicBoolean acquiredWrite = new AtomicBoolean(false);

Future<Long> writeFuture = executor.submit(() -> {
attemptingWrite.countDown();
long stamp = manager.writeLock();
acquiredWrite.set(true);
return stamp;
});

assertTrue(attemptingWrite.await(500, TimeUnit.MILLISECONDS), "write attempt did not start in time");
TimeUnit.MILLISECONDS.sleep(100);
assertFalse(acquiredWrite.get(), "write lock acquired while read lock active");

manager.unlockRead(readStamp);
long writeStamp = writeFuture.get(2, TimeUnit.SECONDS);
assertTrue(acquiredWrite.get());
assertTrue(manager.isWriterActive());
manager.unlockWrite(writeStamp);
assertFalse(manager.isWriterActive());
} finally {
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}

@Test
void readLockWaitsForWriters() throws Exception {
StampedLongAdderLockManager manager = new StampedLongAdderLockManager();
long writeStamp = manager.writeLock();
assertTrue(manager.isWriterActive());

ExecutorService executor = Executors.newSingleThreadExecutor();
try {
CountDownLatch attemptingRead = new CountDownLatch(1);
AtomicBoolean acquiredRead = new AtomicBoolean(false);

Future<Long> readFuture = executor.submit(() -> {
attemptingRead.countDown();
long stamp = manager.readLock();
acquiredRead.set(true);
return stamp;
});

assertTrue(attemptingRead.await(500, TimeUnit.MILLISECONDS), "read attempt did not start in time");
TimeUnit.MILLISECONDS.sleep(100);
assertFalse(acquiredRead.get(), "read lock acquired while write lock active");

manager.unlockWrite(writeStamp);
long readStamp = readFuture.get(2, TimeUnit.SECONDS);
assertTrue(acquiredRead.get());
assertEquals(StampedLongAdderLockManager.READ_LOCK_STAMP, readStamp);
assertTrue(manager.isReaderActive());
manager.unlockRead(readStamp);
assertFalse(manager.isReaderActive());
} finally {
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.StampedLock;

import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
import org.lwjgl.PointerBuffer;
Expand Down Expand Up @@ -61,20 +61,25 @@ class LmdbContextIdIterator implements Closeable {

private boolean fetchNext = false;

private final StampedLock txnLock;
private final StampedLongAdderLockManager txnLockManager;

private final Thread ownerThread = Thread.currentThread();

LmdbContextIdIterator(Pool pool, int dbi, Txn txnRef) throws IOException {
this.pool = pool;
LmdbContextIdIterator(int dbi, Txn txnRef) throws IOException {
this.pool = Pool.get();
this.keyData = pool.getVal();
this.valueData = pool.getVal();

this.dbi = dbi;
this.txnRef = txnRef;
this.txnLock = txnRef.lock();
this.txnLockManager = txnRef.lockManager();

long stamp = txnLock.readLock();
long readStamp;
try {
readStamp = txnLockManager.readLock();
} catch (InterruptedException e) {
throw new SailException(e);
}
try {
this.txnRefVersion = txnRef.version();
this.txn = txnRef.get();
Expand All @@ -85,12 +90,17 @@ class LmdbContextIdIterator implements Closeable {
cursor = pp.get(0);
}
} finally {
txnLock.unlockRead(stamp);
txnLockManager.unlockRead(readStamp);
}
}

public long[] next() {
long stamp = txnLock.readLock();
long readStamp;
try {
readStamp = txnLockManager.readLock();
} catch (InterruptedException e) {
throw new SailException(e);
}
try {
if (txnRefVersion != txnRef.version()) {
// cursor must be renewed
Expand Down Expand Up @@ -143,17 +153,21 @@ public long[] next() {
} catch (IOException e) {
throw new SailException(e);
} finally {
txnLock.unlockRead(stamp);
txnLockManager.unlockRead(readStamp);
}
}

private void closeInternal(boolean maybeCalledAsync) {
if (!closed) {
long stamp;
long writeStamp = 0L;
boolean writeLocked = false;
if (maybeCalledAsync && ownerThread != Thread.currentThread()) {
stamp = txnLock.writeLock();
} else {
stamp = 0;
try {
writeStamp = txnLockManager.writeLock();
writeLocked = true;
} catch (InterruptedException e) {
throw new SailException(e);
}
}
try {
if (!closed) {
Expand All @@ -166,8 +180,8 @@ private void closeInternal(boolean maybeCalledAsync) {
}
} finally {
closed = true;
if (stamp != 0) {
txnLock.unlockWrite(stamp);
if (writeLocked) {
txnLockManager.unlockWrite(writeStamp);
}
}
}
Expand Down
Loading
Loading