Skip to content

Commit

Permalink
GH-2604 fix handling of uncommitted changes in RDF* triple retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
abrokenjester committed Nov 7, 2020
1 parent e7ce210 commit de3c949
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.eclipse.rdf4j.IsolationLevels;
Expand All @@ -25,6 +26,7 @@
import org.eclipse.rdf4j.model.ModelFactory;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Triple;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.Var;
Expand Down Expand Up @@ -433,6 +435,48 @@ synchronized Iterable<Statement> getApprovedStatements(Resource subj, IRI pred,
}
}

synchronized Iterable<Triple> getApprovedTriples(Resource subj, IRI pred, Value obj) {
if (approved == null) {
return Collections.emptyList();
}

// TODO none of this is particularly well thought-out in terms of performance, but we are aiming
// for functionally complete first.
Stream<Triple> approvedSubjectTriples = approved.parallelStream()
.filter(st -> st.getSubject() instanceof Triple)
.map(st -> (Triple) st.getSubject())
.filter(t -> {
if (subj != null && !subj.equals(t.getSubject())) {
return false;
}
if (pred != null && !pred.equals(t.getPredicate())) {
return false;
}
if (obj != null && !obj.equals(t.getObject())) {
return false;
}
return true;
});

Stream<Triple> approvedObjectTriples = approved.parallelStream()
.filter(st -> st.getObject() instanceof Triple)
.map(st -> (Triple) st.getObject())
.filter(t -> {
if (subj != null && !subj.equals(t.getSubject())) {
return false;
}
if (pred != null && !pred.equals(t.getPredicate())) {
return false;
}
if (obj != null && !obj.equals(t.getObject())) {
return false;
}
return true;
});

return Stream.concat(approvedSubjectTriples, approvedObjectTriples).collect(Collectors.toList());
}

synchronized void removeApproved(Statement next) {
if (approved != null) {
approved.remove(next);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.DistinctIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.FilterIteration;
import org.eclipse.rdf4j.common.iteration.IteratorIteration;
import org.eclipse.rdf4j.common.iteration.UnionIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
Expand Down Expand Up @@ -287,37 +290,43 @@ public CloseableIteration<? extends Statement, SailException> getStatements(Reso
}
}

@SuppressWarnings("unchecked")
@Override
public CloseableIteration<? extends Triple, SailException> getTriples(Resource subj, IRI pred, Value obj)
throws SailException {

CloseableIteration<? extends Triple, SailException> iter;
if (changes.isStatementCleared()) {
// nothing in the backing source is relevant, but we may still need to return approved data
// from the changeset
iter = null;
} else {
iter = derivedFrom.getTriples(subj, pred, obj);
}

if (iter == null) {
return new EmptyIteration<>();
if (changes.hasDeprecated() && iter != null) {
iter = triplesDifference(iter, triple -> isDeprecated(triple, changes.getDeprecatedStatements()));
}

if (changes.hasApproved()) {
if (iter != null) {
// merge newly approved triples in the changeset with data from the backing source
return new DistinctIteration<>(new UnionIteration<>(
iter,
new IteratorIteration<Triple, SailException>(
changes.getApprovedTriples(subj, pred, obj).iterator())
));
}

// nothing relevant in the backing source, just return all matching approved triples from the changeset
Iterator<Triple> i = changes.getApprovedTriples(subj, pred, obj).iterator();
return new CloseableIteratorIteration<>(i);
}

if (iter != null) {
return iter;
}
return iter; // TODO we will need to figure out a way to handle transaction isolation with deprecated and
// approved data
// Model deprecated = changes.getDeprecated();
// if (deprecated != null && iter != null) {
// iter = difference(iter, deprecated.));
// }
// Model approved = changes.getApproved();
// if (approved != null && iter != null) {
// return new DistinctModelReducingUnionIteration(iter, approved, (m) -> m.filter(subj, pred, obj, contexts));
//
// } else if (approved != null) {
// Iterator<Statement> i = approved.filter(subj, pred, obj, contexts).iterator();
// return new CloseableIteratorIteration<>(i);
// } else if (iter != null) {
// return iter;
// } else {
// return new EmptyIteration<>();
// }
return new EmptyIteration<>();
}

private CloseableIteration<? extends Statement, SailException> difference(
Expand All @@ -331,4 +340,38 @@ protected boolean accept(Statement stmt) {
};
}

private CloseableIteration<? extends Triple, SailException> triplesDifference(
CloseableIteration<? extends Triple, SailException> result, Function<Triple, Boolean> excluded) {
return new FilterIteration<Triple, SailException>(result) {

@Override
protected boolean accept(Triple stmt) {
return !excluded.apply(stmt);
}
};
}

private boolean isDeprecated(Triple triple, List<Statement> deprecatedStatements) {
// the triple is deprecated if the changeset deprecates all existing statements in the backing dataset that
// involve this triple.
try (CloseableIteration<? extends Statement, SailException> subjectStatements = derivedFrom
.getStatements(triple, null, null)) {
while (subjectStatements.hasNext()) {
Statement st = subjectStatements.next();
if (!deprecatedStatements.contains(st)) {
return false;
}
}
}
try (CloseableIteration<? extends Statement, SailException> objectStatements = derivedFrom
.getStatements(null, null, triple)) {
while (objectStatements.hasNext()) {
Statement st = objectStatements.next();
if (!deprecatedStatements.contains(st)) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.eclipse.rdf4j.sail.base;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.DistinctIteration;
import org.eclipse.rdf4j.common.iteration.ExceptionConvertingIteration;
import org.eclipse.rdf4j.common.iteration.Iteration;
import org.eclipse.rdf4j.model.IRI;
Expand Down Expand Up @@ -85,8 +86,10 @@ protected QueryEvaluationException convert(Exception e) {
public CloseableIteration<? extends Triple, QueryEvaluationException> getRdfStarTriples(Resource subj, IRI pred,
Value obj) throws QueryEvaluationException {
try {
return new TriplesIteration(dataset.getTriples(subj, pred, obj));
} catch (SailException e) { // TODO is this necessary?
// In contrast to statement retrieval (which gets de-duplicated later on when handling things like
// projections and conversions) we need to make sure we de-duplicate the RDF* triples here.
return new DistinctIteration<>(new TriplesIteration(dataset.getTriples(subj, pred, obj)));
} catch (SailException e) {
throw new QueryEvaluationException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private CloseableIteration<MemTriple, SailException> createTripleIterator(Resour
return new EmptyIteration<>();
}

// TODO there is no separate index for Trples, so for now we iterate over all statements to find matches.
// TODO there is no separate index for Triples, so for now we iterate over all statements to find matches.
return new MemTripleIterator<>(statements, memSubj, memPred, memObj, snapshot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryResults;
import org.eclipse.rdf4j.query.TupleQuery;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -169,22 +170,22 @@ public void testSparqlStarUpdate() {
public void testRdfStarAddAndRetrieveSparql() throws InterruptedException {

Triple insertedTriple = vf.createTriple(RDF.SUBJECT, RDF.PREDICATE, RDF.OBJECT);

Literal literal = vf.createLiteral("I am a triple ;-D");

testCon.begin();
testCon.add(insertedTriple, RDF.TYPE, literal);

TupleQuery query = testCon.prepareTupleQuery(
"SELECT * WHERE { << <http://www.w3.org/1999/02/22-rdf-syntax-ns#subject> <http://www.w3.org/1999/02/22-rdf-syntax-ns#predicate> <http://www.w3.org/1999/02/22-rdf-syntax-ns#object> >> ?a ?b}");

assertTrue(testCon.prepareBooleanQuery("ASK { ?t a 'I am a triple ;-D'}").evaluate());
assertEquals(1, testCon.prepareTupleQuery(
"SELECT * WHERE { << <http://www.w3.org/1999/02/22-rdf-syntax-ns#subject> <http://www.w3.org/1999/02/22-rdf-syntax-ns#predicate> <http://www.w3.org/1999/02/22-rdf-syntax-ns#object> >> ?a ?b}")
.evaluate()
.stream()
.count());
assertEquals(1, query.evaluate().stream().count());
testCon.commit();
}

@Test
public void testRdfStarAddAndRetrieveSparqlSeperateTransaction() throws InterruptedException {
public void testRdfStarAddAndRetrieveSparqlSeparateTransaction() throws InterruptedException {

Triple insertedTriple = vf.createTriple(RDF.SUBJECT, RDF.PREDICATE, RDF.OBJECT);
Literal literal = vf.createLiteral("I am a triple ;-D");
Expand Down

0 comments on commit de3c949

Please sign in to comment.