Skip to content

Commit 249486d

Browse files
authored
GH-5787 Fix SHACL shutdown interrupt race (#5788)
2 parents 8971d42 + 2148a4e commit 249486d

2 files changed

Lines changed: 104 additions & 2 deletions

File tree

core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ConnectionHelper.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@
1111

1212
package org.eclipse.rdf4j.sail.shacl;
1313

14+
import java.util.NoSuchElementException;
15+
1416
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
1517
import org.eclipse.rdf4j.model.IRI;
1618
import org.eclipse.rdf4j.model.Resource;
1719
import org.eclipse.rdf4j.model.Statement;
1820
import org.eclipse.rdf4j.model.Value;
1921
import org.eclipse.rdf4j.repository.RepositoryResult;
22+
import org.eclipse.rdf4j.sail.InterruptedSailException;
2023
import org.eclipse.rdf4j.sail.SailConnection;
2124
import org.eclipse.rdf4j.sail.SailException;
2225

@@ -55,8 +58,23 @@ static void transferStatements(SailConnection from, TransferStatement transfer)
5558
try (CloseableIteration<? extends Statement> statements = from
5659
.getStatements(null, null, null, false)) {
5760

58-
while (statements.hasNext()) {
59-
Statement next = statements.next();
61+
while (true) {
62+
throwIfInterrupted();
63+
boolean hasNext = statements.hasNext();
64+
throwIfInterrupted();
65+
if (!hasNext) {
66+
break;
67+
}
68+
69+
Statement next;
70+
try {
71+
next = statements.next();
72+
} catch (NoSuchElementException e) {
73+
if (Thread.currentThread().isInterrupted()) {
74+
throw new InterruptedSailException("Thread was interrupted while transferring statements.", e);
75+
}
76+
throw e;
77+
}
6078

6179
transfer.transfer(next.getSubject(), next.getPredicate(),
6280
next.getObject(), next.getContext());
@@ -66,6 +84,12 @@ static void transferStatements(SailConnection from, TransferStatement transfer)
6684

6785
}
6886

87+
private static void throwIfInterrupted() {
88+
if (Thread.currentThread().isInterrupted()) {
89+
throw new InterruptedSailException("Thread was interrupted while transferring statements.");
90+
}
91+
}
92+
6993
static boolean isEmpty(SailConnection connection) {
7094
return !connection.hasStatement(null, null, null, false);
7195
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
13+
package org.eclipse.rdf4j.sail.shacl;
14+
15+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
16+
import static org.mockito.ArgumentMatchers.anyBoolean;
17+
import static org.mockito.ArgumentMatchers.isNull;
18+
import static org.mockito.Mockito.doReturn;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.never;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.when;
23+
24+
import java.util.NoSuchElementException;
25+
26+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
27+
import org.eclipse.rdf4j.model.Statement;
28+
import org.eclipse.rdf4j.sail.InterruptedSailException;
29+
import org.eclipse.rdf4j.sail.SailConnection;
30+
import org.junit.jupiter.api.Test;
31+
32+
class ConnectionHelperTest {
33+
34+
@Test
35+
void transferStatementsConvertsInterruptedIterationToInterruptedSailException() {
36+
SailConnection connection = mock(SailConnection.class);
37+
@SuppressWarnings("unchecked")
38+
CloseableIteration<? extends Statement> statements = mock(CloseableIteration.class);
39+
doReturn(statements).when(connection).getStatements(isNull(), isNull(), isNull(), anyBoolean());
40+
when(statements.hasNext()).thenReturn(true);
41+
when(statements.next()).thenAnswer(invocation -> {
42+
Thread.currentThread().interrupt();
43+
throw new NoSuchElementException("The iteration has been interrupted.");
44+
});
45+
46+
try {
47+
assertThatExceptionOfType(InterruptedSailException.class)
48+
.isThrownBy(() -> ConnectionHelper.transferStatements(connection, (subject, predicate, object,
49+
context) -> {
50+
throw new AssertionError("No statement should be transferred after interrupt");
51+
}))
52+
.withCauseInstanceOf(NoSuchElementException.class);
53+
verify(statements).close();
54+
} finally {
55+
Thread.interrupted();
56+
}
57+
}
58+
59+
@Test
60+
void transferStatementsKeepsNonInterruptNoSuchElementException() {
61+
SailConnection connection = mock(SailConnection.class);
62+
@SuppressWarnings("unchecked")
63+
CloseableIteration<? extends Statement> statements = mock(CloseableIteration.class);
64+
NoSuchElementException exception = new NoSuchElementException("empty");
65+
doReturn(statements).when(connection).getStatements(isNull(), isNull(), isNull(), anyBoolean());
66+
when(statements.hasNext()).thenReturn(true);
67+
when(statements.next()).thenThrow(exception);
68+
69+
assertThatExceptionOfType(NoSuchElementException.class)
70+
.isThrownBy(() -> ConnectionHelper.transferStatements(connection, (subject, predicate, object,
71+
context) -> {
72+
throw new AssertionError("No statement should be transferred");
73+
}))
74+
.isSameAs(exception);
75+
verify(statements).close();
76+
verify(statements, never()).remove();
77+
}
78+
}

0 commit comments

Comments
 (0)