From f7c38669dd4d1bc9943f44f1ee966b4197298799 Mon Sep 17 00:00:00 2001 From: funkye Date: Sat, 17 Aug 2024 14:54:19 +0800 Subject: [PATCH] bugfix: the bug where multiple nodes cannot be retrieved from the naming server (#6757) --- changes/en-us/2.x.md | 3 + changes/zh-cn/2.x.md | 4 +- .../seata/common/ConfigurationKeys.java | 4 +- .../apache/seata/common/metadata/Cluster.java | 13 +- .../NamingserverRegistryServiceImpl.java | 5 +- .../controller/NamingController.java | 27 ++- .../namingserver/entity/pojo/ClusterData.java | 16 +- .../namingserver/manager/NamingManager.java | 200 ++++++++++-------- .../namingserver/NamingControllerTest.java | 131 ++++++++++-- script/config-center/config.txt | 1 + .../server/store/StoreDBProperties.java | 11 + .../db/store/VGroupMappingDataBaseDAO.java | 4 +- .../store/RedisVGroupMappingStoreManager.java | 16 +- .../main/resources/application.example.yml | 1 + 14 files changed, 303 insertions(+), 133 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 230b811e940..0db8d1db87e 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -7,6 +7,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support - [[#6537](https://github.com/apache/incubator-seata/pull/6537)] support Namingserver - [[#6538](https://github.com/apache/incubator-seata/pull/6538)] Integration of naming server on the Seata server side + ### bugfix: - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager - [[#6624](https://github.com/apache/incubator-seata/pull/6624)] fix Alibaba Dubbo convert error @@ -22,6 +23,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6711](https://github.com/apache/incubator-seata/pull/6711)] fix dameng rollback info un compress fail - [[#6714](https://github.com/apache/incubator-seata/pull/6714)] fix dameng delete undo fail - [[#6701](https://github.com/apache/incubator-seata/pull/6728)] fix support serialization for dm.jdbc.driver.DmdbTimestamp +- [[#6757](https://github.com/apache/incubator-seata/pull/6757)] the bug where multiple nodes cannot be retrieved from the naming server ### optimize: @@ -54,6 +56,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6745](https://github.com/apache/incubator-seata/pull/6745)] fix node-gyp build error on arm64 and macos - [[#6748](https://github.com/apache/incubator-seata/pull/6748)] optimize ConsistentHashLoadBalance Algorithm - [[#6747](https://github.com/apache/incubator-seata/pull/6747)] optimize fastjson deserialization +- [[#6755](https://github.com/apache/incubator-seata/pull/6755)] optimize namingserver code logic ### refactor: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 205feebb489..f00ecdb5edd 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -24,6 +24,8 @@ - [[#6711](https://github.com/apache/incubator-seata/pull/6711)] 修复达梦数据库的getRollbackInfo没有解压缩的问题 - [[#6714](https://github.com/apache/incubator-seata/pull/6714)] 修复达梦数据库的delete sql回滚失败的问题 - [[#6701](https://github.com/apache/incubator-seata/pull/6728)] 修复达梦数据库的对dm.jdbc.driver.DmdbTimestamp的支持 +- [[#6757](https://github.com/apache/incubator-seata/pull/6757)] 修复client通过namingserver只能获取到一个tc节点的bug + ### optimize: - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池 @@ -55,7 +57,7 @@ - [[#6745](https://github.com/apache/incubator-seata/pull/6745)] 修复 node-gyp 在 arm64 和 macos 构建失败问题 - [[#6748](https://github.com/apache/incubator-seata/pull/6748)] 优化 ConsistentHashLoadBalance 算法 - [[#6747](https://github.com/apache/incubator-seata/pull/6747)] 优化 fastjson 反序列化 - +- [[#6755](https://github.com/apache/incubator-seata/pull/6755)] 优化namingserver代码逻辑 ### refactor: diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index 26017ff753c..fc3f9c9402e 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -1033,9 +1033,9 @@ public interface ConfigurationKeys { String REGISTRY_NAMINGSERVER_CLUSTER = NAMINGSERVER_REGISTRY_PREFIX + "cluster"; /** - * The constant MAPPING_TABLE_NAME + * The constant VGROUP_TABLE_NAME */ - String MAPPING_TABLE_NAME = STORE_DB_PREFIX + "mapping-table"; + String VGROUP_TABLE_NAME = STORE_DB_PREFIX + FILE_CONFIG_SPLIT_CHAR + "vgroup-table"; /** * The constant NAMESPACE_KEY diff --git a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java b/common/src/main/java/org/apache/seata/common/metadata/Cluster.java index 2dcfec0fd64..c32d3616dd5 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Cluster.java @@ -16,11 +16,12 @@ */ package org.apache.seata.common.metadata; -import org.apache.seata.common.metadata.namingserver.Unit; - import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import org.apache.seata.common.metadata.namingserver.Unit; + public class Cluster { private String clusterName; private String clusterType; @@ -54,6 +55,14 @@ public void setUnitData(List unitData) { this.unitData = unitData; } + public void appendUnits(Collection unitData) { + this.unitData.addAll(unitData); + } + + public void appendUnit(Unit unitData) { + this.unitData.add(unitData); + } + } diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index d62b3e75bf5..b8c988eb310 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -152,7 +152,6 @@ static NamingserverRegistryServiceImpl getInstance() { @Override public void register(InetSocketAddress address) throws Exception { - unregister(address); NetUtil.validAddress(address); Instance instance = Instance.getInstance(); instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty")); @@ -207,7 +206,7 @@ public void doRegister(Instance instance, List urlList) { } public boolean doHealthCheck(String url) { - url = HTTP_PREFIX + url + "/naming/v1/health"; + url = HTTP_PREFIX + url + "/health"; Map header = new HashMap<>(); header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); try (CloseableHttpResponse response = HttpClientUtil.doGet(url, null, header, 3000)) { @@ -233,6 +232,8 @@ public void unregister(InetSocketAddress address) { String unit = instance.getUnit(); String jsonBody = instance.toJsonString(); String params = "unit=" + unit; + params = params + "&cluster=" + instance.getClusterName(); + params = params + "&namespace=" + instance.getNamespace(); url += params; Map header = new HashMap<>(); header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java index e94af26e0ac..3e1de73f965 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java @@ -71,10 +71,10 @@ public Result registerInstance(@RequestParam String namespace, } @PostMapping("/unregister") - public Result unregisterInstance(@RequestParam String unit, - @RequestBody NamingServerNode registerBody) { + public Result unregisterInstance(@RequestParam String namespace, @RequestParam String clusterName, + @RequestParam String unit, @RequestBody NamingServerNode registerBody) { Result result = new Result<>(); - boolean isSuccess = namingManager.unregisterInstance(unit, registerBody); + boolean isSuccess = namingManager.unregisterInstance(namespace, clusterName, unit, registerBody); if (isSuccess) { result.setMessage("node has unregistered successfully!"); } else { @@ -95,22 +95,29 @@ public MetaResponse discovery(@RequestParam String vGroup, @RequestParam String clusterWatcherManager.getTermByvGroup(vGroup)); } + @PostMapping("/addGroup") + public Result addGroup(@RequestParam String namespace, + @RequestParam String clusterName, + @RequestParam String unitName, + @RequestParam String vGroup) { + + Result addGroupResult = namingManager.createGroup(namespace, vGroup, clusterName, unitName); + if (!addGroupResult.isSuccess()) { + return addGroupResult; + } + return new Result<>("200", "change vGroup " + vGroup + "to cluster " + clusterName + " successfully!"); + } + @PostMapping("/changeGroup") public Result changeGroup(@RequestParam String namespace, @RequestParam String clusterName, @RequestParam String unitName, @RequestParam String vGroup) { - Result addGroupResult = namingManager.addGroup(namespace, vGroup, clusterName, unitName); + Result addGroupResult = namingManager.changeGroup(namespace, vGroup, clusterName, unitName); if (!addGroupResult.isSuccess()) { return addGroupResult; } - // remove vGroup in old cluster - Result removeGroupResult = namingManager.removeGroup(namespace, vGroup, unitName); - if (!removeGroupResult.isSuccess()) { - return removeGroupResult; - } - namingManager.changeGroup(namespace, clusterName, unitName, vGroup); return new Result<>("200", "change vGroup " + vGroup + "to cluster " + clusterName + " successfully!"); } diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java index ead20f4abfc..250f8509388 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Lock; @@ -34,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; public class ClusterData { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterData.class); @@ -107,16 +107,18 @@ public List getInstanceList() { } - public Cluster getClusterByUnit(String unitName) { + public Cluster getClusterByUnits(Set unitNames) { Cluster clusterResponse = new Cluster(); clusterResponse.setClusterName(clusterName); clusterResponse.setClusterType(clusterType); - if (!StringUtils.hasLength(unitName)) { - clusterResponse.setUnitData(new ArrayList<>(unitData.values())); + if (CollectionUtils.isEmpty(unitNames)) { + clusterResponse.appendUnits(unitData.values()); } else { - List unitList = new ArrayList<>(); - Optional.ofNullable(unitData.get(unitName)).ifPresent(unitList::add); - clusterResponse.setUnitData(unitList); + for (String unitName : unitNames) { + List unitList = new ArrayList<>(); + Optional.ofNullable(unitData.get(unitName)).ifPresent(unitList::add); + clusterResponse.appendUnits(unitList); + } } return clusterResponse; diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index 72889b8d136..5e892674b43 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -16,7 +16,23 @@ */ package org.apache.seata.namingserver.manager; -import org.apache.commons.lang3.tuple.Pair; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.PostConstruct; + import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ContentType; import org.apache.http.protocol.HTTP; @@ -40,21 +56,6 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import javax.annotation.PostConstruct; - import static org.apache.seata.common.NamingServerConstants.CONSTANT_GROUP; @@ -62,9 +63,10 @@ public class NamingManager { private static final Logger LOGGER = LoggerFactory.getLogger(NamingManager.class); private final ConcurrentMap instanceLiveTable; - private final ConcurrentMap>> vGroupMap; - private final ConcurrentMap> namespaceClusterDataMap; + private final ConcurrentMap/* unitName */>>> vGroupMap; + private final ConcurrentMap> namespaceClusterDataMap; @Value("${heartbeat.threshold:90000}") private int heartbeatTimeThreshold; @@ -109,21 +111,23 @@ public List monitorCluster(String namespace) { LOGGER.warn("no cluster in namespace:" + namespace); } - for (Map.Entry>> entry : vGroupMap.entrySet()) { + for (Map.Entry>>> entry : vGroupMap + .entrySet()) { String vGroup = entry.getKey(); - Map> namespaceMap = entry.getValue(); - Pair pair = namespaceMap.get(namespace); - String clusterName = pair.getKey(); - ClusterVO clusterVO = clusterVOHashMap.get(clusterName); - if (clusterVO != null) { - clusterVO.addMapping(vGroup); - } + ConcurrentMap>> namespaceMap = entry.getValue(); + ConcurrentMap> pair = namespaceMap.get(namespace); + pair.keySet().stream().findFirst().ifPresent(clusterName -> { + ClusterVO clusterVO = clusterVOHashMap.get(clusterName); + if (clusterVO != null) { + clusterVO.addMapping(vGroup); + } + }); } return new ArrayList<>(clusterVOHashMap.values()); } - public Result addGroup(String namespace, String vGroup, String clusterName, String unitName) { + public Result createGroup(String namespace, String vGroup, String clusterName, String unitName) { // add vGroup in new cluster List nodeList = getInstances(namespace, clusterName); if (nodeList == null || nodeList.size() == 0) { @@ -151,11 +155,11 @@ public Result addGroup(String namespace, String vGroup, String clusterNa return new Result<>("500", "add vGroup in new cluster failed"); } } - changeGroup(namespace,clusterName,unitName,vGroup); + addGroup(namespace,clusterName,unitName,vGroup); return new Result<>("200", "add vGroup successfully!"); } - public Result removeGroup(String namespace, String vGroup, String unitName) { + public Result removeGroup(String namespace, String clusterName,String vGroup, String unitName) { List clusterList = getClusterListByVgroup(vGroup, namespace); for (Cluster cluster : clusterList) { if (cluster.getUnitData() != null && cluster.getUnitData().size() > 0) { @@ -189,13 +193,13 @@ public Result removeGroup(String namespace, String vGroup, String unitNa return new Result<>("200", "remove group in old cluster successfully!"); } - public void changeGroup(String namespace, String clusterName, String unitName, String vGroup) { + public void addGroup(String namespace, String clusterName, String unitName, String vGroup) { try { - Pair pair = Pair.of(clusterName, unitName); - ConcurrentMap> stringPairHashMap = new ConcurrentHashMap<>(); - stringPairHashMap.put(namespace, pair); - if (!vGroupMap.containsKey(vGroup) || !vGroupMap.get(vGroup).equals(stringPairHashMap)) { - vGroupMap.put(vGroup, stringPairHashMap); + Set units = vGroupMap.computeIfAbsent(vGroup, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(namespace, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()); + if (!units.contains(unitName)) { + units.add(unitName); applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, System.currentTimeMillis())); } } catch (Exception e) { @@ -203,22 +207,18 @@ public void changeGroup(String namespace, String clusterName, String unitName, S } } - public void notifyClusterChange(String namespace, String clusterName, String unitName,long term) { - for (Map.Entry>> entry : vGroupMap.entrySet()) { + public void notifyClusterChange(String namespace, String clusterName, String unitName, long term) { + for (Map.Entry>>> entry : vGroupMap + .entrySet()) { String vGroup = entry.getKey(); - Map> namespaceMap = entry.getValue(); - - // Iterating through an internal HashMap - for (Map.Entry> innerEntry : namespaceMap.entrySet()) { - - Pair pair = innerEntry.getValue(); - String clusterName1 = pair.getKey(); - if (StringUtils.equals(clusterName1,clusterName)) { - applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term)); - } - } + Map>> namespaceMap = entry.getValue(); + Optional.ofNullable(namespaceMap.get(namespace)).flatMap(pair -> Optional.ofNullable(pair.get(clusterName))) + .ifPresent(unitSet -> { + if (StringUtils.isBlank(unitName) || unitSet.contains(unitName)) { + applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term)); + } + }); } - } public boolean registerInstance(NamingServerNode node, String namespace, String clusterName, String unitName) { @@ -229,16 +229,16 @@ public boolean registerInstance(NamingServerNode node, String namespace, String // add instance in cluster // create cluster when there is no cluster in clusterDataHashMap ClusterData clusterData = clusterDataHashMap.computeIfAbsent(clusterName, - key -> new ClusterData(clusterName, (String)node.getMetadata().remove("cluster-type"))); + key -> new ClusterData(clusterName, (String)node.getMetadata().get("cluster-type"))); // if extended metadata includes vgroup mapping relationship, add it in clusterData - Optional.ofNullable(node.getMetadata().remove(CONSTANT_GROUP)).ifPresent(mappingObj -> { + Optional.ofNullable(node.getMetadata().get(CONSTANT_GROUP)).ifPresent(mappingObj -> { if (mappingObj instanceof Map) { - Map vGroups = (Map) mappingObj; + Map vGroups = (Map) mappingObj; vGroups.forEach((k, v) -> { // In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node. // In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used. - changeGroup(namespace, clusterName, v == null ? unitName : (String)v, k); + addGroup(namespace, clusterName, v == null ? unitName : (String)v, k); }); } }); @@ -257,19 +257,21 @@ public boolean registerInstance(NamingServerNode node, String namespace, String return true; } - public boolean unregisterInstance(String unitName, NamingServerNode node) { + public boolean unregisterInstance(String namespace, String clusterName, String unitName, NamingServerNode node) { try { - for (String namespace : namespaceClusterDataMap.keySet()) { - Map clusterMap = namespaceClusterDataMap.get(namespace); - if (clusterMap != null) { - clusterMap.forEach((clusterName, clusterData) -> { - if (clusterData.getUnitData() != null && clusterData.getUnitData().containsKey(unitName)) { - clusterData.removeInstance(node, unitName); - notifyClusterChange(namespace, clusterName, unitName, node.getTerm()); - instanceLiveTable.remove(new InetSocketAddress(node.getTransaction().getHost(), - node.getTransaction().getPort())); - } - }); + Map clusterMap = namespaceClusterDataMap.get(namespace); + if (clusterMap != null) { + ClusterData clusterData = clusterMap.get(clusterName); + if (clusterData.getUnitData() != null && clusterData.getUnitData().containsKey(unitName)) { + clusterData.removeInstance(node, unitName); + Object vgroupMap = node.getMetadata().get(CONSTANT_GROUP); + if (vgroupMap instanceof Map) { + ((Map)vgroupMap).forEach((group, realUnitName) -> vGroupMap.get(group) + .get(namespace).get(clusterName).remove(realUnitName)); + } + notifyClusterChange(namespace, clusterName, unitName, node.getTerm()); + instanceLiveTable.remove( + new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort())); } } } catch (Exception e) { @@ -281,22 +283,23 @@ public boolean unregisterInstance(String unitName, NamingServerNode node) { public List getClusterListByVgroup(String vGroup, String namespace) { // find the cluster where the transaction group is located - ConcurrentMap> map = - vGroupMap.get(vGroup); - if (!CollectionUtils.isEmpty(map)) { - Pair clusterUnitPair = map.get(namespace); - if (clusterUnitPair != null) { - String clusterName = clusterUnitPair.getKey(); - String unitName = clusterUnitPair.getValue(); - List clusterList = new ArrayList<>(); - Optional.ofNullable(namespaceClusterDataMap.get(namespace)) - .flatMap(clusterDataMap -> Optional.ofNullable(clusterDataMap.get(clusterName))).ifPresent(data -> { - clusterList.add(data.getClusterByUnit(unitName)); - }); - return clusterList; + ConcurrentMap/* unitName */>> vgroupNamespaceMap = + vGroupMap.get(vGroup); + List clusterList = new ArrayList<>(); + if (!CollectionUtils.isEmpty(vgroupNamespaceMap)) { + ConcurrentMap> clusterUnitPair = vgroupNamespaceMap.get(namespace); + ConcurrentMap clusterDataMap = namespaceClusterDataMap.get(namespace); + if (clusterUnitPair != null && !CollectionUtils.isEmpty(clusterDataMap)) { + clusterUnitPair.forEach((clusterName, unitNameSet) -> { + ClusterData clusterData = clusterDataMap.get(clusterName); + if (clusterData != null) { + clusterList.add(clusterData.getClusterByUnits(unitNameSet)); + } + }); } } - return Collections.emptyList(); + return clusterList; } public List getInstances(String namespace, String clusterName) { @@ -313,8 +316,8 @@ public void instanceHeartBeatCheck() { for (String namespace : namespaceClusterDataMap.keySet()) { for (ClusterData clusterData : namespaceClusterDataMap.get(namespace).values()) { for (Unit unit : clusterData.getUnitData().values()) { - List removeList = new ArrayList<>(); - for (Node instance : unit.getNamingInstanceList()) { + List removeList = new ArrayList<>(); + for (NamingServerNode instance : unit.getNamingInstanceList()) { InetSocketAddress inetSocketAddress = new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort()); long lastHeatBeatTimeStamp = instanceLiveTable.getOrDefault(inetSocketAddress, (long)0); @@ -325,8 +328,17 @@ public void instanceHeartBeatCheck() { } if (!CollectionUtils.isEmpty(removeList)) { unit.getNamingInstanceList().removeAll(removeList); - for (Node instance : removeList) { + for (NamingServerNode instance : removeList) { clusterData.removeInstance(instance, unit.getUnitName()); + Object vgoupMap = instance.getMetadata().get(CONSTANT_GROUP); + if (vgoupMap instanceof Map) { + ((Map)vgoupMap).forEach((group, unitName) -> { + Set units = + vGroupMap.get(group).get(namespace).get(clusterData.getClusterName()); + units.remove(unitName); + }); + } + LOGGER.warn("{} instance has gone offline", instance.getTransaction().getHost() + ":" + instance.getTransaction().getPort()); } @@ -337,4 +349,26 @@ public void instanceHeartBeatCheck() { } } + public Result changeGroup(String namespace, String vGroup, String clusterName, String unitName) { + ConcurrentMap>> namespaceMap = + new ConcurrentHashMap<>(vGroupMap.get(vGroup)); + createGroup(namespace, vGroup, clusterName, unitName); + AtomicReference> result = new AtomicReference<>(); + namespaceMap.forEach((currentNamespace, clusterMap) -> clusterMap.forEach((currentCluster, unitSet) -> { + for (String currentUnitName : unitSet) { + if (StringUtils.isBlank(unitName)) { + if (StringUtils.equalsIgnoreCase(clusterName, currentCluster)) { + continue; + } + result.set(removeGroup(currentNamespace, clusterName, vGroup, unitName)); + } else { + if (!StringUtils.equalsIgnoreCase(unitName, currentUnitName)) { + result.set(removeGroup(currentNamespace, clusterName, vGroup, unitName)); + } + } + } + })); + return Optional.ofNullable(result.get()).orElseGet(() -> new Result<>("200", "change vGroup successfully!")); + } + } diff --git a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java index 200019da367..a84350cfd70 100644 --- a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java +++ b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.apache.seata.common.NamingServerConstants.CONSTANT_GROUP; @@ -56,14 +57,14 @@ class NamingControllerTest { @Test void mockRegister() { String clusterName = "cluster1"; - String namespace = "public"; + String namespace = "public1"; String unitName = String.valueOf(UUID.randomUUID()); NamingServerNode node = new NamingServerNode(); node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); Map meatadata = node.getMetadata(); Map vGroups = new HashMap<>(); - vGroups.put("vgroup1",null); + vGroups.put("vgroup1",unitName); meatadata.put(CONSTANT_GROUP, vGroups); namingController.registerInstance(namespace, clusterName, unitName, node); String vGroup = "vgroup1"; @@ -81,30 +82,31 @@ void mockRegister() { Node node1 = unit.getNamingInstanceList().get(0); assertEquals("127.0.0.1", node1.getTransaction().getHost()); assertEquals(8091, node1.getTransaction().getPort()); - namingController.unregisterInstance(unitName, node); + namingController.unregisterInstance(namespace, clusterName, unitName, node); } @Test void mockUnregisterGracefully() { String clusterName = "cluster1"; - String namespace = "public"; + String namespace = "public2"; String unitName = String.valueOf(UUID.randomUUID()); NamingServerNode node = new NamingServerNode(); node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); Map meatadata = node.getMetadata(); Map vGroups = new HashMap<>(); - vGroups.put("vgroup1",null); + vGroups.put("vgroup1",unitName); meatadata.put(CONSTANT_GROUP, vGroups); namingController.registerInstance(namespace, clusterName, unitName, node); NamingServerNode node2 = new NamingServerNode(); - node2.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); - node2.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); + node2.setTransaction(new Node.Endpoint("127.0.0.1", 8092, "netty")); + node2.setControl(new Node.Endpoint("127.0.0.1", 7092, "http")); Map meatadata2 = node2.getMetadata(); Map vGroups2 = new HashMap<>(); - vGroups2.put("vgroup2",null); + String unitName2 = UUID.randomUUID().toString(); + vGroups2.put("vgroup2",unitName2); meatadata2.put(CONSTANT_GROUP, vGroups2); - namingController.registerInstance(namespace, "cluster2", UUID.randomUUID().toString(), node2); + namingController.registerInstance(namespace, "cluster2", unitName2, node2); String vGroup = "vgroup1"; MetaResponse metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); @@ -119,26 +121,24 @@ void mockUnregisterGracefully() { Node node1 = unit.getNamingInstanceList().get(0); assertEquals("127.0.0.1", node1.getTransaction().getHost()); assertEquals(8091, node1.getTransaction().getPort()); - namingController.unregisterInstance(unitName, node); + namingController.unregisterInstance(namespace, clusterName, unitName, node); metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); assertNotNull(metaResponse.getClusterList()); - assertEquals(1, metaResponse.getClusterList().size()); - cluster = metaResponse.getClusterList().get(0); - assertEquals(0, cluster.getUnitData().size()); + assertEquals(0, metaResponse.getClusterList().get(0).getUnitData().size()); } @Test void mockUnregisterUngracefully() throws InterruptedException { String clusterName = "cluster1"; - String namespace = "public"; + String namespace = "public3"; String unitName = String.valueOf(UUID.randomUUID()); NamingServerNode node = new NamingServerNode(); node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); Map meatadata = node.getMetadata(); Map vGroups = new HashMap<>(); - vGroups.put("vgroup1",null); + vGroups.put("vgroup1",unitName); meatadata.put(CONSTANT_GROUP, vGroups); namingController.registerInstance(namespace, clusterName, unitName, node); String vGroup = "vgroup1"; @@ -161,9 +161,108 @@ void mockUnregisterUngracefully() throws InterruptedException { metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); assertNotNull(metaResponse.getClusterList()); + assertEquals(0, metaResponse.getClusterList().get(0).getUnitData().size()); + } + + @Test + void mockDiscoveryMultiNode() { + String clusterName = "cluster1"; + String namespace = "public4"; + String unitName = String.valueOf(UUID.randomUUID()); + NamingServerNode node = new NamingServerNode(); + node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); + node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); + Map meatadata = node.getMetadata(); + Map vGroups = new HashMap<>(); + vGroups.put("vgroup1",unitName); + meatadata.put(CONSTANT_GROUP, vGroups); + NamingServerNode node2 = new NamingServerNode(); + String unitName2 = String.valueOf(UUID.randomUUID()); + node2.setTransaction(new Node.Endpoint("127.0.0.1", 8092, "netty")); + node2.setControl(new Node.Endpoint("127.0.0.1", 7092, "http")); + vGroups = new HashMap<>(); + vGroups.put("vgroup1",unitName2); + node2.getMetadata().put(CONSTANT_GROUP, vGroups); + namingController.registerInstance(namespace, clusterName, unitName, node); + namingController.registerInstance(namespace, clusterName, unitName2, node2); + String vGroup = "vgroup1"; + //namingController.changeGroup(namespace, clusterName, vGroup, vGroup); + MetaResponse metaResponse = namingController.discovery(vGroup, namespace); + assertNotNull(metaResponse); + assertNotNull(metaResponse.getClusterList()); + assertEquals(1, metaResponse.getClusterList().size()); + Cluster cluster = metaResponse.getClusterList().get(0); + assertNotNull(cluster.getUnitData()); + assertEquals(2, cluster.getUnitData().size()); + Unit unit = cluster.getUnitData().get(0); + assertNotNull(unit.getNamingInstanceList()); + assertEquals(1, unit.getNamingInstanceList().size()); + namingController.unregisterInstance(namespace, clusterName, unitName, node); + metaResponse = namingController.discovery(vGroup, namespace); + assertNotNull(metaResponse); + assertNotNull(metaResponse.getClusterList()); assertEquals(1, metaResponse.getClusterList().size()); cluster = metaResponse.getClusterList().get(0); - assertEquals(0, cluster.getUnitData().size()); + assertNotNull(cluster.getUnitData()); + assertEquals(1, cluster.getUnitData().size()); + unit = cluster.getUnitData().get(0); + assertNotNull(unit.getNamingInstanceList()); + } + + @Test + void mockHeartbeat() throws InterruptedException { + String clusterName = "cluster1"; + String namespace = "public5"; + String unitName = String.valueOf(UUID.randomUUID()); + NamingServerNode node = new NamingServerNode(); + node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); + node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); + Map meatadata = node.getMetadata(); + Map vGroups = new HashMap<>(); + vGroups.put("vgroup1",unitName); + meatadata.put(CONSTANT_GROUP, vGroups); + namingController.registerInstance(namespace, clusterName, unitName, node); + NamingServerNode node2 = new NamingServerNode(); + node2.setTransaction(new Node.Endpoint("127.0.0.1", 8092, "netty")); + node2.setControl(new Node.Endpoint("127.0.0.1", 7092, "http")); + Map meatadata2 = node2.getMetadata(); + Map vGroups2 = new HashMap<>(); + String unitName2 = UUID.randomUUID().toString(); + vGroups2.put("vgroup1",unitName2); + meatadata2.put(CONSTANT_GROUP, vGroups2); + namingController.registerInstance(namespace, clusterName, unitName2, node2); + String vGroup = "vgroup1"; + Thread thread = new Thread(()->{ + for (int i = 0; i < 5; i++) { + try { + TimeUnit.SECONDS.sleep(5); + namingController.registerInstance(namespace, clusterName, unitName, node); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + thread.start(); + MetaResponse metaResponse = namingController.discovery(vGroup, namespace); + assertNotNull(metaResponse); + assertNotNull(metaResponse.getClusterList()); + assertEquals(1, metaResponse.getClusterList().size()); + Cluster cluster = metaResponse.getClusterList().get(0); + assertNotNull(cluster.getUnitData()); + assertEquals(2, cluster.getUnitData().size()); + Unit unit = cluster.getUnitData().get(0); + assertNotNull(unit.getNamingInstanceList()); + assertEquals(1, unit.getNamingInstanceList().size()); + int timeGap = threshold + period; + Thread.sleep(timeGap); + metaResponse = namingController.discovery(vGroup, namespace); + assertNotNull(metaResponse); + assertNotNull(metaResponse.getClusterList()); + assertEquals(1, metaResponse.getClusterList().get(0).getUnitData().size()); + unit = metaResponse.getClusterList().get(0).getUnitData().get(0); + Node node1 = unit.getNamingInstanceList().get(0); + assertEquals("127.0.0.1", node1.getTransaction().getHost()); + assertEquals(8091, node1.getTransaction().getPort()); } } \ No newline at end of file diff --git a/script/config-center/config.txt b/script/config-center/config.txt index b5914058d22..82b39c83173 100644 --- a/script/config-center/config.txt +++ b/script/config-center/config.txt @@ -112,6 +112,7 @@ store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.distributedLockTable=distributed_lock +store.db.vgroupTable=vgroup-table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreDBProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreDBProperties.java index 761f8fe1772..5904d8d5d3b 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreDBProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreDBProperties.java @@ -40,6 +40,7 @@ public class StoreDBProperties { private String branchTable = "branch_table"; private String lockTable = "lock_table"; private String distributedLockTable = "distributed_lock"; + private String vgroupTable = "vgroup_table"; private Integer queryLimit = DEFAULT_QUERY_LIMIT; private Long maxWait = 5000L; @@ -167,4 +168,14 @@ public StoreDBProperties setMaxWait(Long maxWait) { this.maxWait = maxWait; return this; } + + public String getVgroupTable() { + return vgroupTable; + } + + public StoreDBProperties setVgroupTable(String vgroupTable) { + this.vgroupTable = vgroupTable; + return this; + } + } diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java index 81d227eb809..85c8faa13e6 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java @@ -34,7 +34,7 @@ import java.util.ArrayList; import java.util.List; -import static org.apache.seata.common.ConfigurationKeys.MAPPING_TABLE_NAME; +import static org.apache.seata.common.ConfigurationKeys.VGROUP_TABLE_NAME; import static org.apache.seata.common.ConfigurationKeys.REGISTRY_NAMINGSERVER_CLUSTER; import static org.apache.seata.common.NamingServerConstants.DEFAULT_VGROUP_MAPPING; @@ -50,7 +50,7 @@ public class VGroupMappingDataBaseDAO { public VGroupMappingDataBaseDAO(DataSource vGroupMappingDataSource) { this.vGroupMappingDataSource = vGroupMappingDataSource; - this.vMapping = CONFIG.getConfig(MAPPING_TABLE_NAME, DEFAULT_VGROUP_MAPPING); + this.vMapping = CONFIG.getConfig(VGROUP_TABLE_NAME, DEFAULT_VGROUP_MAPPING); } public boolean insertMappingDO(MappingDO mappingDO) { diff --git a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java index 7b4129cb5d4..470f39cde43 100644 --- a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java @@ -31,12 +31,12 @@ @LoadLevel(name = "redis") public class RedisVGroupMappingStoreManager implements VGroupMappingStoreManager { - private static final String REDIS_SPLIT_KEY = ":"; + private static final String REDIS_PREFIX = "SEATA_NAMINGSERVER_NAMESPACE_"; @Override public boolean addVGroup(MappingDO mappingDO) { String vGroup = mappingDO.getVGroup(); - String namespace = mappingDO.getNamespace(); + String namespace = REDIS_PREFIX + mappingDO.getNamespace(); String clusterName = mappingDO.getCluster(); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { jedis.hset(namespace, vGroup, clusterName); @@ -49,7 +49,7 @@ public boolean addVGroup(MappingDO mappingDO) { @Override public boolean removeVGroup(String vGroup) { Instance instance = Instance.getInstance(); - String namespace = instance.getNamespace(); + String namespace = REDIS_PREFIX + instance.getNamespace(); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { jedis.hdel(namespace, vGroup); return true; @@ -61,16 +61,16 @@ public boolean removeVGroup(String vGroup) { @Override public HashMap loadVGroups() { Instance instance = Instance.getInstance(); - String namespace = instance.getNamespace(); + String namespace = REDIS_PREFIX + instance.getNamespace(); String clusterName = instance.getClusterName(); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { Map mappingKeyMap = jedis.hgetAll(namespace); HashMap result = new HashMap<>(); - for (Map.Entry entry : mappingKeyMap.entrySet()) { - if (StringUtils.equals(clusterName, entry.getValue())) { - result.put(entry.getKey(), null); + mappingKeyMap.forEach((vgroup,clusterNameValue) -> { + if (StringUtils.equals(clusterName, clusterNameValue)) { + result.put(vgroup, null); } - } + }); return result; } catch (Exception ex) { throw new RedisException(ex); diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index 86f0625a58e..0a7897ab930 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -186,6 +186,7 @@ seata: branch-table: branch_table lock-table: lock_table distributed-lock-table: distributed_lock + vgroup-table: vgroup_table query-limit: 1000 max-wait: 5000 redis: