Skip to content

Commit

Permalink
Silently publish the JCache event
Browse files Browse the repository at this point in the history
This slight change is to ensure that the `ignoreSynchronous()` does not
ignore other events already recorded. However, we want to ignore any
events occuring on the background executor. Since these types of
evictions are not user facing and may be asynchronous, they are not
required to be blocking calls to user-facing threads.
  • Loading branch information
ben-manes committed Jul 13, 2015
1 parent 6fce9ac commit ba81c36
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ compile 'com.github.ben-manes.caffeine:caffeine:1.3.0'
compile 'com.github.ben-manes.caffeine:guava:1.3.0'
compile 'com.github.ben-manes.caffeine:jcache:1.3.0'
compile 'com.github.ben-manes.caffeine:tracing-async:1.3.0'
// Transitive requirement (if jars manually managed)
compile 'com.github.ben-manes.caffeine:tracing-api:1.3.0'
```

Snapshots of the development version are available in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void deregister(CacheEntryListenerConfiguration<K, V> configuration) {
* @param value the entry's value
*/
public void publishCreated(Cache<K, V> cache, K key, V value) {
publish(new JCacheEntryEvent<>(cache, EventType.CREATED, key, null, value));
publish(new JCacheEntryEvent<>(cache, EventType.CREATED, key, null, value), false);
}

/**
Expand All @@ -116,7 +116,7 @@ public void publishCreated(Cache<K, V> cache, K key, V value) {
* @param newValue the entry's new value
*/
public void publishUpdated(Cache<K, V> cache, K key, V oldValue, V newValue) {
publish(new JCacheEntryEvent<>(cache, EventType.UPDATED, key, oldValue, newValue));
publish(new JCacheEntryEvent<>(cache, EventType.UPDATED, key, oldValue, newValue), false);
}

/**
Expand All @@ -127,7 +127,19 @@ public void publishUpdated(Cache<K, V> cache, K key, V oldValue, V newValue) {
* @param value the entry's value
*/
public void publishRemoved(Cache<K, V> cache, K key, V value) {
publish(new JCacheEntryEvent<>(cache, EventType.REMOVED, key, null, value));
publish(new JCacheEntryEvent<>(cache, EventType.REMOVED, key, null, value), false);
}

/**
* Publishes a remove event for the entry to all of the interested listeners. This method does
* not register the synchronous listener's future with {@link #awaitSynchronous()}.
*
* @param cache the cache where the entry was removed
* @param key the entry's key
* @param value the entry's value
*/
public void publishRemovedQuietly(Cache<K, V> cache, K key, V value) {
publish(new JCacheEntryEvent<>(cache, EventType.REMOVED, key, null, value), true);
}

/**
Expand All @@ -138,7 +150,19 @@ public void publishRemoved(Cache<K, V> cache, K key, V value) {
* @param value the entry's value
*/
public void publishExpired(Cache<K, V> cache, K key, V value) {
publish(new JCacheEntryEvent<>(cache, EventType.EXPIRED, key, value, null));
publish(new JCacheEntryEvent<>(cache, EventType.EXPIRED, key, value, null), false);
}

/**
* Publishes a expire event for the entry to all of the interested listeners. This method does
* not register the synchronous listener's future with {@link #awaitSynchronous()}.
*
* @param cache the cache where the entry expired
* @param key the entry's key
* @param value the entry's value
*/
public void publishExpiredQuietly(Cache<K, V> cache, K key, V value) {
publish(new JCacheEntryEvent<>(cache, EventType.EXPIRED, key, value, null), true);
}

/**
Expand Down Expand Up @@ -167,7 +191,7 @@ public void ignoreSynchronous() {
}

/** Broadcasts the event to all of the interested listener's dispatch queues. */
private void publish(JCacheEntryEvent<K, V> event) {
private void publish(JCacheEntryEvent<K, V> event, boolean quiet) {
dispatchQueues.keySet().stream()
.filter(registration -> registration.getCacheEntryFilter().evaluate(event))
.filter(registration -> registration.getCacheEntryListener().isCompatible(event))
Expand All @@ -177,7 +201,7 @@ private void publish(JCacheEntryEvent<K, V> event) {
Runnable action = () -> registration.getCacheEntryListener().dispatch(event);
return queue.thenRunAsync(action, exectuor);
});
if ((future != null) && registration.isSynchronous()) {
if ((future != null) && registration.isSynchronous() && !quiet) {
return future;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ public void write(K key, Expirable<V> value) {}
public void delete(K key, Expirable<V> value, RemovalCause cause) {
if (cause.wasEvicted()) {
if (cause == RemovalCause.EXPIRED) {
dispatcher.publishExpired(cache, key, value.get());
dispatcher.publishExpiredQuietly(cache, key, value.get());
} else {
dispatcher.publishRemoved(cache, key, value.get());
dispatcher.publishRemovedQuietly(cache, key, value.get());
}
dispatcher.ignoreSynchronous();
statistics.recordEvictions(1L);
}
}
Expand Down

0 comments on commit ba81c36

Please sign in to comment.