diff --git a/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousQueryExecutionState.java b/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousQueryExecutionState.java index 36ff95aea..0fa3d4f8b 100644 --- a/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousQueryExecutionState.java +++ b/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousQueryExecutionState.java @@ -27,7 +27,7 @@ public class ContinuousQueryExecutionState { /** * The stream tuple that are matched in the last query execution */ - protected final Set containedTupleKeys; + protected final Set containedStreamKeys; /** * The stream tuples and their join partners that are used in the last execution @@ -40,18 +40,18 @@ public class ContinuousQueryExecutionState { protected final Set joinPartnersForCurrentKey; public ContinuousQueryExecutionState() { - this.containedTupleKeys = new HashSet<>(); + this.containedStreamKeys = new HashSet<>(); this.containedJoinedKeys = new HashMap<>(); this.joinPartnersForCurrentKey = new HashSet<>(); } /** - * Was the stream key contained in the last query + * Was the stream key contained in the last range query * @param key * @return */ - public boolean wasStreamKeyContainedInLastQuery(final String key) { - return containedTupleKeys.contains(key); + public boolean wasStreamKeyContainedInLastRangeQuery(final String key) { + return containedStreamKeys.contains(key); } /** @@ -59,8 +59,8 @@ public boolean wasStreamKeyContainedInLastQuery(final String key) { * @param key * @return */ - public boolean removeStreamKeyFromState(final String key) { - return containedTupleKeys.remove(key); + public boolean removeStreamKeyFromRangeState(final String key) { + return containedStreamKeys.remove(key); } /** @@ -68,7 +68,7 @@ public boolean removeStreamKeyFromState(final String key) { * @param key */ public void addStreamKeyToState(final String key) { - containedTupleKeys.add(key); + containedStreamKeys.add(key); } /** @@ -84,7 +84,7 @@ public void addJoinCandidateForCurrentKey(final String key) { * @param streamKey * @return */ - public Set clearStateAndGetMissingJoinpartners(final String streamKey) { + public Set commitStateAndGetMissingJoinpartners(final String streamKey) { final Set oldJoinPartners = containedJoinedKeys.getOrDefault(streamKey, new HashSet<>()); @@ -113,12 +113,31 @@ public void clearJoinPartnerState() { public Map> getContainedJoinedKeys() { return containedJoinedKeys; } + + /** + * Remove the stream key from join state + * @param streamKey + * @return + */ + public Set removeStreamKeyFromJoinState(final String streamKey) { + return containedJoinedKeys.remove(streamKey); + } + + /** + * Was the stream key contained in the last join query + * @param key + * @return + */ + public boolean wasStreamKeyContainedInLastJoinQuery(final String key) { + return containedJoinedKeys.containsKey(key); + } + /** * Get the contained range query keys * @return */ public Set getContainedTupleKeys() { - return containedTupleKeys; + return containedStreamKeys; } /** @@ -126,7 +145,7 @@ public Set getContainedTupleKeys() { * @param resultState */ public void merge(final Set rangeQueryState, final Map> joinQueryState) { - containedTupleKeys.addAll(rangeQueryState); + containedStreamKeys.addAll(rangeQueryState); containedJoinedKeys.putAll(joinQueryState); } } diff --git a/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousRangeQuery.java b/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousRangeQuery.java index 312bc842d..218c384d1 100644 --- a/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousRangeQuery.java +++ b/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousRangeQuery.java @@ -116,7 +116,7 @@ protected void handleNonMatch(final Tuple streamTuple) { final String streamKey = streamTuple.getKey(); - if(continuousQueryState.wasStreamKeyContainedInLastQuery(streamKey)) { + if(continuousQueryState.wasStreamKeyContainedInLastRangeQuery(streamKey)) { logger.debug("Key {} was contained in last execution, sending invalidation tuple", streamKey); generateInvalidationTuple(streamTuple, continuousQueryState, streamKey); } @@ -140,8 +140,9 @@ protected void handleInvalidationTuple(final Tuple streamTuple) { final String streamKey = streamTuple.getKey(); // Invalidate range query results - if(continuousQueryState.wasStreamKeyContainedInLastQuery(streamKey)) { + if(continuousQueryState.wasStreamKeyContainedInLastRangeQuery(streamKey)) { generateInvalidationTuple(streamTuple, continuousQueryState, streamKey); + continuousQueryState.removeStreamKeyFromRangeState(streamKey); } } @@ -154,7 +155,7 @@ protected void handleInvalidationTuple(final Tuple streamTuple) { private void generateInvalidationTuple(final Tuple streamTuple, final ContinuousQueryExecutionState continuousQueryState, final String streamKey) { - continuousQueryState.removeStreamKeyFromState(streamKey); + continuousQueryState.removeStreamKeyFromRangeState(streamKey); final long versionTimestamp = streamTuple.getVersionTimestamp(); final InvalidationTuple tuple = new InvalidationTuple(streamKey, versionTimestamp); diff --git a/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousSpatialJoinQuery.java b/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousSpatialJoinQuery.java index 0f363233c..d3f3a89e6 100644 --- a/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousSpatialJoinQuery.java +++ b/bboxdb-server/src/main/java/org/bboxdb/network/server/query/continuous/ContinuousSpatialJoinQuery.java @@ -229,9 +229,16 @@ protected void handleJoinMatchFinal(final Tuple streamTuple) { } @Override - protected void handleInvalidationTuple(final Tuple streamTuple) { + protected void handleInvalidationTuple(final Tuple streamTuple) { + final ContinuousQueryExecutionState continuousQueryState = continuousClientQuery.getContinuousQueryState(); + final String streamKey = streamTuple.getKey(); + continuousClientQuery.getContinuousQueryState().clearJoinPartnerState(); - generateInvalidationTuplesForStreamKey(streamTuple); + + if(continuousQueryState.wasStreamKeyContainedInLastJoinQuery(streamKey)) { + generateInvalidationTuplesForStreamKey(streamTuple); + continuousQueryState.removeStreamKeyFromJoinState(streamKey); + } } /** @@ -248,7 +255,7 @@ private void generateInvalidationTuplesForStreamKey(final Tuple streamTuple) { final List tables = Arrays.asList(queryPlan.getStreamTable(), queryPlan.getJoinTable()); // Invalidate join query results - final Set joinPartners = continuousQueryState.clearStateAndGetMissingJoinpartners(streamKey); + final Set joinPartners = continuousQueryState.commitStateAndGetMissingJoinpartners(streamKey); for(final String joinPartner : joinPartners) { final long versionTimestamp = streamTuple.getVersionTimestamp(); diff --git a/bboxdb-server/src/test/java/org/bboxdb/test/query/TestContinuousQueryExecutionState.java b/bboxdb-server/src/test/java/org/bboxdb/test/query/TestContinuousQueryExecutionState.java index 3be2b7810..6e705fd16 100644 --- a/bboxdb-server/src/test/java/org/bboxdb/test/query/TestContinuousQueryExecutionState.java +++ b/bboxdb-server/src/test/java/org/bboxdb/test/query/TestContinuousQueryExecutionState.java @@ -29,27 +29,30 @@ public class TestContinuousQueryExecutionState { public void testStreamState() { final ContinuousQueryExecutionState state = new ContinuousQueryExecutionState(); - Assert.assertFalse(state.wasStreamKeyContainedInLastQuery("abc")); + Assert.assertFalse(state.wasStreamKeyContainedInLastRangeQuery("abc")); state.addStreamKeyToState("abc"); - Assert.assertTrue(state.wasStreamKeyContainedInLastQuery("abc")); + Assert.assertTrue(state.wasStreamKeyContainedInLastRangeQuery("abc")); - Assert.assertTrue(state.removeStreamKeyFromState("abc")); - Assert.assertFalse(state.removeStreamKeyFromState("abc")); - Assert.assertFalse(state.wasStreamKeyContainedInLastQuery("abc")); + Assert.assertTrue(state.removeStreamKeyFromRangeState("abc")); + Assert.assertFalse(state.removeStreamKeyFromRangeState("abc")); + Assert.assertFalse(state.wasStreamKeyContainedInLastRangeQuery("abc")); } @Test(timeout = 60_000) public void testJoinState() { final ContinuousQueryExecutionState state = new ContinuousQueryExecutionState(); + Assert.assertFalse(state.wasStreamKeyContainedInLastJoinQuery("stream")); + state.clearJoinPartnerState(); state.addJoinCandidateForCurrentKey("abc"); state.addJoinCandidateForCurrentKey("def"); - final Set missingPartners1 = state.clearStateAndGetMissingJoinpartners("stream"); + final Set missingPartners1 = state.commitStateAndGetMissingJoinpartners("stream"); Assert.assertTrue(missingPartners1.isEmpty()); - - final Set missingPartners2 = state.clearStateAndGetMissingJoinpartners("stream"); + Assert.assertTrue(state.wasStreamKeyContainedInLastJoinQuery("stream")); + + final Set missingPartners2 = state.commitStateAndGetMissingJoinpartners("stream"); Assert.assertEquals(2, missingPartners2.size()); Assert.assertTrue(missingPartners2.contains("abc")); Assert.assertTrue(missingPartners2.contains("def")); @@ -57,12 +60,19 @@ public void testJoinState() { state.clearJoinPartnerState(); state.addJoinCandidateForCurrentKey("abc"); state.addJoinCandidateForCurrentKey("def"); - final Set missingPartners3 = state.clearStateAndGetMissingJoinpartners("stream"); + final Set missingPartners3 = state.commitStateAndGetMissingJoinpartners("stream"); Assert.assertTrue(missingPartners3.isEmpty()); state.addJoinCandidateForCurrentKey("def"); - final Set missingPartners4 = state.clearStateAndGetMissingJoinpartners("stream"); + final Set missingPartners4 = state.commitStateAndGetMissingJoinpartners("stream"); Assert.assertEquals(1, missingPartners4.size()); Assert.assertTrue(missingPartners4.contains("abc")); + + // Clear + state.removeStreamKeyFromJoinState("stream"); + Assert.assertFalse(state.wasStreamKeyContainedInLastJoinQuery("stream")); + final Set missingPartners5 = state.commitStateAndGetMissingJoinpartners("stream"); + Assert.assertTrue(missingPartners5.isEmpty()); + } }