Skip to content

Commit

Permalink
Refactor statistics collect job (#31989)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangML authored Jul 5, 2024
1 parent 340fa42 commit e572389
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Broadcast routing engine for database instance.
*/
@RequiredArgsConstructor
public class BroadcastInstanceBroadcastRoutingEngine implements BroadcastRouteEngine {
public final class BroadcastInstanceBroadcastRoutingEngine implements BroadcastRouteEngine {

private final ResourceMetaData resourceMetaData;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public enum GlobalLockNames {

PREPARE("prepare_%s"),

GLOBAL_LOCK("global_clock");
GLOBAL_LOCK("global_clock"),

STATISTICS("statistics");

private final String lockName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.persist.pojo.AlteredShardingSphereSchemaData;

import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
* Statistics collect job.
Expand All @@ -55,107 +39,9 @@ public final class StatisticsCollectJob implements SimpleJob {

@Override
public void execute(final ShardingContext shardingContext) {
try {
if (contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED)) {
ShardingSphereStatistics statistics = contextManager.getMetaDataContexts().getStatistics();
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
ShardingSphereStatistics changedStatistics = new ShardingSphereStatistics();
statistics.getDatabaseData().forEach((key, value) -> {
if (metaData.containsDatabase(key)) {
collectForDatabase(key, value, metaData.getDatabases(), changedStatistics);
}
});
compareUpdateAndSendEvent(statistics, changedStatistics, metaData.getDatabases());
}
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("Collect data error", ex);
}
}

private void collectForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData,
final Map<String, ShardingSphereDatabase> databases, final ShardingSphereStatistics statistics) {
databaseData.getSchemaData().forEach((key, value) -> {
if (databases.get(databaseName.toLowerCase()).containsSchema(key)) {
collectForSchema(databaseName, key, value, databases, statistics);
}
});
}

private void collectForSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData,
final Map<String, ShardingSphereDatabase> databases, final ShardingSphereStatistics statistics) {
schemaData.getTableData().forEach((key, value) -> {
if (databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key)) {
collectForTable(databaseName, schemaName, databases.get(databaseName).getSchema(schemaName).getTable(key), databases, statistics);
}
});
}

private void collectForTable(final String databaseName, final String schemaName, final ShardingSphereTable table,
final Map<String, ShardingSphereDatabase> databases, final ShardingSphereStatistics statistics) {
Optional<ShardingSphereStatisticsCollector> dataCollector = TypedSPILoader.findService(ShardingSphereStatisticsCollector.class, table.getName());
if (!dataCollector.isPresent()) {
return;
}
Optional<ShardingSphereTableData> tableData = Optional.empty();
try {
tableData = dataCollector.get().collect(databaseName, table, databases, contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData());
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error(String.format("Collect %s.%s.%s data failed", databaseName, schemaName, table.getName()), ex);
}
tableData.ifPresent(optional -> statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> new ShardingSphereDatabaseData())
.getSchemaData().computeIfAbsent(schemaName, key -> new ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), optional));
}

private void compareUpdateAndSendEvent(final ShardingSphereStatistics statistics, final ShardingSphereStatistics changedStatistics,
final Map<String, ShardingSphereDatabase> databases) {
changedStatistics.getDatabaseData().forEach((key, value) -> compareUpdateAndSendEventForDatabase(key, statistics.getDatabaseData().get(key), value, statistics,
databases.get(key.toLowerCase())));
}

private void compareUpdateAndSendEventForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData, final ShardingSphereDatabaseData changedDatabaseData,
final ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
changedDatabaseData.getSchemaData().forEach((key, value) -> compareUpdateAndSendEventForSchema(databaseName, key, databaseData.getSchemaData().get(key), value, statistics,
database.getSchema(key)));
}

private void compareUpdateAndSendEventForSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData,
final ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics statistics, final ShardingSphereSchema schema) {
changedSchemaData.getTableData().forEach((key, value) -> compareUpdateAndSendEventForTable(databaseName, schemaName, schemaData.getTableData().get(key), value, statistics,
schema.getTable(key)));
}

private void compareUpdateAndSendEventForTable(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
final ShardingSphereTableData changedTableData, final ShardingSphereStatistics statistics, final ShardingSphereTable table) {
if (tableData.equals(changedTableData)) {
return;
}
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(), changedTableData);
AlteredShardingSphereSchemaData schemaDataAlteredPOJO = getShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName, tableData, changedTableData, table);
contextManager.getPersistServiceFacade().persist(schemaDataAlteredPOJO);
}

private AlteredShardingSphereSchemaData getShardingSphereSchemaDataAlteredPOJO(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
final ShardingSphereTableData changedTableData, final ShardingSphereTable table) {
AlteredShardingSphereSchemaData result = new AlteredShardingSphereSchemaData(databaseName, schemaName, tableData.getName());
Map<String, ShardingSphereRowData> tableDataMap = tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
Map<String, ShardingSphereRowData> changedTableDataMap = changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumnValues()));
for (Entry<String, ShardingSphereRowData> entry : changedTableDataMap.entrySet()) {
if (!tableDataMap.containsKey(entry.getKey())) {
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
} else if (!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
}
}
for (Entry<String, ShardingSphereRowData> entry : tableDataMap.entrySet()) {
if (!changedTableDataMap.containsKey(entry.getKey())) {
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
}
PersistRepository repository = contextManager.getPersistServiceFacade().getRepository();
if (repository instanceof ClusterPersistRepository) {
new ShardingSphereStatisticsRefreshEngine(contextManager, new GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository) repository))).refresh();
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -69,11 +70,32 @@ private static void start(final ContextManager contextManager) {
private static CoordinatorRegistryCenter createRegistryCenter(final ModeConfiguration modeConfig) {
ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
String namespace = String.join("/", repositoryConfig.getNamespace(), ShardingSphereDataNode.getJobPath());
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(new ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace));
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(getZookeeperConfiguration(repositoryConfig, namespace));
result.init();
return result;
}

private static ZookeeperConfiguration getZookeeperConfiguration(final ClusterPersistRepositoryConfiguration repositoryConfig, final String namespace) {
// TODO Merge registry center code in ElasticJob and ShardingSphere mode; Use SPI to load impl
ZookeeperConfiguration result = new ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace);
Properties props = repositoryConfig.getProps();
int retryIntervalMilliseconds = props.containsKey("retryIntervalMilliseconds") ? (int) props.get("retryIntervalMilliseconds") : 500;
int maxRetries = props.containsKey("maxRetries") ? (int) props.get("maxRetries") : 3;
result.setBaseSleepTimeMilliseconds(retryIntervalMilliseconds);
result.setMaxRetries(maxRetries);
result.setMaxSleepTimeMilliseconds(retryIntervalMilliseconds * maxRetries);
int timeToLiveSeconds = props.containsKey("timeToLiveSeconds") ? (int) props.get("timeToLiveSeconds") : 60;
if (0 != timeToLiveSeconds) {
result.setSessionTimeoutMilliseconds(timeToLiveSeconds * 1000);
}
int operationTimeoutMilliseconds = props.containsKey("operationTimeoutMilliseconds") ? (int) props.get("operationTimeoutMilliseconds") : 500;
if (0 != operationTimeoutMilliseconds) {
result.setConnectionTimeoutMilliseconds(operationTimeoutMilliseconds);
}
result.setDigest(props.getProperty("digest"));
return result;
}

private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder(JOB_NAME, 1).cron(CRON_EXPRESSION).overwrite(true).build();
}
Expand Down
Loading

0 comments on commit e572389

Please sign in to comment.