Skip to content

Commit

Permalink
Add StaticDataSourceRule (#30376)
Browse files Browse the repository at this point in the history
* Refactor ReadwriteSplittingDataSourceMapperRuleTest

* Add StaticDataSourceRule
  • Loading branch information
terrymanu authored Mar 2, 2024
1 parent eb4fdd7 commit 71de1b2
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,30 @@

package org.apache.shardingsphere.readwritesplitting.rule;

import com.google.common.base.Preconditions;
import lombok.Getter;
import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.algorithm.load.balancer.core.LoadBalanceAlgorithm;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.expr.core.InlineExpressionParserFactory;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.StorageConnectorReusableRule;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.DataSourceMapperContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.DataSourceMapperRule;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.StaticDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
import org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableConstants;
import org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableItemConstants;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceChangedEvent;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceDeletedEvent;
import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
import org.apache.shardingsphere.readwritesplitting.exception.rule.InvalidInlineExpressionDataSourceNameException;
import org.apache.shardingsphere.readwritesplitting.group.type.StaticReadwriteSplittingGroup;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -57,8 +49,6 @@
*/
public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceMapperContainedRule, StaticDataSourceContainedRule, ExportableRule, StorageConnectorReusableRule {

private final String databaseName;

@Getter
private final ReadwriteSplittingRuleConfiguration configuration;

Expand All @@ -67,18 +57,18 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceMap
@Getter
private final Map<String, ReadwriteSplittingDataSourceRule> dataSourceRules;

private final InstanceContext instanceContext;

@Getter
private final DataSourceMapperRule dataSourceMapperRule;

@Getter
private final ReadwriteSplittingStaticDataSourceRule staticDataSourceRule;

public ReadwriteSplittingRule(final String databaseName, final ReadwriteSplittingRuleConfiguration ruleConfig, final InstanceContext instanceContext) {
this.databaseName = databaseName;
configuration = ruleConfig;
this.instanceContext = instanceContext;
loadBalancers = createLoadBalancers(ruleConfig);
dataSourceRules = createDataSourceRules(ruleConfig);
dataSourceMapperRule = new ReadwriteSplittingDataSourceMapperRule(dataSourceRules.values());
staticDataSourceRule = new ReadwriteSplittingStaticDataSourceRule(databaseName, dataSourceRules, instanceContext);
}

private Map<String, LoadBalanceAlgorithm> createLoadBalancers(final ReadwriteSplittingRuleConfiguration ruleConfig) {
Expand Down Expand Up @@ -151,46 +141,6 @@ public Optional<ReadwriteSplittingDataSourceRule> findDataSourceRule(final Strin
return Optional.ofNullable(dataSourceRules.get(dataSourceName));
}

@Override
public Map<String, Collection<String>> getDataSourceMapper() {
Map<String, Collection<String>> result = new HashMap<>();
for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
result.put(entry.getValue().getName(), entry.getValue().getReadwriteSplittingGroup().getAllDataSources());
}
return result;
}

@Override
public void updateStatus(final DataSourceStatusChangedEvent event) {
StorageNodeDataSourceChangedEvent dataSourceEvent = (StorageNodeDataSourceChangedEvent) event;
QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase();
ReadwriteSplittingDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName());
Preconditions.checkNotNull(dataSourceRule, "Can not find readwrite-splitting data source rule in database `%s`", qualifiedDatabase.getDatabaseName());
if (DataSourceState.DISABLED == dataSourceEvent.getDataSource().getStatus()) {
dataSourceRule.disableDataSource(dataSourceEvent.getQualifiedDatabase().getDataSourceName());
} else {
dataSourceRule.enableDataSource(dataSourceEvent.getQualifiedDatabase().getDataSourceName());
}
}

@Override
public void cleanStorageNodeDataSource(final String groupName) {
Preconditions.checkNotNull(dataSourceRules.get(groupName), String.format("`%s` group name not exist in database `%s`", groupName, databaseName));
deleteStorageNodeDataSources(dataSourceRules.get(groupName));
}

private void deleteStorageNodeDataSources(final ReadwriteSplittingDataSourceRule rule) {
rule.getReadwriteSplittingGroup().getReadDataSources()
.forEach(each -> instanceContext.getEventBusContext().post(new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, rule.getName(), each))));
}

@Override
public void cleanStorageNodeDataSources() {
for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
deleteStorageNodeDataSources(entry.getValue());
}
}

@Override
public Map<String, Object> getExportData() {
Map<String, Object> result = new HashMap<>(2, 1F);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.readwritesplitting.rule;

import com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.StaticDataSourceRule;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceChangedEvent;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceDeletedEvent;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

@RequiredArgsConstructor
public final class ReadwriteSplittingStaticDataSourceRule implements StaticDataSourceRule {

private final String databaseName;

private final Map<String, ReadwriteSplittingDataSourceRule> dataSourceRules;

private final InstanceContext instanceContext;

@Override
public Map<String, Collection<String>> getDataSourceMapper() {
Map<String, Collection<String>> result = new HashMap<>();
for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
result.put(entry.getValue().getName(), entry.getValue().getReadwriteSplittingGroup().getAllDataSources());
}
return result;
}

@Override
public void updateStatus(final DataSourceStatusChangedEvent event) {
StorageNodeDataSourceChangedEvent dataSourceEvent = (StorageNodeDataSourceChangedEvent) event;
QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase();
ReadwriteSplittingDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName());
Preconditions.checkNotNull(dataSourceRule, "Can not find readwrite-splitting data source rule in database `%s`", qualifiedDatabase.getDatabaseName());
if (DataSourceState.DISABLED == dataSourceEvent.getDataSource().getStatus()) {
dataSourceRule.disableDataSource(dataSourceEvent.getQualifiedDatabase().getDataSourceName());
} else {
dataSourceRule.enableDataSource(dataSourceEvent.getQualifiedDatabase().getDataSourceName());
}
}

@Override
public void cleanStorageNodeDataSource(final String groupName) {
Preconditions.checkNotNull(dataSourceRules.get(groupName), String.format("`%s` group name not exist in database `%s`", groupName, databaseName));
deleteStorageNodeDataSources(dataSourceRules.get(groupName));
}

private void deleteStorageNodeDataSources(final ReadwriteSplittingDataSourceRule rule) {
rule.getReadwriteSplittingGroup().getReadDataSources()
.forEach(each -> instanceContext.getEventBusContext().post(new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, rule.getName(), each))));
}

@Override
public void cleanStorageNodeDataSources() {
for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
deleteStorageNodeDataSources(entry.getValue());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,26 @@ private void assertDataSourceRule(final ReadwriteSplittingDataSourceRule actual)
@Test
void assertUpdateRuleStatusWithNotExistDataSource() {
ReadwriteSplittingRule readwriteSplittingRule = createReadwriteSplittingRule();
readwriteSplittingRule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds"),
readwriteSplittingRule.getStaticDataSourceRule().updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds"),
new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.singleton("read_ds")));
}

@Test
void assertUpdateRuleStatus() {
ReadwriteSplittingRule readwriteSplittingRule = createReadwriteSplittingRule();
readwriteSplittingRule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"),
readwriteSplittingRule.getStaticDataSourceRule().updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"),
new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.singleton("read_ds_0")));
}

@Test
void assertUpdateRuleStatusWithEnable() {
ReadwriteSplittingRule readwriteSplittingRule = createReadwriteSplittingRule();
readwriteSplittingRule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"),
readwriteSplittingRule.getStaticDataSourceRule().updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"),
new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.singleton("read_ds_0")));
readwriteSplittingRule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"),
readwriteSplittingRule.getStaticDataSourceRule().updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"),
new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.ENABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.emptySet()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.rule.identifier.type.datanode.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.DataSourceMapperContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.StaticDataSourceContainedRule;
import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
import org.apache.shardingsphere.readwritesplitting.distsql.statement.DropReadwriteSplittingRuleStatement;
Expand Down Expand Up @@ -139,7 +139,8 @@ public boolean hasAnyOneToBeDropped(final DropReadwriteSplittingRuleStatement sq

@Override
public void operate(final DropReadwriteSplittingRuleStatement sqlStatement, final ShardingSphereDatabase database) {
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class).ifPresent(optional -> sqlStatement.getNames().forEach(optional::cleanStorageNodeDataSource));
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)
.ifPresent(optional -> sqlStatement.getNames().forEach(groupName -> optional.getStaticDataSourceRule().cleanStorageNodeDataSource(groupName)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.StaticDataSourceContainedRule;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -111,7 +111,7 @@ private void cleanResources(final ShardingSphereDatabase database) {
String databaseName = database.getName();
globalRuleMetaData.findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
database.getRuleMetaData().findRules(StaticDataSourceContainedRule.class).forEach(StaticDataSourceContainedRule::cleanStorageNodeDataSources);
database.getRuleMetaData().findRules(StaticDataSourceContainedRule.class).forEach(each -> each.getStaticDataSourceRule().cleanStorageNodeDataSources());
Optional.ofNullable(database.getResourceMetaData())
.ifPresent(optional -> optional.getStorageUnits().values().forEach(each -> new DataSourcePoolDestroyer(each.getDataSource()).asyncDestroy()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.rule.identifier.type.datasource;

import org.apache.shardingsphere.infra.rule.ShardingSphereRule;

/**
* Static data source contained rule.
*/
public interface StaticDataSourceContainedRule extends ShardingSphereRule {

/**
* Get static data source rule.
*
* @return static data source rule
*/
StaticDataSourceRule getStaticDataSourceRule();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
* limitations under the License.
*/

package org.apache.shardingsphere.infra.rule.identifier.type;
package org.apache.shardingsphere.infra.rule.identifier.type.datasource;

import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;

import java.util.Collection;
import java.util.Map;

/**
* Static data source contained rule.
* Static data source rule.
*/
public interface StaticDataSourceContainedRule extends ShardingSphereRule {
public interface StaticDataSourceRule {

/**
* Get data source mapper.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.shardingsphere.infra.instance.mode.ModeContextManager;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.datasource.StaticDataSourceContainedRule;
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.NewYamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;

Expand All @@ -54,8 +54,10 @@ public Collection<MetaDataVersion> operate(final DatabaseRuleDefinitionStatement
ModeContextManager modeContextManager = contextManager.getInstanceContext().getModeContextManager();
RuleConfiguration toBeDroppedRuleConfig = executor.buildToBeDroppedRuleConfiguration(sqlStatement);
if (sqlStatement instanceof StaticDataSourceContainedRuleAwareStatement) {

database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)
.ifPresent(optional -> ((StaticDataSourceContainedRuleAwareStatement) sqlStatement).getNames().forEach(optional::cleanStorageNodeDataSource));
.ifPresent(optional -> ((StaticDataSourceContainedRuleAwareStatement) sqlStatement).getNames()
.forEach(groupName -> optional.getStaticDataSourceRule().cleanStorageNodeDataSource(groupName)));
// TODO refactor to new metadata refresh way
}
modeContextManager.removeRuleConfigurationItem(database.getName(), toBeDroppedRuleConfig);
Expand Down
Loading

0 comments on commit 71de1b2

Please sign in to comment.