Skip to content

Commit

Permalink
optimize: optimize the initialization logic for server meta (#6795)
Browse files Browse the repository at this point in the history
  • Loading branch information
ggbocoder authored Sep 2, 2024
1 parent 1d426ed commit 77c81d5
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 126 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6787](https://github.com/apache/incubator-seata/pull/6787)] upgrade elliptic to 6.5.7
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts
- [[#6793](https://github.com/apache/incubator-seata/pull/6795)] optimize the initialization logic for server meta
- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] optimize NacosMockTest UT case


Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题
- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题
- [[#6793](https://github.com/apache/incubator-seata/pull/6795)] 独立server的meta信息初始化逻辑


### refactor:
Expand Down
29 changes: 8 additions & 21 deletions common/src/main/java/org/apache/seata/common/metadata/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.seata.common.metadata;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -121,28 +124,12 @@ public boolean equals(Object o) {


// convert to String
public String toJsonString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("\"controlEndpoint\": ").append(control.toString()).append(", ");
sb.append("\"transactionEndpoint\": ").append(transaction.toString()).append(", ");
sb.append("\"weight\": ").append(weight).append(", ");
sb.append("\"healthy\": ").append(healthy).append(", ");
sb.append("\"timeStamp\": ").append(timeStamp).append(", ");
sb.append("\"metadata\": {");

// handle metadata k-v map
int i = 0;
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
if (i > 0) {
sb.append(", ");
}
sb.append("\"").append(entry.getKey()).append("\": \"").append(entry.getValue()).append("\"");
i++;
public String toJsonString(ObjectMapper objectMapper) {
try {
return objectMapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

sb.append("}}");
return sb.toString();
}

public static class Endpoint {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class Instance {
private ClusterRole role = ClusterRole.MEMBER;
private Map<String, Object> metadata = new HashMap<>();


private Instance() {
}

Expand Down Expand Up @@ -162,9 +161,7 @@ public boolean equals(Object o) {
}


// Recursively convert metadata to JSON
public String toJsonString() {
ObjectMapper objectMapper = new ObjectMapper();
public String toJsonString(ObjectMapper objectMapper) {
try {
return objectMapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.seata.common.metadata.namingserver;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.Node;

import java.util.Objects;
Expand Down Expand Up @@ -85,16 +83,6 @@ public boolean isChanged(Object obj) {
return otherNode.term > term;
}

// convert to String
public String toJsonString() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public void setWeight(double weight) {
this.weight = weight;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.*;

class InstanceTest {
private final ObjectMapper objectMapper = new ObjectMapper();

@Test
void toJsonString() throws JsonProcessingException {
Expand All @@ -39,7 +40,6 @@ void toJsonString() throws JsonProcessingException {
instance.setMetadata(map);
instance.setControl(new Node.Endpoint("1.1.1.1",888));
instance.setTransaction(new Node.Endpoint("2.2.2.2",999));
System.out.println(instance.toJsonString());
assertEquals(instance.toJsonString(),objectMapper.writeValueAsString(instance));
assertEquals(instance.toJsonString(objectMapper),objectMapper.writeValueAsString(instance));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.jupiter.api.Assertions.*;

class NamingServerNodeTest {
private ObjectMapper objectMapper = new ObjectMapper();

@Test
void toJsonString() throws JsonProcessingException {
Expand All @@ -39,8 +40,7 @@ void toJsonString() throws JsonProcessingException {
node.setGroup("group");
node.setControl(new Node.Endpoint("1.1.1.1",888));
node.setTransaction(new Node.Endpoint("2.2.2.2",999));
System.out.println(node.toJsonString());
assertEquals(node.toJsonString(),objectMapper.writeValueAsString(node));
assertEquals(node.toJsonString(objectMapper),objectMapper.writeValueAsString(node));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void doRegister(Instance instance, List<String> urlList) {
String namespace = instance.getNamespace();
String clusterName = instance.getClusterName();
String unit = instance.getUnit();
String jsonBody = instance.toJsonString();
String jsonBody = instance.toJsonString(OBJECT_MAPPER);
String params = "namespace=" + namespace + "&clusterName=" + clusterName + "&unit=" + unit;
url += params;
Map<String, String> header = new HashMap<>();
Expand Down Expand Up @@ -206,7 +206,7 @@ public void unregister(InetSocketAddress address) {
for (String urlSuffix : getNamingAddrs()) {
String url = HTTP_PREFIX + urlSuffix + "/naming/v1/unregister?";
String unit = instance.getUnit();
String jsonBody = instance.toJsonString();
String jsonBody = instance.toJsonString(OBJECT_MAPPER);
String params = "unit=" + unit;
params = params + "&clusterName=" + instance.getClusterName();
params = params + "&namespace=" + instance.getNamespace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ public Cluster getClusterByUnits(Set<String> unitNames) {
}

public boolean registerInstance(NamingServerNode instance, String unitName) {
// refresh node weight
Object weightValue = instance.getMetadata().get("weight");
if (weightValue != null) {
instance.setWeight(Double.parseDouble(String.valueOf(weightValue)));
instance.getMetadata().remove("weight");
}
Unit currentUnit = unitData.computeIfAbsent(unitName, value -> {
Unit unit = new Unit();
List<NamingServerNode> instances = new CopyOnWriteArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public NamingManager() {
@PostConstruct
public void init() {
this.vGroupMap = Caffeine.newBuilder()
.expireAfterAccess(heartbeatTimeThreshold, TimeUnit.MILLISECONDS) // expired time
.expireAfterAccess(heartbeatTimeThreshold + 1000, TimeUnit.MILLISECONDS) // expired time
.maximumSize(Integer.MAX_VALUE)
.removalListener(new RemovalListener<Object, Object>() {

Expand Down
79 changes: 3 additions & 76 deletions server/src/main/java/org/apache/seata/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@
*/
package org.apache.seata.server;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;
import org.apache.seata.common.XID;
import org.apache.seata.common.holder.ObjectHolder;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
Expand All @@ -37,28 +31,19 @@
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
import org.apache.seata.core.rpc.netty.NettyServerConfig;
import org.apache.seata.server.coordinator.DefaultCoordinator;
import org.apache.seata.server.instance.ServerInstance;
import org.apache.seata.server.lock.LockerManagerFactory;
import org.apache.seata.server.metrics.MetricsManager;
import org.apache.seata.server.session.SessionHolder;
import org.apache.seata.server.store.StoreConfig;
import org.apache.seata.server.store.VGroupMappingStoreManager;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.GenericWebApplicationContext;


import static org.apache.seata.common.ConfigurationKeys.META_PREFIX;
import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER;
import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT;
import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGEX_SPLIT_CHAR;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFERED_NETWORKS;

Expand All @@ -69,66 +54,8 @@
public class Server {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);

protected static volatile ScheduledExecutorService EXECUTOR_SERVICE;

@Resource
RegistryNamingServerProperties registryNamingServerProperties;

@Resource
RegistryProperties registryProperties;

public void metadataInit() {
VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) {
EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true));
ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);

// load node properties
Instance instance = Instance.getInstance();
// load namespace
String namespace = registryNamingServerProperties.getNamespace();
instance.setNamespace(namespace);
// load cluster name
String clusterName = registryNamingServerProperties.getCluster();
instance.setClusterName(clusterName);

// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");

// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource;
for (String propertyName : enumerablePropertySource.getPropertyNames()) {
if (propertyName.startsWith(META_PREFIX)) {
instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName));
}
}
}
}
// load vgroup mapping relationship
instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups());

EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
vGroupMappingStoreManager.notifyMapping();
} catch (Exception e) {
LOGGER.error("Naming server register Exception", e);
}
}, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS);
ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown);
}
}

ServerInstance serverInstance;

/**
* The entry point of application.
Expand Down Expand Up @@ -179,7 +106,7 @@ public void start(String[] args) {
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

metadataInit();
serverInstance.serverInstanceInit();
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
nettyRemotingServer.init();
Expand Down
Loading

0 comments on commit 77c81d5

Please sign in to comment.