Skip to content

Commit

Permalink
add rt topic name in store config
Browse files Browse the repository at this point in the history
fix serialization
fix tests
fix documentation
move methods from Version to Utils
  • Loading branch information
arjun4084346 committed Nov 21, 2024
1 parent 730a597 commit 2a1238c
Show file tree
Hide file tree
Showing 104 changed files with 1,144 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void updateLagMonitor(
if (version == null) {
// During version deletion, the version will be deleted from ZK prior to servers perform resource deletion.
// It's valid to have null version when trying to remove lag monitor for the deleted resource.
version = new VersionImpl(storeName, storeVersion, "");
version = new VersionImpl(storeName, storeVersion, "", Utils.getRealTimeTopicName(store));
}
lagMonFunction.accept(version, getPartition());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public StoreIngestionTask(
this.versionTopic = pubSubTopicRepository.getTopic(kafkaVersionTopic);
this.storeName = versionTopic.getStoreName();
this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName);
this.realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
this.realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic);
this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY);
this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>();
Expand Down Expand Up @@ -3845,7 +3845,7 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
// cluster these metastore writes could be spiky
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName));
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ void setUp() {
ReadStrategy.ANY_OF_ONLINE,
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
version1 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 5);
version1 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 5, store.getRealTimeTopicName());
store.addVersion(version1);
version2 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3);
version2 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3, store.getRealTimeTopicName());
store.addVersion(version2);
store.setCurrentVersion(version1.getNumber());
when(backend.getStoreRepository().getStoreOrThrow(store.getName())).thenReturn(store);
Expand Down Expand Up @@ -231,7 +233,8 @@ void testSubscribeWithoutCurrentVersion() throws Exception {

@Test
void testSubscribeBootstrapVersion() throws Exception {
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15, store.getRealTimeTopicName());
store.addVersion(version3);
store.setCurrentVersion(version2.getNumber());
backend.handleStoreChanged(storeBackend);
Expand Down Expand Up @@ -273,12 +276,14 @@ void testFutureVersionFailure() throws Exception {
verify(ingestionBackend, times(1)).removeStorageEngine(eq(version2.kafkaTopicName()));

// Simulate new version push and subsequent ingestion failure.
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15, store.getRealTimeTopicName());
store.addVersion(version3);
backend.handleStoreChanged(storeBackend);

// Simulate new version push while faulty future version is being ingested.
Version version4 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 20);
Version version4 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 20, store.getRealTimeTopicName());
store.addVersion(version4);
backend.handleStoreChanged(storeBackend);

Expand Down Expand Up @@ -309,7 +314,8 @@ void testFutureVersionFailure() throws Exception {
});

// Simulate new version push and subsequent ingestion failure.
Version version5 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30);
Version version5 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30, store.getRealTimeTopicName());
store.addVersion(version5);
backend.handleStoreChanged(storeBackend);
versionMap.get(version5.kafkaTopicName()).completePartitionExceptionally(partition, new Exception());
Expand Down Expand Up @@ -382,7 +388,8 @@ void testRollbackAndRollForward() {
backend.handleStoreChanged(storeBackend);
versionMap.get(version2.kafkaTopicName()).completePartition(partition);

Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3);
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3, store.getRealTimeTopicName());
store.addVersion(version3);
backend.handleStoreChanged(storeBackend);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testMaybeReportIncrementalPushStatus() {
doReturn(partitionToBatchReportEOIPEnabled).when(versionBackend).getPartitionToBatchReportEOIPEnabled();
doReturn(partitionToPendingReportIncrementalPushList).when(versionBackend)
.getPartitionToPendingReportIncrementalPushList();
Version version = new VersionImpl("test_store", 1, "dummy");
Version version = new VersionImpl("test_store", 1, "dummy", "test_store" + Version.REAL_TIME_TOPIC_SUFFIX);
doReturn(version).when(versionBackend).getVersion();
doCallRealMethod().when(versionBackend).maybeReportIncrementalPushStatus(anyInt(), anyString(), any(), any());
Consumer<String> mockConsumer = mock(Consumer.class);
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testMaybeReportBatchEOIPStatus() {
doReturn(partitionToPendingReportIncrementalPushList).when(versionBackend)
.getPartitionToPendingReportIncrementalPushList();
doCallRealMethod().when(versionBackend).maybeReportBatchEOIPStatus(anyInt(), any());
Version version = new VersionImpl("test_store", 1, "dummy");
Version version = new VersionImpl("test_store", 1, "dummy", "test_store" + Version.REAL_TIME_TOPIC_SUFFIX);
doReturn(version).when(versionBackend).getVersion();
Consumer<String> mockConsumer = mock(Consumer.class);
versionBackend.maybeReportBatchEOIPStatus(0, mockConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void setUp() {

metadataRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
when(store.getCurrentVersion()).thenReturn(1);
when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
when(metadataRepository.getStore(anyString())).thenReturn(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down Expand Up @@ -217,7 +217,7 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down Expand Up @@ -292,7 +292,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down Expand Up @@ -371,7 +371,7 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected LeaderFollowerIngestionProgressNotifier getNotifier() {
@Test
public void testOnBecomeFollowerFromOffline() throws Exception {
// if the resource is not the current serving version, latch is not placed.
Version version = new VersionImpl("mockStore.getName()", 2, "");
Version version = new VersionImpl("mockStore.getName()", 2, "", mockStore.getRealTimeTopicName());
when(mockStore.getVersion(Mockito.anyInt())).thenReturn(version);
when(mockStore.getCurrentVersion()).thenReturn(2);
testStateModel.onBecomeStandbyFromOffline(mockMessage, mockContext);
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testGracefulDropForCurrentVersionResource() {

@Test
public void testRemoveCVStateWhenBecomeOfflineFromStandby() {
Version version = new VersionImpl("mockStore.getName()", 2, "");
Version version = new VersionImpl("mockStore.getName()", 2, "", mockStore.getRealTimeTopicName());
when(mockStore.getVersion(Mockito.anyInt())).thenReturn(version);
when(mockStore.getCurrentVersion()).thenReturn(2);
when(mockIngestionBackend.stopConsumption(any(VeniceStoreVersionConfig.class), eq(testPartition)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ public void testisReadyToServeAnnouncedWithRTLag() {
100L,
DataReplicationPolicy.NON_AGGREGATE,
BufferReplayPolicy.REWIND_FROM_EOP);
Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID);
mockVersion.setHybridStoreConfig(hybridStoreConfig);

StorageService storageService = mock(StorageService.class);
Store store = new ZKStore(
Expand All @@ -243,6 +241,8 @@ public void testisReadyToServeAnnouncedWithRTLag() {
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
store.setHybridStoreConfig(hybridStoreConfig);
Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID, store.getRealTimeTopicName());
mockVersion.setHybridStoreConfig(hybridStoreConfig);
store.setVersions(Collections.singletonList(mockVersion));

Properties kafkaConsumerProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
KafkaConsumerServiceDelegator delegator =
new KafkaConsumerServiceDelegator(mockConfig, consumerServiceBuilder, isAAWCStoreFunc);
PubSubTopicPartition realTimeTopicPartition =
new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName)), 0);
new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic(storeName + Version.REAL_TIME_TOPIC_SUFFIX), 0);

CountDownLatch countDownLatch = new CountDownLatch(1);
List<Thread> infiniteSubUnSubThreads = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager;
import io.tehuti.metrics.MetricsRepository;
Expand Down Expand Up @@ -288,8 +289,9 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
ReadStrategy.ANY_OF_ONLINE,
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
toBeDeletedStore.addVersion(new VersionImpl(deletedStoreName, 1, "test-job-id"));
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id", mockStore.getRealTimeTopicName()));
toBeDeletedStore
.addVersion(new VersionImpl(deletedStoreName, 1, "test-job-id", Utils.getRealTimeTopicName(toBeDeletedStore)));
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
doReturn(toBeDeletedStore).when(mockMetadataRepo).getStore(deletedStoreName);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
Expand All @@ -309,7 +311,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
kafkaStoreIngestionService.getIngestingTopicsWithVersionStatusNotOnline().size(),
0,
"Expecting an empty set since all ingesting topics have version status of ONLINE");
mockStore.addVersion(new VersionImpl(storeName, 2, "test-job-id"));
mockStore.addVersion(new VersionImpl(storeName, 2, "test-job-id", Utils.getRealTimeTopicName(mockStore)));
doReturn(new Pair<>(mockStore, mockStore.getVersion(2))).when(mockMetadataRepo)
.waitVersion(eq(storeName), eq(2), any());
kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig(topic2, veniceProperties), 0);
Expand Down Expand Up @@ -372,7 +374,7 @@ public void testCloseStoreIngestionTask() {
AbstractStorageEngine storageEngine1 = mock(AbstractStorageEngine.class);
Mockito.when(mockStorageEngineRepository.getLocalStorageEngine(topicName)).thenReturn(storageEngine1);

mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id", mockStore.getRealTimeTopicName()));
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
Expand Down Expand Up @@ -433,7 +435,7 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest
AbstractStorageEngine storageEngine1 = mock(AbstractStorageEngine.class);
Mockito.when(mockStorageEngineRepository.getLocalStorageEngine(topicName)).thenReturn(storageEngine1);

mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id", mockStore.getRealTimeTopicName()));
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() {
* {@link StoreIngestionTask#reportIfCatchUpVersionTopicOffset(PartitionConsumptionState)}
*/
doReturn(true).when(mockOffsetRecord).isEndOfPushReceived();
doReturn(Version.composeRealTimeTopic(storeName)).when(mockOffsetRecord).getLeaderTopic();
doReturn(Utils.getRealTimeTopicName(mockStore)).when(mockOffsetRecord).getLeaderTopic();
/**
* Return 0 as the max offset for VT and 1 as the overall consume progress, so reportIfCatchUpVersionTopicOffset()
* will determine that base topic is caught up.
Expand Down
Loading

0 comments on commit 2a1238c

Please sign in to comment.