Skip to content

Commit 684169e

Browse files
author
James Leigh
committed
Issue #896: Close input in same thread as parser
Signed-off-by: James Leigh <james.leigh@ontotext.com>
1 parent 0234817 commit 684169e

4 files changed

Lines changed: 61 additions & 102 deletions

File tree

core/query/src/main/java/org/eclipse/rdf4j/query/impl/BackgroundGraphResult.java

Lines changed: 26 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99

1010
import java.io.InputStream;
1111
import java.io.InputStreamReader;
12-
import java.lang.reflect.UndeclaredThrowableException;
1312
import java.nio.charset.Charset;
1413
import java.util.Collections;
1514
import java.util.Map;
16-
import java.util.NoSuchElementException;
1715
import java.util.concurrent.ConcurrentHashMap;
1816
import java.util.concurrent.CountDownLatch;
1917

@@ -45,6 +43,8 @@ public class BackgroundGraphResult extends IterationWrapper<Statement, QueryEval
4543

4644
private final CountDownLatch namespacesReady = new CountDownLatch(1);
4745

46+
private final CountDownLatch finishedParsing = new CountDownLatch(1);
47+
4848
private final Map<String, String> namespaces = new ConcurrentHashMap<String, String>();
4949

5050
private final QueueCursor<Statement> queue;
@@ -64,67 +64,6 @@ public BackgroundGraphResult(QueueCursor<Statement> queue, RDFParser parser, Inp
6464
this.baseURI = baseURI;
6565
}
6666

67-
@Override
68-
public boolean hasNext()
69-
throws QueryEvaluationException
70-
{
71-
if (isClosed()) {
72-
return false;
73-
}
74-
if (Thread.currentThread().isInterrupted()) {
75-
close();
76-
return false;
77-
}
78-
79-
boolean result = queue.hasNext();
80-
if (!result) {
81-
close();
82-
}
83-
return result;
84-
}
85-
86-
@Override
87-
public Statement next()
88-
throws QueryEvaluationException
89-
{
90-
if (isClosed()) {
91-
throw new NoSuchElementException("The iteration has been closed.");
92-
}
93-
if (Thread.currentThread().isInterrupted()) {
94-
close();
95-
throw new NoSuchElementException("The iteration has been closed.");
96-
}
97-
98-
try {
99-
return queue.next();
100-
}
101-
catch (NoSuchElementException e) {
102-
close();
103-
throw e;
104-
}
105-
}
106-
107-
@Override
108-
public void remove()
109-
throws QueryEvaluationException
110-
{
111-
if (isClosed()) {
112-
throw new IllegalStateException("The iteration has been closed.");
113-
}
114-
if (Thread.currentThread().isInterrupted()) {
115-
close();
116-
throw new IllegalStateException("The iteration has been closed.");
117-
}
118-
119-
try {
120-
queue.remove();
121-
}
122-
catch (IllegalStateException e) {
123-
close();
124-
throw e;
125-
}
126-
}
127-
12867
@Override
12968
protected void handleClose()
13069
throws QueryEvaluationException
@@ -135,30 +74,38 @@ protected void handleClose()
13574
finally {
13675
queue.done();
13776
}
77+
try {
78+
finishedParsing.await();
79+
}
80+
catch (InterruptedException e) {
81+
Thread.currentThread().interrupt();
82+
} finally {
83+
queue.checkException();
84+
}
13885
}
13986

14087
@Override
14188
public void run() {
14289
try {
143-
parser.setRDFHandler(this);
144-
if (charset == null) {
145-
parser.parse(in, baseURI);
90+
try {
91+
parser.setRDFHandler(this);
92+
if (charset == null) {
93+
parser.parse(in, baseURI);
94+
}
95+
else {
96+
parser.parse(new InputStreamReader(in, charset), baseURI);
97+
}
98+
} finally {
99+
in.close();
146100
}
147-
else {
148-
parser.parse(new InputStreamReader(in, charset), baseURI);
149-
}
150-
}
151-
catch (RDFHandlerException e) {
152-
// parsing was cancelled or interrupted
153-
close();
154101
}
155102
catch (Exception e) {
156103
queue.toss(e);
157-
close();
158104
}
159105
finally {
160106
queue.done();
161107
namespacesReady.countDown();
108+
finishedParsing.countDown();
162109
}
163110
}
164111

@@ -178,8 +125,9 @@ public Map<String, String> getNamespaces() {
178125
}
179126
catch (InterruptedException e) {
180127
Thread.currentThread().interrupt();
181-
close();
182-
throw new UndeclaredThrowableException(e);
128+
return Collections.emptyMap();
129+
} finally {
130+
queue.checkException();
183131
}
184132
}
185133

@@ -207,11 +155,8 @@ public void handleStatement(Statement st)
207155
}
208156
catch (InterruptedException e) {
209157
Thread.currentThread().interrupt();
210-
close();
211-
throw new RDFHandlerException(e);
212-
}
213-
if (isClosed()) {
214-
throw new RDFHandlerException("Result closed");
158+
queue.toss(e);
159+
queue.done();
215160
}
216161
}
217162

core/queryresultio/api/src/main/java/org/eclipse/rdf4j/query/resultio/helpers/BackgroundTupleResult.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.eclipse.rdf4j.query.resultio.helpers;
99

1010
import java.io.InputStream;
11-
import java.lang.reflect.UndeclaredThrowableException;
1211
import java.util.ArrayList;
1312
import java.util.Collections;
1413
import java.util.List;
@@ -42,6 +41,8 @@ public class BackgroundTupleResult extends IteratingTupleQueryResult
4241

4342
private final CountDownLatch bindingNamesReady = new CountDownLatch(1);
4443

44+
private final CountDownLatch finishedParsing = new CountDownLatch(1);
45+
4546
public BackgroundTupleResult(TupleQueryResultParser parser, InputStream in) {
4647
this(new QueueCursor<BindingSet>(10), parser, in);
4748
}
@@ -65,43 +66,47 @@ protected void handleClose()
6566
finally {
6667
queue.done();
6768
}
69+
try {
70+
finishedParsing.await();
71+
}
72+
catch (InterruptedException e) {
73+
Thread.currentThread().interrupt();
74+
} finally {
75+
queue.checkException();
76+
}
6877
}
6978

7079
@Override
7180
public List<String> getBindingNames() {
7281
try {
7382
bindingNamesReady.await();
74-
queue.checkException();
7583
return bindingNames;
7684
}
7785
catch (InterruptedException e) {
7886
Thread.currentThread().interrupt();
79-
close();
80-
throw new UndeclaredThrowableException(e);
81-
}
82-
catch (QueryEvaluationException e) {
83-
close();
84-
throw new UndeclaredThrowableException(e);
87+
return Collections.emptyList();
88+
} finally {
89+
queue.checkException();
8590
}
8691
}
8792

8893
@Override
8994
public void run() {
9095
try {
91-
parser.setQueryResultHandler(this);
92-
parser.parseQueryResult(in);
93-
}
94-
catch (QueryResultHandlerException e) {
95-
// parsing cancelled or interrupted
96-
close();
96+
try {
97+
parser.setQueryResultHandler(this);
98+
parser.parseQueryResult(in);
99+
} finally {
100+
in.close();
101+
}
97102
}
98103
catch (Exception e) {
99104
queue.toss(e);
100-
close();
101105
}
102106
finally {
103107
queue.done();
104108
bindingNamesReady.countDown();
109+
finishedParsing.countDown();
105110
}
106111
}
107112

@@ -122,11 +127,8 @@ public void handleSolution(BindingSet bindingSet)
122127
}
123128
catch (InterruptedException e) {
124129
Thread.currentThread().interrupt();
125-
close();
126-
throw new TupleQueryResultHandlerException(e);
127-
}
128-
if (isClosed()) {
129-
throw new TupleQueryResultHandlerException("Result closed");
130+
queue.toss(e);
131+
queue.done();
130132
}
131133
}
132134

core/util/src/main/java/org/eclipse/rdf4j/common/iteration/IterationWrapper.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ public boolean hasNext()
6060
if (isClosed()) {
6161
return false;
6262
}
63+
else if (Thread.currentThread().isInterrupted()) {
64+
close();
65+
return false;
66+
}
6367
boolean result = wrappedIter.hasNext();
6468
if (!result) {
6569
close();
@@ -79,6 +83,10 @@ public E next()
7983
if (isClosed()) {
8084
throw new NoSuchElementException("The iteration has been closed.");
8185
}
86+
else if (Thread.currentThread().isInterrupted()) {
87+
close();
88+
throw new NoSuchElementException("The iteration has been interrupted.");
89+
}
8290
try {
8391
return wrappedIter.next();
8492
}
@@ -103,6 +111,10 @@ public void remove()
103111
if (isClosed()) {
104112
throw new IllegalStateException("The iteration has been closed.");
105113
}
114+
else if (Thread.currentThread().isInterrupted()) {
115+
close();
116+
throw new IllegalStateException("The iteration has been interrupted.");
117+
}
106118
try {
107119
wrappedIter.remove();
108120
}

core/util/src/main/java/org/eclipse/rdf4j/common/iteration/QueueIteration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ public E getNextElement()
142142
}
143143
}
144144
if (isAfterLast(take)) {
145-
checkException();
146145
done(); // put afterLast back for others
146+
checkException();
147147
return null;
148148
}
149149
checkException();

0 commit comments

Comments
 (0)