Skip to content

Commit 0886807

Browse files
committed
mostly functional
1 parent 039a304 commit 0886807

2 files changed

Lines changed: 180 additions & 6 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbTxnContext.java

Lines changed: 178 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
package org.eclipse.rdf4j.sail.lmdb;
1313

1414
import java.io.IOException;
15+
import java.lang.management.LockInfo;
16+
import java.lang.management.ManagementFactory;
17+
import java.lang.management.ThreadInfo;
18+
import java.lang.management.ThreadMXBean;
1519
import java.util.ArrayDeque;
1620
import java.util.ArrayList;
1721
import java.util.Deque;
@@ -22,6 +26,7 @@
2226
import java.util.Map;
2327
import java.util.Objects;
2428
import java.util.Set;
29+
import java.util.concurrent.TimeUnit;
2530
import java.util.concurrent.locks.ReentrantLock;
2631

2732
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
@@ -213,7 +218,20 @@ private void ensureWriteTxnInitialized() throws SailException {
213218
}
214219
bindToCurrentThread();
215220
try {
216-
writeTxnLock.lockInterruptibly();
221+
boolean b = writeTxnLock.tryLock(10, TimeUnit.SECONDS);
222+
int i = 0;
223+
while (!b) {
224+
i++;
225+
System.err.println(Thread.currentThread() + " - LMDB writeTxnLock wait took more than " + i * 10 + " seconds");
226+
// print lock diagnostics
227+
printWriteTxnLockDiagnostics(i * 10, i == 1 || i % 6 == 0);
228+
229+
b = writeTxnLock.tryLock(10, TimeUnit.SECONDS);
230+
if (Thread.currentThread().isInterrupted()) {
231+
throw new InterruptedSailException();
232+
}
233+
}
234+
217235
writeLockHeld = true;
218236
} catch (InterruptedException e) {
219237
Thread.currentThread().interrupt();
@@ -292,7 +310,7 @@ private void checkSerializableConflicts() throws SailException {
292310
return;
293311
}
294312
try (Txn snapshotTxn = acquirePinnedReadTxn();
295-
Txn currentTxn = store.getTripleStore().getTxnManager().createReadTxn()) {
313+
Txn currentTxn = store.getTripleStore().getTxnManager().createReadTxn()) {
296314
store.ensureObservedPatternsUnchanged(snapshotTxn, currentTxn, observedExplicit, true);
297315
store.ensureObservedPatternsUnchanged(snapshotTxn, currentTxn, observedInferred, false);
298316
} catch (IOException e) {
@@ -625,7 +643,7 @@ private void applyPendingTo(Set<Quad> added, Set<Quad> removed, boolean explicit
625643
if (pendingUpdates.isEmpty()) {
626644
return;
627645
}
628-
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext();) {
646+
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext(); ) {
629647
PendingChanges pending = iterator.next();
630648
pending.applyTo(added, removed, explicit);
631649
}
@@ -635,7 +653,7 @@ private void applyPendingDeprecatedTo(Set<Quad> deprecated, boolean explicit) {
635653
if (pendingUpdates.isEmpty()) {
636654
return;
637655
}
638-
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext();) {
656+
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext(); ) {
639657
PendingChanges pending = iterator.next();
640658
pending.applyDeprecatedTo(deprecated, explicit);
641659
}
@@ -889,4 +907,160 @@ void apply(NamespaceStore base) {
889907
clear();
890908
}
891909
}
910+
911+
912+
private void printWriteTxnLockDiagnostics(int waitedSeconds, boolean includeThreadDump) {
913+
try {
914+
System.err.println("LMDB writeTxnLock diagnostics after waiting " + waitedSeconds + " seconds");
915+
System.err.println(" lock=" + writeTxnLock);
916+
System.err.println(" isLocked=" + writeTxnLock.isLocked()
917+
+ ", isHeldByCurrentThread=" + writeTxnLock.isHeldByCurrentThread()
918+
+ ", holdCount=" + writeTxnLock.getHoldCount()
919+
+ ", hasQueuedThreads=" + writeTxnLock.hasQueuedThreads()
920+
+ ", queueLength~=" + writeTxnLock.getQueueLength()
921+
+ ", isFair=" + writeTxnLock.isFair());
922+
923+
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
924+
925+
// Deadlock detection can be very useful when a lock wait stretches out.
926+
try {
927+
long[] deadlocked = bean.findDeadlockedThreads();
928+
if (deadlocked != null && deadlocked.length > 0) {
929+
System.err.println(" DEADLOCK DETECTED (thread ids): " + java.util.Arrays.toString(deadlocked));
930+
ThreadInfo[] infos = bean.getThreadInfo(deadlocked, true, true);
931+
if (infos != null) {
932+
for (ThreadInfo info : infos) {
933+
if (info != null) {
934+
printThreadInfo(info, 80);
935+
}
936+
}
937+
}
938+
}
939+
} catch (Throwable t) {
940+
System.err.println(" (Deadlock detection failed: " + t + ")");
941+
}
942+
943+
// Try to identify and dump the owner thread stack if the lock can tell us.
944+
String ownerThreadName = extractOwnerThreadName(writeTxnLock.toString());
945+
if (ownerThreadName != null) {
946+
dumpThreadByName(ownerThreadName, 120);
947+
}
948+
949+
// Full-ish dump is intentionally throttled (e.g., first time and then every ~60s).
950+
if (includeThreadDump) {
951+
System.err.println(" --- Threads waiting on / holding ReentrantLock synchronizers (best effort) ---");
952+
ThreadInfo[] all = bean.dumpAllThreads(true, true);
953+
if (all != null) {
954+
for (ThreadInfo info : all) {
955+
if (info == null) {
956+
continue;
957+
}
958+
if (isReentrantLockRelated(info)
959+
|| (ownerThreadName != null && ownerThreadName.equals(info.getThreadName()))) {
960+
printThreadInfo(info, 60);
961+
}
962+
}
963+
}
964+
}
965+
} catch (Throwable t) {
966+
// Diagnostics must never prevent progress/interrupt handling.
967+
System.err.println("LMDB writeTxnLock diagnostics failed: " + t);
968+
}
969+
}
970+
971+
private static boolean isReentrantLockRelated(ThreadInfo info) {
972+
LockInfo lock = info.getLockInfo();
973+
if (lock != null && lock.getClassName() != null
974+
&& lock.getClassName().startsWith("java.util.concurrent.locks.ReentrantLock")) {
975+
return true;
976+
}
977+
LockInfo[] synchronizers = info.getLockedSynchronizers();
978+
if (synchronizers != null) {
979+
for (LockInfo li : synchronizers) {
980+
if (li != null && li.getClassName() != null
981+
&& li.getClassName().startsWith("java.util.concurrent.locks.ReentrantLock")) {
982+
return true;
983+
}
984+
}
985+
}
986+
return false;
987+
}
988+
989+
private static void printThreadInfo(ThreadInfo info, int maxFrames) {
990+
System.err.println("Thread \"" + info.getThreadName() + "\" id=" + info.getThreadId()
991+
+ " state=" + info.getThreadState());
992+
if (info.getLockInfo() != null) {
993+
System.err.println(" waitingOn=" + info.getLockInfo());
994+
}
995+
if (info.getLockOwnerName() != null) {
996+
System.err.println(" lockOwner=" + info.getLockOwnerName() + " (id=" + info.getLockOwnerId() + ")");
997+
}
998+
StackTraceElement[] stack = info.getStackTrace();
999+
if (stack != null) {
1000+
int limit = Math.min(stack.length, Math.max(0, maxFrames));
1001+
for (int frame = 0; frame < limit; frame++) {
1002+
System.err.println("\tat " + stack[frame]);
1003+
}
1004+
if (stack.length > limit) {
1005+
System.err.println("\t... " + (stack.length - limit) + " more");
1006+
}
1007+
}
1008+
System.err.println();
1009+
}
1010+
1011+
private static String extractOwnerThreadName(String lockString) {
1012+
if (lockString == null) {
1013+
return null;
1014+
}
1015+
int idx = lockString.indexOf("Locked by thread ");
1016+
if (idx < 0) {
1017+
return null;
1018+
}
1019+
int start = idx + "Locked by thread ".length();
1020+
int end = lockString.indexOf(']', start);
1021+
if (end < 0) {
1022+
end = lockString.length();
1023+
}
1024+
String name = lockString.substring(start, end).trim();
1025+
return name.isEmpty() ? null : name;
1026+
}
1027+
1028+
private static void dumpThreadByName(String threadName, int maxFrames) {
1029+
try {
1030+
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
1031+
Thread t = entry.getKey();
1032+
if (t != null && threadName.equals(t.getName())) {
1033+
System.err.println(" --- Owner thread stack (" + t + ") ---");
1034+
StackTraceElement[] stack = entry.getValue();
1035+
if (stack != null) {
1036+
int limit = Math.min(stack.length, Math.max(0, maxFrames));
1037+
for (int frame = 0; frame < limit; frame++) {
1038+
System.err.println("\tat " + stack[frame]);
1039+
}
1040+
if (stack.length > limit) {
1041+
System.err.println("\t... " + (stack.length - limit) + " more");
1042+
}
1043+
}
1044+
System.err.println();
1045+
return;
1046+
}
1047+
}
1048+
System.err.println(" (Could not find owner thread named: " + threadName + ")");
1049+
1050+
System.err.println("----------------------------------------------------------");
1051+
System.err.println(" Available threads: ");
1052+
1053+
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
1054+
Thread t = entry.getKey();
1055+
if (t != null) {
1056+
System.err.println(" - " + t.getName() + " (id=" + t.getId() + ")");
1057+
}
1058+
}
1059+
System.err.println("----------------------------------------------------------");
1060+
System.err.println();
1061+
1062+
} catch (Throwable t) {
1063+
System.err.println(" (Owner thread dump failed: " + t + ")");
1064+
}
1065+
}
8921066
}

testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,8 @@ public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedE
555555

556556
}
557557

558-
// @Test
559-
@RepeatedTest(5)
558+
@Test
559+
// @RepeatedTest(5)
560560
@Timeout(value = 5, unit = TimeUnit.MINUTES)
561561
public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException {
562562
System.err.println("Running testConcurrentConnectionsShutdownAndClose");

0 commit comments

Comments
 (0)