Skip to content

Commit

Permalink
feat(kv): zset version1 encode support async write (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Nov 8, 2024
1 parent 6aea084 commit f685ffc
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,8 @@ protected final void submitAsyncWriteTask(int slot, Result result, Runnable runn
throw e;
}
}

protected final void submitAsyncWriteTask(int slot, Runnable runnable) {
asyncWriteExecutor.submit(slot, runnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ private Reply zaddVersion1(int slot, KeyMeta keyMeta, byte[] key, byte[] cacheKe
if (result.isKvWriteDelayEnable()) {
submitAsyncWriteTask(slot, result, () -> kvClient.batchPut(slot, list));
} else {
kvClient.batchPut(slot, list);
if (cacheConfig.isZSetVersion1KvWriteAsyncEnable()) {
submitAsyncWriteTask(slot, () -> kvClient.batchPut(slot, list));
} else {
kvClient.batchPut(slot, list);
}
}
List<CompletableFuture<Reply>> futureList = null;
if (!memberIndexCacheWriteCommands.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ protected final Reply zremVersion1(int slot, KeyMeta keyMeta, byte[] key, byte[]
});
} else {
if (!deleteSubKeys.isEmpty()) {
kvClient.batchDelete(slot, deleteSubKeys.toArray(new byte[0][0]));
if (cacheConfig.isZSetVersion1KvWriteAsyncEnable()) {
submitAsyncWriteTask(slot, () -> kvClient.batchDelete(slot, deleteSubKeys.toArray(new byte[0][0])));
} else {
kvClient.batchDelete(slot, deleteSubKeys.toArray(new byte[0][0]));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class CacheConfig {
private final SetLRUCache setLRUCache;
private final ZSetIndexLRUCache zSetIndexLRUCache;

private boolean zsetVersion1KvWriteAsyncEnable;

public CacheConfig(String namespace) {
this.namespace = namespace;
initCacheConfig();
Expand Down Expand Up @@ -117,6 +119,11 @@ private void initCacheConfig() {
}
logger.info("kv.set.local.cache.enable = {}, namespace = {}", setLocalCacheEnable, namespace);
}
boolean zsetVersion1KvWriteAsyncEnable = RedisKvConf.getBoolean(namespace, "kv.zset.version1.kv.async.write.enable", false);
if ((this.zsetVersion1KvWriteAsyncEnable && !zsetVersion1KvWriteAsyncEnable) || (!this.zsetVersion1KvWriteAsyncEnable && zsetVersion1KvWriteAsyncEnable)) {
this.zsetVersion1KvWriteAsyncEnable = zsetVersion1KvWriteAsyncEnable;
logger.info("kv.zset.version1.kv.async.write.enable = {}, namespace = {}",zsetVersion1KvWriteAsyncEnable, namespace);
}
}

public String getNamespace() {
Expand All @@ -139,6 +146,10 @@ public boolean isSetLocalCacheEnable() {
return setLocalCacheEnable;
}

public boolean isZSetVersion1KvWriteAsyncEnable() {
return zsetVersion1KvWriteAsyncEnable;
}

public KeyMetaLRUCache getKeyMetaLRUCache() {
return keyMetaLRUCache;
}
Expand Down

0 comments on commit f685ffc

Please sign in to comment.