Skip to content

Commit

Permalink
Don't refresh automatically when waiting for a slow reload (fixed #175)
Browse files Browse the repository at this point in the history
If the load time exceeded the refreshAfterWrite time then a new refresh
is triggered and the previous was ignored. This was due to how the write
timestamp is used, which was set to the current time to delay expiration
and refresh. When the refresh completes it CAS's to the new value only if
no other writes occurred (so as to not stomp explicit writes and reload a
stale read). The slow reload meant that only the last refresh would be
honored, which may not occur for a long time on a busy system.

This also removes a sync vs async difference that I never remembered why
it was in place. Async didn't have this problem due to the timestamp being
pushed out into the distant future. Since I didn't recall why these were
special cased I had kept them as is, hoping the previous me was smarter
than the current. Seems both were pretty dumb.
  • Loading branch information
ben-manes committed Aug 4, 2017
1 parent 6ccd359 commit 26a9838
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ boolean hasExpired(Node<K, V> node, long now) {
* @param node the entry to evict
* @param cause the reason to evict
* @param now the current time, used only if expiring
* @return if the entry was evicted
*/
@GuardedBy("evictionLock")
@SuppressWarnings({"PMD.CollapsibleIfStatements", "GuardedByChecker"})
Expand Down Expand Up @@ -864,7 +865,7 @@ void refreshIfNeeded(Node<K, V> node, long now) {
K key;
V oldValue;
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = isAsync ? (now + Async.MAXIMUM_EXPIRY) : now;
long refreshWriteTime = (now + Async.MAXIMUM_EXPIRY);
if (((now - oldWriteTime) > refreshAfterWriteNanos())
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ConcurrentModificationException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -411,7 +413,9 @@ <K1 extends K, V1 extends V> Weigher<K1, V1> getWeigher(boolean isAsync) {
* {@link WeakReference} (by default, strong references are used).
* <p>
* <b>Warning:</b> when this method is used, the resulting cache will use identity ({@code ==})
* comparison to determine equality of keys.
* comparison to determine equality of keys. Its {@link Cache#asMap} view will therefore
* technically violate the {@link Map} specification (in the same way that {@link IdentityHashMap}
* does).
* <p>
* Entries with keys that have been garbage collected may be counted in
* {@link Cache#estimatedSize()}, but will never be visible to read or write operations; such
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

Expand All @@ -36,8 +38,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.testng.annotations.Listeners;
Expand All @@ -60,6 +64,7 @@
import com.github.benmanes.caffeine.cache.testing.CheckNoWriter;
import com.github.benmanes.caffeine.cache.testing.RefreshAfterWrite;
import com.github.benmanes.caffeine.cache.testing.RemovalNotification;
import com.github.benmanes.caffeine.cache.testing.TrackingExecutor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -206,6 +211,45 @@ public void get_sameFuture(CacheContext context) {
await().until(() -> cache.synchronous().getIfPresent(key), is(-key));
}

@Test(dataProvider = "caches")
@CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY,
refreshAfterWrite = Expire.ONE_MINUTE, executor = CacheExecutor.THREADED)
public void get_slowRefresh(CacheContext context) {
Integer key = context.absentKey();
Integer originalValue = context.absentValue();
AtomicBoolean reloaded = new AtomicBoolean();
AtomicInteger reloading = new AtomicInteger();
ThreadPoolExecutor executor = (ThreadPoolExecutor)
((TrackingExecutor) context.executor()).delegate();
LoadingCache<Integer, Integer> cache = context.build(new CacheLoader<Integer, Integer>() {
@Override public Integer load(Integer key) {
throw new AssertionError();
}
@Override public Integer reload(Integer key, Integer oldValue) {
int count = reloading.incrementAndGet();
await().untilTrue(reloaded);
return count;
}
});

cache.put(key, originalValue);

context.ticker().advance(2, TimeUnit.MINUTES);
assertThat(cache.get(key), is(originalValue));

await().untilAtomic(reloading, is(1));
assertThat(cache.getIfPresent(key), is(originalValue));

context.ticker().advance(2, TimeUnit.MINUTES);
assertThat(cache.get(key), is(originalValue));

reloaded.set(true);
await().until(() -> cache.get(key), is(not(originalValue)));
await().until(executor::getQueue, is(empty()));
assertThat(reloading.get(), is(1));
assertThat(cache.get(key), is(1));
}

@Test(dataProvider = "caches")
@CacheSpec(refreshAfterWrite = Expire.ONE_MINUTE, loader = Loader.NULL)
public void get_null(AsyncLoadingCache<Integer, Integer> cache, CacheContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@
*/
public final class TrackingExecutor extends ForwardingExecutorService {
private final ExecutorService delegate;
private final AtomicInteger totalTasks;
private final AtomicInteger failures;

public TrackingExecutor(ExecutorService executor) {
delegate = requireNonNull(executor);
totalTasks = new AtomicInteger();
failures = new AtomicInteger();
}

Expand All @@ -55,16 +53,12 @@ public void execute(Runnable command) {
}
}

public int totalTasksCount() {
return totalTasks.get();
}

public int failureCount() {
return failures.get();
}

@Override
protected ExecutorService delegate() {
public ExecutorService delegate() {
return delegate;
}
}
5 changes: 5 additions & 0 deletions config/findbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<Method name="accept"/>
<Bug code="NP"/>
</Match>
<Match>
<Class name="com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache$AsyncBulkCompleter"/>
<Method name="lambda$fillProxies$0"/>
<Bug code="RCN"/>
</Match>
<Match>
<Class name="com.github.benmanes.caffeine.cache.BoundedLocalCache"/>
<Method name="performCleanUp"/>
Expand Down
18 changes: 9 additions & 9 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ext {
jsr305: '3.0.2',
jsr330: '1',
stream: '2.9.5',
univocityParsers: '2.4.1',
univocityParsers: '2.5.1',
ycsb: '0.12.0',
xz: '1.6',
]
Expand All @@ -51,7 +51,7 @@ ext {
junit: '4.12',
mockito: '2.8.47',
paxExam: '4.11.0',
testng: '6.11',
testng: '6.12',
truth: '0.24',
]
benchmarkVersions = [
Expand All @@ -62,33 +62,33 @@ ext {
ehcache3: '3.3.1',
elasticSearch: '5.4.0',
expiringMap: '0.5.8',
jackrabbit: '1.7.3',
jamm: '0.3.1',
jackrabbit: '1.7.5',
jamm: '0.3.2',
javaObjectLayout: '0.8',
jmh: 1.19,
koloboke: '0.6.8',
ohc: '0.6.1',
rapidoid: '5.3.5',
rapidoid: '5.4.0',
slf4j: '1.7.25',
tcache: '1.0.3',
]
pluginVersions = [
buildscan: '1.8',
buildscanRecipes: '0.2.0',
checkstyle: '8.0',
checkstyle: '8.1',
coveralls: '2.8.1',
coverity: '1.0.10',
errorProne: '0.0.10',
jacoco: '0.7.9',
jmh: '0.4.2',
jmhReport: '0.4.1',
jmhReport: '0.5.0',
nexus: '2.3.1',
pmd: '5.8.1',
propdeps: '0.0.10.RELEASE',
semanticVersioning: '1.1.0',
shadow: '2.0.1',
sonarqube: '2.5',
spotbugs: '1.0',
spotbugs: '1.2',
stats: '0.2.0',
versions: '0.15.0',
]
Expand Down Expand Up @@ -130,7 +130,7 @@ ext {
exclude group: 'org.hamcrest'
},
osgiCompile: [
'org.apache.felix:org.apache.felix.framework:5.6.4',
'org.apache.felix:org.apache.felix.framework:5.6.6',
"org.ops4j.pax.exam:pax-exam-junit4:${testVersions.paxExam}",
],
osgiRuntime: [
Expand Down
Loading

0 comments on commit 26a9838

Please sign in to comment.