Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compat][common][controller] Add store and version configs for target region swap and wait time #1340

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,9 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
p -> params.setNearlineProducerCompressionEnabled(p),
argSet);
integerParam(cmd, Arg.NEARLINE_PRODUCER_COUNT_PER_WRITER, p -> params.setNearlineProducerCountPerWriter(p), argSet);
genericParam(cmd, Arg.TARGET_SWAP_REGION, s -> s, p -> params.setTargetRegionSwap(p), argSet);
integerParam(cmd, Arg.TARGET_SWAP_REGION_WAIT_TIME, p -> params.setTargetRegionSwapWaitTime(p), argSet);
booleanParam(cmd, Arg.DAVINCI_HEARTBEAT_REPORTED, p -> params.setIsDavinciHeartbeatReported(p), argSet);

/**
* {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,15 @@ public enum Arg {
"How many producers will be used to write nearline workload in Server"
), INSTANCES("instances", "in", true, "Input list of helix ids of nodes to check if they can removed or not"),
TO_BE_STOPPED_NODES("to-be-stopped-nodes", "tbsn", true, "List of helix ids of nodes assumed to be stopped"),
LAG_FILTER_ENABLED("lag-filter-enabled", "lfe", true, "Enable heartbeat lag filter for a heartbeat request");
LAG_FILTER_ENABLED("lag-filter-enabled", "lfe", true, "Enable heartbeat lag filter for a heartbeat request"),
TARGET_SWAP_REGION("target-region-swap", "trs", true, "Region to swap current version during target colo push"),
TARGET_SWAP_REGION_WAIT_TIME(
"target-region-swap-wait-time", "trswt", true,
"How long to wait in minutes before swapping to the new version in a target colo push"
),
DAVINCI_HEARTBEAT_REPORTED(
"dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats"
);

private final String argName;
private final String first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.linkedin.venice.Arg.CLUSTER_SRC;
import static com.linkedin.venice.Arg.COMPRESSION_STRATEGY;
import static com.linkedin.venice.Arg.DATETIME;
import static com.linkedin.venice.Arg.DAVINCI_HEARTBEAT_REPORTED;
import static com.linkedin.venice.Arg.DEBUG;
import static com.linkedin.venice.Arg.DERIVED_SCHEMA;
import static com.linkedin.venice.Arg.DERIVED_SCHEMA_ID;
Expand Down Expand Up @@ -123,6 +124,8 @@
import static com.linkedin.venice.Arg.STORE_TYPE;
import static com.linkedin.venice.Arg.STORE_VIEW_CONFIGS;
import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES;
import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.Arg.URL;
Expand Down Expand Up @@ -274,7 +277,8 @@ public enum Command {
STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS,
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED,
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER }
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION,
TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ public static void recover(
.setMaxCompactionLagSeconds(deletedStore.getMaxCompactionLagSeconds())
.setMaxRecordSizeBytes(deletedStore.getMaxRecordSizeBytes())
.setMaxNearlineRecordSizeBytes(deletedStore.getMaxNearlineRecordSizeBytes())
.setBlobTransferEnabled(deletedStore.isBlobTransferEnabled());
.setBlobTransferEnabled(deletedStore.isBlobTransferEnabled())
.setTargetRegionSwap(deletedStore.getTargetSwapRegion())
.setTargetRegionSwapWaitTime(deletedStore.getTargetSwapRegionWaitTime())
.setIsDavinciHeartbeatReported(deletedStore.getIsDavinciHeartbeatReported());
System.out.println(
"Updating store: " + storeName + " in cluster: " + recoverCluster + " with params: "
+ updateParams.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void testAdminToolRequiresSSLConfigFile() {
public void testAdminUpdateStoreArg() throws ParseException, IOException {
final String K1 = "k1", V1 = "v1", K2 = "k2", V2 = "v2", K3 = "k3", V3 = "v3";
String[] args = { "--update-store", "--url", "http://localhost:7036", "--cluster", "test-cluster", "--store",
"testStore", "--rmd-chunking-enabled", "true", "--blob-transfer-enabled", "true", "--partitioner-params",
"testStore", "--rmd-chunking-enabled", "true", "--blob-transfer-enabled", "true", "--target-region-swap",
"prod", "--target-region-swap-wait-time", "100", "--partitioner-params",
"{\"" + K1 + "\":\"" + V1 + "\",\"" + K2 + "\":\"" + V2 + "\",\"" + K3 + "\":\"" + V3 + "\"}" };

CommandLine commandLine = AdminTool.getCommandLine(args);
Expand All @@ -83,6 +84,10 @@ public void testAdminUpdateStoreArg() throws ParseException, IOException {
Assert.assertTrue(params.getRmdChunkingEnabled().get());
Assert.assertTrue(params.getBlobTransferEnabled().isPresent());
Assert.assertTrue(params.getBlobTransferEnabled().get());
Assert.assertTrue(params.getTargetSwapRegion().isPresent());
Assert.assertEquals(params.getTargetSwapRegion().get(), "prod");
Assert.assertTrue(params.getTargetRegionSwapWaitTime().isPresent());
Assert.assertEquals(params.getTargetRegionSwapWaitTime(), Optional.of(100));
Optional<Map<String, String>> partitionerParams = params.getPartitionerParams();
Assert.assertTrue(partitionerParams.isPresent());
Map<String, String> partitionerParamsMap = partitionerParams.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,7 @@ public class ControllerApiConstants {

public static final String NEARLINE_PRODUCER_COMPRESSION_ENABLED = "nearline_producer_compression_enabled";
public static final String NEARLINE_PRODUCER_COUNT_PER_WRITER = "nearline_producer_count_per_writer";
public static final String TARGET_SWAP_REGION = "target_swap_region";
public static final String TARGET_SWAP_REGION_WAIT_TIME = "target_swap_region_wait_time";
public static final String IS_DAVINCI_HEARTBEAT_REPORTED = "is_davinci_heartbeat_reported";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_OVERHEAD_BYPASS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_DAVINCI_HEARTBEAT_REPORTED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LARGEST_USED_VERSION_NUMBER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LATEST_SUPERSET_SCHEMA_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_COMPACTION_LAG_SECONDS;
Expand Down Expand Up @@ -61,6 +62,8 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_CLASS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_PARAMS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST;
Expand Down Expand Up @@ -141,6 +144,8 @@ public UpdateStoreQueryParams(StoreInfo srcStore, boolean storeMigrating) {
.setBlobTransferEnabled(srcStore.isBlobTransferEnabled())
.setMaxRecordSizeBytes(srcStore.getMaxRecordSizeBytes())
.setMaxNearlineRecordSizeBytes(srcStore.getMaxNearlineRecordSizeBytes())
.setTargetRegionSwap(srcStore.getTargetRegionSwap())
.setTargetRegionSwapWaitTime(srcStore.getTargetRegionSwapWaitTime())
// TODO: This needs probably some refinement, but since we only support one kind of view type today, this is
// still easy to parse
.setStoreViews(
Expand Down Expand Up @@ -738,6 +743,30 @@ public Optional<Integer> getNearlineProducerCountPerWriter() {
return getInteger(NEARLINE_PRODUCER_COUNT_PER_WRITER);
}

public UpdateStoreQueryParams setTargetRegionSwap(String targetRegion) {
return putString(TARGET_SWAP_REGION, targetRegion);
}

public Optional<String> getTargetSwapRegion() {
return getString(TARGET_SWAP_REGION);
}

public UpdateStoreQueryParams setTargetRegionSwapWaitTime(int waitTime) {
return putInteger(TARGET_SWAP_REGION_WAIT_TIME, waitTime);
}

public Optional<Integer> getTargetRegionSwapWaitTime() {
return getInteger(TARGET_SWAP_REGION_WAIT_TIME);
}

public UpdateStoreQueryParams setIsDavinciHeartbeatReported(boolean isReported) {
return putBoolean(IS_DAVINCI_HEARTBEAT_REPORTED, isReported);
}

public Optional<Boolean> getIsDavinciHeartbeatReported() {
return getBoolean(IS_DAVINCI_HEARTBEAT_REPORTED);
}

// ***************** above this line are getters and setters *****************
private UpdateStoreQueryParams putInteger(String name, int value) {
return (UpdateStoreQueryParams) add(name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl

version.setUseVersionLevelIncrementalPushEnabled(true);

version.setTargetSwapRegion(getTargetSwapRegion());

version.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime());

version.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported());

HybridStoreConfig hybridStoreConfig = getHybridStoreConfig();
if (hybridStoreConfig != null) {
version.setHybridStoreConfig(hybridStoreConfig.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,36 @@ public void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryV
throw new UnsupportedOperationException();
}

@Override
public String getTargetSwapRegion() {
return delegate.getTargetSwapRegion();
}

@Override
public int getTargetSwapRegionWaitTime() {
return delegate.getTargetSwapRegionWaitTime();
}

@Override
public void setTargetSwapRegion(String targetRegion) {
throw new UnsupportedOperationException();
}

@Override
public void setTargetSwapRegionWaitTime(int waitTime) {
throw new UnsupportedOperationException();
}

@Override
public void setIsDavinciHeartbeatReported(boolean isReported) {
throw new UnsupportedOperationException();
}

@Override
public boolean getIsDavinciHeartbeatReported() {
return delegate.getIsDavinciHeartbeatReported();
}

@Override
public void setRepushSourceVersion(int version) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -1437,6 +1467,36 @@ public void setNearlineProducerCountPerWriter(int producerCnt) {
throw new UnsupportedOperationException();
}

@Override
public String getTargetSwapRegion() {
return delegate.getTargetSwapRegion();
}

@Override
public int getTargetSwapRegionWaitTime() {
return delegate.getTargetSwapRegionWaitTime();
}

@Override
public void setTargetSwapRegion(String targetRegion) {
throw new UnsupportedOperationException();
}

@Override
public void setTargetSwapRegionWaitTime(int waitTime) {
throw new UnsupportedOperationException();
}

@Override
public void setIsDavinciHeartbeatReported(boolean isReported) {
throw new UnsupportedOperationException();
}

@Override
public boolean getIsDavinciHeartbeatReported() {
return delegate.getIsDavinciHeartbeatReported();
}

@Override
public String toString() {
return this.delegate.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,16 @@ static boolean isSystemStore(String storeName) {
int getNearlineProducerCountPerWriter();

void setNearlineProducerCountPerWriter(int producerCnt);

String getTargetSwapRegion();

int getTargetSwapRegionWaitTime();

void setTargetSwapRegion(String targetRegion);

void setTargetSwapRegionWaitTime(int waitTime);

void setIsDavinciHeartbeatReported(boolean isReported);

boolean getIsDavinciHeartbeatReported();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public static StoreInfo fromStore(Store store) {
storeInfo.setBlobTransferEnabled(store.isBlobTransferEnabled());
storeInfo.setNearlineProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled());
storeInfo.setNearlineProducerCountPerWriter(store.getNearlineProducerCountPerWriter());
storeInfo.setTargetRegionSwap(store.getTargetSwapRegion());
storeInfo.setTargetRegionSwapWaitTime(store.getTargetSwapRegionWaitTime());
storeInfo.setIsDavinciHeartbeatReported(store.getIsDavinciHeartbeatReported());
return storeInfo;
}

Expand Down Expand Up @@ -326,6 +329,9 @@ public static StoreInfo fromStore(Store store) {

private boolean nearlineProducerCompressionEnabled;
private int nearlineProducerCountPerWriter;
private String targetRegionSwap;
private int targetRegionSwapWaitTime;
private boolean isDavinciHeartbeatReported;

public StoreInfo() {
}
Expand Down Expand Up @@ -831,4 +837,28 @@ public int getNearlineProducerCountPerWriter() {
public void setNearlineProducerCountPerWriter(int nearlineProducerCountPerWriter) {
this.nearlineProducerCountPerWriter = nearlineProducerCountPerWriter;
}

public String getTargetRegionSwap() {
return this.targetRegionSwap;
}

public void setTargetRegionSwap(String targetRegion) {
this.targetRegionSwap = targetRegion;
}

public int getTargetRegionSwapWaitTime() {
return this.targetRegionSwapWaitTime;
}

public void setTargetRegionSwapWaitTime(int waitTime) {
this.targetRegionSwapWaitTime = waitTime;
}

public void setIsDavinciHeartbeatReported(boolean isReported) {
this.isDavinciHeartbeatReported = isReported;
}

public boolean getIsDavinciHeartbeatReported() {
return this.isDavinciHeartbeatReported;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,36 @@ public void setNearlineProducerCountPerWriter(int producerCnt) {
throwUnsupportedOperationException("setNearlineProducerCountPerWriter");
}

@Override
public int getTargetSwapRegionWaitTime() {
return zkSharedStore.getTargetSwapRegionWaitTime();
}

@Override
public String getTargetSwapRegion() {
return zkSharedStore.getTargetSwapRegion();
}

@Override
public void setTargetSwapRegion(String targetRegion) {
throw new UnsupportedOperationException();
}

@Override
public void setTargetSwapRegionWaitTime(int waitTime) {
throw new UnsupportedOperationException();
}

@Override
public void setIsDavinciHeartbeatReported(boolean isReported) {
throw new UnsupportedOperationException();
}

@Override
public boolean getIsDavinciHeartbeatReported() {
return zkSharedStore.getIsDavinciHeartbeatReported();
}

@Override
public Store cloneStore() {
return new SystemStore(zkSharedStore.cloneStore(), systemStoreType, veniceStore.cloneStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryVersionConfig);

void setTargetSwapRegion(String targetRegion);

String getTargetSwapRegion();

void setTargetSwapRegionWaitTime(int waitTime);

int getTargetSwapRegionWaitTime();

void setIsDavinciHeartbeatReported(boolean isReported);

boolean getIsDavinciHeartbeatReported();

/**
* Get the replication metadata version id.
* @deprecated
Expand Down
Loading
Loading