Skip to content

Commit

Permalink
feat(proxy): health check support remove not active upstreams (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Oct 8, 2024
1 parent b48e6c2 commit 5b32abf
Showing 1 changed file with 31 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import com.netease.nim.camellia.redis.proxy.upstream.IUpstreamClient;
import com.netease.nim.camellia.redis.proxy.upstream.UpstreamRedisClientFactory;
import com.netease.nim.camellia.redis.proxy.util.ExecutorUtils;
import com.netease.nim.camellia.redis.proxy.util.TimeCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

Expand All @@ -22,6 +25,7 @@ public class ScheduledResourceChecker implements ResourceSelector.ResourceChecke

private final ConcurrentHashMap<String, IUpstreamClient> clientCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Boolean> validCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> lastCheckValidTime = new ConcurrentHashMap<>();

private final UpstreamRedisClientFactory factory;

Expand All @@ -33,13 +37,30 @@ public ScheduledResourceChecker(UpstreamRedisClientFactory factory) {
}

private void schedule() {
for (Map.Entry<String, IUpstreamClient> entry : clientCache.entrySet()) {
String url = entry.getKey();
boolean valid = entry.getValue().isValid();
if (!valid) {
logger.warn("IUpstreamClient with resource = {} not valid", url);
try {
long notActiveThresholdMs = ProxyDynamicConf.getInt("check.redis.resource.valid.not.active.threshold.seconds", 300) * 1000L;
Set<String> notActiveClients = new HashSet<>();
for (Map.Entry<String, IUpstreamClient> entry : clientCache.entrySet()) {
String url = entry.getKey();
boolean valid = entry.getValue().isValid();
if (!valid) {
logger.warn("IUpstreamClient with resource = {} not valid", url);
}
validCache.put(url, valid);
Long lastCheckTime = lastCheckValidTime.get(url);
if (lastCheckTime != null && TimeCache.currentMillis - lastCheckTime > notActiveThresholdMs) {
notActiveClients.add(url);
}
}
validCache.put(url, valid);
if (!notActiveClients.isEmpty()) {
for (String url : notActiveClients) {
clientCache.remove(url);
validCache.remove(url);
lastCheckValidTime.remove(url);
}
}
} catch (Exception e) {
logger.error("ScheduledResourceChecker error", e);
}
}

Expand All @@ -51,11 +72,14 @@ public void addResource(Resource resource) {
boolean valid = client.isValid();
logger.info("addResource to ScheduledResourceChecker, resource = {}, valid = {}", url, valid);
validCache.put(url, valid);
lastCheckValidTime.put(url, TimeCache.currentMillis);
}

@Override
public boolean checkValid(Resource resource) {
Boolean valid = validCache.get(resource.getUrl());
String url = resource.getUrl();
Boolean valid = validCache.get(url);
lastCheckValidTime.put(url, TimeCache.currentMillis);
return valid == null || valid;
}
}

0 comments on commit 5b32abf

Please sign in to comment.