Skip to content

Commit

Permalink
feat(kv): hot-key-calculator support set different config for read/wr…
Browse files Browse the repository at this point in the history
…ite command (#332)
  • Loading branch information
caojiajun committed Oct 12, 2024
1 parent 9e90951 commit c1ee941
Show file tree
Hide file tree
Showing 38 changed files with 67 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.netease.nim.camellia.redis.proxy.cluster.ClusterModeStatus;
import com.netease.nim.camellia.redis.proxy.cluster.ProxyClusterSlotMapUtils;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.upstream.kv.conf.RedisKvConf;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyType;
import com.netease.nim.camellia.tools.utils.BytesKey;
Expand Down Expand Up @@ -62,8 +63,8 @@ private void rebuild() {
this.capacity = capacity;
}

public boolean isHotKey(byte[] key) {
return hotKeyCalculator.isHotKey(key);
public boolean isHotKey(byte[] key, RedisCommand redisCommand) {
return hotKeyCalculator.isHotKey(key, redisCommand);
}

public void putAllForRead(int slot, byte[] cacheKey, RedisHash hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.upstream.kv.conf.RedisKvConf;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyType;
import com.netease.nim.camellia.redis.proxy.util.TimeCache;
Expand All @@ -24,6 +25,8 @@ public class HotKeyCalculator {
private final KeyType keyType;
private ConcurrentLinkedHashMap<BytesKey, Node> cache;
private int threshold;
private int readThreshold;
private int writeThreshold;
private int capacity;
private long window;

Expand All @@ -50,25 +53,39 @@ private void rebuild() {
}

int defaultThreshold = RedisKvConf.getInt(namespace, "kv.hot.key.threshold", 2);
int defaultReadThreshold = RedisKvConf.getInt(namespace, "kv.hot.key.read.threshold", -1);
int defaultWriteThreshold = RedisKvConf.getInt(namespace, "kv.hot.key.write.threshold", -1);
int defaultWindow = RedisKvConf.getInt(namespace, "kv.hot.key.time.window", 5000);

int threshold = RedisKvConf.getInt(namespace, keyType + ".kv.hot.key.threshold", defaultThreshold);
int readThreshold = RedisKvConf.getInt(namespace, keyType + ".kv.hot.key.read.threshold", defaultReadThreshold);
int writeThreshold = RedisKvConf.getInt(namespace, keyType + ".kv.hot.key.write.threshold", defaultWriteThreshold);
int window = RedisKvConf.getInt(namespace, keyType + ".kv.hot.key.time.window", defaultWindow);
if (this.threshold != threshold) {
logger.info("kv hot key calculator cache build, namespace = {}, keyType = {}, threshold = {}", namespace, keyType, threshold);

if (this.threshold != threshold || this.readThreshold != readThreshold || this.writeThreshold != writeThreshold) {
logger.info("kv hot key calculator cache config update, namespace = {}, keyType = {}, threshold = {}, read.threshold = {}, write.threshold = {}",
namespace, keyType, threshold, readThreshold, writeThreshold);
this.threshold = threshold;
if (this.threshold < 0) {
this.readThreshold = readThreshold;
this.writeThreshold = writeThreshold;
if (this.threshold < 0 && this.writeThreshold < 0 && this.readThreshold < 0) {
cache.clear();
}
}
if (this.window != window) {
logger.info("kv hot key calculator cache build, namespace = {}, keyType = {}, window = {}", namespace, keyType, window);
logger.info("kv hot key calculator cache config update, namespace = {}, keyType = {}, window = {}", namespace, keyType, window);
this.window = window;
}
}

public boolean isHotKey(byte[] key) {
if (threshold < 0) {
public boolean isHotKey(byte[] key, RedisCommand redisCommand) {
int targetThreshold;
if (redisCommand.getType() == RedisCommand.Type.READ) {
targetThreshold = readThreshold < 0 ? threshold : readThreshold;
} else {
targetThreshold = writeThreshold < 0 ? threshold : writeThreshold;
}
if (targetThreshold < 0) {
return false;
}
BytesKey bytesKey = new BytesKey(key);
Expand All @@ -78,7 +95,7 @@ public boolean isHotKey(byte[] key) {
node.count.set(last / 2);
node.time = TimeCache.currentMillis;
}
return node.count.incrementAndGet() > threshold;
return node.count.incrementAndGet() > targetThreshold;
}

public long estimateSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.netease.nim.camellia.redis.proxy.cluster.ClusterModeStatus;
import com.netease.nim.camellia.redis.proxy.cluster.ProxyClusterSlotMapUtils;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.upstream.kv.conf.RedisKvConf;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyType;
import com.netease.nim.camellia.tools.utils.BytesKey;
Expand Down Expand Up @@ -62,8 +63,8 @@ private void rebuild() {
this.capacity = capacity;
}

public boolean isHotKey(byte[] key) {
return hotKeyCalculator.isHotKey(key);
public boolean isHotKey(byte[] key, RedisCommand redisCommand) {
return hotKeyCalculator.isHotKey(key, redisCommand);
}

public void putAllForRead(int slot, byte[] cacheKey, RedisSet set) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.netease.nim.camellia.redis.proxy.cluster.ClusterModeStatus;
import com.netease.nim.camellia.redis.proxy.cluster.ProxyClusterSlotMapUtils;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.zset.utils.ZSetLex;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.zset.utils.ZSetScore;
import com.netease.nim.camellia.redis.proxy.upstream.kv.conf.RedisKvConf;
Expand Down Expand Up @@ -63,8 +64,8 @@ private void rebuild() {
this.capacity = capacity;
}

public boolean isHotKey(byte[] key) {
return hotKeyCalculator.isHotKey(key);
public boolean isHotKey(byte[] key, RedisCommand redisCommand) {
return hotKeyCalculator.isHotKey(key, redisCommand);
}

public void putZSetForWrite(int slot, byte[] cacheKey, RedisZSet zSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected Reply execute(int slot, Command command) {
return IntegerReply.REPLY_0;
}
if (deleteMaps == null) {
boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
//
type = KvCacheMonitor.Type.kv_store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected Reply execute(int slot, Command command) {
return IntegerReply.parse(hash.hexists(new BytesKey(field)));
}

boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
hash = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected Reply execute(int slot, Command command) {
return new BulkReply(hash.hget(new BytesKey(field)));
}

boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
hash = loadLRUCache(slot, keyMeta, key);
hashLRUCache.putAllForRead(slot, cacheKey, hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected Reply execute(int slot, Command command) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return toReply(hash.hgetAll());
}
boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
hash = loadLRUCache(slot, keyMeta, key);
hashLRUCache.putAllForRead(slot, cacheKey, hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private Reply getSizeFromCache(int slot, KeyMeta keyMeta, byte[] key, byte[] cac
if (checkHotKey) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (encodeVersion == EncodeVersion.version_1) {
boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
hash = loadLRUCache(slot, keyMeta, key);
hashLRUCache.putAllForRead(slot, cacheKey, hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected Reply execute(int slot, Command command) {
return toReply2(fields, hash.hgetAll());
}

boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
hash = loadLRUCache(slot, keyMeta, key);
hashLRUCache.putAllForRead(slot, cacheKey, hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected Reply execute(int slot, Command command) {
} else {
existMapByCache = hashLRUCache.hset(slot, cacheKey, fieldMap);
if (existMapByCache == null) {
boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
//
type = KvCacheMonitor.Type.kv_store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected Reply execute(int slot, Command command) {

RedisHash hash = hashLRUCache.getForWrite(slot, cacheKey);
if (hash == null) {
boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
//
type = KvCacheMonitor.Type.kv_store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected Reply execute(int slot, Command command) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return IntegerReply.parse(hash.hstrlen(new BytesKey(field)));
}
boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
hash = loadLRUCache(slot, keyMeta, key);
hashLRUCache.putAllForRead(slot, cacheKey, hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected Reply execute(int slot, Command command) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return toReply(hash.hgetAll());
}
boolean hotKey = hashLRUCache.isHotKey(key);
boolean hotKey = hashLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
hash = loadLRUCache(slot, keyMeta, key);
hashLRUCache.putAllForRead(slot, cacheKey, hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected Reply execute(int slot, Command command) {
} else {
Set<BytesKey> existsSet = setLRUCache.sadd(slot, cacheKey, memberSet);
if (existsSet == null) {
boolean hotKey = setLRUCache.isHotKey(key);
boolean hotKey = setLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
//
type = KvCacheMonitor.Type.kv_store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected Reply execute(int slot, Command command) {
}

if (encodeVersion == EncodeVersion.version_1) {
boolean hotKey = setLRUCache.isHotKey(key);
boolean hotKey = setLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
set = loadLRUCache(slot, keyMeta, key);
setLRUCache.putAllForRead(slot, cacheKey, set);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected Reply execute(int slot, Command command) {
return IntegerReply.parse(sismeber ? 1 : 0);
}

boolean hotKey = setLRUCache.isHotKey(key);
boolean hotKey = setLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
set = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected Reply execute(int slot, Command command) {
return toReply(smismember, members);
}

boolean hotKey = setLRUCache.isHotKey(key);
boolean hotKey = setLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
set = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected Reply execute(int slot, Command command) {
}

if (spop == null) {
boolean hotKey = setLRUCache.isHotKey(key);
boolean hotKey = setLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
//
type = KvCacheMonitor.Type.kv_store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected Reply execute(int slot, Command command) {
return toReply(srandmember, batch);
}

boolean hotKey = setLRUCache.isHotKey(key);
boolean hotKey = setLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
set = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected Reply execute(int slot, Command command) {
}

if (removedMembers == null) {
boolean hotKey = setLRUCache.isHotKey(key);
boolean hotKey = setLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
//
type = KvCacheMonitor.Type.kv_store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected Reply execute(int slot, Command command) {
} else {
Map<BytesKey, Double> map = zSetLRUCache.zadd(slot, cacheKey, memberMap);
if (map == null) {
boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
if (zSet != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected Reply execute(int slot, Command command) {
return IntegerReply.parse(zcount);
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected Reply execute(int slot, Command command) {
return IntegerReply.parse(zcount);
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected Reply execute(int slot, Command command) {
return toReply(zSet.zmscore(members));
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected Reply execute(int slot, Command command) {
return ZSetTupleUtils.toReply(list, false);
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected Reply execute(int slot, Command command) {
return ZSetTupleUtils.toReply(list, withScores);
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected Reply execute(int slot, Command command) {
return ZSetTupleUtils.toReply(list, withScores);
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected Reply execute(int slot, Command command) {
return toReply(zrank, withScores);
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected Reply execute(int slot, Command command) {
}

if (removedMembers == null) {
boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
RedisZSet zSet = loadLRUCache(slot, keyMeta, key);
if (zSet != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected Reply execute(int slot, Command command) {
}

if (removedMembers == null) {
boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
RedisZSet zSet = loadLRUCache(slot, keyMeta, key);
if (zSet != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected Reply execute(int slot, Command command) {
}

if (removedMap == null) {
boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
RedisZSet zSet = loadLRUCache(slot, keyMeta, key);
if (zSet != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected Reply execute(int slot, Command command) {
}

if (removedMap == null) {
boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());
if (hotKey) {
RedisZSet zSet = loadLRUCache(slot, keyMeta, key);
if (zSet != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected Reply execute(int slot, Command command) {
return ZSetTupleUtils.toReply(list, false);
}

boolean hotKey = zSetLRUCache.isHotKey(key);
boolean hotKey = zSetLRUCache.isHotKey(key, redisCommand());

if (hotKey) {
zSet = loadLRUCache(slot, keyMeta, key);
Expand Down
Loading

0 comments on commit c1ee941

Please sign in to comment.