Skip to content

Commit

Permalink
Clear the jdbc EPHEMERAL data and lock which client is not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 23, 2024
1 parent 627c76b commit 17f21c9
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 44 deletions.
2 changes: 1 addition & 1 deletion dolphinscheduler-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector.version}</version>
<scope>test</scope>
<!-- <scope>test</scope>-->
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public void close() {
log.info("Closing Jdbc Registry...");
// remove the current Ephemeral node, if can connect to jdbc
try (JdbcRegistryClient closed1 = jdbcRegistryClient) {
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
} catch (Exception e) {
log.error("Close Jdbc Registry error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.dolphinscheduler.plugin.registry.jdbc.client;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.Data;

@ToString
@Getter
@Data
@AllArgsConstructor
public class JdbcRegistryClientIdentify {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryClientHeartbeat;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -36,7 +37,6 @@ public class JdbcRegistryClientHeartbeatDTO {

private Long id;

// clientName
private String clientName;

private Long lastHeartbeatTime;
Expand Down Expand Up @@ -90,7 +90,7 @@ public JdbcRegistryClientHeartbeatDTO clone() {
public static class ClientConfig {

@Builder.Default
private long sessionTimeout = 60 * 1000L;
private long sessionTimeout = TimeUnit.SECONDS.toMillis(60);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,25 @@ public Optional<JdbcRegistryDataDTO> selectByKey(String key) {
.map(JdbcRegistryDataDTO::fromJdbcRegistryData);
}

public Optional<JdbcRegistryDataDTO> selectByClientId(String key) {
return Optional.ofNullable(jdbcRegistryDataMapper.selectByKey(key))
.map(JdbcRegistryDataDTO::fromJdbcRegistryData);
}

public void deleteEphemeralDateByClientIds(List<Long> clientIds) {
if (CollectionUtils.isEmpty(clientIds)) {
return;
}
jdbcRegistryDataMapper.deleteByClientIds(clientIds, DataType.EPHEMERAL.name());
}

public void deleteEphemeralDateWhichClientIdIsNotIn(List<Long> clientIds) {
if (CollectionUtils.isEmpty(clientIds)) {
return;
}
jdbcRegistryDataMapper.deleteByClientIds(clientIds, DataType.EPHEMERAL.name());
}

public void deleteByKey(String key) {
jdbcRegistryDataMapper.deleteByKey(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.collections4.CollectionUtils;

import java.util.List;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
Expand All @@ -36,6 +37,13 @@ public class JdbcRegistryLockRepository {
@Autowired
private JdbcRegistryLockMapper jdbcRegistryLockMapper;

public List<JdbcRegistryLockDTO> queryAll() {
return jdbcRegistryLockMapper.selectList(null)
.stream()
.map(JdbcRegistryLockDTO::fromJdbcRegistryLock)
.collect(Collectors.toList());
}

public void deleteByClientIds(List<Long> clientIds) {
if (CollectionUtils.isEmpty(clientIds)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public interface IJdbcRegistryDataManager {

boolean existKey(String key);

/**
* Get all the {@link JdbcRegistryDataDTO}.
*/
List<JdbcRegistryDataDTO> getAllJdbcRegistryData();

/**
* Get the {@link JdbcRegistryDataDTO} by key.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public boolean existKey(String key) {
return jdbcRegistryDataRepository.selectByKey(key).isPresent();
}

@Override
public List<JdbcRegistryDataDTO> getAllJdbcRegistryData() {
return jdbcRegistryDataRepository.selectAll();
}

@Override
public Optional<JdbcRegistryDataDTO> getRegistryDataByKey(String key) {
checkNotNull(key);
Expand Down Expand Up @@ -212,7 +217,7 @@ public void deleteJdbcRegistryDataByKey(String key) {
return;
}
jdbcRegistryDataRepository.deleteByKey(key);
JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder()
final JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder()
.jdbcRegistryData(jdbcRegistryDataOptional.get())
.eventType(JdbcRegistryDataChanceEventDTO.EventType.DELETE)
.createTime(new Date())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import org.apache.dolphinscheduler.registry.api.RegistryException;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -100,25 +102,22 @@ public void start() {
// The server is already started or stopped, will not start again.
return;
}
// Purge the previous client to avoid the client is still in the registry.
purgePreviousJdbcRegistryClient();
// Start the Purge thread
// The Purge thread will remove the client from the registry, and remove it's related data and lock.
// Connect to the database, load the data and lock.
purgeDeadJdbcRegistryClient();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor()
.scheduleWithFixedDelay(this::purgeDeadJdbcRegistryClient,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
// The Purge thread will clear the invalidated data
purgeInvalidJdbcRegistryMetadata();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::purgeInvalidJdbcRegistryMetadata,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
jdbcRegistryDataManager.start();
jdbcRegistryServerState = JdbcRegistryServerState.STARTED;
doTriggerOnConnectedListener();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor()
.scheduleWithFixedDelay(this::refreshClientsHeartbeat,
0,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::refreshClientsHeartbeat,
0,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
}

@SneakyThrows
Expand All @@ -139,9 +138,8 @@ public void registerClient(IJdbcRegistryClient jdbcRegistryClient) {
.lastHeartbeatTime(System.currentTimeMillis())
.build();

while (jdbcRegistryClientDTOMap.containsKey(jdbcRegistryClientIdentify)) {
log.warn("The client {} is already exist the registry.", jdbcRegistryClientIdentify.getClientId());
Thread.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis());
if (jdbcRegistryClientDTOMap.containsKey(jdbcRegistryClientIdentify)) {
throw new IllegalArgumentException("The client is already registered: " + jdbcRegistryClientIdentify);
}
jdbcRegistryClientRepository.insert(registryClientDTO);
jdbcRegistryClients.add(jdbcRegistryClient);
Expand Down Expand Up @@ -260,34 +258,47 @@ public void close() {
jdbcRegistryClientDTOMap.clear();
}

private void purgePreviousJdbcRegistryClient() {
private void purgeInvalidJdbcRegistryMetadata() {
final StopWatch stopWatch = StopWatch.createStarted();
if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) {
return;
}
List<Long> previousJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll()
.stream()
.filter(jdbcRegistryClientHeartbeat -> jdbcRegistryClientHeartbeat.getClientName()
.equals(jdbcRegistryProperties.getJdbcRegistryClientName()))
.map(JdbcRegistryClientHeartbeatDTO::getId)
.collect(Collectors.toList());
doPurgeJdbcRegistryClientInDB(previousJdbcRegistryClientIds);

}

private void purgeDeadJdbcRegistryClient() {
if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) {
return;
}
List<Long> deadJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll()
// remove the client which is already dead from the registry, and remove it's related data and lock.
final List<JdbcRegistryClientHeartbeatDTO> jdbcRegistryClients = jdbcRegistryClientRepository.queryAll();
final List<Long> deadJdbcRegistryClientIds = jdbcRegistryClients
.stream()
.filter(JdbcRegistryClientHeartbeatDTO::isDead)
.map(JdbcRegistryClientHeartbeatDTO::getId)
.collect(Collectors.toList());
doPurgeJdbcRegistryClientInDB(deadJdbcRegistryClientIds);

// remove the data and lock which client is not exist.
final Set<Long> existJdbcRegistryClientIds = jdbcRegistryClients
.stream()
.map(JdbcRegistryClientHeartbeatDTO::getId)
.collect(Collectors.toSet());
jdbcRegistryDataManager.getAllJdbcRegistryData()
.stream()
.filter(jdbcRegistryDataDTO -> !existJdbcRegistryClientIds.contains(jdbcRegistryDataDTO.getClientId()))
.filter(jdbcRegistryDataDTO -> DataType.EPHEMERAL.name().equals(jdbcRegistryDataDTO.getDataType()))
.forEach(jdbcRegistryData -> {
log.info("Remove the JdbcRegistryData: {} which client is not exist in the registry",
jdbcRegistryData);
jdbcRegistryDataManager.deleteJdbcRegistryDataByKey(jdbcRegistryData.getDataKey());
});
jdbcRegistryLockRepository.queryAll()
.stream()
.filter(jdbcRegistryLockDTO -> !existJdbcRegistryClientIds.contains(jdbcRegistryLockDTO.getClientId()))
.forEach(jdbcRegistryLockDTO -> {
log.info("Remove the JdbcRegistryLock: {} which client is not exist in the registry",
jdbcRegistryLockDTO);
jdbcRegistryLockRepository.deleteById(jdbcRegistryLockDTO.getId());
});
stopWatch.stop();
log.debug("Success purge invalid jdbcRegistryMetadata, cost: {} ms", stopWatch.getTime());
}

private void doPurgeJdbcRegistryClientInDB(List<Long> jdbcRegistryClientIds) {
private void doPurgeJdbcRegistryClientInDB(final List<Long> jdbcRegistryClientIds) {
if (CollectionUtils.isEmpty(jdbcRegistryClientIds)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,4 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
username: root
password: root
password: root@123

0 comments on commit 17f21c9

Please sign in to comment.