Skip to content

Commit 18e018d

Browse files
committed
GH-4784 interrupt thread that is owning a connection before closing if we are closing from a different thread
1 parent 6bc981a commit 18e018d

9 files changed

Lines changed: 334 additions & 54 deletions

File tree

core/repository/sail/src/main/java/org/eclipse/rdf4j/repository/sail/SailRepositoryConnection.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,19 @@ public RepositoryResult<Statement> getStatements(Resource subj, IRI pred, Value
349349
Resource... contexts) throws RepositoryException {
350350
Objects.requireNonNull(contexts,
351351
"contexts argument may not be null; either the value should be cast to Resource or an empty array should be supplied");
352-
352+
CloseableIteration<? extends Statement, SailException> statements = null;
353353
try {
354-
return createRepositoryResult(sailConnection.getStatements(subj, pred, obj, includeInferred, contexts));
355-
} catch (SailException e) {
356-
throw new RepositoryException("Unable to get statements from Sail", e);
354+
statements = sailConnection.getStatements(subj, pred, obj, includeInferred, contexts);
355+
return createRepositoryResult(statements);
356+
} catch (Throwable t) {
357+
if (statements != null) {
358+
statements.close();
359+
}
360+
if (t instanceof SailException) {
361+
throw new RepositoryException("Unable to get statements from Sail", t);
362+
} else {
363+
throw t;
364+
}
357365
}
358366
}
359367

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,36 @@ public void shutDown() throws SailException {
222222
activeConnectionsCopy = new IdentityHashMap<>(activeConnections);
223223
}
224224

225+
// Interrupt any threads that are still using a connection, in case they are waiting for a lock
226+
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
227+
try {
228+
SailConnection con = entry.getKey();
229+
230+
if (con instanceof AbstractSailConnection) {
231+
AbstractSailConnection sailCon = (AbstractSailConnection) con;
232+
if (sailCon.getOwner() != Thread.currentThread()) {
233+
sailCon.getOwner().interrupt();
234+
sailCon.getOwner().join(1000);
235+
if (sailCon.getOwner().isAlive()) {
236+
logger.error(
237+
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
238+
sailCon.getOwner());
239+
}
240+
}
241+
}
242+
243+
} catch (Throwable e) {
244+
if (e instanceof InterruptedException) {
245+
throw new SailException(e);
246+
} else if (e instanceof AssertionError) {
247+
// ignore assertions errors
248+
} else if (e instanceof Error) {
249+
throw (Error) e;
250+
}
251+
// ignore all other exceptions
252+
}
253+
}
254+
225255
// Forcefully close any connections that are still open
226256
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
227257
SailConnection con = entry.getKey();

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java

Lines changed: 109 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212

1313
import java.lang.invoke.MethodHandles;
1414
import java.lang.invoke.VarHandle;
15+
import java.time.Duration;
1516
import java.util.Collection;
1617
import java.util.Collections;
1718
import java.util.IdentityHashMap;
1819
import java.util.LinkedList;
1920
import java.util.Map;
2021
import java.util.concurrent.ConcurrentHashMap;
2122
import java.util.concurrent.atomic.LongAdder;
23+
import java.util.concurrent.locks.LockSupport;
2224
import java.util.concurrent.locks.ReentrantLock;
2325
import java.util.concurrent.locks.ReentrantReadWriteLock;
2426

@@ -98,15 +100,7 @@ public abstract class AbstractSailConnection implements SailConnection {
98100
private boolean isOpen = true;
99101
private static final VarHandle IS_OPEN;
100102

101-
static {
102-
try {
103-
IS_OPEN = MethodHandles.lookup()
104-
.in(AbstractSailConnection.class)
105-
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
106-
} catch (ReflectiveOperationException e) {
107-
throw new Error(e);
108-
}
109-
}
103+
private Thread owner;
110104

111105
/**
112106
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
@@ -161,6 +155,7 @@ public AbstractSailConnection(AbstractSail sailBase) {
161155
} else {
162156
activeIterationsDebug = Collections.emptyMap();
163157
}
158+
owner = Thread.currentThread();
164159
}
165160

166161
/*---------*
@@ -252,6 +247,8 @@ public boolean isActive() throws UnknownSailTransactionStateException {
252247

253248
@Override
254249
public final void close() throws SailException {
250+
Thread deadlockPreventionThread = startDeadlockPreventionThread();
251+
255252
// obtain an exclusive lock so that any further operations on this
256253
// connection (including those from any concurrent threads) are blocked.
257254
if (!IS_OPEN.compareAndSet(this, true, false)) {
@@ -268,7 +265,7 @@ public final void close() throws SailException {
268265
if (sumDone == sumBlocking) {
269266
break;
270267
} else {
271-
Thread.onSpinWait();
268+
LockSupport.parkNanos(Duration.ofMillis(10).toNanos());
272269
}
273270
}
274271

@@ -297,13 +294,56 @@ public final void close() throws SailException {
297294
sailBase.connectionClosed(this);
298295
}
299296
} finally {
300-
if (useConnectionLock) {
301-
connectionLock.writeLock().unlock();
297+
try {
298+
if (deadlockPreventionThread != null) {
299+
deadlockPreventionThread.interrupt();
300+
}
301+
} finally {
302+
if (useConnectionLock) {
303+
connectionLock.writeLock().unlock();
304+
}
302305
}
306+
303307
}
304308

305309
}
306310

311+
/**
312+
* If the current thread is not the owner, starts a thread to handle potential deadlocks by interrupting the owner.
313+
*
314+
* @return The started deadlock prevention thread or null if the current thread is the owner.
315+
*/
316+
private Thread startDeadlockPreventionThread() {
317+
Thread deadlockPreventionThread = null;
318+
319+
if (Thread.currentThread() != owner) {
320+
if (logger.isInfoEnabled()) {
321+
logger.info(
322+
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by "
323+
+ owner + " closed by " + Thread.currentThread(),
324+
new Throwable("Throwable used for stacktrace"));
325+
}
326+
deadlockPreventionThread = new Thread(() -> {
327+
try {
328+
Thread.sleep(sailBase.connectionTimeOut / 2);
329+
330+
owner.interrupt();
331+
owner.join(1000);
332+
if (owner.isAlive()) {
333+
logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner);
334+
}
335+
336+
} catch (InterruptedException ignored) {
337+
}
338+
339+
});
340+
deadlockPreventionThread.setDaemon(true);
341+
deadlockPreventionThread.start();
342+
343+
}
344+
return deadlockPreventionThread;
345+
}
346+
307347
@Override
308348
public final CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate(TupleExpr tupleExpr,
309349
Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException {
@@ -876,6 +916,16 @@ protected Lock getTransactionLock() throws SailException {
876916
return new JavaLock(updateLock);
877917
}
878918

919+
/**
920+
* This is for internal use only. It returns the thread that opened this connection.
921+
*
922+
* @return the thread that opened this connection.
923+
*/
924+
@InternalUseOnly
925+
public Thread getOwner() {
926+
return owner;
927+
}
928+
879929
/**
880930
* Registers an iteration as active by wrapping it in a {@link SailBaseIteration} object and adding it to the list
881931
* of active iterations.
@@ -959,41 +1009,51 @@ protected AbstractSail getSailBase() {
9591009
}
9601010

9611011
private void forceCloseActiveOperations() throws SailException {
962-
for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) {
963-
System.gc();
964-
try {
965-
Thread.sleep(1);
966-
} catch (InterruptedException e) {
967-
Thread.currentThread().interrupt();
1012+
Thread deadlockPreventionThread = startDeadlockPreventionThread();
1013+
try {
1014+
for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) {
1015+
System.gc();
1016+
try {
1017+
Thread.sleep(1);
1018+
} catch (InterruptedException e) {
1019+
Thread.currentThread().interrupt();
1020+
throw new SailException(e);
1021+
}
9681022
}
969-
}
9701023

971-
if (debugEnabled) {
972-
973-
var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug);
974-
activeIterationsDebug.clear();
975-
976-
if (!activeIterationsCopy.isEmpty()) {
977-
for (var entry : activeIterationsCopy.entrySet()) {
978-
try {
979-
logger.warn("Unclosed iteration", entry.getValue());
980-
entry.getKey().close();
981-
} catch (Exception e) {
982-
if (e instanceof InterruptedException) {
983-
Thread.currentThread().interrupt();
1024+
if (debugEnabled) {
1025+
1026+
var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug);
1027+
activeIterationsDebug.clear();
1028+
1029+
if (!activeIterationsCopy.isEmpty()) {
1030+
for (var entry : activeIterationsCopy.entrySet()) {
1031+
try {
1032+
logger.warn("Unclosed iteration", entry.getValue());
1033+
entry.getKey().close();
1034+
} catch (Exception e) {
1035+
if (e instanceof InterruptedException) {
1036+
Thread.currentThread().interrupt();
1037+
throw new SailException(e);
1038+
}
1039+
logger.warn("Exception occurred while closing unclosed iterations.", e);
9841040
}
985-
logger.warn("Exception occurred while closing unclosed iterations.", e);
9861041
}
987-
}
9881042

989-
var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow();
1043+
var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow();
9901044

991-
throw new SailException(
992-
"Connection closed before all iterations were closed: " + entry.getKey().toString(),
993-
entry.getValue());
994-
}
1045+
throw new SailException(
1046+
"Connection closed before all iterations were closed: " + entry.getKey().toString(),
1047+
entry.getValue());
1048+
}
9951049

1050+
}
1051+
} finally {
1052+
if (deadlockPreventionThread != null) {
1053+
deadlockPreventionThread.interrupt();
1054+
}
9961055
}
1056+
9971057
}
9981058

9991059
/**
@@ -1138,4 +1198,14 @@ public synchronized void release() {
11381198
}
11391199
}
11401200
}
1201+
1202+
static {
1203+
try {
1204+
IS_OPEN = MethodHandles.lookup()
1205+
.in(AbstractSailConnection.class)
1206+
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
1207+
} catch (ReflectiveOperationException e) {
1208+
throw new Error(e);
1209+
}
1210+
}
11411211
}

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/SailWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,5 @@ public Supplier<CollectionFactory> getCollectionFactory() {
150150
verifyBaseSailSet();
151151
return baseSail.getCollectionFactory();
152152
}
153+
153154
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import java.io.File;
1414
import java.io.IOException;
15+
import java.time.Duration;
1516
import java.util.ArrayList;
1617
import java.util.List;
1718
import java.util.Map;
@@ -20,6 +21,7 @@
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Executors;
2223
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.locks.LockSupport;
2325
import java.util.concurrent.locks.ReentrantLock;
2426

2527
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
@@ -236,7 +238,8 @@ public void close() throws SailException {
236238
running.set(false);
237239
tripleStoreExecutor.shutdown();
238240
while (!tripleStoreExecutor.isTerminated()) {
239-
Thread.yield();
241+
tripleStoreExecutor.shutdownNow();
242+
LockSupport.parkNanos(Duration.ofMillis(100).toNanos());
240243
}
241244
tripleStore.close();
242245
}
@@ -561,6 +564,9 @@ private void startTransaction(boolean preferThreading) throws SailException {
561564
op.execute();
562565
}
563566
} else {
567+
if (Thread.interrupted()) {
568+
throw new InterruptedException();
569+
}
564570
Thread.yield();
565571
}
566572
}

0 commit comments

Comments
 (0)