Skip to content

Commit 0410820

Browse files
authored
GH-4784 interrupt threads using connections when forcefully closing (#4787)
2 parents 6bc981a + 475aa5d commit 0410820

25 files changed

Lines changed: 589 additions & 130 deletions

File tree

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -352,50 +352,58 @@ public TupleExpr optimize(TupleExpr expr, EvaluationStatistics evaluationStatist
352352
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(TupleExpr expr, BindingSet bindings)
353353
throws QueryEvaluationException {
354354

355-
CloseableIteration<BindingSet, QueryEvaluationException> ret;
355+
CloseableIteration<BindingSet, QueryEvaluationException> ret = null;
356356

357-
if (expr instanceof StatementPattern) {
358-
ret = evaluate((StatementPattern) expr, bindings);
359-
} else if (expr instanceof UnaryTupleOperator) {
360-
ret = evaluate((UnaryTupleOperator) expr, bindings);
361-
} else if (expr instanceof BinaryTupleOperator) {
362-
ret = evaluate((BinaryTupleOperator) expr, bindings);
363-
} else if (expr instanceof SingletonSet) {
364-
ret = evaluate((SingletonSet) expr, bindings);
365-
} else if (expr instanceof EmptySet) {
366-
ret = evaluate((EmptySet) expr, bindings);
367-
} else if (expr instanceof ZeroLengthPath) {
368-
ret = evaluate((ZeroLengthPath) expr, bindings);
369-
} else if (expr instanceof ArbitraryLengthPath) {
370-
ret = evaluate((ArbitraryLengthPath) expr, bindings);
371-
} else if (expr instanceof BindingSetAssignment) {
372-
ret = evaluate((BindingSetAssignment) expr, bindings);
373-
} else if (expr instanceof TripleRef) {
374-
ret = evaluate((TripleRef) expr, bindings);
375-
} else if (expr instanceof TupleFunctionCall) {
376-
if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) {
377-
throw new QueryEvaluationException(
378-
"Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode());
357+
try {
358+
if (expr instanceof StatementPattern) {
359+
ret = evaluate((StatementPattern) expr, bindings);
360+
} else if (expr instanceof UnaryTupleOperator) {
361+
ret = evaluate((UnaryTupleOperator) expr, bindings);
362+
} else if (expr instanceof BinaryTupleOperator) {
363+
ret = evaluate((BinaryTupleOperator) expr, bindings);
364+
} else if (expr instanceof SingletonSet) {
365+
ret = evaluate((SingletonSet) expr, bindings);
366+
} else if (expr instanceof EmptySet) {
367+
ret = evaluate((EmptySet) expr, bindings);
368+
} else if (expr instanceof ZeroLengthPath) {
369+
ret = evaluate((ZeroLengthPath) expr, bindings);
370+
} else if (expr instanceof ArbitraryLengthPath) {
371+
ret = evaluate((ArbitraryLengthPath) expr, bindings);
372+
} else if (expr instanceof BindingSetAssignment) {
373+
ret = evaluate((BindingSetAssignment) expr, bindings);
374+
} else if (expr instanceof TripleRef) {
375+
ret = evaluate((TripleRef) expr, bindings);
376+
} else if (expr instanceof TupleFunctionCall) {
377+
if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) {
378+
throw new QueryEvaluationException(
379+
"Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode());
380+
}
381+
return evaluate(expr, bindings);
382+
} else if (expr == null) {
383+
throw new IllegalArgumentException("expr must not be null");
384+
} else {
385+
throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass());
379386
}
380-
return evaluate((TupleFunctionCall) expr, bindings);
381-
} else if (expr == null) {
382-
throw new IllegalArgumentException("expr must not be null");
383-
} else {
384-
throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass());
385-
}
386387

387-
if (trackTime) {
388-
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
389-
expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual()));
390-
ret = new TimedIterator(ret, expr);
391-
}
388+
if (trackTime) {
389+
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
390+
expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual()));
391+
ret = new TimedIterator(ret, expr);
392+
}
392393

393-
if (trackResultSize) {
394-
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
395-
expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual()));
396-
ret = new ResultSizeCountingIterator(ret, expr);
394+
if (trackResultSize) {
395+
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
396+
expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual()));
397+
ret = new ResultSizeCountingIterator(ret, expr);
398+
}
399+
return ret;
400+
401+
} catch (Throwable t) {
402+
if (ret != null) {
403+
ret.close();
404+
}
405+
throw t;
397406
}
398-
return ret;
399407
}
400408

401409
@Override
@@ -708,9 +716,18 @@ protected QueryEvaluationStep prepare(DescribeOperator node, QueryEvaluationCont
708716

709717
@Override
710718
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bs) {
711-
return new DescribeIteration(child.evaluate(bs), DefaultEvaluationStrategy.this,
712-
node.getBindingNames(),
713-
bs);
719+
CloseableIteration<BindingSet, QueryEvaluationException> evaluate = null;
720+
721+
try {
722+
evaluate = child.evaluate(bs);
723+
return new DescribeIteration(evaluate, DefaultEvaluationStrategy.this, node.getBindingNames(), bs);
724+
} catch (Throwable t) {
725+
if (evaluate != null) {
726+
evaluate.close();
727+
}
728+
throw t;
729+
}
730+
714731
}
715732
};
716733
}

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/DescribeIteration.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,4 +220,21 @@ protected CloseableIteration<BindingSet, QueryEvaluationException> createNextIte
220220
return strategy.evaluate(pattern, parentBindings);
221221
}
222222

223+
@Override
224+
protected void handleClose() throws QueryEvaluationException {
225+
try {
226+
super.handleClose();
227+
228+
} finally {
229+
try {
230+
if (currentDescribeExprIter != null)
231+
currentDescribeExprIter.close();
232+
} finally {
233+
if (sourceIter instanceof CloseableIteration) {
234+
((CloseableIteration<?, QueryEvaluationException>) sourceIter).close();
235+
}
236+
}
237+
238+
}
239+
}
223240
}

core/repository/sail/src/main/java/org/eclipse/rdf4j/repository/sail/SailRepositoryConnection.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,19 @@ public RepositoryResult<Statement> getStatements(Resource subj, IRI pred, Value
349349
Resource... contexts) throws RepositoryException {
350350
Objects.requireNonNull(contexts,
351351
"contexts argument may not be null; either the value should be cast to Resource or an empty array should be supplied");
352-
352+
CloseableIteration<? extends Statement, SailException> statements = null;
353353
try {
354-
return createRepositoryResult(sailConnection.getStatements(subj, pred, obj, includeInferred, contexts));
355-
} catch (SailException e) {
356-
throw new RepositoryException("Unable to get statements from Sail", e);
354+
statements = sailConnection.getStatements(subj, pred, obj, includeInferred, contexts);
355+
return createRepositoryResult(statements);
356+
} catch (Throwable t) {
357+
if (statements != null) {
358+
statements.close();
359+
}
360+
if (t instanceof SailException) {
361+
throw new RepositoryException("Unable to get statements from Sail", t);
362+
} else {
363+
throw t;
364+
}
357365
}
358366
}
359367

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,38 @@ public void shutDown() throws SailException {
222222
activeConnectionsCopy = new IdentityHashMap<>(activeConnections);
223223
}
224224

225+
// Interrupt any threads that are still using a connection, in case they are waiting for a lock
226+
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
227+
try {
228+
SailConnection con = entry.getKey();
229+
230+
if (con instanceof AbstractSailConnection) {
231+
AbstractSailConnection sailCon = (AbstractSailConnection) con;
232+
Thread owner = sailCon.getOwner();
233+
if (owner != Thread.currentThread()) {
234+
owner.interrupt();
235+
// wait up to 1 second for the owner thread to die
236+
owner.join(1000);
237+
if (owner.isAlive()) {
238+
logger.error(
239+
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
240+
owner);
241+
}
242+
}
243+
}
244+
245+
} catch (Throwable e) {
246+
if (e instanceof InterruptedException) {
247+
throw new SailException(e);
248+
} else if (e instanceof AssertionError) {
249+
// ignore assertions errors
250+
} else if (e instanceof Error) {
251+
throw (Error) e;
252+
}
253+
// ignore all other exceptions
254+
}
255+
}
256+
225257
// Forcefully close any connections that are still open
226258
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
227259
SailConnection con = entry.getKey();

0 commit comments

Comments
 (0)