Skip to content

Commit

Permalink
Send invalidations only if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
jnidzwetzki committed Jun 18, 2021
1 parent 3298072 commit 01798ee
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ContinuousQueryExecutionState {
/**
* The stream tuple that are matched in the last query execution
*/
protected final Set<String> containedTupleKeys;
protected final Set<String> containedStreamKeys;

/**
* The stream tuples and their join partners that are used in the last execution
Expand All @@ -40,35 +40,35 @@ public class ContinuousQueryExecutionState {
protected final Set<String> joinPartnersForCurrentKey;

public ContinuousQueryExecutionState() {
this.containedTupleKeys = new HashSet<>();
this.containedStreamKeys = new HashSet<>();
this.containedJoinedKeys = new HashMap<>();
this.joinPartnersForCurrentKey = new HashSet<>();
}

/**
* Was the stream key contained in the last query
* Was the stream key contained in the last range query
* @param key
* @return
*/
public boolean wasStreamKeyContainedInLastQuery(final String key) {
return containedTupleKeys.contains(key);
public boolean wasStreamKeyContainedInLastRangeQuery(final String key) {
return containedStreamKeys.contains(key);
}

/**
* Remove the given key from the state
* @param key
* @return
*/
public boolean removeStreamKeyFromState(final String key) {
return containedTupleKeys.remove(key);
public boolean removeStreamKeyFromRangeState(final String key) {
return containedStreamKeys.remove(key);
}

/**
* Add the given key to the state
* @param key
*/
public void addStreamKeyToState(final String key) {
containedTupleKeys.add(key);
containedStreamKeys.add(key);
}

/**
Expand All @@ -84,7 +84,7 @@ public void addJoinCandidateForCurrentKey(final String key) {
* @param streamKey
* @return
*/
public Set<String> clearStateAndGetMissingJoinpartners(final String streamKey) {
public Set<String> commitStateAndGetMissingJoinpartners(final String streamKey) {

final Set<String> oldJoinPartners = containedJoinedKeys.getOrDefault(streamKey, new HashSet<>());

Expand Down Expand Up @@ -113,20 +113,39 @@ public void clearJoinPartnerState() {
public Map<String, Set<String>> getContainedJoinedKeys() {
return containedJoinedKeys;
}

/**
* Remove the stream key from join state
* @param streamKey
* @return
*/
public Set<String> removeStreamKeyFromJoinState(final String streamKey) {
return containedJoinedKeys.remove(streamKey);
}

/**
* Was the stream key contained in the last join query
* @param key
* @return
*/
public boolean wasStreamKeyContainedInLastJoinQuery(final String key) {
return containedJoinedKeys.containsKey(key);
}

/**
* Get the contained range query keys
* @return
*/
public Set<String> getContainedTupleKeys() {
return containedTupleKeys;
return containedStreamKeys;
}

/**
* Merge the given state into the local one
* @param resultState
*/
public void merge(final Set<String> rangeQueryState, final Map<String, Set<String>> joinQueryState) {
containedTupleKeys.addAll(rangeQueryState);
containedStreamKeys.addAll(rangeQueryState);
containedJoinedKeys.putAll(joinQueryState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected void handleNonMatch(final Tuple streamTuple) {

final String streamKey = streamTuple.getKey();

if(continuousQueryState.wasStreamKeyContainedInLastQuery(streamKey)) {
if(continuousQueryState.wasStreamKeyContainedInLastRangeQuery(streamKey)) {
logger.debug("Key {} was contained in last execution, sending invalidation tuple", streamKey);
generateInvalidationTuple(streamTuple, continuousQueryState, streamKey);
}
Expand All @@ -140,8 +140,9 @@ protected void handleInvalidationTuple(final Tuple streamTuple) {
final String streamKey = streamTuple.getKey();

// Invalidate range query results
if(continuousQueryState.wasStreamKeyContainedInLastQuery(streamKey)) {
if(continuousQueryState.wasStreamKeyContainedInLastRangeQuery(streamKey)) {
generateInvalidationTuple(streamTuple, continuousQueryState, streamKey);
continuousQueryState.removeStreamKeyFromRangeState(streamKey);
}
}

Expand All @@ -154,7 +155,7 @@ protected void handleInvalidationTuple(final Tuple streamTuple) {
private void generateInvalidationTuple(final Tuple streamTuple,
final ContinuousQueryExecutionState continuousQueryState, final String streamKey) {

continuousQueryState.removeStreamKeyFromState(streamKey);
continuousQueryState.removeStreamKeyFromRangeState(streamKey);

final long versionTimestamp = streamTuple.getVersionTimestamp();
final InvalidationTuple tuple = new InvalidationTuple(streamKey, versionTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,16 @@ protected void handleJoinMatchFinal(final Tuple streamTuple) {
}

@Override
protected void handleInvalidationTuple(final Tuple streamTuple) {
protected void handleInvalidationTuple(final Tuple streamTuple) {
final ContinuousQueryExecutionState continuousQueryState = continuousClientQuery.getContinuousQueryState();
final String streamKey = streamTuple.getKey();

continuousClientQuery.getContinuousQueryState().clearJoinPartnerState();
generateInvalidationTuplesForStreamKey(streamTuple);

if(continuousQueryState.wasStreamKeyContainedInLastJoinQuery(streamKey)) {
generateInvalidationTuplesForStreamKey(streamTuple);
continuousQueryState.removeStreamKeyFromJoinState(streamKey);
}
}

/**
Expand All @@ -248,7 +255,7 @@ private void generateInvalidationTuplesForStreamKey(final Tuple streamTuple) {
final List<String> tables = Arrays.asList(queryPlan.getStreamTable(), queryPlan.getJoinTable());

// Invalidate join query results
final Set<String> joinPartners = continuousQueryState.clearStateAndGetMissingJoinpartners(streamKey);
final Set<String> joinPartners = continuousQueryState.commitStateAndGetMissingJoinpartners(streamKey);

for(final String joinPartner : joinPartners) {
final long versionTimestamp = streamTuple.getVersionTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,50 @@ public class TestContinuousQueryExecutionState {
public void testStreamState() {
final ContinuousQueryExecutionState state = new ContinuousQueryExecutionState();

Assert.assertFalse(state.wasStreamKeyContainedInLastQuery("abc"));
Assert.assertFalse(state.wasStreamKeyContainedInLastRangeQuery("abc"));
state.addStreamKeyToState("abc");
Assert.assertTrue(state.wasStreamKeyContainedInLastQuery("abc"));
Assert.assertTrue(state.wasStreamKeyContainedInLastRangeQuery("abc"));

Assert.assertTrue(state.removeStreamKeyFromState("abc"));
Assert.assertFalse(state.removeStreamKeyFromState("abc"));
Assert.assertFalse(state.wasStreamKeyContainedInLastQuery("abc"));
Assert.assertTrue(state.removeStreamKeyFromRangeState("abc"));
Assert.assertFalse(state.removeStreamKeyFromRangeState("abc"));
Assert.assertFalse(state.wasStreamKeyContainedInLastRangeQuery("abc"));
}

@Test(timeout = 60_000)
public void testJoinState() {
final ContinuousQueryExecutionState state = new ContinuousQueryExecutionState();

Assert.assertFalse(state.wasStreamKeyContainedInLastJoinQuery("stream"));

state.clearJoinPartnerState();
state.addJoinCandidateForCurrentKey("abc");
state.addJoinCandidateForCurrentKey("def");

final Set<String> missingPartners1 = state.clearStateAndGetMissingJoinpartners("stream");
final Set<String> missingPartners1 = state.commitStateAndGetMissingJoinpartners("stream");
Assert.assertTrue(missingPartners1.isEmpty());

final Set<String> missingPartners2 = state.clearStateAndGetMissingJoinpartners("stream");
Assert.assertTrue(state.wasStreamKeyContainedInLastJoinQuery("stream"));

final Set<String> missingPartners2 = state.commitStateAndGetMissingJoinpartners("stream");
Assert.assertEquals(2, missingPartners2.size());
Assert.assertTrue(missingPartners2.contains("abc"));
Assert.assertTrue(missingPartners2.contains("def"));

state.clearJoinPartnerState();
state.addJoinCandidateForCurrentKey("abc");
state.addJoinCandidateForCurrentKey("def");
final Set<String> missingPartners3 = state.clearStateAndGetMissingJoinpartners("stream");
final Set<String> missingPartners3 = state.commitStateAndGetMissingJoinpartners("stream");
Assert.assertTrue(missingPartners3.isEmpty());

state.addJoinCandidateForCurrentKey("def");
final Set<String> missingPartners4 = state.clearStateAndGetMissingJoinpartners("stream");
final Set<String> missingPartners4 = state.commitStateAndGetMissingJoinpartners("stream");
Assert.assertEquals(1, missingPartners4.size());
Assert.assertTrue(missingPartners4.contains("abc"));

// Clear
state.removeStreamKeyFromJoinState("stream");
Assert.assertFalse(state.wasStreamKeyContainedInLastJoinQuery("stream"));
final Set<String> missingPartners5 = state.commitStateAndGetMissingJoinpartners("stream");
Assert.assertTrue(missingPartners5.isEmpty());

}
}

0 comments on commit 01798ee

Please sign in to comment.