diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index fb45bc8a2d7..f63bc187833 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -73,6 +73,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6787](https://github.com/apache/incubator-seata/pull/6787)] upgrade elliptic to 6.5.7 - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts +- [[#6793](https://github.com/apache/incubator-seata/pull/6795)] optimize the initialization logic for server meta - [[#6794](https://github.com/apache/incubator-seata/pull/6794)] optimize NacosMockTest UT case diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index d4064797752..177e1417f23 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -75,6 +75,7 @@ - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题 - [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题 +- [[#6793](https://github.com/apache/incubator-seata/pull/6795)] 独立server的meta信息初始化逻辑 ### refactor: diff --git a/common/src/main/java/org/apache/seata/common/metadata/Node.java b/common/src/main/java/org/apache/seata/common/metadata/Node.java index 9d105a5581d..8a4a75f60c1 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Node.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Node.java @@ -16,6 +16,9 @@ */ package org.apache.seata.common.metadata; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -121,28 +124,12 @@ public boolean equals(Object o) { // convert to String - public String toJsonString() { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - sb.append("\"controlEndpoint\": ").append(control.toString()).append(", "); - sb.append("\"transactionEndpoint\": ").append(transaction.toString()).append(", "); - sb.append("\"weight\": ").append(weight).append(", "); - sb.append("\"healthy\": ").append(healthy).append(", "); - sb.append("\"timeStamp\": ").append(timeStamp).append(", "); - sb.append("\"metadata\": {"); - - // handle metadata k-v map - int i = 0; - for (Map.Entry entry : metadata.entrySet()) { - if (i > 0) { - sb.append(", "); - } - sb.append("\"").append(entry.getKey()).append("\": \"").append(entry.getValue()).append("\""); - i++; + public String toJsonString(ObjectMapper objectMapper) { + try { + return objectMapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); } - - sb.append("}}"); - return sb.toString(); } public static class Endpoint { diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java index 4692692c8c4..3159dc4a429 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java +++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java @@ -42,7 +42,6 @@ public class Instance { private ClusterRole role = ClusterRole.MEMBER; private Map metadata = new HashMap<>(); - private Instance() { } @@ -162,9 +161,7 @@ public boolean equals(Object o) { } - // Recursively convert metadata to JSON - public String toJsonString() { - ObjectMapper objectMapper = new ObjectMapper(); + public String toJsonString(ObjectMapper objectMapper) { try { return objectMapper.writeValueAsString(this); } catch (JsonProcessingException e) { diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java index 459dd66b944..507bd943b58 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java +++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java @@ -16,8 +16,6 @@ */ package org.apache.seata.common.metadata.namingserver; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seata.common.metadata.Node; import java.util.Objects; @@ -85,16 +83,6 @@ public boolean isChanged(Object obj) { return otherNode.term > term; } - // convert to String - public String toJsonString() { - ObjectMapper objectMapper = new ObjectMapper(); - try { - return objectMapper.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - public void setWeight(double weight) { this.weight = weight; } diff --git a/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java b/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java index 989d3cc0190..77ba5f4bd4c 100644 --- a/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java +++ b/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.*; class InstanceTest { + private final ObjectMapper objectMapper = new ObjectMapper(); @Test void toJsonString() throws JsonProcessingException { @@ -39,7 +40,6 @@ void toJsonString() throws JsonProcessingException { instance.setMetadata(map); instance.setControl(new Node.Endpoint("1.1.1.1",888)); instance.setTransaction(new Node.Endpoint("2.2.2.2",999)); - System.out.println(instance.toJsonString()); - assertEquals(instance.toJsonString(),objectMapper.writeValueAsString(instance)); + assertEquals(instance.toJsonString(objectMapper),objectMapper.writeValueAsString(instance)); } } \ No newline at end of file diff --git a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java index 8ddeadfaef2..2b70cd26ba0 100644 --- a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java +++ b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java @@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.*; class NamingServerNodeTest { + private ObjectMapper objectMapper = new ObjectMapper(); @Test void toJsonString() throws JsonProcessingException { @@ -39,8 +40,7 @@ void toJsonString() throws JsonProcessingException { node.setGroup("group"); node.setControl(new Node.Endpoint("1.1.1.1",888)); node.setTransaction(new Node.Endpoint("2.2.2.2",999)); - System.out.println(node.toJsonString()); - assertEquals(node.toJsonString(),objectMapper.writeValueAsString(node)); + assertEquals(node.toJsonString(objectMapper),objectMapper.writeValueAsString(node)); } @Test diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index a479fe69d15..901c4913681 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -165,7 +165,7 @@ public void doRegister(Instance instance, List urlList) { String namespace = instance.getNamespace(); String clusterName = instance.getClusterName(); String unit = instance.getUnit(); - String jsonBody = instance.toJsonString(); + String jsonBody = instance.toJsonString(OBJECT_MAPPER); String params = "namespace=" + namespace + "&clusterName=" + clusterName + "&unit=" + unit; url += params; Map header = new HashMap<>(); @@ -206,7 +206,7 @@ public void unregister(InetSocketAddress address) { for (String urlSuffix : getNamingAddrs()) { String url = HTTP_PREFIX + urlSuffix + "/naming/v1/unregister?"; String unit = instance.getUnit(); - String jsonBody = instance.toJsonString(); + String jsonBody = instance.toJsonString(OBJECT_MAPPER); String params = "unit=" + unit; params = params + "&clusterName=" + instance.getClusterName(); params = params + "&namespace=" + instance.getNamespace(); diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java index 971b768e40c..766c6c1d455 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java @@ -125,12 +125,6 @@ public Cluster getClusterByUnits(Set unitNames) { } public boolean registerInstance(NamingServerNode instance, String unitName) { - // refresh node weight - Object weightValue = instance.getMetadata().get("weight"); - if (weightValue != null) { - instance.setWeight(Double.parseDouble(String.valueOf(weightValue))); - instance.getMetadata().remove("weight"); - } Unit currentUnit = unitData.computeIfAbsent(unitName, value -> { Unit unit = new Unit(); List instances = new CopyOnWriteArrayList<>(); diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index bd89851b32e..84a3cbcfc2a 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -96,7 +96,7 @@ public NamingManager() { @PostConstruct public void init() { this.vGroupMap = Caffeine.newBuilder() - .expireAfterAccess(heartbeatTimeThreshold, TimeUnit.MILLISECONDS) // expired time + .expireAfterAccess(heartbeatTimeThreshold + 1000, TimeUnit.MILLISECONDS) // expired time .maximumSize(Integer.MAX_VALUE) .removalListener(new RemovalListener() { diff --git a/server/src/main/java/org/apache/seata/server/Server.java b/server/src/main/java/org/apache/seata/server/Server.java index 4f4de537cca..c699ef037b1 100644 --- a/server/src/main/java/org/apache/seata/server/Server.java +++ b/server/src/main/java/org/apache/seata/server/Server.java @@ -16,19 +16,13 @@ */ package org.apache.seata.server; -import java.util.Objects; -import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Resource; import org.apache.seata.common.XID; import org.apache.seata.common.holder.ObjectHolder; -import org.apache.seata.common.metadata.Node; -import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -37,28 +31,19 @@ import org.apache.seata.core.rpc.netty.NettyRemotingServer; import org.apache.seata.core.rpc.netty.NettyServerConfig; import org.apache.seata.server.coordinator.DefaultCoordinator; +import org.apache.seata.server.instance.ServerInstance; import org.apache.seata.server.lock.LockerManagerFactory; import org.apache.seata.server.metrics.MetricsManager; import org.apache.seata.server.session.SessionHolder; -import org.apache.seata.server.store.StoreConfig; -import org.apache.seata.server.store.VGroupMappingStoreManager; -import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties; -import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationListener; -import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.core.env.EnumerablePropertySource; -import org.springframework.core.env.PropertySource; import org.springframework.stereotype.Component; import org.springframework.web.context.support.GenericWebApplicationContext; -import static org.apache.seata.common.ConfigurationKeys.META_PREFIX; -import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT; -import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGEX_SPLIT_CHAR; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFERED_NETWORKS; @@ -69,66 +54,8 @@ public class Server { private static final Logger LOGGER = LoggerFactory.getLogger(Server.class); - protected static volatile ScheduledExecutorService EXECUTOR_SERVICE; - - @Resource - RegistryNamingServerProperties registryNamingServerProperties; - @Resource - RegistryProperties registryProperties; - - public void metadataInit() { - VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); - if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) { - EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true)); - ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); - - // load node properties - Instance instance = Instance.getInstance(); - // load namespace - String namespace = registryNamingServerProperties.getNamespace(); - instance.setNamespace(namespace); - // load cluster name - String clusterName = registryNamingServerProperties.getCluster(); - instance.setClusterName(clusterName); - - // load cluster type - String clusterType = String.valueOf(StoreConfig.getSessionMode()); - instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); - - // load unit name - instance.setUnit(String.valueOf(UUID.randomUUID())); - - instance.setTerm(System.currentTimeMillis()); - - // load node Endpoint - instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); - - // load metadata - for (PropertySource propertySource : environment.getPropertySources()) { - if (propertySource instanceof EnumerablePropertySource) { - EnumerablePropertySource enumerablePropertySource = (EnumerablePropertySource) propertySource; - for (String propertyName : enumerablePropertySource.getPropertyNames()) { - if (propertyName.startsWith(META_PREFIX)) { - instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName)); - } - } - } - } - // load vgroup mapping relationship - instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); - - EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { - try { - vGroupMappingStoreManager.notifyMapping(); - } catch (Exception e) { - LOGGER.error("Naming server register Exception", e); - } - }, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS); - ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown); - } - } - + ServerInstance serverInstance; /** * The entry point of application. @@ -179,7 +106,7 @@ public void start(String[] args) { coordinator.init(); nettyRemotingServer.setHandler(coordinator); - metadataInit(); + serverInstance.serverInstanceInit(); // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); nettyRemotingServer.init(); diff --git a/server/src/main/java/org/apache/seata/server/instance/ServerInstance.java b/server/src/main/java/org/apache/seata/server/instance/ServerInstance.java new file mode 100644 index 00000000000..e88aaf03669 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/instance/ServerInstance.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.instance; + +import org.apache.seata.common.holder.ObjectHolder; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.server.Server; +import org.apache.seata.server.ServerRunner; +import org.apache.seata.server.session.SessionHolder; +import org.apache.seata.server.store.StoreConfig; +import org.apache.seata.server.store.VGroupMappingStoreManager; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.EnumerablePropertySource; +import org.springframework.core.env.PropertySource; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.seata.common.ConfigurationKeys.META_PREFIX; +import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; +import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; + + +@Component("serverInstance") +public class ServerInstance { + @Resource + private RegistryProperties registryProperties; + + protected static volatile ScheduledExecutorService EXECUTOR_SERVICE; + + @Resource + private RegistryNamingServerProperties registryNamingServerProperties; + + private static final Logger LOGGER = LoggerFactory.getLogger(Server.class); + + public void serverInstanceInit() { + VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); + if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) { + EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true)); + ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); + + // load node properties + Instance instance = Instance.getInstance(); + // load namespace + String namespace = registryNamingServerProperties.getNamespace(); + instance.setNamespace(namespace); + // load cluster name + String clusterName = registryNamingServerProperties.getCluster(); + instance.setClusterName(clusterName); + + // load cluster type + String clusterType = String.valueOf(StoreConfig.getSessionMode()); + instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); + + // load unit name + instance.setUnit(String.valueOf(UUID.randomUUID())); + + instance.setTerm(System.currentTimeMillis()); + + // load node Endpoint + instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); + + // load metadata + for (PropertySource propertySource : environment.getPropertySources()) { + if (propertySource instanceof EnumerablePropertySource) { + EnumerablePropertySource enumerablePropertySource = (EnumerablePropertySource) propertySource; + for (String propertyName : enumerablePropertySource.getPropertyNames()) { + if (propertyName.startsWith(META_PREFIX)) { + instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName)); + } + } + } + } + // load vgroup mapping relationship + instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); + + EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { + try { + vGroupMappingStoreManager.notifyMapping(); + } catch (Exception e) { + LOGGER.error("Naming server register Exception", e); + } + }, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS); + ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown); + } + } +}