Skip to content

Commit 2fcabe5

Browse files
Ostrzycielkenwenzel
authored andcommitted
GH-5433 Use RWLockManager in LmdbStore
1 parent cbc8e00 commit 2fcabe5

7 files changed

Lines changed: 186 additions & 53 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.nio.ByteBuffer;
2626
import java.util.concurrent.locks.StampedLock;
2727

28+
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
29+
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
2830
import org.eclipse.rdf4j.sail.SailException;
2931
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
3032
import org.lwjgl.PointerBuffer;
@@ -61,7 +63,7 @@ class LmdbContextIdIterator implements Closeable {
6163

6264
private boolean fetchNext = false;
6365

64-
private final StampedLock txnLock;
66+
private final ReadWriteLockManager txnLockManager;
6567

6668
private final Thread ownerThread = Thread.currentThread();
6769

@@ -72,9 +74,14 @@ class LmdbContextIdIterator implements Closeable {
7274

7375
this.dbi = dbi;
7476
this.txnRef = txnRef;
75-
this.txnLock = txnRef.lock();
77+
this.txnLockManager = txnRef.lockManager();
7678

77-
long stamp = txnLock.readLock();
79+
Lock lock;
80+
try {
81+
lock = txnLockManager.getReadLock();
82+
} catch (InterruptedException e) {
83+
throw new SailException(e);
84+
}
7885
try {
7986
this.txnRefVersion = txnRef.version();
8087
this.txn = txnRef.get();
@@ -85,12 +92,17 @@ class LmdbContextIdIterator implements Closeable {
8592
cursor = pp.get(0);
8693
}
8794
} finally {
88-
txnLock.unlockRead(stamp);
95+
lock.release();
8996
}
9097
}
9198

9299
public long[] next() {
93-
long stamp = txnLock.readLock();
100+
Lock lock;
101+
try {
102+
lock = txnLockManager.getReadLock();
103+
} catch (InterruptedException e) {
104+
throw new SailException(e);
105+
}
94106
try {
95107
if (txnRefVersion != txnRef.version()) {
96108
// cursor must be renewed
@@ -143,17 +155,21 @@ public long[] next() {
143155
} catch (IOException e) {
144156
throw new SailException(e);
145157
} finally {
146-
txnLock.unlockRead(stamp);
158+
lock.release();
147159
}
148160
}
149161

150162
private void closeInternal(boolean maybeCalledAsync) {
151163
if (!closed) {
152-
long stamp;
164+
Lock lock;
153165
if (maybeCalledAsync && ownerThread != Thread.currentThread()) {
154-
stamp = txnLock.writeLock();
166+
try {
167+
lock = txnLockManager.getWriteLock();
168+
} catch (InterruptedException e) {
169+
throw new SailException(e);
170+
}
155171
} else {
156-
stamp = 0;
172+
lock = null;
157173
}
158174
try {
159175
if (!closed) {
@@ -166,8 +182,8 @@ private void closeInternal(boolean maybeCalledAsync) {
166182
}
167183
} finally {
168184
closed = true;
169-
if (stamp != 0) {
170-
txnLock.unlockWrite(stamp);
185+
if (lock != null) {
186+
lock.release();
171187
}
172188
}
173189
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbRecordIterator.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import java.nio.ByteBuffer;
2727
import java.util.concurrent.locks.StampedLock;
2828

29+
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
30+
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
31+
import org.eclipse.rdf4j.sail.SailException;
2932
import org.eclipse.rdf4j.sail.lmdb.TripleStore.TripleIndex;
3033
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
3134
import org.eclipse.rdf4j.sail.lmdb.Varint.GroupMatcher;
@@ -74,7 +77,7 @@ class LmdbRecordIterator implements RecordIterator {
7477

7578
private boolean fetchNext = false;
7679

77-
private final StampedLock txnLock;
80+
private final ReadWriteLockManager txnLockManager;
7881

7982
private final Thread ownerThread = Thread.currentThread();
8083

@@ -107,9 +110,14 @@ class LmdbRecordIterator implements RecordIterator {
107110
}
108111
this.dbi = index.getDB(explicit);
109112
this.txnRef = txnRef;
110-
this.txnLock = txnRef.lock();
113+
this.txnLockManager = txnRef.lockManager();
111114

112-
long stamp = txnLock.readLock();
115+
Lock lock;
116+
try {
117+
lock = txnLockManager.getReadLock();
118+
} catch (InterruptedException e) {
119+
throw new SailException(e);
120+
}
113121
try {
114122
this.txnRefVersion = txnRef.version();
115123
this.txn = txnRef.get();
@@ -120,13 +128,18 @@ class LmdbRecordIterator implements RecordIterator {
120128
cursor = pp.get(0);
121129
}
122130
} finally {
123-
txnLock.unlockRead(stamp);
131+
lock.release();
124132
}
125133
}
126134

127135
@Override
128136
public long[] next() {
129-
long stamp = txnLock.readLock();
137+
Lock lock;
138+
try {
139+
lock = txnLockManager.getReadLock();
140+
} catch (InterruptedException e) {
141+
throw new SailException(e);
142+
}
130143
try {
131144
if (closed) {
132145
log.debug("Calling next() on an LmdbRecordIterator that is already closed, returning null");
@@ -191,17 +204,21 @@ public long[] next() {
191204
closeInternal(false);
192205
return null;
193206
} finally {
194-
txnLock.unlockRead(stamp);
207+
lock.release();
195208
}
196209
}
197210

198211
private void closeInternal(boolean maybeCalledAsync) {
199212
if (!closed) {
200-
long stamp;
213+
Lock lock;
201214
if (maybeCalledAsync && ownerThread != Thread.currentThread()) {
202-
stamp = txnLock.writeLock();
215+
try {
216+
lock = txnLockManager.getWriteLock();
217+
} catch (InterruptedException e) {
218+
throw new SailException(e);
219+
}
203220
} else {
204-
stamp = 0;
221+
lock = null;
205222
}
206223
try {
207224
if (!closed) {
@@ -218,8 +235,8 @@ private void closeInternal(boolean maybeCalledAsync) {
218235
}
219236
} finally {
220237
closed = true;
221-
if (stamp != 0) {
222-
txnLock.unlockWrite(stamp);
238+
if (lock != null) {
239+
lock.release();
223240
}
224241
}
225242
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@
3939
import java.util.AbstractSet;
4040
import java.util.Iterator;
4141
import java.util.NoSuchElementException;
42-
import java.util.concurrent.locks.StampedLock;
4342

43+
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
44+
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
45+
import org.eclipse.rdf4j.sail.SailException;
4446
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
4547
import org.lwjgl.PointerBuffer;
4648
import org.lwjgl.system.MemoryStack;
@@ -112,7 +114,7 @@ public boolean remove(Object element) {
112114
}
113115
}
114116

115-
private synchronized boolean update(Object element, boolean add) throws IOException {
117+
private synchronized boolean update(Object element, boolean add) throws IOException, InterruptedException {
116118
try (MemoryStack stack = MemoryStack.stackPush()) {
117119
if (factory.writeTxn == 0) {
118120
// start a write transaction
@@ -189,7 +191,7 @@ private class ElementIterator implements Iterator<T> {
189191
private final MDBVal valueData = MDBVal.malloc();
190192
private final long cursor;
191193

192-
private final StampedLock txnLock;
194+
private final ReadWriteLockManager txnLockManager;
193195
private Txn txnRef;
194196
private long txnRefVersion;
195197

@@ -199,9 +201,9 @@ private class ElementIterator implements Iterator<T> {
199201
private ElementIterator(int dbi) {
200202
try {
201203
this.txnRef = factory.txnManager.createReadTxn();
202-
this.txnLock = txnRef.lock();
204+
this.txnLockManager = txnRef.lockManager();
203205

204-
long stamp = txnLock.readLock();
206+
Lock lock = txnLockManager.getReadLock();
205207
try {
206208
this.txnRefVersion = txnRef.version();
207209

@@ -211,7 +213,7 @@ private ElementIterator(int dbi) {
211213
cursor = pp.get(0);
212214
}
213215
} finally {
214-
txnLock.unlockRead(stamp);
216+
lock.release();
215217
}
216218
} catch (Exception e) {
217219
throw new RuntimeException(e);
@@ -243,8 +245,8 @@ public T next() {
243245
return current;
244246
}
245247

246-
private T computeNext() throws IOException {
247-
long stamp = txnLock.readLock();
248+
private T computeNext() throws IOException, InterruptedException {
249+
Lock lock = txnLockManager.getReadLock();
248250
try {
249251
if (txnRefVersion != txnRef.version()) {
250252
// cursor must be renewed
@@ -267,21 +269,26 @@ private T computeNext() throws IOException {
267269
close();
268270
return null;
269271
} finally {
270-
txnLock.unlockRead(stamp);
272+
lock.release();
271273
}
272274
}
273275

274276
public void close() {
275277
if (txnRef != null) {
276278
keyData.close();
277279
valueData.close();
278-
long stamp = txnLock.readLock();
280+
Lock lock;
281+
try {
282+
lock = txnLockManager.getReadLock();
283+
} catch (InterruptedException e) {
284+
throw new SailException(e);
285+
}
279286
try {
280287
mdb_cursor_close(cursor);
281288
txnRef.close();
282289
txnRef = null;
283290
} finally {
284-
txnLock.unlockRead(stamp);
291+
lock.release();
285292
}
286293
}
287294
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSetFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@
3636
import java.nio.ByteBuffer;
3737
import java.nio.file.Files;
3838
import java.nio.file.Path;
39-
import java.util.concurrent.locks.StampedLock;
4039
import java.util.function.Function;
4140

4241
import org.apache.commons.io.FileUtils;
42+
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
43+
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
4344
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode;
4445
import org.lwjgl.PointerBuffer;
4546
import org.lwjgl.system.MemoryStack;
@@ -106,10 +107,10 @@ synchronized void commit() throws IOException {
106107
}
107108
}
108109

109-
void ensureResize() throws IOException {
110+
void ensureResize() throws IOException, InterruptedException {
110111
if (LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) {
111-
StampedLock lock = txnManager.lock();
112-
long stamp = lock.writeLock();
112+
ReadWriteLockManager lockManager = txnManager.lockManager();
113+
Lock lock = lockManager.getWriteLock();
113114
try {
114115
txnManager.deactivate();
115116

@@ -124,7 +125,7 @@ void ensureResize() throws IOException {
124125
try {
125126
txnManager.activate();
126127
} finally {
127-
lock.unlockWrite(stamp);
128+
lock.release();
128129
}
129130
}
130131
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
import java.util.concurrent.locks.StampedLock;
7979
import java.util.function.Consumer;
8080

81+
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
82+
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
8183
import org.eclipse.rdf4j.sail.SailException;
8284
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode;
8385
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
@@ -1081,8 +1083,13 @@ void endTransaction(boolean commit) throws IOException {
10811083
try {
10821084
E(mdb_txn_commit(writeTxn));
10831085
if (recordCache != null) {
1084-
StampedLock lock = txnManager.lock();
1085-
long stamp = lock.writeLock();
1086+
ReadWriteLockManager lockManager = txnManager.lockManager();
1087+
Lock lock;
1088+
try {
1089+
lock = lockManager.getReadLock();
1090+
} catch (InterruptedException e) {
1091+
throw new SailException(e);
1092+
}
10861093
try {
10871094
txnManager.deactivate();
10881095
mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0);
@@ -1102,7 +1109,7 @@ void endTransaction(boolean commit) throws IOException {
11021109
try {
11031110
txnManager.activate();
11041111
} finally {
1105-
lock.unlockWrite(stamp);
1112+
lock.release();
11061113
}
11071114
}
11081115
} else {

0 commit comments

Comments
 (0)