Skip to content

Commit

Permalink
removing version changes
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Nov 21, 2024
1 parent 2a1238c commit 59bb408
Show file tree
Hide file tree
Showing 60 changed files with 255 additions and 431 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void updateLagMonitor(
if (version == null) {
// During version deletion, the version will be deleted from ZK prior to servers perform resource deletion.
// It's valid to have null version when trying to remove lag monitor for the deleted resource.
version = new VersionImpl(storeName, storeVersion, "", Utils.getRealTimeTopicName(store));
version = new VersionImpl(storeName, storeVersion, "");
}
lagMonFunction.accept(version, getPartition());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,9 @@ void setUp() {
ReadStrategy.ANY_OF_ONLINE,
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
version1 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 5, store.getRealTimeTopicName());
version1 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 5);
store.addVersion(version1);
version2 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3, store.getRealTimeTopicName());
version2 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3);
store.addVersion(version2);
store.setCurrentVersion(version1.getNumber());
when(backend.getStoreRepository().getStoreOrThrow(store.getName())).thenReturn(store);
Expand Down Expand Up @@ -233,8 +231,7 @@ void testSubscribeWithoutCurrentVersion() throws Exception {

@Test
void testSubscribeBootstrapVersion() throws Exception {
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15, store.getRealTimeTopicName());
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
store.addVersion(version3);
store.setCurrentVersion(version2.getNumber());
backend.handleStoreChanged(storeBackend);
Expand Down Expand Up @@ -276,14 +273,12 @@ void testFutureVersionFailure() throws Exception {
verify(ingestionBackend, times(1)).removeStorageEngine(eq(version2.kafkaTopicName()));

// Simulate new version push and subsequent ingestion failure.
Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15, store.getRealTimeTopicName());
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
store.addVersion(version3);
backend.handleStoreChanged(storeBackend);

// Simulate new version push while faulty future version is being ingested.
Version version4 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 20, store.getRealTimeTopicName());
Version version4 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 20);
store.addVersion(version4);
backend.handleStoreChanged(storeBackend);

Expand Down Expand Up @@ -314,8 +309,7 @@ void testFutureVersionFailure() throws Exception {
});

// Simulate new version push and subsequent ingestion failure.
Version version5 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30, store.getRealTimeTopicName());
Version version5 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30);
store.addVersion(version5);
backend.handleStoreChanged(storeBackend);
versionMap.get(version5.kafkaTopicName()).completePartitionExceptionally(partition, new Exception());
Expand Down Expand Up @@ -388,8 +382,7 @@ void testRollbackAndRollForward() {
backend.handleStoreChanged(storeBackend);
versionMap.get(version2.kafkaTopicName()).completePartition(partition);

Version version3 =
new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3, store.getRealTimeTopicName());
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 3);
store.addVersion(version3);
backend.handleStoreChanged(storeBackend);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testMaybeReportIncrementalPushStatus() {
doReturn(partitionToBatchReportEOIPEnabled).when(versionBackend).getPartitionToBatchReportEOIPEnabled();
doReturn(partitionToPendingReportIncrementalPushList).when(versionBackend)
.getPartitionToPendingReportIncrementalPushList();
Version version = new VersionImpl("test_store", 1, "dummy", "test_store" + Version.REAL_TIME_TOPIC_SUFFIX);
Version version = new VersionImpl("test_store", 1, "dummy");
doReturn(version).when(versionBackend).getVersion();
doCallRealMethod().when(versionBackend).maybeReportIncrementalPushStatus(anyInt(), anyString(), any(), any());
Consumer<String> mockConsumer = mock(Consumer.class);
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testMaybeReportBatchEOIPStatus() {
doReturn(partitionToPendingReportIncrementalPushList).when(versionBackend)
.getPartitionToPendingReportIncrementalPushList();
doCallRealMethod().when(versionBackend).maybeReportBatchEOIPStatus(anyInt(), any());
Version version = new VersionImpl("test_store", 1, "dummy", "test_store" + Version.REAL_TIME_TOPIC_SUFFIX);
Version version = new VersionImpl("test_store", 1, "dummy");
doReturn(version).when(versionBackend).getVersion();
Consumer<String> mockConsumer = mock(Consumer.class);
versionBackend.maybeReportBatchEOIPStatus(0, mockConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void setUp() {

metadataRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Version mockVersion = new VersionImpl(storeName, 1, "foo");
when(store.getCurrentVersion()).thenReturn(1);
when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
when(metadataRepository.getStore(anyString())).thenReturn(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down Expand Up @@ -217,7 +217,7 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down Expand Up @@ -292,7 +292,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down Expand Up @@ -371,7 +371,7 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo", store.getRealTimeTopicName());
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected LeaderFollowerIngestionProgressNotifier getNotifier() {
@Test
public void testOnBecomeFollowerFromOffline() throws Exception {
// if the resource is not the current serving version, latch is not placed.
Version version = new VersionImpl("mockStore.getName()", 2, "", mockStore.getRealTimeTopicName());
Version version = new VersionImpl("mockStore.getName()", 2, "");
when(mockStore.getVersion(Mockito.anyInt())).thenReturn(version);
when(mockStore.getCurrentVersion()).thenReturn(2);
testStateModel.onBecomeStandbyFromOffline(mockMessage, mockContext);
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testGracefulDropForCurrentVersionResource() {

@Test
public void testRemoveCVStateWhenBecomeOfflineFromStandby() {
Version version = new VersionImpl("mockStore.getName()", 2, "", mockStore.getRealTimeTopicName());
Version version = new VersionImpl("mockStore.getName()", 2, "");
when(mockStore.getVersion(Mockito.anyInt())).thenReturn(version);
when(mockStore.getCurrentVersion()).thenReturn(2);
when(mockIngestionBackend.stopConsumption(any(VeniceStoreVersionConfig.class), eq(testPartition)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void testisReadyToServeAnnouncedWithRTLag() {
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
store.setHybridStoreConfig(hybridStoreConfig);
Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID, store.getRealTimeTopicName());
Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID);
mockVersion.setHybridStoreConfig(hybridStoreConfig);
store.setVersions(Collections.singletonList(mockVersion));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager;
import io.tehuti.metrics.MetricsRepository;
Expand Down Expand Up @@ -289,9 +288,8 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
ReadStrategy.ANY_OF_ONLINE,
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id", mockStore.getRealTimeTopicName()));
toBeDeletedStore
.addVersion(new VersionImpl(deletedStoreName, 1, "test-job-id", Utils.getRealTimeTopicName(toBeDeletedStore)));
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
toBeDeletedStore.addVersion(new VersionImpl(deletedStoreName, 1, "test-job-id"));
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
doReturn(toBeDeletedStore).when(mockMetadataRepo).getStore(deletedStoreName);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
Expand All @@ -311,7 +309,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
kafkaStoreIngestionService.getIngestingTopicsWithVersionStatusNotOnline().size(),
0,
"Expecting an empty set since all ingesting topics have version status of ONLINE");
mockStore.addVersion(new VersionImpl(storeName, 2, "test-job-id", Utils.getRealTimeTopicName(mockStore)));
mockStore.addVersion(new VersionImpl(storeName, 2, "test-job-id"));
doReturn(new Pair<>(mockStore, mockStore.getVersion(2))).when(mockMetadataRepo)
.waitVersion(eq(storeName), eq(2), any());
kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig(topic2, veniceProperties), 0);
Expand Down Expand Up @@ -374,7 +372,7 @@ public void testCloseStoreIngestionTask() {
AbstractStorageEngine storageEngine1 = mock(AbstractStorageEngine.class);
Mockito.when(mockStorageEngineRepository.getLocalStorageEngine(topicName)).thenReturn(storageEngine1);

mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id", mockStore.getRealTimeTopicName()));
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
Expand Down Expand Up @@ -435,7 +433,7 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest
AbstractStorageEngine storageEngine1 = mock(AbstractStorageEngine.class);
Mockito.when(mockStorageEngineRepository.getLocalStorageEngine(topicName)).thenReturn(storageEngine1);

mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id", mockStore.getRealTimeTopicName()));
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
Expand Down
Loading

0 comments on commit 59bb408

Please sign in to comment.