diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index ca660f11b1..947036000a 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -1167,6 +1167,9 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) { p -> params.setNearlineProducerCompressionEnabled(p), argSet); integerParam(cmd, Arg.NEARLINE_PRODUCER_COUNT_PER_WRITER, p -> params.setNearlineProducerCountPerWriter(p), argSet); + genericParam(cmd, Arg.TARGET_SWAP_REGION, s -> s, p -> params.setTargetRegionSwap(p), argSet); + integerParam(cmd, Arg.TARGET_SWAP_REGION_WAIT_TIME, p -> params.setTargetRegionSwapWaitTime(p), argSet); + booleanParam(cmd, Arg.DAVINCI_HEARTBEAT_REPORTED, p -> params.setIsDavinciHeartbeatReported(p), argSet); /** * {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true. diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index b2acfc8620..cda3837b6b 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -286,7 +286,15 @@ public enum Arg { "How many producers will be used to write nearline workload in Server" ), INSTANCES("instances", "in", true, "Input list of helix ids of nodes to check if they can removed or not"), TO_BE_STOPPED_NODES("to-be-stopped-nodes", "tbsn", true, "List of helix ids of nodes assumed to be stopped"), - LAG_FILTER_ENABLED("lag-filter-enabled", "lfe", true, "Enable heartbeat lag filter for a heartbeat request"); + LAG_FILTER_ENABLED("lag-filter-enabled", "lfe", true, "Enable heartbeat lag filter for a heartbeat request"), + TARGET_SWAP_REGION("target-region-swap", "trs", true, "Region to swap current version during target colo push"), + TARGET_SWAP_REGION_WAIT_TIME( + "target-region-swap-wait-time", "trswt", true, + "How long to wait in minutes before swapping to the new version in a target colo push" + ), + DAVINCI_HEARTBEAT_REPORTED( + "dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats" + ); private final String argName; private final String first; diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 51512e6053..f91974069e 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -21,6 +21,7 @@ import static com.linkedin.venice.Arg.CLUSTER_SRC; import static com.linkedin.venice.Arg.COMPRESSION_STRATEGY; import static com.linkedin.venice.Arg.DATETIME; +import static com.linkedin.venice.Arg.DAVINCI_HEARTBEAT_REPORTED; import static com.linkedin.venice.Arg.DEBUG; import static com.linkedin.venice.Arg.DERIVED_SCHEMA; import static com.linkedin.venice.Arg.DERIVED_SCHEMA_ID; @@ -123,6 +124,8 @@ import static com.linkedin.venice.Arg.STORE_TYPE; import static com.linkedin.venice.Arg.STORE_VIEW_CONFIGS; import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE; +import static com.linkedin.venice.Arg.TARGET_SWAP_REGION; +import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME; import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES; import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED; import static com.linkedin.venice.Arg.URL; @@ -274,7 +277,8 @@ public enum Command { STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS, MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES, UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED, - NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER } + NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION, + TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED } ), UPDATE_CLUSTER_CONFIG( "update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER }, diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java index 753f604170..4d62d1c3ff 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java @@ -257,7 +257,10 @@ public static void recover( .setMaxCompactionLagSeconds(deletedStore.getMaxCompactionLagSeconds()) .setMaxRecordSizeBytes(deletedStore.getMaxRecordSizeBytes()) .setMaxNearlineRecordSizeBytes(deletedStore.getMaxNearlineRecordSizeBytes()) - .setBlobTransferEnabled(deletedStore.isBlobTransferEnabled()); + .setBlobTransferEnabled(deletedStore.isBlobTransferEnabled()) + .setTargetRegionSwap(deletedStore.getTargetSwapRegion()) + .setTargetRegionSwapWaitTime(deletedStore.getTargetSwapRegionWaitTime()) + .setIsDavinciHeartbeatReported(deletedStore.getIsDavinciHeartbeatReported()); System.out.println( "Updating store: " + storeName + " in cluster: " + recoverCluster + " with params: " + updateParams.toString()); diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index 3613f398b5..fef519f4cb 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -74,7 +74,8 @@ public void testAdminToolRequiresSSLConfigFile() { public void testAdminUpdateStoreArg() throws ParseException, IOException { final String K1 = "k1", V1 = "v1", K2 = "k2", V2 = "v2", K3 = "k3", V3 = "v3"; String[] args = { "--update-store", "--url", "http://localhost:7036", "--cluster", "test-cluster", "--store", - "testStore", "--rmd-chunking-enabled", "true", "--blob-transfer-enabled", "true", "--partitioner-params", + "testStore", "--rmd-chunking-enabled", "true", "--blob-transfer-enabled", "true", "--target-region-swap", + "prod", "--target-region-swap-wait-time", "100", "--partitioner-params", "{\"" + K1 + "\":\"" + V1 + "\",\"" + K2 + "\":\"" + V2 + "\",\"" + K3 + "\":\"" + V3 + "\"}" }; CommandLine commandLine = AdminTool.getCommandLine(args); @@ -83,6 +84,10 @@ public void testAdminUpdateStoreArg() throws ParseException, IOException { Assert.assertTrue(params.getRmdChunkingEnabled().get()); Assert.assertTrue(params.getBlobTransferEnabled().isPresent()); Assert.assertTrue(params.getBlobTransferEnabled().get()); + Assert.assertTrue(params.getTargetSwapRegion().isPresent()); + Assert.assertEquals(params.getTargetSwapRegion().get(), "prod"); + Assert.assertTrue(params.getTargetRegionSwapWaitTime().isPresent()); + Assert.assertEquals(params.getTargetRegionSwapWaitTime(), Optional.of(100)); Optional> partitionerParams = params.getPartitionerParams(); Assert.assertTrue(partitionerParams.isPresent()); Map partitionerParamsMap = partitionerParams.get(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index f2a7aec035..f36b8c013a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -242,4 +242,7 @@ public class ControllerApiConstants { public static final String NEARLINE_PRODUCER_COMPRESSION_ENABLED = "nearline_producer_compression_enabled"; public static final String NEARLINE_PRODUCER_COUNT_PER_WRITER = "nearline_producer_count_per_writer"; + public static final String TARGET_SWAP_REGION = "target_swap_region"; + public static final String TARGET_SWAP_REGION_WAIT_TIME = "target_swap_region_wait_time"; + public static final String IS_DAVINCI_HEARTBEAT_REPORTED = "is_davinci_heartbeat_reported"; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java index f5e6fd7b28..c4f3db6069 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java @@ -24,6 +24,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_OVERHEAD_BYPASS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_ENABLED; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_DAVINCI_HEARTBEAT_REPORTED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.LARGEST_USED_VERSION_NUMBER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.LATEST_SUPERSET_SCHEMA_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_COMPACTION_LAG_SECONDS; @@ -61,6 +62,8 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_CLASS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_PARAMS; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION_WAIT_TIME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST; @@ -141,6 +144,8 @@ public UpdateStoreQueryParams(StoreInfo srcStore, boolean storeMigrating) { .setBlobTransferEnabled(srcStore.isBlobTransferEnabled()) .setMaxRecordSizeBytes(srcStore.getMaxRecordSizeBytes()) .setMaxNearlineRecordSizeBytes(srcStore.getMaxNearlineRecordSizeBytes()) + .setTargetRegionSwap(srcStore.getTargetRegionSwap()) + .setTargetRegionSwapWaitTime(srcStore.getTargetRegionSwapWaitTime()) // TODO: This needs probably some refinement, but since we only support one kind of view type today, this is // still easy to parse .setStoreViews( @@ -738,6 +743,30 @@ public Optional getNearlineProducerCountPerWriter() { return getInteger(NEARLINE_PRODUCER_COUNT_PER_WRITER); } + public UpdateStoreQueryParams setTargetRegionSwap(String targetRegion) { + return putString(TARGET_SWAP_REGION, targetRegion); + } + + public Optional getTargetSwapRegion() { + return getString(TARGET_SWAP_REGION); + } + + public UpdateStoreQueryParams setTargetRegionSwapWaitTime(int waitTime) { + return putInteger(TARGET_SWAP_REGION_WAIT_TIME, waitTime); + } + + public Optional getTargetRegionSwapWaitTime() { + return getInteger(TARGET_SWAP_REGION_WAIT_TIME); + } + + public UpdateStoreQueryParams setIsDavinciHeartbeatReported(boolean isReported) { + return putBoolean(IS_DAVINCI_HEARTBEAT_REPORTED, isReported); + } + + public Optional getIsDavinciHeartbeatReported() { + return getBoolean(IS_DAVINCI_HEARTBEAT_REPORTED); + } + // ***************** above this line are getters and setters ***************** private UpdateStoreQueryParams putInteger(String name, int value) { return (UpdateStoreQueryParams) add(name, value); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java index 62c318899d..9c4e56a508 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java @@ -158,6 +158,12 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl version.setUseVersionLevelIncrementalPushEnabled(true); + version.setTargetSwapRegion(getTargetSwapRegion()); + + version.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime()); + + version.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported()); + HybridStoreConfig hybridStoreConfig = getHybridStoreConfig(); if (hybridStoreConfig != null) { version.setHybridStoreConfig(hybridStoreConfig.clone()); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 8095f71991..5271e87915 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -594,6 +594,36 @@ public void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryV throw new UnsupportedOperationException(); } + @Override + public String getTargetSwapRegion() { + return delegate.getTargetSwapRegion(); + } + + @Override + public int getTargetSwapRegionWaitTime() { + return delegate.getTargetSwapRegionWaitTime(); + } + + @Override + public void setTargetSwapRegion(String targetRegion) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTargetSwapRegionWaitTime(int waitTime) { + throw new UnsupportedOperationException(); + } + + @Override + public void setIsDavinciHeartbeatReported(boolean isReported) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getIsDavinciHeartbeatReported() { + return delegate.getIsDavinciHeartbeatReported(); + } + @Override public void setRepushSourceVersion(int version) { throw new UnsupportedOperationException(); @@ -1437,6 +1467,36 @@ public void setNearlineProducerCountPerWriter(int producerCnt) { throw new UnsupportedOperationException(); } + @Override + public String getTargetSwapRegion() { + return delegate.getTargetSwapRegion(); + } + + @Override + public int getTargetSwapRegionWaitTime() { + return delegate.getTargetSwapRegionWaitTime(); + } + + @Override + public void setTargetSwapRegion(String targetRegion) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTargetSwapRegionWaitTime(int waitTime) { + throw new UnsupportedOperationException(); + } + + @Override + public void setIsDavinciHeartbeatReported(boolean isReported) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getIsDavinciHeartbeatReported() { + return delegate.getIsDavinciHeartbeatReported(); + } + @Override public String toString() { return this.delegate.toString(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java index c8ae16bc49..8b0c5b0710 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java @@ -329,4 +329,16 @@ static boolean isSystemStore(String storeName) { int getNearlineProducerCountPerWriter(); void setNearlineProducerCountPerWriter(int producerCnt); + + String getTargetSwapRegion(); + + int getTargetSwapRegionWaitTime(); + + void setTargetSwapRegion(String targetRegion); + + void setTargetSwapRegionWaitTime(int waitTime); + + void setIsDavinciHeartbeatReported(boolean isReported); + + boolean getIsDavinciHeartbeatReported(); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java index 5a11157e17..2c27f3082a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java @@ -76,6 +76,9 @@ public static StoreInfo fromStore(Store store) { storeInfo.setBlobTransferEnabled(store.isBlobTransferEnabled()); storeInfo.setNearlineProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled()); storeInfo.setNearlineProducerCountPerWriter(store.getNearlineProducerCountPerWriter()); + storeInfo.setTargetRegionSwap(store.getTargetSwapRegion()); + storeInfo.setTargetRegionSwapWaitTime(store.getTargetSwapRegionWaitTime()); + storeInfo.setIsDavinciHeartbeatReported(store.getIsDavinciHeartbeatReported()); return storeInfo; } @@ -326,6 +329,9 @@ public static StoreInfo fromStore(Store store) { private boolean nearlineProducerCompressionEnabled; private int nearlineProducerCountPerWriter; + private String targetRegionSwap; + private int targetRegionSwapWaitTime; + private boolean isDavinciHeartbeatReported; public StoreInfo() { } @@ -831,4 +837,28 @@ public int getNearlineProducerCountPerWriter() { public void setNearlineProducerCountPerWriter(int nearlineProducerCountPerWriter) { this.nearlineProducerCountPerWriter = nearlineProducerCountPerWriter; } + + public String getTargetRegionSwap() { + return this.targetRegionSwap; + } + + public void setTargetRegionSwap(String targetRegion) { + this.targetRegionSwap = targetRegion; + } + + public int getTargetRegionSwapWaitTime() { + return this.targetRegionSwapWaitTime; + } + + public void setTargetRegionSwapWaitTime(int waitTime) { + this.targetRegionSwapWaitTime = waitTime; + } + + public void setIsDavinciHeartbeatReported(boolean isReported) { + this.isDavinciHeartbeatReported = isReported; + } + + public boolean getIsDavinciHeartbeatReported() { + return this.isDavinciHeartbeatReported; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java index 74f12fa9b8..8e2e2091a4 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java @@ -699,6 +699,36 @@ public void setNearlineProducerCountPerWriter(int producerCnt) { throwUnsupportedOperationException("setNearlineProducerCountPerWriter"); } + @Override + public int getTargetSwapRegionWaitTime() { + return zkSharedStore.getTargetSwapRegionWaitTime(); + } + + @Override + public String getTargetSwapRegion() { + return zkSharedStore.getTargetSwapRegion(); + } + + @Override + public void setTargetSwapRegion(String targetRegion) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTargetSwapRegionWaitTime(int waitTime) { + throw new UnsupportedOperationException(); + } + + @Override + public void setIsDavinciHeartbeatReported(boolean isReported) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getIsDavinciHeartbeatReported() { + return zkSharedStore.getIsDavinciHeartbeatReported(); + } + @Override public Store cloneStore() { return new SystemStore(zkSharedStore.cloneStore(), systemStoreType, veniceStore.cloneStore()); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index 785b1bf7fb..406422e8ed 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -196,6 +196,18 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) { void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryVersionConfig); + void setTargetSwapRegion(String targetRegion); + + String getTargetSwapRegion(); + + void setTargetSwapRegionWaitTime(int waitTime); + + int getTargetSwapRegionWaitTime(); + + void setIsDavinciHeartbeatReported(boolean isReported); + + boolean getIsDavinciHeartbeatReported(); + /** * Get the replication metadata version id. * @deprecated diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java index 9e79a5f709..37a4a3c638 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java @@ -395,6 +395,36 @@ public void setRmdVersionId(int replicationMetadataVersionId) { this.storeVersion.timestampMetadataVersionId = replicationMetadataVersionId; } + @Override + public void setTargetSwapRegion(String targetRegion) { + this.storeVersion.targetSwapRegion = targetRegion; + } + + @Override + public String getTargetSwapRegion() { + return this.storeVersion.targetSwapRegion.toString(); + } + + @Override + public void setTargetSwapRegionWaitTime(int waitTime) { + this.storeVersion.targetSwapRegionWaitTime = waitTime; + } + + @Override + public int getTargetSwapRegionWaitTime() { + return this.storeVersion.targetSwapRegionWaitTime; + } + + @Override + public void setIsDavinciHeartbeatReported(boolean isReported) { + this.storeVersion.isDaVinciHeartBeatReported = isReported; + } + + @Override + public boolean getIsDavinciHeartbeatReported() { + return this.storeVersion.isDaVinciHeartBeatReported; + } + @Override public StoreVersion dataModel() { return this.storeVersion; @@ -475,6 +505,8 @@ public Version cloneVersion() { clonedVersion.setRepushSourceVersion(getRepushSourceVersion()); clonedVersion.setViewConfigs(getViewConfigs()); clonedVersion.setBlobTransferEnabled(isBlobTransferEnabled()); + clonedVersion.setTargetSwapRegion(getTargetSwapRegion()); + clonedVersion.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime()); return clonedVersion; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java index 48c4fb810f..1a05c0a807 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java @@ -230,6 +230,9 @@ public ZKStore(Store store) { setBlobTransferEnabled(store.isBlobTransferEnabled()); setNearlineProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled()); setNearlineProducerCountPerWriter(store.getNearlineProducerCountPerWriter()); + setTargetSwapRegion(store.getTargetSwapRegion()); + setTargetSwapRegionWaitTime(store.getTargetSwapRegionWaitTime()); + setIsDavinciHeartbeatReported(store.getIsDavinciHeartbeatReported()); for (Version storeVersion: store.getVersions()) { forceAddVersion(storeVersion.cloneVersion(), true); @@ -925,6 +928,36 @@ public void setNearlineProducerCountPerWriter(int producerCnt) { this.storeProperties.nearlineProducerCountPerWriter = producerCnt; } + @Override + public int getTargetSwapRegionWaitTime() { + return this.storeProperties.targetSwapRegionWaitTime; + } + + @Override + public String getTargetSwapRegion() { + return this.storeProperties.targetSwapRegion.toString(); + } + + @Override + public void setTargetSwapRegion(String targetRegion) { + this.storeProperties.targetSwapRegion = targetRegion; + } + + @Override + public void setTargetSwapRegionWaitTime(int waitTime) { + this.storeProperties.targetSwapRegionWaitTime = waitTime; + } + + @Override + public void setIsDavinciHeartbeatReported(boolean isReported) { + this.storeProperties.isDaVinciHeartBeatReported = isReported; + } + + @Override + public boolean getIsDavinciHeartbeatReported() { + return this.storeProperties.isDaVinciHeartBeatReported; + } + /** * Set all of PUSHED version to ONLINE once store is enabled to write. */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java index b0a59a3148..00c4cefebf 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java @@ -72,7 +72,7 @@ public enum AvroProtocolDefinition { * * TODO: Move AdminOperation to venice-common module so that we can properly reference it here. */ - ADMIN_OPERATION(83, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"), + ADMIN_OPERATION(84, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"), /** * Single chunk of a large multi-chunk value. Just a bunch of bytes. @@ -143,7 +143,7 @@ public enum AvroProtocolDefinition { /** * Value schema for metadata system store. */ - METADATA_SYSTEM_SCHEMA_STORE(26, StoreMetaValue.class), + METADATA_SYSTEM_SCHEMA_STORE(27, StoreMetaValue.class), /** * Key schema for push status system store. diff --git a/internal/venice-common/src/main/resources/avro/StoreMetaValue/v27/StoreMetaValue.avsc b/internal/venice-common/src/main/resources/avro/StoreMetaValue/v27/StoreMetaValue.avsc new file mode 100644 index 0000000000..2cd0a6f72c --- /dev/null +++ b/internal/venice-common/src/main/resources/avro/StoreMetaValue/v27/StoreMetaValue.avsc @@ -0,0 +1,408 @@ +{ + "name": "StoreMetaValue", + "namespace": "com.linkedin.venice.systemstore.schemas", + "type": "record", + "fields": [ + { + "name": "timestamp", + "doc": "Timestamp when the value or a partial update for the value was generated by the writer (Venice Controller/Venice Server).", + "type": "long", + "default": 0 + }, + { + "name": "storeProperties", + "type": [ + "null", + { + "name": "StoreProperties", + "doc": "This type contains all the store configs and the corresponding versions", + "type": "record", + "fields": [ + {"name": "name", "type": "string", "doc": "Store name."}, + {"name": "owner", "type": "string", "doc": "Owner of this store."}, + {"name": "createdTime", "type": "long", "doc": "Timestamp when this store was created."}, + {"name": "currentVersion", "type": "int", "default": 0, "doc": "The number of version which is used currently."}, + {"name": "partitionCount", "type": "int", "default": 0, "doc": "Default partition count for all of versions in this store. Once first version become online, the number will be assigned."}, + {"name": "lowWatermark", "type": "long", "default": 0, "doc": "EOIP control message timestamp of the most recent incremental push that has been marked successful"}, + {"name": "enableWrites", "type": "boolean", "default": true, "doc": "If a store is disabled from writing, new version can not be created for it."}, + {"name": "enableReads", "type": "boolean", "default": true, "doc": "If a store is disabled from being read, none of versions under this store could serve read requests."}, + {"name": "storageQuotaInByte", "type": "long", "default": 21474836480, "doc": "Maximum capacity a store version is able to have, and default is 20GB"}, + {"name": "persistenceType", "type": "int", "default": 2, "doc": "Type of persistence storage engine, and default is 'ROCKS_DB'"}, + {"name": "routingStrategy", "type": "int", "default": 0, "doc": "How to route the key to partition, and default is 'CONSISTENT_HASH'"}, + {"name": "readStrategy", "type": "int", "default": 0, "doc": "How to read data from multiple replications, and default is 'ANY_OF_ONLINE'"}, + {"name": "offlinePushStrategy", "type": "int", "default": 1, "doc": "When doing off-line push, how to decide the data is ready to serve, and default is 'WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION'"}, + {"name": "largestUsedVersionNumber", "type": "int", "default": 0, "doc": "The largest version number ever used before for this store."}, + {"name": "readQuotaInCU", "type": "long", "default": 0, "doc": "Quota for read request hit this store. Measurement is capacity unit."}, + { + "name": "hybridConfig", + "doc": "Properties related to Hybrid Store behavior. If absent (null), then the store is not hybrid.", + "type": [ + "null", + { + "name": "StoreHybridConfig", + "type": "record", + "fields": [ + {"name": "rewindTimeInSeconds", "type": "long"}, + {"name": "offsetLagThresholdToGoOnline", "type": "long"}, + {"name": "producerTimestampLagThresholdToGoOnlineInSeconds", "type": "long"}, + {"name": "dataReplicationPolicy", "type": "int", "default": 0, "doc": "Real-time Samza job data replication policy, and default is 'NON_AGGREGATE'"}, + { + "name": "bufferReplayPolicy", + "type": "int", + "doc": "Policy that will be used during buffer replay. rewindTimeInSeconds defines the delta. 0 => REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + }, + {"name": "realTimeTopicName", "type": "string", "default": "", "doc": "Name of the real time topic this store/version uses"} + ] + } + ], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": { + "type":"map", + "values": { + "name": "StoreViewConfig", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }, + "default": {} + }, + {"name": "accessControlled", "type": "boolean", "default": true, "doc": "Store-level ACL switch. When disabled, Venice Router should accept every request."}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "Strategy used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "clientDecompressionEnabled", "type": "boolean", "default": true, "doc": "le/Disable client-side record decompression (default: true)"}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large value (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large replication metadata (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "batchGetLimit", "type": "int", "default": -1, "doc": "Batch get key number limit, and Venice will use cluster-level config if it is not positive."}, + {"name": "numVersionsToPreserve", "type": "int", "default": 0, "doc": "How many versions this store preserve at most. By default it's 0 means we use the cluster level config to determine how many version is preserved."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "migrating", "type": "boolean", "default": false, "doc": "Whether or not the store is in the process of migration."}, + {"name": "writeComputationEnabled", "type": "boolean", "default": false, "doc": "Whether or not write-path computation feature is enabled for this store."}, + {"name": "readComputationEnabled", "type": "boolean", "default": false, "doc": "Whether read-path computation is enabled for this store."}, + {"name": "bootstrapToOnlineTimeoutInHours", "type": "int", "default": 24, "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state."}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition model for upcoming version."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native should be enabled for this store. Will only successfully apply if leaderFollowerModelEnabled is also true either in this update or a previous version of the store."}, + {"name": "replicationMetadataVersionID", "type": "int", "default": -1, "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "backupStrategy", "type": "int", "default": 1, "doc": "Strategies to store backup versions, and default is 'DELETE_ON_NEW_PUSH_START'"}, + {"name": "schemaAutoRegisteFromPushJobEnabled", "type": "boolean", "default": false, "doc": "Whether or not value schema auto registration enabled from push job for this store."}, + {"name": "latestSuperSetValueSchemaId", "type": "int", "default": -1, "doc": "For read compute stores with auto super-set schema enabled, stores the latest super-set value schema ID."}, + {"name": "hybridStoreDiskQuotaEnabled", "type": "boolean", "default": false, "doc": "Whether or not storage disk quota is enabled for a hybrid store. This store config cannot be enabled until the routers and servers in the corresponding cluster are upgraded to the right version: 0.2.249 or above for routers and servers."}, + {"name": "storeMetadataSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store metadata system store is enabled for this store."}, + { + "name": "etlConfig", + "doc": "Properties related to ETL Store behavior.", + "type": [ + "null", + { + "name": "StoreETLConfig", + "type": "record", + "fields": [ + {"name": "etledUserProxyAccount", "type": "string", "doc": "If enabled regular ETL or future version ETL, this account name is part of path for where the ETLed snapshots will go. for example, for user account veniceetl001, snapshots will be published to HDFS /jobs/veniceetl001/storeName."}, + {"name": "regularVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable regular version ETL for this store."}, + {"name": "futureVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable future version ETL - the version that might come online in future - for this store."} + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "doc": "", + "type": [ + "null", + { + "name": "StorePartitionerConfig", + "type": "record", + "fields": [ + {"name": "partitionerClass", "type": "string"}, + {"name": "partitionerParams", "type": {"type": "map", "values": "string"}}, + {"name": "amplificationFactor", "type": "int"} + ] + } + ], + "default": null + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes, and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1, "doc": "This is used to track the time when a new version is promoted to current version. For now, it is mostly to decide whether a backup version can be removed or not based on retention. For the existing store before this code change, it will be set to be current timestamp."}, + {"name": "backupVersionRetentionMs", "type": "long", "default": -1, "doc": "Backup retention time, and if it is not set (-1), Venice Controller will use the default configured retention. {@link com.linkedin.venice.ConfigKeys#CONTROLLER_BACKUP_VERSION_DEFAULT_RETENTION_MS}."}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica each store version will keep."}, + {"name": "migrationDuplicateStore", "type": "boolean", "default": false, "doc": "Whether or not the store is a duplicate store in the process of migration."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "daVinciPushStatusStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not davinci push status store is enabled."}, + {"name": "storeMetaSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store meta system store is enabled for this store."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "applyTargetVersionFilterForIncPush", "type": "boolean", "default": false, "doc": "Whether or not the target version field in Kafka messages will be used in increment push to RT policy"}, + {"name": "minCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level min compaction lag config and if not specified, it will use the global config for version topics"}, + {"name": "maxCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level max compaction lag config and if not specified, 'max.compaction.lag.ms' config won't be setup in the corresponding version topics"}, + {"name": "maxRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes. If not specified (-1), the controller config 'default.max.record.size.bytes' (100MB default) will be backfilled"}, + {"name": "maxNearlineRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes for nearline jobs with partial updates. If not specified (-1), the server config 'default.max.record.size.bytes' (100MB default) will be backfilled. This may converge with maxRecordSizeBytes in the future"}, + {"name": "unusedSchemaDeletionEnabled", "type": "boolean", "default": false, "doc": "Store level config to indicate whether unused schema deletion is enabled or not."}, + { + "name": "versions", + "doc": "List of non-retired versions. It's currently sorted and there is code run under the assumption that the last element in the list is the largest. Check out {VeniceHelixAdmin#getIncrementalPushVersion}, and please make it in mind if you want to change this logic", + "type": { + "type": "array", + "items": { + "name": "StoreVersion", + "type": "record", + "doc": "Type describes all the version attributes", + "fields": [ + {"name": "storeName", "type": "string", "doc": "Name of the store which this version belong to."}, + {"name": "number", "type": "int", "doc": "Version number."}, + {"name": "createdTime", "type": "long", "doc": "Time when this version was created."}, + {"name": "status", "type": "int", "default": 1, "doc": "Status of version, and default is 'STARTED'"}, + {"name": "pushJobId", "type": "string", "default": ""}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "strategies used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native replication is enabled."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "bufferReplayEnabledForHybrid", "type": "boolean", "default": true, "doc": "Whether or not to enable buffer replay for hybrid."}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large values are supported (via chunking)."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large replication metadata are supported (via chunking)."}, + {"name": "pushType", "type": "int", "default": 0, "doc": "Producer type for this version, and default is 'BATCH'"}, + {"name": "partitionCount", "type": "int", "default": 0, "doc": "Partition count of this version."}, + { + "name": "partitionerConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StorePartitionerConfig" + ], + "default": null, + "doc": "Config for custom partitioning." + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes., and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica this store version is keeping."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"}, + {"name": "useVersionLevelIncrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if incrementalPushEnabled config at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + { + "name": "hybridConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StoreHybridConfig" + ], + "default": null, + "doc": "Properties related to Hybrid Store behavior. If absent (null), then the store is not hybrid." + }, + {"name": "useVersionLevelHybridConfig", "type": "boolean", "default": false, "doc": "Flag to see if hybridConfig at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "timestampMetadataVersionId", "type": "int", "default": -1, "doc": "The A/A timestamp metadata schema version ID that will be used to deserialize metadataPayload."}, + { + "name": "dataRecoveryConfig", + "type": [ + "null", + { + "name": "DataRecoveryConfig", + "type": "record", + "fields": [ + {"name": "dataRecoverySourceFabric", "type": "string", "doc": "The fabric name to be used as the source for data recovery."}, + {"name": "isDataRecoveryComplete", "type": "boolean", "doc": "Whether or not data recovery is complete."}, + {"name": "dataRecoverySourceVersionNumber", "type": "int", "default": 0, "doc": "The store version number to be used as the source for data recovery."} + ] + } + ], + "default": null, + "doc": "Properties related to data recovery mode behavior for this version. If absent (null), then the version never went go through data recovery." + }, + {"name": "deferVersionSwap", "type": "boolean", "default": false, "doc": "flag that informs venice controller to defer marking this version as the serving version after instances report ready to serve. This version must be marked manually as the current version in order to serve traffic from it."}, + { + "name": "views", + "doc": "A list of views which describe and configure a downstream view of a venice store.", + "type": { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": "com.linkedin.venice.systemstore.schemas.StoreViewConfig" + }, + "default": {} + }, + {"name": "repushSourceVersion", "type": "int", "default": -1, "doc": "For store version created from repush, indicates the source store version its created from."}, + {"name": "targetSwapRegion", "type": "string", "default": "", "doc": "Controls what region to swap in the current version during target colo push"}, + {"name": "targetSwapRegionWaitTime", "type": "int", "default": 60, "doc": "Controls how long to wait in minutes before swapping the version on the regions"}, + {"name": "isDaVinciHeartBeatReported", "type": "boolean", "default": false, "doc": "Flag to indicate whether DVC is bootstrapping and sending heartbeats"} + ] + } + }, + "default": [] + }, + { + "name": "systemStores", + "doc": "This field is used to maintain a mapping between each type of system store and the corresponding distinct properties", + "type": { + "type": "map", + "values": { + "name": "SystemStoreProperties", + "type": "record", + "doc": "This type describes all the distinct properties", + "fields": [ + {"name": "largestUsedVersionNumber", "type": "int", "default": 0}, + {"name": "currentVersion", "type": "int", "default": 0}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1}, + {"name": "versions", "type": {"type": "array", "items": "com.linkedin.venice.systemstore.schemas.StoreVersion"}, "default": []} + ] + } + }, + "default": {} + }, + {"name": "storageNodeReadQuotaEnabled", "type": "boolean", "default": false, "doc": "Controls the storage node read quota enforcement for the given Venice store"}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"}, + {"name": "nearlineProducerCompressionEnabled", "type": "boolean", "default": true, "doc": "Flag to control whether the producer in Server for near-line workload will enable compression or not"}, + {"name": "nearlineProducerCountPerWriter", "type": "int", "default": 1, "doc": "How many producers will be used for the nearline producer in Server to improve producing throughput"}, + {"name": "targetSwapRegion", "type": "string", "default": "", "doc": "Controls what region to swap in the current version during target colo push"}, + {"name": "targetSwapRegionWaitTime", "type": "int", "default": 60, "doc": "Controls how long to wait in minutes before swapping the version on the regions"}, + {"name": "isDaVinciHeartBeatReported", "type": "boolean", "default": false, "doc": "Flag to indicate whether DVC is bootstrapping and sending heartbeats"} + ] + } + ], + "default": null + }, + { + "name": "storeKeySchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreKeySchemas", + "doc": "This type describes the key schemas of the store", + "type": "record", + "fields": [ + { + "name": "keySchemaMap", + "doc": "A string to string map representing the mapping from id to key schema.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchemas", + "doc": "This type describes the value schemas of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchemaMap", + "doc": "A string to string map representing the mapping from schema id to value schema string. The value could be an empty string indicating the value schema is stored in another field.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchema", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchema", + "doc": "This type describes a single version of the value schema of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchema", + "doc": "Store value schema string.", + "type": "string", + "default": "" + } + ] + } + ], + "default": null + }, + { + "name": "storeReplicaStatuses", + "doc": "This field describes the replica statuses per version per partition, and the mapping is 'host_port' -> 'replica status'", + "type": [ + "null", + { + "type": "map", + "values": { + "name": "StoreReplicaStatus", + "type": "record", + "doc": "This structure will contain all kinds of info related to one replica", + "fields": [ + {"name": "status", "type": "int", "doc": "replica status"} + ] + } + } + ], + "default": null + }, + { + "name": "storeValueSchemaIdsWrittenPerStoreVersion", + "doc": "This field described the set of value schemas id written by a store version.", + "type": [ + "null", + { + "name": "StoreValueSchemaIdsWrittenPerStoreVersion", + "doc": "This type describes value schema IDs written by the store version.", + "type": "array", + "items": "int" + } + ], + "default": null + }, + { + "name": "storeClusterConfig", + "doc": "This is the Zk's StoreConfig equivalent which contains various Venice cluster information", + "type": [ + "null", + { + "name": "StoreClusterConfig", + "doc": "This type describes the various Venice cluster information for a store", + "type": "record", + "fields": [ + {"name": "cluster", "type": "string", "default": "", "doc": "The Venice cluster of the store."}, + {"name": "deleting", "type": "boolean", "default": false, "doc": "Is the store undergoing deletion."}, + {"name": "migrationDestCluster", "type": ["null", "string"], "default": null, "doc": "The destination cluster for store migration"}, + {"name": "migrationSrcCluster", "type": ["null", "string"], "default": null, "doc": "The source cluster for store migration"}, + {"name": "storeName", "type": "string", "default": "", "doc": "The name of the store"} + ] + } + ], + "default": null + } + ] +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java index c95cfe579f..e944095f73 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java @@ -903,6 +903,7 @@ private void testUpdateConfigs(ControllerClient parentControllerClient, Controll testUpdateMaxRecordSize(parentControllerClient, childControllerClient); testUpdateBlobTransfer(parentControllerClient, childControllerClient); testUpdateNearlineProducerConfig(parentControllerClient, childControllerClient); + testUpdateTargetSwapRegion(parentControllerClient, childControllerClient); } /** @@ -940,6 +941,23 @@ private void testUpdateConfig( }); } + private void testUpdateTargetSwapRegion(ControllerClient parentClient, ControllerClient childClient) { + final String region = "prod"; + final int waitTime = 100; + final boolean isDavinci = false; + Consumer paramsConsumer = params -> { + params.setTargetRegionSwap(region); + params.setTargetRegionSwapWaitTime(waitTime); + params.setIsDavinciHeartbeatReported(isDavinci); + }; + Consumer responseConsumer = response -> { + Assert.assertEquals(response.getStore().getTargetRegionSwap(), region); + Assert.assertEquals(response.getStore().getTargetRegionSwapWaitTime(), waitTime); + Assert.assertEquals(response.getStore().getIsDavinciHeartbeatReported(), isDavinci); + }; + testUpdateConfig(parentClient, childClient, paramsConsumer, responseConsumer); + } + private void testUpdateCompactionLag(ControllerClient parentClient, ControllerClient childClient) { final long expectedMinCompactionLagSeconds = 100; final long expectedMaxCompactionLagSeconds = 200; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index c426c45091..8631be7452 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -4820,6 +4820,9 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto Optional blobTransferEnabled = params.getBlobTransferEnabled(); Optional nearlineProducerCompressionEnabled = params.getNearlineProducerCompressionEnabled(); Optional nearlineProducerCountPerWriter = params.getNearlineProducerCountPerWriter(); + Optional targetSwapRegion = params.getTargetSwapRegion(); + Optional targetSwapRegionWaitTime = params.getTargetRegionSwapWaitTime(); + Optional isDavinciHeartbeatReported = params.getIsDavinciHeartbeatReported(); final Optional newHybridStoreConfig; if (hybridRewindSeconds.isPresent() || hybridOffsetLagThreshold.isPresent() || hybridTimeLagThreshold.isPresent() @@ -5114,6 +5117,21 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto return store; })); + targetSwapRegion.ifPresent(aString -> storeMetadataUpdate(clusterName, storeName, store -> { + store.setTargetSwapRegion((aString)); + return store; + })); + + targetSwapRegionWaitTime.ifPresent(aInt -> storeMetadataUpdate(clusterName, storeName, store -> { + store.setTargetSwapRegionWaitTime(aInt); + return store; + })); + + isDavinciHeartbeatReported.ifPresent(aBool -> storeMetadataUpdate(clusterName, storeName, store -> { + store.setIsDavinciHeartbeatReported(aBool); + return store; + })); + LOGGER.info("Finished updating store: {} in cluster: {}", storeName, clusterName); } catch (VeniceException e) { LOGGER.error( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 38eb8f9bba..09ebbdab6d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -25,6 +25,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.FUTURE_VERSION_ETL_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_ENABLED; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_DAVINCI_HEARTBEAT_REPORTED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.LARGEST_USED_VERSION_NUMBER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.LATEST_SUPERSET_SCHEMA_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_COMPACTION_LAG_SECONDS; @@ -57,6 +58,8 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION_WAIT_TIME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION; @@ -2625,6 +2628,18 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa .map(addToUpdatedConfigList(updatedConfigsList, NEARLINE_PRODUCER_COUNT_PER_WRITER)) .orElseGet(currStore::getNearlineProducerCountPerWriter); + setStore.targetSwapRegion = params.getTargetSwapRegion() + .map(addToUpdatedConfigList(updatedConfigsList, TARGET_SWAP_REGION)) + .orElseGet(currStore::getTargetSwapRegion); + + setStore.targetSwapRegionWaitTime = params.getTargetRegionSwapWaitTime() + .map(addToUpdatedConfigList(updatedConfigsList, TARGET_SWAP_REGION_WAIT_TIME)) + .orElseGet((currStore::getTargetSwapRegionWaitTime)); + + setStore.isDaVinciHeartBeatReported = params.getIsDavinciHeartbeatReported() + .map(addToUpdatedConfigList(updatedConfigsList, IS_DAVINCI_HEARTBEAT_REPORTED)) + .orElseGet((currStore::getIsDavinciHeartbeatReported)); + // Check whether the passed param is valid or not if (latestSupersetSchemaId.isPresent()) { if (latestSupersetSchemaId.get() != SchemaData.INVALID_VALUE_SCHEMA_ID) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java index 4ffa4d1b4c..65422002e3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java @@ -500,7 +500,13 @@ private void handleSetStore(UpdateStore message) { .setBlobTransferEnabled(message.blobTransferEnabled) .setUnusedSchemaDeletionEnabled(message.unusedSchemaDeletionEnabled) .setNearlineProducerCompressionEnabled(message.nearlineProducerCompressionEnabled) - .setNearlineProducerCountPerWriter(message.nearlineProducerCountPerWriter); + .setNearlineProducerCountPerWriter(message.nearlineProducerCountPerWriter) + .setTargetRegionSwapWaitTime(message.targetSwapRegionWaitTime) + .setIsDavinciHeartbeatReported(message.isDaVinciHeartBeatReported); + + if (message.targetSwapRegion != null) { + params.setTargetRegionSwap(message.getTargetSwapRegion().toString()); + } if (message.ETLStoreConfig != null) { params.setRegularVersionETLEnabled(message.ETLStoreConfig.regularVersionETLEnabled) @@ -642,6 +648,7 @@ private void handleAddVersion(AddVersion message) { storeName, clusterName, versionNumber); + if (isParentController) { if (checkPreConditionForReplicateAddVersion(clusterName, storeName)) { // Parent controller mirrors new version to src or dest cluster if the store is migrating diff --git a/services/venice-controller/src/main/resources/avro/AdminOperation/v84/AdminOperation.avsc b/services/venice-controller/src/main/resources/avro/AdminOperation/v84/AdminOperation.avsc new file mode 100644 index 0000000000..cb6ce72209 --- /dev/null +++ b/services/venice-controller/src/main/resources/avro/AdminOperation/v84/AdminOperation.avsc @@ -0,0 +1,1165 @@ +{ + "name": "AdminOperation", + "namespace": "com.linkedin.venice.controller.kafka.protocol.admin", + "type": "record", + "fields": [ + { + "name": "operationType", + "doc": "0 => StoreCreation, 1 => ValueSchemaCreation, 2 => PauseStore, 3 => ResumeStore, 4 => KillOfflinePushJob, 5 => DisableStoreRead, 6 => EnableStoreRead, 7=> DeleteAllVersions, 8=> SetStoreOwner, 9=> SetStorePartitionCount, 10=> SetStoreCurrentVersion, 11=> UpdateStore, 12=> DeleteStore, 13=> DeleteOldVersion, 14=> MigrateStore, 15=> AbortMigration, 16=>AddVersion, 17=> DerivedSchemaCreation, 18=>SupersetSchemaCreation, 19=>EnableNativeReplicationForCluster, 20=>MetadataSchemaCreation, 21=>EnableActiveActiveReplicationForCluster, 25=>CreatePersona, 26=>DeletePersona, 27=>UpdatePersona, 28=>RollbackCurrentVersion, 29=>RollforwardCurrentVersion", + "type": "int" + }, { + "name": "executionId", + "doc": "ID of a command execution which is used to query the status of this command.", + "type": "long", + "default": 0 + }, { + "name": "payloadUnion", + "doc": "This contains the main payload of the admin operation", + "type": [ + { + "name": "StoreCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "keySchema", + "type": { + "type": "record", + "name": "SchemaMeta", + "fields": [ + {"name": "schemaType", "type": "int", "doc": "0 => Avro-1.4, and we can add more if necessary"}, + {"name": "definition", "type": "string"} + ] + } + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + } + ] + }, + { + "name": "ValueSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "schemaId", + "type": "int" + }, + { + "name": "doUpdateSupersetSchemaID", + "type": "boolean", + "doc": "Whether this superset schema ID should be updated to be the value schema ID for this store.", + "default": false + } + ] + }, + { + "name": "PauseStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "ResumeStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "KillOfflinePushJob", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "kafkaTopic", + "type": "string" + } + ] + }, + { + "name": "DisableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "EnableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "DeleteAllVersions", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "SetStoreOwner", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + } + ] + }, + { + "name": "SetStorePartitionCount", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + } + ] + }, + { + "name": "SetStoreCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "currentVersion", + "type": "int" + } + ] + }, + { + "name": "UpdateStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + }, + { + "name": "currentVersion", + "type": "int" + }, + { + "name": "enableReads", + "type": "boolean" + }, + { + "name": "enableWrites", + "type": "boolean" + }, + { + "name": "storageQuotaInByte", + "type": "long", + "default": 21474836480 + }, + { + "name": "readQuotaInCU", + "type": "long", + "default": 1800 + }, + { + "name": "hybridStoreConfig", + "type": [ + "null", + { + "name": "HybridStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "rewindTimeInSeconds", + "type": "long" + }, + { + "name": "offsetLagThresholdToGoOnline", + "type": "long" + }, + { + "name": "producerTimestampLagThresholdToGoOnlineInSeconds", + "type": "long", + "default": -1 + }, + { + "name": "dataReplicationPolicy", + "doc": "Real-time Samza job data replication policy. Using int because Avro Enums are not evolvable 0 => NON_AGGREGATE, 1 => AGGREGATE, 2 => NONE, 3 => ACTIVE_ACTIVE", + "type": "int", + "default": 0 + }, + { + "name": "bufferReplayPolicy", + "type": "int", + "doc": "Policy that will be used during buffer replay. rewindTimeInSeconds defines the delta. 0 => REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + }, + {"name": "realTimeTopicName", "type": "string", "default": "", "doc": "Name of the real time topic this store/version uses"} + ] + } + ], + "default": null + }, + { + "name": "accessControlled", + "type": "boolean", + "default": false + }, + { + "name": "compressionStrategy", + "doc": "Using int because Avro Enums are not evolvable", + "type": "int", + "default": 0 + }, + { + "name": "chunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "rmdChunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "singleGetRouterCacheEnabled", + "aliases": ["routerCacheEnabled"], + "type": "boolean", + "default": false + }, + { + "name": "batchGetRouterCacheEnabled", + "type": "boolean", + "default": false + }, + { + "name": "batchGetLimit", + "doc": "The max key number allowed in batch get request, and Venice will use cluster-level config if the limit (not positive) is not valid", + "type": "int", + "default": -1 + }, + { + "name": "numVersionsToPreserve", + "doc": "The max number of versions the store should preserve. Venice will use cluster-level config if the number is 0 here.", + "type": "int", + "default": 0 + }, + { + "name": "incrementalPushEnabled", + "doc": "a flag to see if the store supports incremental push or not", + "type": "boolean", + "default": false + }, + { + "name": "separateRealTimeTopicEnabled", + "doc": "Flag to see if the store supports separate real-time topic for incremental push.", + "type": "boolean", + "default": false + }, + { + "name": "isMigrating", + "doc": "Whether or not the store is in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "writeComputationEnabled", + "doc": "Whether write-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "replicationMetadataVersionID", + "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores.", + "type": "int", + "default": -1 + }, + { + "name": "readComputationEnabled", + "doc": "Whether read-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "bootstrapToOnlineTimeoutInHours", + "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state", + "type": "int", + "default": 24 + }, + { + "name": "leaderFollowerModelEnabled", + "doc": "Whether or not to use leader follower state transition model for upcoming version", + "type": "boolean", + "default": false + }, + { + "name": "backupStrategy", + "doc": "Strategies to store backup versions.", + "type": "int", + "default": 0 + }, + { + "name": "clientDecompressionEnabled", + "type": "boolean", + "default": true + }, + { + "name": "schemaAutoRegisterFromPushJobEnabled", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreOverheadBypass", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreDiskQuotaEnabled", + "doc": "Whether or not to enable disk storage quota for a hybrid store", + "type": "boolean", + "default": false + }, + { + "name": "ETLStoreConfig", + "type": [ + "null", + { + "name": "ETLStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "etledUserProxyAccount", + "type": ["null", "string"] + }, + { + "name": "regularVersionETLEnabled", + "type": "boolean" + }, + { + "name": "futureVersionETLEnabled", + "type": "boolean" + } + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "type": [ + "null", + { + "name": "PartitionerConfigRecord", + "type": "record", + "fields": [ + { + "name": "partitionerClass", + "type": "string" + }, + { + "name": "partitionerParams", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "amplificationFactor", + "type": "int" + } + ] + } + ], + "default": null + }, + { + "name": "nativeReplicationEnabled", + "type": "boolean", + "default": false + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "largestUsedVersionNumber", + "type": ["null", "int"], + "default": null + }, + { + "name": "incrementalPushPolicy", + "doc": "Incremental Push Policy to reconcile with real time pushes. Using int because Avro Enums are not evolvable 0 => PUSH_TO_VERSION_TOPIC, 1 => INCREMENTAL_PUSH_SAME_AS_REAL_TIME", + "type": "int", + "default": 0 + }, + { + "name": "backupVersionRetentionMs", + "type": "long", + "doc": "Backup version retention time after a new version is promoted to the current version, if not specified, Venice will use the configured retention as the default policy", + "default": -1 + }, + { + "name": "replicationFactor", + "doc": "number of replica each store version will have", + "type": "int", + "default": 3 + }, + { + "name": "migrationDuplicateStore", + "doc": "Whether or not the store is a duplicate store in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "nativeReplicationSourceFabric", + "doc": "The source fabric to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "activeActiveReplicationEnabled", + "doc": "A command option to enable/disable Active/Active replication feature for a store", + "type": "boolean", + "default": false + }, + { + "name": "disableMetaStore", + "doc": "An UpdateStore command option to disable the companion meta system store", + "type": "boolean", + "default": false + }, + { + "name": "disableDavinciPushStatusStore", + "doc": "An UpdateStore command option to disable the companion davinci push status store", + "type": "boolean", + "default": false + }, + { + "name": "applyTargetVersionFilterForIncPush", + "doc": "An UpdateStore command option to enable/disable applying the target version filter for incremental pushes", + "type": "boolean", + "default": false + }, + { + "name": "updatedConfigsList", + "doc": "The list that contains all updated configs by the UpdateStore command. Most of the fields in UpdateStore are not optional, and changing those fields to Optional (Union) is not a backward compatible change, so we have to add an addition array field to record all updated configs in parent controller.", + "type": { + "type": "array", + "items": "string" + }, + "default": [] + }, + { + "name": "replicateAllConfigs", + "doc": "A flag to indicate whether all store configs in parent cluster will be replicated to child clusters; true by default, so that existing UpdateStore messages in Admin topic will behave the same as before.", + "type": "boolean", + "default": true + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the UpdateStore command", + "type": ["null", "string"], + "default": null + }, + { + "name": "storagePersona", + "doc": "The name of the StoragePersona to add to the store", + "type": ["null", "string"], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": ["null", + { + "type":"map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { + "name": "StoreViewConfigRecord", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }], + "default": null + }, + { + "name": "latestSuperSetValueSchemaId", + "doc": "The schema id for the latest superset schema", + "type" : "int", + "default": -1 + }, + { + "name": "storageNodeReadQuotaEnabled", + "doc": "Whether storage node read quota is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "minCompactionLagSeconds", + "doc": "Store-level version topic min compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxCompactionLagSeconds", + "doc": "Store-level version topic max compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for batch push jobs", + "type": "int", + "default": -1 + }, + { + "name": "maxNearlineRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for nearline jobs with partial updates", + "type": "int", + "default": -1 + }, + { + "name": "unusedSchemaDeletionEnabled", + "doc": "Whether unused schema deletion is enabled or not.", + "type": "boolean", + "default": false + }, + { + "name": "blobTransferEnabled", + "doc": "Flag to indicate if the blob transfer is allowed or not", + "type": "boolean", + "default": false + }, + { + "name": "nearlineProducerCompressionEnabled", + "doc": "Flag to control whether the producer in Server for nearline workload will enable compression or not", + "type": "boolean", + "default": true + }, + { + "name": "nearlineProducerCountPerWriter", + "doc": "How many producers will be used for the nearline producer in Server to improve producing throughput", + "type": "int", + "default": 1 + }, + { + "name": "targetSwapRegion", + "doc": "Controls what region to swap in the current version during target colo push", + "type": ["null","string"], + "default": null + }, + { + "name": "targetSwapRegionWaitTime", + "doc": "Controls how long to wait in minutes before swapping the version on the regions", + "type": "int", + "default": 60 + }, + { + "name": "isDaVinciHeartBeatReported", + "doc": "Flag to indicate whether DVC is bootstrapping and sending heartbeats", + "type": "boolean", + "default": false + } + ] + }, + { + "name": "DeleteStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "largestUsedVersionNumber", + "type": "int" + } + ] + }, + { + "name": "DeleteOldVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + } + ] + }, + { + "name": "MigrateStore", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AbortMigration", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AddVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "pushJobId", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + }, + { + "name": "numberOfPartitions", + "type": "int" + }, + { + "name": "pushType", + "doc": "The push type of the new version, 0 => BATCH, 1 => STREAM_REPROCESSING. Previous add version messages will default to BATCH and this is a safe because they were created when BATCH was the only version type", + "type": "int", + "default": 0 + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "rewindTimeInSecondsOverride", + "doc": "The overridable rewind time config for this specific version of a hybrid store, and if it is not specified, the new version will use the store-level rewind time config", + "type": "long", + "default": -1 + }, + { + "name": "timestampMetadataVersionId", + "doc": "The A/A metadata schema version ID that will be used to deserialize metadataPayload.", + "type": "int", + "default": -1 + }, + { + "name": "versionSwapDeferred", + "doc": "Indicates if swapping this version to current version after push completion should be initiated or not", + "type": "boolean", + "default": false + }, + { + "name": "targetedRegions", + "doc": "The list of regions that is separated by comma for targeted region push. If set, this admin message should only be consumed by the targeted regions", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, + { + "name": "repushSourceVersion", + "doc": "Indicates the source version from which a repush version is created", + "type": "int", + "default": -1 + } + ] + }, + { + "name": "DerivedSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "derivedSchemaId", + "type": "int" + } + ] + }, + { + "name": "SupersetSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "supersetSchema", + "type": "SchemaMeta" + }, + { + "name": "supersetSchemaId", + "type": "int" + } + ] + }, + { + "name": "ConfigureNativeReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "nativeReplicationSourceRegion", + "doc": "The source region to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "MetadataSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "metadataSchema", + "type": "SchemaMeta" + }, + { + "name": "timestampMetadataVersionId", + "type": "int", + "aliases": ["metadataVersionId"], + "default": -1 + } + ] + }, + { + "name": "ConfigureActiveActiveReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "ConfigureIncrementalPushForCluster", + "doc": "A command to migrate all incremental push stores in a cluster to a specific incremental push policy.", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "incrementalPushPolicyToFilter", + "doc": "If this batch update command is trying to configure existing incremental push store type, their incremental push policy should also match this filter before the batch update command applies any change to them. Default value is -1, meaning there is no filter.", + "type": "int", + "default": -1 + }, + { + "name": "incrementalPushPolicyToApply", + "doc": "This field will determine what incremental push policy will be applied to the selected stores. Default value is 1, which is the INCREMENTAL_PUSH_SAME_AS_REAL_TIME policy", + "type": "int", + "default": 1 + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "MetaSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "PushStatusSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "CreateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "quotaNumber", + "type": "long" + }, + { + "name": "storesToEnforce", + "type": { + "type": "array", + "items": "string", + "default": [] + } + }, + { + "name": "owners", + "type": { + "type": "array", + "items": "string", + "default": [] + } + } + ] + }, { + "name": "DeleteStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + } + ] + }, { + "name": "UpdateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, { + "name": "name", + "type": "string" + }, { + "name": "quotaNumber", + "type": ["null","long"], + "default": null + }, { + "name": "storesToEnforce", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, { + "name": "owners", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + } + ] + }, + { + "name": "DeleteUnusedValueSchemas", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schemaIds", + "type": { + "type": "array", + "items": "int", + "default": [] + } + } + ] + }, + { + "name": "RollbackCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollbackCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "RollForwardCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollForwardCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + } + ] + } + ] +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index b8a8a2fc90..7bd62f33a3 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -1893,6 +1893,32 @@ public void testUpdateStoreNativeReplicationSourceFabric() { "Native replication source fabric does not match after updating the store!"); } + @Test(description = "Test that update store sets target region swap configs correctly") + public void testUpdateStoreTargetSwapRegion() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams().setTargetRegionSwap("prod") + .setTargetRegionSwapWaitTime(100) + .setIsDavinciHeartbeatReported(false); + parentAdmin.initStorageCluster(clusterName); + parentAdmin.updateStore(clusterName, storeName, updateStoreQueryParams); + + AdminOperation adminMessage = verifyAndGetSingleAdminOperation(); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertEquals(updateStore.targetSwapRegion.toString(), "prod"); + Assert.assertEquals(updateStore.targetSwapRegionWaitTime, 100); + Assert.assertEquals(updateStore.isDaVinciHeartBeatReported, false); + } + @Test public void testDisableHybridConfigWhenActiveActiveOrIncPushConfigIsEnabled() { String storeName = Utils.getUniqueString("testUpdateStore");