Skip to content

Commit

Permalink
Merge branch '2.x' into incr_spring_autoconfigure_unit_test
Browse files Browse the repository at this point in the history
  • Loading branch information
l81893521 authored Aug 16, 2024
2 parents a54c3c8 + fb88638 commit bad4ec0
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ public interface ConfigurationKeys {
/**
* The constant MAPPING_TABLE_NAME
*/
String MAPPING_TABLE_NAME = STORE_DB_PREFIX + FILE_CONFIG_SPLIT_CHAR + "mapping-table";
String MAPPING_TABLE_NAME = STORE_DB_PREFIX + "mapping-table";

/**
* The constant NAMESPACE_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.seata.common.metadata.namingserver;



import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.ClusterRole;
Expand Down Expand Up @@ -133,6 +132,10 @@ public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public Map<String, Object> getMetadata() {
return metadata;
}

public void addMetadata(String key, Object value) {
this.metadata.put(key, value);
}
Expand Down Expand Up @@ -182,7 +185,7 @@ public Map<String, String> toMap() {
resultMap.put("weight", String.valueOf(weight));
resultMap.put("healthy", String.valueOf(healthy));
resultMap.put("term", String.valueOf(term));
resultMap.put("timestamp",String.valueOf(timestamp));
resultMap.put("timestamp", String.valueOf(timestamp));
resultMap.put("metadata", mapToJsonString(metadata));

return resultMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,5 @@ public List<WatcherVO> getWatchList() {
.collect(Collectors.toList());
}

@GetMapping("/health")
public String healthCheck() {
return "ok";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ public boolean registerInstance(NamingServerNode instance, String unitName) {
// ensure that when adding an instance, the remove side will not delete the unit.
lock.lock();
try {
currentUnit.addInstance(instance);
return currentUnit.addInstance(instance);
} finally {
lock.unlock();
}
return true;

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void init() {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers -> watchers.parallelStream().forEach(watcher -> {
if (System.currentTimeMillis() >= watcher.getTimeout()) {
notify(watcher, 304);
notify(watcher, HttpStatus.NOT_MODIFIED.value());
}
if (!watcher.isDone()) {
// Re-register
Expand All @@ -77,7 +78,7 @@ public void init() {
@EventListener
@Async
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
if (event.getTerm() > 0 || event.getTerm() == -1) {
GROUP_UPDATE_TIME.put(event.getGroup(), event.getTerm());
// Notifications are made of changes in cluster information

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seata.common.result.Result;
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.common.NamingServerConstants;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.namingserver.listener.ClusterChangeEvent;
import org.apache.seata.namingserver.entity.pojo.ClusterData;
import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO;
Expand Down Expand Up @@ -202,20 +203,18 @@ public void changeGroup(String namespace, String clusterName, String unitName, S
}
}

public void notifyClusterChange(String namespace, String clusterName, String unitName) {
public void notifyClusterChange(String namespace, String clusterName, String unitName,long term) {
for (Map.Entry<String, ConcurrentMap<String, Pair<String, String>>> entry : vGroupMap.entrySet()) {
String vGroup = entry.getKey();
Map<String, Pair<String, String>> namespaceMap = entry.getValue();

// Iterating through an internal HashMap
for (Map.Entry<String, Pair<String, String>> innerEntry : namespaceMap.entrySet()) {
String namespace1 = innerEntry.getKey();

Pair<String, String> pair = innerEntry.getValue();
String clusterName1 = pair.getKey();
String unitName1 = pair.getValue();
if (namespace1.equals(namespace) && clusterName1.equals(clusterName)
&& (unitName1 == null || unitName1.equals(unitName))) {
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, System.currentTimeMillis()));
if (StringUtils.equals(clusterName1,clusterName)) {
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term));
}
}
}
Expand All @@ -234,17 +233,19 @@ public boolean registerInstance(NamingServerNode node, String namespace, String

// if extended metadata includes vgroup mapping relationship, add it in clusterData
Optional.ofNullable(node.getMetadata().remove(CONSTANT_GROUP)).ifPresent(mappingObj -> {
if (mappingObj instanceof List) {
List<String> vGroups = (List<String>)mappingObj;
for (String vGroup : vGroups) {
changeGroup(namespace, clusterName, unitName, vGroup);
}
if (mappingObj instanceof Map) {
Map<String, Object> vGroups = (Map) mappingObj;
vGroups.forEach((k, v) -> {
// In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node.
// In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used.
changeGroup(namespace, clusterName, v == null ? unitName : (String)v, k);
});
}
});

boolean hasChanged = clusterData.registerInstance(node, unitName);
if (hasChanged) {
notifyClusterChange(namespace, clusterName, unitName);
notifyClusterChange(namespace, clusterName, unitName,node.getTerm());
}
instanceLiveTable.put(
new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()),
Expand All @@ -256,15 +257,15 @@ public boolean registerInstance(NamingServerNode node, String namespace, String
return true;
}

public boolean unregisterInstance(String unitName, Node node) {
public boolean unregisterInstance(String unitName, NamingServerNode node) {
try {
for (String namespace : namespaceClusterDataMap.keySet()) {
Map<String, ClusterData> clusterMap = namespaceClusterDataMap.get(namespace);
if (clusterMap != null) {
clusterMap.forEach((clusterName, clusterData) -> {
if (clusterData.getUnitData() != null && clusterData.getUnitData().containsKey(unitName)) {
clusterData.removeInstance(node, unitName);
notifyClusterChange(namespace, clusterName, unitName);
notifyClusterChange(namespace, clusterName, unitName, node.getTerm());
instanceLiveTable.remove(new InetSocketAddress(node.getTransaction().getHost(),
node.getTransaction().getPort()));
}
Expand Down Expand Up @@ -329,7 +330,7 @@ public void instanceHeartBeatCheck() {
LOGGER.warn("{} instance has gone offline",
instance.getTransaction().getHost() + ":" + instance.getTransaction().getPort());
}
notifyClusterChange(namespace, clusterData.getClusterName(), unit.getUnitName());
notifyClusterChange(namespace, clusterData.getClusterName(), unit.getUnitName(),-1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.test.context.junit4.SpringRunner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -61,8 +62,8 @@ void mockRegister() {
node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty"));
node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http"));
Map<String, Object> meatadata = node.getMetadata();
List<String> vGroups = new ArrayList<>();
vGroups.add("vgroup1");
Map<String,Object> vGroups = new HashMap<>();
vGroups.put("vgroup1",null);
meatadata.put(CONSTANT_GROUP, vGroups);
namingController.registerInstance(namespace, clusterName, unitName, node);
String vGroup = "vgroup1";
Expand Down Expand Up @@ -92,16 +93,16 @@ void mockUnregisterGracefully() {
node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty"));
node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http"));
Map<String, Object> meatadata = node.getMetadata();
List<String> vGroups = new ArrayList<>();
vGroups.add("vgroup1");
Map<String,Object> vGroups = new HashMap<>();
vGroups.put("vgroup1",null);
meatadata.put(CONSTANT_GROUP, vGroups);
namingController.registerInstance(namespace, clusterName, unitName, node);
NamingServerNode node2 = new NamingServerNode();
node2.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty"));
node2.setControl(new Node.Endpoint("127.0.0.1", 7091, "http"));
Map<String, Object> meatadata2 = node2.getMetadata();
List<String> vGroups2 = new ArrayList<>();
vGroups2.add("vgroup2");
Map<String,Object> vGroups2 = new HashMap<>();
vGroups2.put("vgroup2",null);
meatadata2.put(CONSTANT_GROUP, vGroups2);
namingController.registerInstance(namespace, "cluster2", UUID.randomUUID().toString(), node2);
String vGroup = "vgroup1";
Expand Down Expand Up @@ -136,8 +137,8 @@ void mockUnregisterUngracefully() throws InterruptedException {
node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty"));
node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http"));
Map<String, Object> meatadata = node.getMetadata();
List<String> vGroups = new ArrayList<>();
vGroups.add("vgroup1");
Map<String,Object> vGroups = new HashMap<>();
vGroups.put("vgroup1",null);
meatadata.put(CONSTANT_GROUP, vGroups);
namingController.registerInstance(namespace, clusterName, unitName, node);
String vGroup = "vgroup1";
Expand All @@ -164,4 +165,5 @@ void mockUnregisterUngracefully() throws InterruptedException {
cluster = metaResponse.getClusterList().get(0);
assertEquals(0, cluster.getUnitData().size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Result<?> addVGroup(@RequestParam String vGroup, @RequestParam String uni
mappingDO.setUnit(unit);
mappingDO.setVGroup(vGroup);
boolean rst = vGroupMappingStoreManager.addVGroup(mappingDO);
Instance.getInstance().setTerm(System.currentTimeMillis());
if (!rst) {
result.setCode("500");
result.setMessage("add vGroup failed!");
Expand All @@ -86,6 +87,7 @@ public Result<?> addVGroup(@RequestParam String vGroup, @RequestParam String uni
public Result<?> removeVGroup(@RequestParam String vGroup) {
Result<?> result = new Result<>();
boolean rst = vGroupMappingStoreManager.removeVGroup(vGroup);
Instance.getInstance().setTerm(System.currentTimeMillis());
if (!rst) {
result.setCode("500");
result.setMessage("remove vGroup failed!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class SessionHolder {
/**
* The default vgroup mapping store dir
*/
public static final String DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR = System.getProperty("user.dir");
public static final String DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR = "vgroupStore";

private static VGroupMappingStoreManager ROOT_VGROUP_MAPPING_MANAGER;

Expand Down Expand Up @@ -122,7 +122,8 @@ public static void init(SessionMode sessionMode) {
RaftServerManager.start();
} else {
String vGroupMappingStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR);
DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR) + separator
+ System.getProperty(SERVER_SERVICE_PORT_CAMEL);
String sessionStorePath =
CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, DEFAULT_SESSION_STORE_FILE_DIR) + separator
+ System.getProperty(SERVER_SERVICE_PORT_CAMEL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.seata.server.storage.file.store;



import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
Expand All @@ -32,18 +31,23 @@
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@LoadLevel(name = "file")
public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager {
private static final Logger LOGGER = LoggerFactory.getLogger(FileVGroupMappingStoreManager.class);

public static final String ROOT_MAPPING_MANAGER_NAME = "vgroup_mapping.json";

private final ReentrantLock writeLock = new ReentrantLock();
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private String storePath;

HashMap<String, Object> vGroupMapping = new HashMap<>();


protected static final Configuration CONFIG = ConfigurationFactory.getInstance();


Expand All @@ -56,29 +60,50 @@ public FileVGroupMappingStoreManager(String mappingStoreFilePath) {

@Override
public boolean addVGroup(MappingDO mappingDO) {
HashMap<String, Object> vGroupMapping = loadVGroups();
vGroupMapping.put(mappingDO.getVGroup(), mappingDO.getUnit());
boolean isSaved = save(vGroupMapping);
if (!isSaved) {
LOGGER.error("add mapping relationship failed!");
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
vGroupMapping.put(mappingDO.getVGroup(), mappingDO.getUnit());
boolean isSaved = save(vGroupMapping);

if (!isSaved) {
LOGGER.error("add mapping relationship failed!");
}
return isSaved;
} finally {
writeLock.unlock();
}
return isSaved;
}

@Override
public boolean removeVGroup(String vGroup) {
HashMap<String, Object> vGroupMapping = loadVGroups();
vGroupMapping.remove(vGroup);
boolean isSaved = save(vGroupMapping);
if (!isSaved) {
LOGGER.error("remove mapping relationship failed!");
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
vGroupMapping.remove(vGroup);
boolean isSaved = save(vGroupMapping);
if (!isSaved) {
LOGGER.error("remove mapping relationship failed!");
}
return isSaved;
} finally {
writeLock.unlock();
}
}

@Override
public HashMap<String, Object> readVGroups() {
Lock readLock = lock.readLock();
readLock.lock();
try {
return vGroupMapping;
} finally {
readLock.unlock();
}
return isSaved;
}

@Override
public HashMap<String, Object> loadVGroups() {
HashMap<String, Object> vGroupMapping = new HashMap<>();
try {
File fileToLoad = new File(storePath);
if (!fileToLoad.exists()) {
Expand Down Expand Up @@ -112,7 +137,6 @@ public HashMap<String, Object> loadVGroups() {


public boolean save(HashMap<String, Object> vGroupMapping) {
writeLock.lock();
try {
ObjectMapper objectMapper = new ObjectMapper();
String jsonMapping = objectMapper.writeValueAsString(vGroupMapping);
Expand All @@ -121,8 +145,6 @@ public boolean save(HashMap<String, Object> vGroupMapping) {
} catch (IOException e) {
LOGGER.error("mapping relationship saved failed! ", e);
return false;
} finally {
writeLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.seata.common.exception.RedisException;
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.store.MappingDO;
import org.apache.seata.server.storage.redis.JedisPooledFactory;
import org.apache.seata.server.store.VGroupMappingStoreManager;
Expand Down Expand Up @@ -66,7 +67,9 @@ public HashMap<String, Object> loadVGroups() {
Map<String, String> mappingKeyMap = jedis.hgetAll(namespace);
HashMap<String, Object> result = new HashMap<>();
for (Map.Entry<String, String> entry : mappingKeyMap.entrySet()) {
result.put(entry.getKey(), null);
if (StringUtils.equals(clusterName, entry.getValue())) {
result.put(entry.getKey(), null);
}
}
return result;
} catch (Exception ex) {
Expand Down
Loading

0 comments on commit bad4ec0

Please sign in to comment.