Skip to content

Commit

Permalink
Merge pull request #2608 from eclipse/GH-2604-rdfstar-transaction-bug
Browse files Browse the repository at this point in the history
GH-2604 rdfstar transaction bug
  • Loading branch information
hmottestad authored Nov 7, 2020
2 parents 6fe2494 + de3c949 commit ed8fb47
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.eclipse.rdf4j.IsolationLevels;
Expand All @@ -25,6 +26,7 @@
import org.eclipse.rdf4j.model.ModelFactory;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Triple;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.Var;
Expand Down Expand Up @@ -433,6 +435,48 @@ synchronized Iterable<Statement> getApprovedStatements(Resource subj, IRI pred,
}
}

synchronized Iterable<Triple> getApprovedTriples(Resource subj, IRI pred, Value obj) {
if (approved == null) {
return Collections.emptyList();
}

// TODO none of this is particularly well thought-out in terms of performance, but we are aiming
// for functionally complete first.
Stream<Triple> approvedSubjectTriples = approved.parallelStream()
.filter(st -> st.getSubject() instanceof Triple)
.map(st -> (Triple) st.getSubject())
.filter(t -> {
if (subj != null && !subj.equals(t.getSubject())) {
return false;
}
if (pred != null && !pred.equals(t.getPredicate())) {
return false;
}
if (obj != null && !obj.equals(t.getObject())) {
return false;
}
return true;
});

Stream<Triple> approvedObjectTriples = approved.parallelStream()
.filter(st -> st.getObject() instanceof Triple)
.map(st -> (Triple) st.getObject())
.filter(t -> {
if (subj != null && !subj.equals(t.getSubject())) {
return false;
}
if (pred != null && !pred.equals(t.getPredicate())) {
return false;
}
if (obj != null && !obj.equals(t.getObject())) {
return false;
}
return true;
});

return Stream.concat(approvedSubjectTriples, approvedObjectTriples).collect(Collectors.toList());
}

synchronized void removeApproved(Statement next) {
if (approved != null) {
approved.remove(next);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.DistinctIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.FilterIteration;
import org.eclipse.rdf4j.common.iteration.IteratorIteration;
import org.eclipse.rdf4j.common.iteration.UnionIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
Expand Down Expand Up @@ -287,37 +290,43 @@ public CloseableIteration<? extends Statement, SailException> getStatements(Reso
}
}

@SuppressWarnings("unchecked")
@Override
public CloseableIteration<? extends Triple, SailException> getTriples(Resource subj, IRI pred, Value obj)
throws SailException {

CloseableIteration<? extends Triple, SailException> iter;
if (changes.isStatementCleared()) {
// nothing in the backing source is relevant, but we may still need to return approved data
// from the changeset
iter = null;
} else {
iter = derivedFrom.getTriples(subj, pred, obj);
}

if (iter == null) {
return new EmptyIteration<>();
if (changes.hasDeprecated() && iter != null) {
iter = triplesDifference(iter, triple -> isDeprecated(triple, changes.getDeprecatedStatements()));
}

if (changes.hasApproved()) {
if (iter != null) {
// merge newly approved triples in the changeset with data from the backing source
return new DistinctIteration<>(new UnionIteration<>(
iter,
new IteratorIteration<Triple, SailException>(
changes.getApprovedTriples(subj, pred, obj).iterator())
));
}

// nothing relevant in the backing source, just return all matching approved triples from the changeset
Iterator<Triple> i = changes.getApprovedTriples(subj, pred, obj).iterator();
return new CloseableIteratorIteration<>(i);
}

if (iter != null) {
return iter;
}
return iter; // TODO we will need to figure out a way to handle transaction isolation with deprecated and
// approved data
// Model deprecated = changes.getDeprecated();
// if (deprecated != null && iter != null) {
// iter = difference(iter, deprecated.));
// }
// Model approved = changes.getApproved();
// if (approved != null && iter != null) {
// return new DistinctModelReducingUnionIteration(iter, approved, (m) -> m.filter(subj, pred, obj, contexts));
//
// } else if (approved != null) {
// Iterator<Statement> i = approved.filter(subj, pred, obj, contexts).iterator();
// return new CloseableIteratorIteration<>(i);
// } else if (iter != null) {
// return iter;
// } else {
// return new EmptyIteration<>();
// }
return new EmptyIteration<>();
}

private CloseableIteration<? extends Statement, SailException> difference(
Expand All @@ -331,4 +340,38 @@ protected boolean accept(Statement stmt) {
};
}

private CloseableIteration<? extends Triple, SailException> triplesDifference(
CloseableIteration<? extends Triple, SailException> result, Function<Triple, Boolean> excluded) {
return new FilterIteration<Triple, SailException>(result) {

@Override
protected boolean accept(Triple stmt) {
return !excluded.apply(stmt);
}
};
}

private boolean isDeprecated(Triple triple, List<Statement> deprecatedStatements) {
// the triple is deprecated if the changeset deprecates all existing statements in the backing dataset that
// involve this triple.
try (CloseableIteration<? extends Statement, SailException> subjectStatements = derivedFrom
.getStatements(triple, null, null)) {
while (subjectStatements.hasNext()) {
Statement st = subjectStatements.next();
if (!deprecatedStatements.contains(st)) {
return false;
}
}
}
try (CloseableIteration<? extends Statement, SailException> objectStatements = derivedFrom
.getStatements(null, null, triple)) {
while (objectStatements.hasNext()) {
Statement st = objectStatements.next();
if (!deprecatedStatements.contains(st)) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.eclipse.rdf4j.sail.base;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.DistinctIteration;
import org.eclipse.rdf4j.common.iteration.ExceptionConvertingIteration;
import org.eclipse.rdf4j.common.iteration.Iteration;
import org.eclipse.rdf4j.model.IRI;
Expand Down Expand Up @@ -85,8 +86,10 @@ protected QueryEvaluationException convert(Exception e) {
public CloseableIteration<? extends Triple, QueryEvaluationException> getRdfStarTriples(Resource subj, IRI pred,
Value obj) throws QueryEvaluationException {
try {
return new TriplesIteration(dataset.getTriples(subj, pred, obj));
} catch (SailException e) { // TODO is this necessary?
// In contrast to statement retrieval (which gets de-duplicated later on when handling things like
// projections and conversions) we need to make sure we de-duplicate the RDF* triples here.
return new DistinctIteration<>(new TriplesIteration(dataset.getTriples(subj, pred, obj)));
} catch (SailException e) {
throw new QueryEvaluationException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ protected Repository createRepository() {
return new SailRepository(
new ElasticsearchStore(clientPool, "index1"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ protected Repository createRepository() throws IOException {
sail.addMember(new SailRepository(new MemoryStore()));
return new SailRepository(sail);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ protected Repository createRepository()
ResourceUtil.getString("/custom-query-inferencing/rule.rq"),
ResourceUtil.getString("/custom-query-inferencing/match.rq")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private CloseableIteration<MemTriple, SailException> createTripleIterator(Resour
return new EmptyIteration<>();
}

// TODO there is no separate index for Trples, so for now we iterate over all statements to find matches.
// TODO there is no separate index for Triples, so for now we iterate over all statements to find matches.
return new MemTripleIterator<>(statements, memSubj, memPred, memObj, snapshot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public MemoryStoreConnectionTest(IsolationLevel level) {
protected Repository createRepository() {
return new SailRepository(new MemoryStore());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnectionTest;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.junit.Ignore;
import org.junit.Test;

public class NativeStoreConnectionTest extends RepositoryConnectionTest {
Expand Down Expand Up @@ -74,4 +75,5 @@ public void testSES715() throws Exception {

testCon2.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ protected Repository createRepository()
return new SailRepository(
new SpinSail(new SchemaCachingRDFSInferencer(new DedupingInferencer(new MemoryStore()), false)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.eclipse.rdf4j.repository;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.List;
Expand All @@ -22,6 +24,7 @@
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryResults;
import org.eclipse.rdf4j.query.TupleQuery;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -163,5 +166,59 @@ public void testSparqlStarUpdate() {
assertThat(testCon.hasStatement(bob, FOAF.AGE, vf.createLiteral(23), false));
}

@Test
public void testRdfStarAddAndRetrieveSparql() throws InterruptedException {

Triple insertedTriple = vf.createTriple(RDF.SUBJECT, RDF.PREDICATE, RDF.OBJECT);

Literal literal = vf.createLiteral("I am a triple ;-D");

testCon.begin();
testCon.add(insertedTriple, RDF.TYPE, literal);

TupleQuery query = testCon.prepareTupleQuery(
"SELECT * WHERE { << <http://www.w3.org/1999/02/22-rdf-syntax-ns#subject> <http://www.w3.org/1999/02/22-rdf-syntax-ns#predicate> <http://www.w3.org/1999/02/22-rdf-syntax-ns#object> >> ?a ?b}");

assertTrue(testCon.prepareBooleanQuery("ASK { ?t a 'I am a triple ;-D'}").evaluate());
assertEquals(1, query.evaluate().stream().count());
testCon.commit();
}

@Test
public void testRdfStarAddAndRetrieveSparqlSeparateTransaction() throws InterruptedException {

Triple insertedTriple = vf.createTriple(RDF.SUBJECT, RDF.PREDICATE, RDF.OBJECT);
Literal literal = vf.createLiteral("I am a triple ;-D");
testCon.begin();

testCon.add(insertedTriple, RDF.TYPE, literal);
testCon.commit();
testCon.begin();
assertTrue(testCon.prepareBooleanQuery("ASK { ?t a 'I am a triple ;-D'}").evaluate());
assertEquals(1, testCon.prepareTupleQuery(
"SELECT * WHERE { << <http://www.w3.org/1999/02/22-rdf-syntax-ns#subject> <http://www.w3.org/1999/02/22-rdf-syntax-ns#predicate> <http://www.w3.org/1999/02/22-rdf-syntax-ns#object> >> ?a ?b}")
.evaluate()
.stream()
.count());
testCon.commit();

}

@Test
public void testRdfStarAddAndRetrieve() throws InterruptedException {

Triple insertedTriple = vf.createTriple(RDF.SUBJECT, RDF.PREDICATE, RDF.OBJECT);
Triple copyOfInsertedTriple = vf.createTriple(RDF.SUBJECT, RDF.PREDICATE, RDF.OBJECT);
Literal literal = vf.createLiteral("I am a triple ;-D");
testCon.begin();

testCon.add(insertedTriple, RDF.TYPE, literal);

assertEquals(1, testCon.getStatements(null, RDF.TYPE, literal, false).stream().count());
assertEquals(1, testCon.getStatements(copyOfInsertedTriple, null, null, false).stream().count());
testCon.commit();

}

protected abstract Repository createRepository();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Triple;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.LinkedHashModel;
Expand Down Expand Up @@ -86,10 +87,14 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public abstract class RepositoryConnectionTest {

private final Logger logger = LoggerFactory.getLogger(RepositoryConnectionTest.class);

@BeforeClass
public static void setUpClass() throws Exception {
// Turn off debugging for this test, as the cleanup processes are working correctly,
Expand Down Expand Up @@ -1784,4 +1789,5 @@ private int getTotalStatementCount(RepositoryConnection connection) throws Repos
iter.close();
}
}

}

0 comments on commit ed8fb47

Please sign in to comment.