diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java index cfe20c694..3a15fd9c9 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java @@ -35,6 +35,7 @@ public class ConsensusProxyClusterModeProvider extends AbstractProxyClusterModeP private static final Logger logger = LoggerFactory.getLogger(ConsensusProxyClusterModeProvider.class); private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock initLock = new ReentrantLock(); private ConsensusLeaderSelector leaderSelector; private volatile ProxyNode leader; private volatile ProxyClusterSlotMap slotMap; @@ -65,16 +66,46 @@ public void init() { } //add leader change listener addLeaderChangeListener(); - //init + //init leader or follower + initLeaderOrFollower(); + //heartbeat to follower if current node is leader + startHeartbeatToSlave(); + //heartbeat to leader if current node is follower + startHeartbeatToMaster(); + } + + private void addLeaderChangeListener() { + //add leader change listener + leaderSelector.addConsensusLeaderChangeListener(() -> { + initLock.lock(); + try { + leader = leaderSelector.getLeader(); + initLeaderOrFollower(); + } finally { + initLock.unlock(); + } + }); + //schedule for reveal all the details + schedule.scheduleAtFixedRate(() -> { + initLock.lock(); + try { + ProxyNode newLeader = leaderSelector.getLeader(); + if (newLeader != null && leader != newLeader) { + leader = newLeader; + initLeaderOrFollower(); + } + } finally { + initLock.unlock(); + } + }, 10, 10, TimeUnit.SECONDS); + } + + private void initLeaderOrFollower() { if (currentNodeLeader()) { initLeader(); } else { initFollower(); } - //heartbeat to follower if current node is leader - startHeartbeatToSlave(); - //heartbeat to leader if current node is follower - startHeartbeatToMaster(); } private void initLeader() { @@ -122,24 +153,6 @@ private void initFollower() { updateSlotMap(this.slotMap, newSlotMap, "initFollower", false); } - private void sleep(long ms) { - try { - TimeUnit.MILLISECONDS.sleep(ms); - } catch (InterruptedException e) { - logger.error(e.toString(), e); - } - } - - private void addLeaderChangeListener() { - leaderSelector.addConsensusLeaderChangeListener(() -> { - leader = leaderSelector.getLeader(); - if (currentNodeLeader()) { - initLeader(); - } else { - initFollower(); - } - }); - } private void startHeartbeatToSlave() { int intervalSeconds = ClusterModeConfig.clusterModeHeartbeatIntervalSeconds(); @@ -154,7 +167,7 @@ private void startHeartbeatToMaster() { private ProxyClusterSlotMap getSlotMapFromLeader() { Reply reply; try { - if (leader.equals(current())) { + if (currentNodeLeader()) { return ProxyClusterSlotMapUtils.localSlotMap(current(), leaderSelector.getSlotMap()); } reply = sendCmd(leader, ClusterModeCmd.send_get_slot_map_from_leader, "{}"); @@ -220,7 +233,7 @@ public Reply proxyHeartbeat(Command command) { return new ErrorReply("ERR target not follower"); } if (cmd == ClusterModeCmd.send_heartbeat_to_follower) { - return followerReceiveLeaderHeartbeat(source, data); + return followerReceiveLeaderHeartbeat(data); } if (cmd == ClusterModeCmd.send_slot_map_to_follower) { return followerReceiveNewSlotMap(source, data); @@ -259,23 +272,27 @@ private Reply followerReceiveNewSlotMap(ProxyNode leader, JSONObject data) { return StatusReply.OK; } - private Reply followerReceiveLeaderHeartbeat(ProxyNode leader, JSONObject data) { + private Reply followerReceiveLeaderHeartbeat(JSONObject data) { String md5 = data.getString("md5"); if (md5 != null && slotMap != null && !slotMap.getMd5().equals(md5)) { - try { - executor.submit(() -> { - ProxyClusterSlotMap newSlotMap = getSlotMapFromLeader(); - updateSlotMap(slotMap, newSlotMap, "heartbeat-md5-check|leader=" + leader, false); - }); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } + updateSlotMapFromMaster(); } JSONObject json = new JSONObject(); json.put("status", ClusterModeStatus.getStatus().getValue()); return new BulkReply(Utils.stringToBytes(json.toJSONString())); } + private void updateSlotMapFromMaster() { + try { + executor.submit(() -> { + ProxyClusterSlotMap newSlotMap = getSlotMapFromLeader(); + updateSlotMap(slotMap, newSlotMap, "heartbeat-md5-check|leader=" + leader, false); + }); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + //=====leader===== private Reply leaderReceiveSlaveHeartbeat(ProxyNode follower, JSONObject data) { @@ -283,7 +300,9 @@ private Reply leaderReceiveSlaveHeartbeat(ProxyNode follower, JSONObject data) { if (logger.isDebugEnabled()) { logger.debug("leader receive follower heartbeat, follower = {}, data = {}", follower, data); } - return StatusReply.OK; + JSONObject response = new JSONObject(); + response.put("md5", slotMap.getMd5()); + return new BulkReply(Utils.stringToBytes(response.toJSONString())); } //====heartbeat===== @@ -343,6 +362,13 @@ private void sendHeartbeatToLeader0() { if (reply instanceof ErrorReply) { logger.error("send heartbeat to leader error, leader = {}, error = {}", targetLeader, ((ErrorReply) reply).getError()); } + if (reply instanceof BulkReply) { + JSONObject json = JSONObject.parseObject(Utils.bytesToString(((BulkReply) reply).getRaw())); + String md5 = json.getString("md5"); + if (slotMap == null || !slotMap.getMd5().equals(md5)) { + updateSlotMapFromMaster(); + } + } } catch (Exception e) { logger.error("sendHeartbeatToLeader0 error", e); } @@ -453,4 +479,12 @@ private void updateSlotMap(ProxyClusterSlotMap oldSlotMap, ProxyClusterSlotMap n } } + private void sleep(long ms) { + try { + TimeUnit.MILLISECONDS.sleep(ms); + } catch (InterruptedException e) { + logger.error(e.toString(), e); + } + } + }