Skip to content

Commit ee1f2ce

Browse files
authored
GH-5347 make the ShaclSail more robust during shutdown() (#5348)
1 parent 60d1586 commit ee1f2ce

57 files changed

Lines changed: 1642 additions & 205 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

compliance/repository/src/test/java/org/eclipse/rdf4j/repository/sparql/SPARQLStoreConnectionTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,4 +524,12 @@ public void testRemoveStatementsFromContextSingleTransaction(IsolationLevel leve
524524
public void testClearStatementsFromContextSingleTransaction(IsolationLevel level) throws Exception {
525525
super.testClearStatementsFromContextSingleTransaction(level);
526526
}
527+
528+
@ParameterizedTest
529+
@MethodSource("parameters")
530+
@Disabled("relies on pending updates being visible in own connection")
531+
@Override
532+
public void testRollbackAfterInterrupt(IsolationLevel level) {
533+
super.testRollbackAfterInterrupt(level);
534+
}
527535
}

core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/FilterIteration.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,18 @@ private void findNextElement() {
102102
}
103103
}
104104

105-
// We know that nextElement has to be null and that wrappedIter.hasNext() must be true, based on the code
106-
// above. To be sure that these invariants don't change we also assert them below.
107-
assert nextElement == null && wrappedIter.hasNext();
108-
109105
do {
110106
E result;
111107
if (Thread.currentThread().isInterrupted()) {
112108
close();
113109
return;
114110
}
115111
try {
112+
// We know that nextElement has to be null and that wrappedIter.hasNext() must be true, based on the
113+
// code
114+
// above. To be sure that these invariants don't change we also assert them below.
115+
assert nextElement == null && wrappedIter.hasNext();
116+
116117
result = wrappedIter.next();
117118
} catch (NoSuchElementException e) {
118119
close();

core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/LookAheadIteration.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@
1313

1414
import java.util.NoSuchElementException;
1515

16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
1619
/**
1720
* An Iteration that looks one element ahead, if necessary, to handle calls to {@link #hasNext}. This is a convenient
1821
* super class for Iterations that have no easy way to tell if there are any more results, but still should implement
1922
* the <var>java.util.Iteration</var> interface.
2023
*/
2124
public abstract class LookAheadIteration<E> extends AbstractCloseableIteration<E> {
25+
private static final Logger log = LoggerFactory.getLogger(LookAheadIteration.class);
2226

2327
/*-----------*
2428
* Variables *
@@ -50,7 +54,14 @@ public final boolean hasNext() {
5054
return false;
5155
}
5256

53-
return lookAhead() != null;
57+
try {
58+
return lookAhead() != null;
59+
} catch (NoSuchElementException logged) {
60+
// The lookAhead() method shouldn't throw a NoSuchElementException since it should return null when there
61+
// are no more elements.
62+
log.trace("LookAheadIteration threw NoSuchElementException:", logged);
63+
return false;
64+
}
5465
}
5566

5667
@Override

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ protected static CloseableIteration<BindingSet> evaluate(TupleFunction func,
217217
throws QueryEvaluationException {
218218
final CloseableIteration<? extends List<? extends Value>> iter = func
219219
.evaluate(valueFactory, argValues);
220-
return new LookAheadIteration<BindingSet>() {
220+
return new LookAheadIteration<>() {
221221

222222
@Override
223223
public BindingSet getNextElement() throws QueryEvaluationException {
@@ -712,7 +712,7 @@ protected QueryEvaluationStep prepare(Distinct node, QueryEvaluationContext cont
712712
final CollectionFactory cf = this.getCollectionFactory().get();
713713
return bindings -> {
714714
final CloseableIteration<BindingSet> evaluate = child.evaluate(bindings);
715-
return new DistinctIteration<BindingSet>(evaluate, cf.createSetOfBindingSets()) {
715+
return new DistinctIteration<>(evaluate, cf.createSetOfBindingSets()) {
716716

717717
@Override
718718
protected void handleClose() throws QueryEvaluationException {

core/repository/sail/src/main/java/org/eclipse/rdf4j/repository/sail/SailRepositoryConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,17 @@ public void commit() throws RepositoryException {
231231

232232
@Override
233233
public void rollback() throws RepositoryException {
234+
// We still want to attempt to rollback the transaction even if the thread is interrupted. We restore the
235+
// interrupted status when we are done.
236+
boolean interrupted = Thread.interrupted();
234237
try {
235238
sailConnection.rollback();
236239
} catch (SailException e) {
237240
throw new RepositoryException(e);
241+
} finally {
242+
if (interrupted) {
243+
Thread.currentThread().interrupt();
244+
}
238245
}
239246
}
240247

core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,22 @@ public ExclusiveReentrantLockManager(boolean trackLocks) {
4545
}
4646

4747
public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency) {
48+
this(trackLocks, collectionFrequency, "ExclusiveReentrantLockManager");
49+
}
50+
51+
public ExclusiveReentrantLockManager(String lockName) {
52+
this(false, LockMonitoring.INITIAL_WAIT_TO_COLLECT, lockName);
53+
}
54+
55+
public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency, String lockName) {
4856

4957
this.waitToCollect = collectionFrequency;
5058

5159
if (trackLocks || Properties.lockTrackingEnabled()) {
5260

5361
lockMonitoring = new LockTracking(
5462
true,
55-
"ExclusiveReentrantLockManager",
63+
lockName,
5664
LoggerFactory.getLogger(this.getClass()),
5765
waitToCollect,
5866
Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner)
@@ -61,7 +69,7 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency
6169
} else {
6270
lockMonitoring = new LockCleaner(
6371
false,
64-
"ExclusiveReentrantLockManager",
72+
lockName,
6573
LoggerFactory.getLogger(this.getClass()),
6674
Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner)
6775
);
@@ -101,6 +109,16 @@ private Lock getExclusiveLockInner() throws InterruptedException {
101109
return lock;
102110
} else {
103111
lockMonitoring.runCleanup();
112+
lock = tryExclusiveLockInner();
113+
if (lock != null) {
114+
return lock;
115+
}
116+
117+
Thread thread = owner.get();
118+
if (thread != null && !thread.isAlive()) {
119+
logger.debug("Lock is held by dead thread: " + thread);
120+
}
121+
104122
owner.wait(waitToCollect);
105123
}
106124
} while (true);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
******************************************************************************/
11+
12+
package org.eclipse.rdf4j.sail;
13+
14+
public class InterruptedSailException extends SailException {
15+
16+
public InterruptedSailException(String s) {
17+
super(s);
18+
assert Thread.currentThread().isInterrupted();
19+
}
20+
21+
public InterruptedSailException() {
22+
super();
23+
assert Thread.currentThread().isInterrupted();
24+
}
25+
26+
public InterruptedSailException(Throwable t) {
27+
super(t);
28+
assert Thread.currentThread().isInterrupted();
29+
}
30+
31+
public InterruptedSailException(String msg, Throwable t) {
32+
super(msg, t);
33+
assert Thread.currentThread().isInterrupted();
34+
}
35+
}

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
public abstract class AbstractSail implements Sail {
4141

4242
/**
43-
* Default connection timeout on shutdown: 20,000 milliseconds.
43+
* Default connection timeout on shutdown: 20,000 milliseconds (20 seconds).
4444
*/
4545
protected final static long DEFAULT_CONNECTION_TIMEOUT = 20000L;
4646

@@ -227,17 +227,19 @@ public void shutDown() throws SailException {
227227
try {
228228
SailConnection con = entry.getKey();
229229

230-
if (con instanceof AbstractSailConnection) {
231-
AbstractSailConnection sailCon = (AbstractSailConnection) con;
232-
Thread owner = sailCon.getOwner();
233-
if (owner != Thread.currentThread()) {
234-
owner.interrupt();
235-
// wait up to 1 second for the owner thread to die
236-
owner.join(1000);
237-
if (owner.isAlive()) {
238-
logger.error(
239-
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
240-
owner);
230+
if (con.isOpen()) {
231+
if (con instanceof AbstractSailConnection) {
232+
AbstractSailConnection sailCon = (AbstractSailConnection) con;
233+
Thread owner = sailCon.getOwner();
234+
if (owner != Thread.currentThread()) {
235+
owner.interrupt();
236+
// wait up to 1 second for the owner thread to die
237+
owner.join(1000);
238+
if (owner.isAlive() && con.isOpen()) {
239+
logger.error(
240+
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
241+
owner);
242+
}
241243
}
242244
}
243245
}
@@ -257,6 +259,9 @@ public void shutDown() throws SailException {
257259
// Forcefully close any connections that are still open
258260
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
259261
SailConnection con = entry.getKey();
262+
if (!con.isOpen()) {
263+
continue;
264+
}
260265
Throwable stackTrace = entry.getValue();
261266

262267
if (stackTrace == null) {

0 commit comments

Comments
 (0)