Skip to content

Commit

Permalink
add rt topic name in store config
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Nov 13, 2024
1 parent af40771 commit 6cedc00
Show file tree
Hide file tree
Showing 67 changed files with 827 additions and 191 deletions.
1 change: 1 addition & 0 deletions clients/da-vinci-client/classHash-1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-281922223
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public StoreIngestionTask(
this.versionTopic = pubSubTopicRepository.getTopic(kafkaVersionTopic);
this.storeName = versionTopic.getStoreName();
this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName);
this.realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
this.realTimeTopic = pubSubTopicRepository.getTopic(Version.getRealTimeTopicName(store));
this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic);
this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY);
this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ void setUp() {
ReadStrategy.ANY_OF_ONLINE,
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
version1 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 5);
version1 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 5, store.getRealTimeTopicName());
store.addVersion(version1);
version2 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3);
version2 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3, store.getRealTimeTopicName());
store.addVersion(version2);
store.setCurrentVersion(version1.getNumber());
when(backend.getStoreRepository().getStoreOrThrow(store.getName())).thenReturn(store);
Expand Down Expand Up @@ -231,7 +233,8 @@ void testSubscribeWithoutCurrentVersion() throws Exception {

@Test
void testSubscribeBootstrapVersion() throws Exception {
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15, store.getRealTimeTopicName());
store.addVersion(version3);
store.setCurrentVersion(version2.getNumber());
backend.handleStoreChanged(storeBackend);
Expand Down Expand Up @@ -273,12 +276,14 @@ void testFutureVersionFailure() throws Exception {
verify(ingestionBackend, times(1)).removeStorageEngine(eq(version2.kafkaTopicName()));

// Simulate new version push and subsequent ingestion failure.
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15, store.getRealTimeTopicName());
store.addVersion(version3);
backend.handleStoreChanged(storeBackend);

// Simulate new version push while faulty future version is being ingested.
Version version4 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 20);
Version version4 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 20, store.getRealTimeTopicName());
store.addVersion(version4);
backend.handleStoreChanged(storeBackend);

Expand Down Expand Up @@ -309,7 +314,8 @@ void testFutureVersionFailure() throws Exception {
});

// Simulate new version push and subsequent ingestion failure.
Version version5 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30);
Version version5 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30, store.getRealTimeTopicName());
store.addVersion(version5);
backend.handleStoreChanged(storeBackend);
versionMap.get(version5.kafkaTopicName()).completePartitionExceptionally(partition, new Exception());
Expand Down Expand Up @@ -382,7 +388,8 @@ void testRollbackAndRollForward() {
backend.handleStoreChanged(storeBackend);
versionMap.get(version2.kafkaTopicName()).completePartition(partition);

Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3);
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3, store.getRealTimeTopicName());
store.addVersion(version3);
backend.handleStoreChanged(storeBackend);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
KafkaConsumerServiceDelegator delegator =
new KafkaConsumerServiceDelegator(mockConfig, consumerServiceBuilder, isAAWCStoreFunc);
PubSubTopicPartition realTimeTopicPartition =
new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName)), 0);
new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic(storeName + Version.REAL_TIME_TOPIC_SUFFIX), 0);

CountDownLatch countDownLatch = new CountDownLatch(1);
List<Thread> infiniteSubUnSubThreads = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() {
* {@link StoreIngestionTask#reportIfCatchUpVersionTopicOffset(PartitionConsumptionState)}
*/
doReturn(true).when(mockOffsetRecord).isEndOfPushReceived();
doReturn(Version.composeRealTimeTopic(storeName)).when(mockOffsetRecord).getLeaderTopic();
doReturn(Version.getRealTimeTopicName(mockStore)).when(mockOffsetRecord).getLeaderTopic();
/**
* Return 0 as the max offset for VT and 1 as the overall consume progress, so reportIfCatchUpVersionTopicOffset()
* will determine that base topic is caught up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.VersionStatus;
Expand Down Expand Up @@ -375,6 +376,7 @@ public static Object[][] sortedInputAndAAConfigProvider() {
private StorePartitionDataReceiver remoteConsumedDataReceiver;

private static String storeNameWithoutVersionInfo;
private StoreInfo storeInfo;
private String topic;
private PubSubTopic pubSubTopic;
private PubSubTopicPartition fooTopicPartition;
Expand Down Expand Up @@ -499,6 +501,8 @@ public void methodSetUp() throws Exception {
localVeniceWriter = getVeniceWriter(new MockInMemoryProducerAdapter(inMemoryLocalKafkaBroker));

mockStorageEngineRepository = mock(StorageEngineRepository.class);
storeInfo = mock(StoreInfo.class);
doReturn(storeNameWithoutVersionInfo + Version.REAL_TIME_TOPIC_SUFFIX).when(storeInfo).getRealTimeTopicName();

mockLogNotifier = mock(LogNotifier.class);
mockNotifierProgress = new ArrayList<>();
Expand Down Expand Up @@ -904,7 +908,9 @@ private MockStoreVersionConfigs setupStoreAndVersionMocks(
VeniceStoreVersionConfig storeConfig = getDefaultMockVeniceStoreVersionConfig(storeVersionConfigOverride);

Store mockStore = mock(Store.class);
Version version = new VersionImpl(storeNameWithoutVersionInfo, 1, "1", partitionCount);
doReturn(storeNameWithoutVersionInfo + Version.REAL_TIME_TOPIC_SUFFIX).when(mockStore).getRealTimeTopicName();
Version version =
new VersionImpl(storeNameWithoutVersionInfo, 1, "1", partitionCount, mockStore.getRealTimeTopicName());

doReturn(storeNameWithoutVersionInfo).when(mockStore).getName();

Expand Down Expand Up @@ -1570,8 +1576,10 @@ public void testReadyToServePartition(AAConfig aaConfig) throws Exception {

runTest(Utils.setOf(PARTITION_FOO), () -> {
Store mockStore = mock(Store.class);
String storeName = "storeName";
doReturn(storeName + Version.REAL_TIME_TOPIC_SUFFIX).when(mockStore).getRealTimeTopicName();
doReturn(true).when(mockStore).isHybrid();
doReturn(new VersionImpl("storeName", 1)).when(mockStore).getVersion(1);
doReturn(new VersionImpl("storeName", 1, mockStore.getRealTimeTopicName())).when(mockStore).getVersion(1);
doReturn(storeNameWithoutVersionInfo).when(mockStore).getName();
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo);
}, () -> {
Expand Down Expand Up @@ -2447,7 +2455,7 @@ public void testDelayedTransitionToOnlineInHybridMode(AAConfig aaConfig) throws

localVeniceWriter.broadcastTopicSwitch(
Collections.singletonList(inMemoryLocalKafkaBroker.getKafkaBootstrapServer()),
Version.composeRealTimeTopic(storeNameWithoutVersionInfo),
Version.getRealTimeTopicName(storeInfo),
System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10),
Collections.emptyMap());
for (int partition: ALL_PARTITIONS) {
Expand Down Expand Up @@ -2585,12 +2593,14 @@ public void testSchemaCacheWarming(AAConfig aaConfig) throws Exception {
// Records order are: StartOfSeg, StartOfPush, data, EndOfPush, EndOfSeg
runTest(new RandomPollStrategy(), Utils.setOf(PARTITION_FOO), () -> {
Store mockStore = mock(Store.class);
String storeName = "storeName";
doReturn(storeName + Version.REAL_TIME_TOPIC_SUFFIX).when(mockStore).getRealTimeTopicName();
doReturn(storeNameWithoutVersionInfo).when(mockStore).getName();
doReturn(true).when(mockStore).isReadComputationEnabled();
doReturn(true).when(mockSchemaRepo).hasValueSchema(storeNameWithoutVersionInfo, EXISTING_SCHEMA_ID);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo);
doReturn(schemaEntry).when(mockSchemaRepo).getValueSchema(anyString(), anyInt());
doReturn(new VersionImpl("storeName", 1)).when(mockStore).getVersion(1);
doReturn(new VersionImpl(storeName, 1, mockStore.getRealTimeTopicName())).when(mockStore).getVersion(1);
}, () -> waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO);
// since notifier reporting happens before offset update, it actually reports previous offsets
Expand Down Expand Up @@ -2808,7 +2818,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte
kafkaUrlToRecordsThrottler.put(inMemoryLocalKafkaBroker.getKafkaBootstrapServer(), localThrottler);
kafkaUrlToRecordsThrottler.put(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer(), remoteThrottler);

String rtTopic = Version.composeRealTimeTopic(storeNameWithoutVersionInfo);
String rtTopic = Version.getRealTimeTopicName(storeInfo);
PubSubTopic rtPubSubTopic = pubSubTopicRepository.getTopic(rtTopic);
PubSubTopicPartition fooRtPartition = new PubSubTopicPartitionImpl(rtPubSubTopic, PARTITION_FOO);

Expand Down Expand Up @@ -2935,7 +2945,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica
false,
Optional.empty(),
null);
String rtTopicName = Version.composeRealTimeTopic(mockStore.getName());
String rtTopicName = Version.getRealTimeTopicName(mockStore);
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rtTopicName);
TopicSwitch topicSwitchWithSourceRealTimeTopic = new TopicSwitch();
topicSwitchWithSourceRealTimeTopic.sourceKafkaServers = new ArrayList<>();
Expand Down Expand Up @@ -3160,7 +3170,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT
assertNotNull(activeActiveStoreIngestionTask.getIngestionBatchProcessor().getLockManager());
}

String rtTopicName = Version.composeRealTimeTopic(mockStore.getName());
String rtTopicName = Version.getRealTimeTopicName(mockStore);
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rtTopicName);
TopicSwitch topicSwitchWithMultipleSourceKafkaServers = new TopicSwitch();
topicSwitchWithMultipleSourceKafkaServers.sourceKafkaServers = new ArrayList<>();
Expand Down Expand Up @@ -3550,7 +3560,7 @@ public void testProcessTopicSwitch(NodeType nodeType) {
TopicSwitch topicSwitchWithRemoteRealTimeTopic = new TopicSwitch();
topicSwitchWithRemoteRealTimeTopic.sourceKafkaServers = new ArrayList<>();
topicSwitchWithRemoteRealTimeTopic.sourceKafkaServers.add(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
topicSwitchWithRemoteRealTimeTopic.sourceTopicName = Version.composeRealTimeTopic(mockStore.getName());
topicSwitchWithRemoteRealTimeTopic.sourceTopicName = Version.getRealTimeTopicName(mockStore);
topicSwitchWithRemoteRealTimeTopic.rewindStartTimestamp = System.currentTimeMillis();
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageUnion = topicSwitchWithRemoteRealTimeTopic;
Expand Down Expand Up @@ -3809,11 +3819,11 @@ private Consumer<PubSubTopicPartitionOffset> getObserver(

@Test
public void testResubscribeAfterRoleChange() throws Exception {
PubSubTopic realTimeTopic =
pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeNameWithoutVersionInfo));
String realTimeTopicName = storeNameWithoutVersionInfo + Version.REAL_TIME_TOPIC_SUFFIX;
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(realTimeTopicName);
// Prepare both local and remote real-time topics
inMemoryLocalKafkaBroker.createTopic(Version.composeRealTimeTopic(storeNameWithoutVersionInfo), PARTITION_COUNT);
inMemoryRemoteKafkaBroker.createTopic(Version.composeRealTimeTopic(storeNameWithoutVersionInfo), PARTITION_COUNT);
inMemoryLocalKafkaBroker.createTopic(realTimeTopicName, PARTITION_COUNT);
inMemoryRemoteKafkaBroker.createTopic(realTimeTopicName, PARTITION_COUNT);
mockStorageMetadataService = new InMemoryStorageMetadataService();

AbstractStoragePartition mockStoragePartition = mock(AbstractStoragePartition.class);
Expand All @@ -3826,12 +3836,10 @@ public void testResubscribeAfterRoleChange() throws Exception {
.getReplicationMetadata(ByteBuffer.wrap(deleteKeyFoo));

VeniceWriter vtWriter = getVeniceWriter(topic, new MockInMemoryProducerAdapter(inMemoryLocalKafkaBroker));
VeniceWriter localRtWriter = getVeniceWriter(
Version.composeRealTimeTopic(storeNameWithoutVersionInfo),
new MockInMemoryProducerAdapter(inMemoryLocalKafkaBroker));
VeniceWriter remoteRtWriter = getVeniceWriter(
Version.composeRealTimeTopic(storeNameWithoutVersionInfo),
new MockInMemoryProducerAdapter(inMemoryRemoteKafkaBroker));
VeniceWriter localRtWriter =
getVeniceWriter(realTimeTopicName, new MockInMemoryProducerAdapter(inMemoryLocalKafkaBroker));
VeniceWriter remoteRtWriter =
getVeniceWriter(realTimeTopicName, new MockInMemoryProducerAdapter(inMemoryRemoteKafkaBroker));
HybridStoreConfig hybridStoreConfig = new HybridStoreConfigImpl(
100,
100,
Expand Down Expand Up @@ -3886,7 +3894,7 @@ public void testResubscribeAfterRoleChange() throws Exception {
vtWriter.broadcastEndOfPush(new HashMap<>());
vtWriter.broadcastTopicSwitch(
kafkaBootstrapServers,
Version.composeRealTimeTopic(storeNameWithoutVersionInfo),
realTimeTopicName,
System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10),
new HashMap<>());
storeIngestionTaskUnderTest.promoteToLeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand All @@ -24,7 +25,8 @@ public void testMetadataBasedTopicExistenceChecker() {

ReadOnlyStoreRepository repository = mock(ReadOnlyStoreRepository.class);
Store store = mock(Store.class);
doReturn(new VersionImpl("existingTopic", 123)).when(store).getVersion(123);
doReturn(new VersionImpl("existingTopic", 123, "existingTopic" + Version.REAL_TIME_TOPIC_SUFFIX)).when(store)
.getVersion(123);
doReturn(store).when(repository).getStoreOrThrow("existingTopic");
doThrow(new VeniceNoStoreException(nontExitingTopic1)).when(repository).getStoreOrThrow("non-existingTopic");
doReturn(true).when(store).isHybrid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ private List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> prepareAdminPu

@Test
public void testAdminToolConsumption() {
String topic = Version.composeRealTimeTopic(STORE_NAME);
ControllerClient controllerClient = mock(ControllerClient.class);
SchemaResponse schemaResponse = mock(SchemaResponse.class);
when(schemaResponse.getSchemaStr()).thenReturn(SCHEMA_STRING);
Expand All @@ -122,6 +121,8 @@ public void testAdminToolConsumption() {
when(storeInfo.getPartitionCount()).thenReturn(2);
when(controllerClient.getStore(STORE_NAME)).thenReturn(storeResponse);
when(storeResponse.getStore()).thenReturn(storeInfo);
when(storeInfo.getRealTimeTopicName()).thenReturn(STORE_NAME + Version.REAL_TIME_TOPIC_SUFFIX);
String topic = storeInfo.getRealTimeTopicName();

int assignedPartition = 0;
long startOffset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ private void configureMockTransportClient(
versionCreationResponse.setPartitionerClass(partitionerConfig.getPartitionerClass());
versionCreationResponse.setPartitionerParams(partitionerConfig.getPartitionerParams());
versionCreationResponse.setKafkaBootstrapServers("localhost:9092");
versionCreationResponse.setKafkaTopic(Version.composeRealTimeTopic(storeName));
versionCreationResponse.setKafkaTopic(Version.getRealTimeTopicName(store));
versionCreationResponse.setEnableSSL(false);

return getTransportClientFuture(MAPPER.writeValueAsBytes(versionCreationResponse), delayInResponseMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public PushJobHeartbeatSender createHeartbeatSender(
StoreInfo storeInfo = heartBeatStoreResponse.getStore();
PartitionerConfig partitionerConfig = storeInfo.getPartitionerConfig();
int partitionNum = storeInfo.getPartitionCount();
String heartbeatKafkaTopicName = Version.composeRealTimeTopic(heartbeatStoreName);
String heartbeatKafkaTopicName = Version.getRealTimeTopicName(storeInfo);
VeniceWriter<byte[], byte[], byte[]> veniceWriter = getVeniceWriter(
heartbeatKafkaTopicName,
partitionerConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public void testHeartbeatSenderCreation() {
StoreResponse storeResponse = mock(StoreResponse.class);
StoreInfo storeInfo = mock(StoreInfo.class);
PartitionerConfig partitionerConfig = new PartitionerConfigImpl();
doReturn(Version.composeRealTimeTopic(heartbeatStoreName)).when(storeInfo).getRealTimeTopicName();
doReturn(1).when(storeInfo).getPartitionCount();
doReturn(partitionerConfig).when(storeInfo).getPartitionerConfig();
doReturn(storeInfo).when(storeResponse).getStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private AbstractAvroStoreClient getMockStoreClient(boolean updateEnabled, int de
versionCreationResponse.setPartitionerClass(partitionerConfig.getPartitionerClass());
versionCreationResponse.setPartitionerParams(partitionerConfig.getPartitionerParams());
versionCreationResponse.setKafkaBootstrapServers("localhost:9092");
versionCreationResponse.setKafkaTopic(Version.composeRealTimeTopic(storeName));
versionCreationResponse.setKafkaTopic(Version.getRealTimeTopicName(store));
versionCreationResponse.setEnableSSL(false);

CompletableFuture<byte[]> requestTopicFuture = mock(CompletableFuture.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,16 @@ public void setRmdVersionId(int replicationMetadataVersionId) {
throw new UnsupportedOperationException();
}

@Override
public String getRealTimeTopicName() {
return this.delegate.getRealTimeTopicName();
}

@Override
public void setRealTimeTopicName(String realTimeTopicName) {
throw new UnsupportedOperationException();
}

@Override
public Version cloneVersion() {
return this.delegate.cloneVersion();
Expand Down Expand Up @@ -1422,6 +1432,16 @@ public void setNearlineProducerCountPerWriter(int producerCnt) {
throw new UnsupportedOperationException();
}

@Override
public String getRealTimeTopicName() {
return delegate.getRealTimeTopicName();
}

@Override
public void setRealTimeTopicName(String realTimeTopicName) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return this.delegate.toString();
Expand Down
Loading

0 comments on commit 6cedc00

Please sign in to comment.