Skip to content

Commit

Permalink
fix: logic in health service and function manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Lambert-Rao committed Apr 5, 2024
1 parent c18cad7 commit 752aa15
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ public class RuntimeMetadata extends MetadataConfig {
private Long startTimeStamp;

private String clusterName;

private String clusterRegistryAddress;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public ClusterEntity getClusterByName(String name) {
return INSTANCE.clusterNameMap.get(name);
}

public ClusterEntity getClusterByRegistryAddress(String registryAddress) {
for (ClusterEntity clusterEntity : INSTANCE.clusterIdMap.values()) {
if (clusterEntity.getRegistryAddress().equals(registryAddress)) {
return clusterEntity;
}
}
return null;
}

public List<ClusterEntity> getClusters() {
return new ArrayList<>(INSTANCE.clusterIdMap.values());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,18 @@ public void createExecutor(HealthDataService dataService, CheckResultCache cache
}

public void executeAll() {
healthExecutor.startExecute();
try {

healthExecutor.startExecute();

checkServiceMap.forEach((type, subMap) -> {
subMap.forEach((typeId, healthCheckService) -> {
healthExecutor.execute(healthCheckService);
checkServiceMap.forEach((type, subMap) -> {
subMap.forEach((typeId, healthCheckService) -> {
healthExecutor.execute(healthCheckService);
});
});
});
} catch (Exception e) {
log.error("execute health check failed", e);
}

healthExecutor.endExecute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ public void run() {
}

public void run(Boolean toDbOn, Boolean toServiceOn) {
metaDataServiceWrapperMap.forEach((cacheId, metaDataServiceWrapper) -> handlers(cacheId, metaDataServiceWrapper, toDbOn, toServiceOn));
try {
metaDataServiceWrapperMap.forEach((cacheId, metaDataServiceWrapper) -> handlers(cacheId, metaDataServiceWrapper, toDbOn, toServiceOn));
} catch (Exception e) {
log.error("metadata manager run error", e);
}
}

public void handlers(Long cacheId, MetadataServiceWrapper metaDataServiceWrapper, Boolean toDbOn, Boolean toServiceOn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public class RuntimeMetadataHandlerToDbImpl implements MetadataHandler<RuntimeMe

@Override
public void addMetadata(RuntimeMetadata meta) {
ClusterEntity cluster = ClusterCache.getINSTANCE().getClusterByName(meta.getClusterName());
ClusterEntity cluster = ClusterCache.getINSTANCE().getClusterByRegistryAddress(meta.getClusterRegistryAddress());
if (Objects.isNull(cluster)) {
log.info("new cluster detected syncing runtime, adding cluster to db, cluster:{}", meta.getClusterName());
ClusterEntity clusterEntity = new ClusterEntity(meta.getClusterName(), "", "", "", "", "", "", "", 0, 0, 1, null, null, 0);
ClusterEntity clusterEntity = new ClusterEntity(meta.getClusterName(), cluster.getRegistryAddress(), "", "", "", "", "", "", 0, 0, 1,
null, null, 0);
clusterService.addCluster(clusterEntity);
} else {
cluster.setName(meta.getClusterName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.dashboard.common.model.remoting.runtime.GetRuntimeRequest;
import org.apache.eventmesh.dashboard.console.cache.ClusterCache;
import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService;
import org.apache.eventmesh.dashboard.core.meta.runtime.NacosRuntimeCore;
import org.apache.eventmesh.dashboard.service.remoting.MetaRemotingService;

import java.util.ArrayList;
Expand All @@ -33,15 +34,13 @@

import org.springframework.stereotype.Service;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class RuntimeSyncFromClusterService implements SyncDataService<RuntimeMetadata> {

@Setter
private MetaRemotingService metaRemotingService;
private final MetaRemotingService metaRemotingService = new NacosRuntimeCore();

@Override
public List<RuntimeMetadata> getData() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,35 @@ List<HealthCheckResultEntity> selectByClusterIdAndCreateTimeRange(@Param("cluste

@Insert({
"<script>",
" <choose>",
" <when test='list.size() > 0'>",
" INSERT INTO health_check_result(type, type_id, cluster_id, state, result_desc) VALUES ",
" <if test='list.size() > 0'>",
" INSERT INTO health_check_result(type, type_id, cluster_id, state, result_desc)",
" VALUES ",
" <foreach collection='list' item='healthCheckResultEntity' index='index' separator=','>",
" (#{healthCheckResultEntity.type}, #{healthCheckResultEntity.typeId}, #{healthCheckResultEntity.clusterId},",
" #{healthCheckResultEntity.state}, #{healthCheckResultEntity.resultDesc})",
" </foreach>",
" </when>",
" <otherwise>",
" SELECT 1 FROM DUAL WHERE FALSE",
" </otherwise>",
" </choose>",
" ON DUPLICATE KEY UPDATE",
" state = VALUES(state),",
" result_desc = VALUES(result_desc)",
" </if>",
"</script>"
})
void batchInsert(List<HealthCheckResultEntity> healthCheckResultEntityList);

@Insert({
"<script>",
" <if test='list.size() > 0'>",
" INSERT INTO health_check_result(type, type_id, cluster_id, state, result_desc)",
" VALUES ",
" <foreach collection='list' item='healthCheckResultEntity' index='index' separator=','>",
" (#{healthCheckResultEntity.type}, #{healthCheckResultEntity.typeId}, #{healthCheckResultEntity.clusterId},",
" 4, #{healthCheckResultEntity.resultDesc})",
" </foreach>",
" </if>",
"</script>"
})
void insertNewChecks(List<HealthCheckResultEntity> healthCheckResultEntityList);

@Update("UPDATE health_check_result SET state = #{state}, result_desc = #{resultDesc} WHERE id = #{id}")
void update(HealthCheckResultEntity healthCheckResultEntity);

Expand All @@ -96,10 +109,13 @@ List<HealthCheckResultEntity> selectByClusterIdAndCreateTimeRange(@Param("cluste
@Select({
"<script>",
" SELECT * FROM health_check_result",
" WHERE (cluster_id, type, type_id, state) IN",
" <foreach collection='list' item='item' open='(' separator=',' close=')'>",
" (#{item.clusterId}, #{item.type}, #{item.typeId}, 2)",
" </foreach>",
" <if test='list != null and list.size() > 0'>",
" WHERE (cluster_id, type, type_id) IN",
" <foreach collection='list' item='item' open='(' separator=',' close=')'>",
" (#{item.clusterId}, #{item.type}, #{item.typeId})",
" </foreach>",
" AND (state = 2 OR state = 4)",
" </if>",
" ORDER BY create_time DESC",
"</script>"
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public interface RuntimeMapper {
"<script>",
" INSERT INTO runtime (cluster_id, host, storage_cluster_id, port, jmx_port, start_timestamp, rack, status, endpoint_map) VALUES",
" <foreach collection='list' item='c' index='index' separator=','>",
" (#{c.clusterId},#{c.host},#{c.storageClusterId},#{c.port},#{c.jmxPort},#{c.startTimestamp},#{c.rack},#{c.status},#{c.endpointMap})",
" (#{c.clusterId},#{c.host},#{c.storageClusterId},#{c.port},#{c.jmxPort},NOW(),#{c.rack},#{c.status},#{c.endpointMap})",
" </foreach>",
"</script>"})
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
void batchInsert(List<RuntimeEntity> runtimeEntities);

@Insert("INSERT INTO runtime (cluster_id, host, storage_cluster_id, port, jmx_port, start_timestamp, rack, status, "
+ "endpoint_map) VALUES(#{clusterId},#{host},#{storageClusterId},#{port},#{jmxPort},#{startTimestamp},#{rack},#{status},#{endpointMap})"
+ "endpoint_map) VALUES(#{clusterId},#{host},#{storageClusterId},#{port},#{jmxPort},NOW(),#{rack},#{status},#{endpointMap})"
+ " ON DUPLICATE KEY UPDATE status=1,start_timestamp = now()")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
void addRuntime(RuntimeEntity runtimeEntity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public interface HealthDataService {

void batchInsertHealthCheckResult(List<HealthCheckResultEntity> healthCheckResultEntityList);

/**
* New check results have state 4: SDK client not created or connected
*/
void batchInsertNewCheckResult(List<HealthCheckResultEntity> healthCheckResultEntityList);

List<HealthCheckResultEntity> selectAll();

List<HealthCheckResultEntity> queryHealthCheckResultByClusterIdAndTypeAndTypeId(HealthCheckResultEntity entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,20 @@ public HealthCheckResultEntity insertHealthCheckResult(HealthCheckResultEntity h

@Override
public void batchInsertHealthCheckResult(List<HealthCheckResultEntity> healthCheckResultEntityList) {
if (healthCheckResultEntityList.isEmpty()) {
return;
}
healthCheckResultMapper.batchInsert(healthCheckResultEntityList);
}

@Override
public void batchInsertNewCheckResult(List<HealthCheckResultEntity> healthCheckResultEntityList) {
if (healthCheckResultEntityList.isEmpty()) {
return;
}
healthCheckResultMapper.insertNewChecks(healthCheckResultEntityList);
}

@Override
public List<HealthCheckResultEntity> selectAll() {
return healthCheckResultMapper.selectAll();
Expand All @@ -71,17 +82,21 @@ public void batchUpdateCheckResult(List<HealthCheckResultEntity> healthCheckResu

@Override
public void batchUpdateCheckResultByClusterIdAndTypeAndTypeId(List<HealthCheckResultEntity> healthCheckResultEntityList) {
List<HealthCheckResultEntity> idsNeedToBeUpdate = healthCheckResultMapper.getIdsNeedToBeUpdateByClusterIdAndTypeAndTypeId(
if (healthCheckResultEntityList.isEmpty()) {
return;
}
List<HealthCheckResultEntity> entitiesNeedToBeUpdate = healthCheckResultMapper.getIdsNeedToBeUpdateByClusterIdAndTypeAndTypeId(
healthCheckResultEntityList);
idsNeedToBeUpdate.forEach(entity -> {
entitiesNeedToBeUpdate.forEach(entity -> {
healthCheckResultEntityList.forEach(updateEntity -> {
if (entity.getClusterId().equals(updateEntity.getClusterId()) && entity.getType().equals(updateEntity.getType())
&& entity.getTypeId().equals(updateEntity.getTypeId())) {
updateEntity.setId(entity.getId());
entity.setState(updateEntity.getState());
entity.setResultDesc(updateEntity.getResultDesc());
}
});
});
healthCheckResultMapper.batchUpdate(healthCheckResultEntityList);
healthCheckResultMapper.batchUpdate(entitiesNeedToBeUpdate);
}

@Autowired
Expand Down
Loading

0 comments on commit 752aa15

Please sign in to comment.