Skip to content

Commit

Permalink
issue#301 : Work on tightening the contract for lucene sail components
Browse files Browse the repository at this point in the history
Note, this does not fix the issue which is still visible, by enforces more structural constraints to avoid future issues.

Lock more Lucene methods. The internal Lucene API isn't stable for long term threaded access so external synchronisation is necessary.

Add checks on calls after closure

Signed-off-by: Peter Ansell <p_ansell@yahoo.com>
  • Loading branch information
ansell authored and abrokenjester committed Sep 5, 2016
1 parent 597f58f commit fbf9e13
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
package org.eclipse.rdf4j.sail.lucene;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;

public abstract class AbstractLuceneIndex extends AbstractSearchIndex {

/**
* keep a lit of old monitors that are still iterating but not closed (open iterators), will be all closed
* on shutdown items are removed from list by ReaderMnitor.endReading() when closing
*/
protected final Collection<AbstractReaderMonitor> oldmonitors = new LinkedList<AbstractReaderMonitor>();
protected final Collection<AbstractReaderMonitor> oldmonitors = new ArrayList<AbstractReaderMonitor>();

protected abstract AbstractReaderMonitor getCurrentMonitor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,22 @@ protected AbstractReaderMonitor(AbstractLuceneIndex index) {
this.index = index;
}

public int getReadingCount() {
public final int getReadingCount() {
return readingCount.get();
}

/**
*
*/
public void beginReading() {
public final synchronized void beginReading() {
if (closed.get()) {
throw new IllegalStateException("Cannot begin reading as we have been closed.");
}
// We cannot allow any more readers to be open at this stage, as any
// decrements towards zero on readingCount could trigger closure/removal
if (doClose.get()) {
throw new IllegalStateException("Cannot begin reading as we have moved into closing stages.");
}
readingCount.incrementAndGet();
}

Expand All @@ -49,16 +57,17 @@ public void beginReading() {
*
* @throws IOException
*/
public void endReading()
public final synchronized void endReading()
throws IOException
{
if (readingCount.decrementAndGet() == 0 && doClose.get()) {
// when endReading is called on CurrentMonitor and it should be closed,
// close it
close();// close Lucene index remove them self from Lucene index
if (readingCount.decrementAndGet() <= 0 && doClose.get()) {
// when endReading is called on CurrentMonitor and it should be
// closed, close it
close();
// close Lucene index remove them self from Lucene index
synchronized (index.oldmonitors) {
index.oldmonitors.remove(this); // if its not in the list, then this
// is a no-operation
// if its not in the list, then this is a no-operation
index.oldmonitors.remove(this);
}
}
}
Expand All @@ -69,7 +78,7 @@ public void endReading()
* @return <code>true</code> if the close succeeded, <code>false</code> otherwise.
* @throws IOException
*/
public boolean closeWhenPossible()
public final synchronized boolean closeWhenPossible()
throws IOException
{
doClose.set(true);
Expand All @@ -79,10 +88,10 @@ public boolean closeWhenPossible()
return closed.get();
}

public void close()
public final void close()
throws IOException
{
if (!closed.getAndSet(true)) {
if (closed.compareAndSet(false, true)) {
handleClose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
Expand Down Expand Up @@ -253,20 +254,22 @@ public class LuceneSail extends NotifyingSailWrapper {
/**
* The LuceneIndex holding the indexed literals.
*/
private SearchIndex luceneIndex;
private volatile SearchIndex luceneIndex;

protected final Properties parameters = new Properties();

private String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s";
private volatile String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s";

private boolean incompleteQueryFails = true;
private volatile boolean incompleteQueryFails = true;

private Set<IRI> indexedFields;

private Map<IRI, IRI> indexedFieldsMapping;

private IndexableStatementFilter filter = null;

private final AtomicBoolean closed = new AtomicBoolean(false);

public void setLuceneIndex(SearchIndex luceneIndex) {
this.luceneIndex = luceneIndex;
}
Expand All @@ -286,18 +289,22 @@ public NotifyingSailConnection getConnection()
public void shutDown()
throws SailException
{
try {
if (luceneIndex != null) {
luceneIndex.shutDown();
if(closed.compareAndSet(false, true)) {
try {
SearchIndex toShutDownLuceneIndex = luceneIndex;
luceneIndex = null;
if (toShutDownLuceneIndex != null) {
toShutDownLuceneIndex.shutDown();
}
}
catch (IOException e) {
throw new SailException(e);
}
finally {
// ensure that super is also invoked when the LuceneIndex causes an
// IOException
super.shutDown();
}
}
catch (IOException e) {
throw new SailException(e);
}
finally {
// ensure that super is also invoked when the LuceneIndex causes an
// IOException
super.shutDown();
}
}

Expand Down Expand Up @@ -492,27 +499,33 @@ public void registerStatementFilter(IndexableStatementFilter filter) {
}

protected boolean acceptStatementToIndex(Statement s) {
return (filter != null) ? filter.accept(s) : true;
IndexableStatementFilter nextFilter = filter;
return (nextFilter != null) ? nextFilter.accept(s) : true;
}

public Statement mapStatement(Statement statement) {
IRI p = statement.getPredicate();
boolean predicateChanged = false;
if (indexedFieldsMapping != null) {
IRI res = indexedFieldsMapping.get(p);
Map<IRI, IRI> nextIndexedFieldsMapping = indexedFieldsMapping;
if (nextIndexedFieldsMapping != null) {
IRI res = nextIndexedFieldsMapping.get(p);
if (res != null) {
p = res;
predicateChanged = true;
}
}
if (this.indexedFields != null && !this.indexedFields.contains(p))
Set<IRI> nextIndexedFields = indexedFields;
if (nextIndexedFields != null && !nextIndexedFields.contains(p)) {
return null;
}

if (predicateChanged)
if (predicateChanged) {
return getValueFactory().createStatement(statement.getSubject(), p, statement.getObject(),
statement.getContext());
else
}
else {
return statement;
}
}

protected Collection<SearchQueryInterpreter> getSearchQueryInterpreters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.IRI;
Expand Down Expand Up @@ -121,7 +122,7 @@ public void statementRemoved(Statement statement) {
/**
* To remember if the iterator was already closed and only free resources once
*/
private boolean mustclose = false;
private final AtomicBoolean closed = new AtomicBoolean(false);

public LuceneSailConnection(NotifyingSailConnection wrappedConnection, SearchIndex luceneIndex,
LuceneSail sail)
Expand All @@ -148,19 +149,21 @@ public synchronized void addStatement(Resource arg0, IRI arg1, Value arg2, Resou
public void close()
throws SailException
{
// remember if you were closed before, some sloppy programmers
// may call close() twice.
if (mustclose) {
mustclose = false;
if (closed.compareAndSet(false, true)) {
try {
luceneIndex.endReading();
super.close();
}
catch (IOException e) {
logger.warn("could not close IndexReader or IndexSearcher " + e, e);
finally {
try {
luceneIndex.endReading();
}
catch (IOException e) {
logger.warn("could not close IndexReader or IndexSearcher " + e, e);
}
// remember if you were closed before, some sloppy programmers
// may call close() twice.
}
}

super.close();
}

// //////////////////////////////// Methods related to indexing
Expand Down Expand Up @@ -277,8 +280,8 @@ private void clearContexts(Resource... contexts)
// //////////////////////////////// Methods related to querying

@Override
public CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate(TupleExpr tupleExpr,
Dataset dataset, BindingSet bindings, boolean includeInferred)
public synchronized CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate(
TupleExpr tupleExpr, Dataset dataset, BindingSet bindings, boolean includeInferred)
throws SailException
{
// Don't modify the original tuple expression
Expand Down Expand Up @@ -319,14 +322,17 @@ private void evaluateLuceneQueries(Collection<SearchQueryEvaluator> queries, Tup
// - multiple different property constraints can be put into the lucene
// query string (escape colons here)

if (closed.get()) {
throw new SailException("Sail has been closed already");
}

// mark that reading is in progress
try {
this.luceneIndex.beginReading();
}
catch (IOException e) {
throw new SailException(e);
}
this.mustclose = true;

// evaluate queries, generate binding sets, and remove queries
for (SearchQueryEvaluator query : queries) {
Expand Down
Loading

0 comments on commit fbf9e13

Please sign in to comment.