Skip to content

Commit

Permalink
GH-4784 interrupt threads using connections when forcefully closing (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad authored Oct 2, 2023
2 parents 6bc981a + 475aa5d commit 0410820
Show file tree
Hide file tree
Showing 25 changed files with 589 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,50 +352,58 @@ public TupleExpr optimize(TupleExpr expr, EvaluationStatistics evaluationStatist
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(TupleExpr expr, BindingSet bindings)
throws QueryEvaluationException {

CloseableIteration<BindingSet, QueryEvaluationException> ret;
CloseableIteration<BindingSet, QueryEvaluationException> ret = null;

if (expr instanceof StatementPattern) {
ret = evaluate((StatementPattern) expr, bindings);
} else if (expr instanceof UnaryTupleOperator) {
ret = evaluate((UnaryTupleOperator) expr, bindings);
} else if (expr instanceof BinaryTupleOperator) {
ret = evaluate((BinaryTupleOperator) expr, bindings);
} else if (expr instanceof SingletonSet) {
ret = evaluate((SingletonSet) expr, bindings);
} else if (expr instanceof EmptySet) {
ret = evaluate((EmptySet) expr, bindings);
} else if (expr instanceof ZeroLengthPath) {
ret = evaluate((ZeroLengthPath) expr, bindings);
} else if (expr instanceof ArbitraryLengthPath) {
ret = evaluate((ArbitraryLengthPath) expr, bindings);
} else if (expr instanceof BindingSetAssignment) {
ret = evaluate((BindingSetAssignment) expr, bindings);
} else if (expr instanceof TripleRef) {
ret = evaluate((TripleRef) expr, bindings);
} else if (expr instanceof TupleFunctionCall) {
if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) {
throw new QueryEvaluationException(
"Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode());
try {
if (expr instanceof StatementPattern) {
ret = evaluate((StatementPattern) expr, bindings);
} else if (expr instanceof UnaryTupleOperator) {
ret = evaluate((UnaryTupleOperator) expr, bindings);
} else if (expr instanceof BinaryTupleOperator) {
ret = evaluate((BinaryTupleOperator) expr, bindings);
} else if (expr instanceof SingletonSet) {
ret = evaluate((SingletonSet) expr, bindings);
} else if (expr instanceof EmptySet) {
ret = evaluate((EmptySet) expr, bindings);
} else if (expr instanceof ZeroLengthPath) {
ret = evaluate((ZeroLengthPath) expr, bindings);
} else if (expr instanceof ArbitraryLengthPath) {
ret = evaluate((ArbitraryLengthPath) expr, bindings);
} else if (expr instanceof BindingSetAssignment) {
ret = evaluate((BindingSetAssignment) expr, bindings);
} else if (expr instanceof TripleRef) {
ret = evaluate((TripleRef) expr, bindings);
} else if (expr instanceof TupleFunctionCall) {
if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) {
throw new QueryEvaluationException(
"Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode());
}
return evaluate(expr, bindings);
} else if (expr == null) {
throw new IllegalArgumentException("expr must not be null");
} else {
throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass());
}
return evaluate((TupleFunctionCall) expr, bindings);
} else if (expr == null) {
throw new IllegalArgumentException("expr must not be null");
} else {
throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass());
}

if (trackTime) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual()));
ret = new TimedIterator(ret, expr);
}
if (trackTime) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual()));
ret = new TimedIterator(ret, expr);
}

if (trackResultSize) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual()));
ret = new ResultSizeCountingIterator(ret, expr);
if (trackResultSize) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual()));
ret = new ResultSizeCountingIterator(ret, expr);
}
return ret;

} catch (Throwable t) {
if (ret != null) {
ret.close();
}
throw t;
}
return ret;
}

@Override
Expand Down Expand Up @@ -708,9 +716,18 @@ protected QueryEvaluationStep prepare(DescribeOperator node, QueryEvaluationCont

@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bs) {
return new DescribeIteration(child.evaluate(bs), DefaultEvaluationStrategy.this,
node.getBindingNames(),
bs);
CloseableIteration<BindingSet, QueryEvaluationException> evaluate = null;

try {
evaluate = child.evaluate(bs);
return new DescribeIteration(evaluate, DefaultEvaluationStrategy.this, node.getBindingNames(), bs);
} catch (Throwable t) {
if (evaluate != null) {
evaluate.close();
}
throw t;
}

}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,21 @@ protected CloseableIteration<BindingSet, QueryEvaluationException> createNextIte
return strategy.evaluate(pattern, parentBindings);
}

@Override
protected void handleClose() throws QueryEvaluationException {
try {
super.handleClose();

} finally {
try {
if (currentDescribeExprIter != null)
currentDescribeExprIter.close();
} finally {
if (sourceIter instanceof CloseableIteration) {
((CloseableIteration<?, QueryEvaluationException>) sourceIter).close();
}
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,19 @@ public RepositoryResult<Statement> getStatements(Resource subj, IRI pred, Value
Resource... contexts) throws RepositoryException {
Objects.requireNonNull(contexts,
"contexts argument may not be null; either the value should be cast to Resource or an empty array should be supplied");

CloseableIteration<? extends Statement, SailException> statements = null;
try {
return createRepositoryResult(sailConnection.getStatements(subj, pred, obj, includeInferred, contexts));
} catch (SailException e) {
throw new RepositoryException("Unable to get statements from Sail", e);
statements = sailConnection.getStatements(subj, pred, obj, includeInferred, contexts);
return createRepositoryResult(statements);
} catch (Throwable t) {
if (statements != null) {
statements.close();
}
if (t instanceof SailException) {
throw new RepositoryException("Unable to get statements from Sail", t);
} else {
throw t;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,38 @@ public void shutDown() throws SailException {
activeConnectionsCopy = new IdentityHashMap<>(activeConnections);
}

// Interrupt any threads that are still using a connection, in case they are waiting for a lock
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
try {
SailConnection con = entry.getKey();

if (con instanceof AbstractSailConnection) {
AbstractSailConnection sailCon = (AbstractSailConnection) con;
Thread owner = sailCon.getOwner();
if (owner != Thread.currentThread()) {
owner.interrupt();
// wait up to 1 second for the owner thread to die
owner.join(1000);
if (owner.isAlive()) {
logger.error(
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
owner);
}
}
}

} catch (Throwable e) {
if (e instanceof InterruptedException) {
throw new SailException(e);
} else if (e instanceof AssertionError) {
// ignore assertions errors
} else if (e instanceof Error) {
throw (Error) e;
}
// ignore all other exceptions
}
}

// Forcefully close any connections that are still open
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
SailConnection con = entry.getKey();
Expand Down
Loading

0 comments on commit 0410820

Please sign in to comment.