From 52a18cb7b0a440db860497581645cdba275cc978 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sat, 19 Aug 2023 10:32:20 -0500 Subject: [PATCH] PulsarAdministration accepts multiple customizers (#433) See #432 --- .../pulsar/core/PulsarAdministration.java | 34 ++- .../PulsarAdministrationIntegrationTests.java | 254 ++++++++++++++++++ .../core/PulsarAdministrationTests.java | 241 +++-------------- 3 files changed, 317 insertions(+), 212 deletions(-) create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationIntegrationTests.java diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java index 63d6dc9c..f6d77e1a 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -26,6 +27,7 @@ import java.util.stream.Collectors; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; @@ -54,7 +56,10 @@ public class PulsarAdministration private ApplicationContext applicationContext; @Nullable - private final PulsarAdminBuilderCustomizer adminCustomizer; + private final List adminCustomizers; + + @Nullable + private PulsarAdminBuilder adminBuilder; /** * Construct a default instance using the specified service url. @@ -70,7 +75,16 @@ public PulsarAdministration(String serviceHttpUrl) { * default admin builder without modifications */ public PulsarAdministration(@Nullable PulsarAdminBuilderCustomizer adminCustomizer) { - this.adminCustomizer = adminCustomizer; + this(adminCustomizer != null ? List.of(adminCustomizer) : Collections.emptyList()); + } + + /** + * Construct an instance with the specified customizations. + * @param adminCustomizers the customizers to apply to the builder or null to use the + * default admin builder without modifications + */ + public PulsarAdministration(List adminCustomizers) { + this.adminCustomizers = adminCustomizers; } @Override @@ -91,11 +105,19 @@ private void initialize() { } public PulsarAdmin createAdminClient() throws PulsarClientException { - var adminBuilder = PulsarAdmin.builder(); - if (this.adminCustomizer != null) { - this.adminCustomizer.customize(adminBuilder); + if (this.adminBuilder == null) { + this.adminBuilder = PulsarAdmin.builder(); } - return adminBuilder.build(); + this.adminCustomizers.forEach((adminCustomizer) -> adminCustomizer.customize(this.adminBuilder)); + return this.adminBuilder.build(); + } + + /** + * Sets the admin builder to use when creating the client - only visible for testing. + * @param adminBuilder the admin builder to use + */ + void setAdminBuilder(PulsarAdminBuilder adminBuilder) { + this.adminBuilder = adminBuilder; } @Override diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationIntegrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationIntegrationTests.java new file mode 100644 index 00000000..6f9647e9 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationIntegrationTests.java @@ -0,0 +1,254 @@ +/* + * Copyright 2022-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; + +import java.util.Collections; +import java.util.List; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClientException; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.test.support.PulsarTestContainerSupport; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +/** + * Lightweight integration tests for {@link PulsarAdministration}. + * + * @author Alexander Preuß + * @author Chris Bono + * @author Kirill Merkushev + */ +@ExtendWith(SpringExtension.class) +@ContextConfiguration +@SuppressWarnings("JUnitMalformedDeclaration") +public class PulsarAdministrationIntegrationTests implements PulsarTestContainerSupport { + + private static final String NAMESPACE = "public/default"; + + @Autowired + private PulsarAdmin pulsarAdminClient; + + @Autowired + private PulsarAdministration pulsarAdministration; + + private void assertThatTopicsExist(List expected) throws PulsarAdminException { + assertThatTopicsExistIn(expected, NAMESPACE); + } + + private void assertThatTopicsExistIn(List expectedTopics, String namespace) + throws PulsarAdminException { + List expectedFullyQualifiedTopicNames = expectedTopics.stream().mapMulti((topic, consumer) -> { + if (topic.isPartitioned()) { + for (int i = 0; i < topic.numberOfPartitions(); i++) { + consumer.accept(topic.getFullyQualifiedTopicName() + "-partition-" + i); + } + } + else { + consumer.accept(topic.getFullyQualifiedTopicName()); + } + + }).toList(); + assertThat(pulsarAdminClient.topics().getList(namespace)).containsAll(expectedFullyQualifiedTopicNames); + } + + @Configuration(proxyBeanMethods = false) + static class AdminConfiguration { + + @Bean + PulsarAdmin pulsarAdminClient() throws PulsarClientException { + return PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()).build(); + } + + @Bean + PulsarAdministration pulsarAdministration() { + return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); + } + + } + + @Nested + @ContextConfiguration(classes = CreateMissingTopicsTests.CreateMissingTopicsConfig.class) + class CreateMissingTopicsTests { + + @Test + void topicsExist(@Autowired ObjectProvider expectedTopics) throws Exception { + assertThatTopicsExist(expectedTopics.stream().toList()); + } + + @Configuration(proxyBeanMethods = false) + static class CreateMissingTopicsConfig { + + @Bean + PulsarTopic nonPartitionedTopic() { + return PulsarTopic.builder("cmt-non-partitioned-1").build(); + } + + @Bean + PulsarTopic nonPartitionedTopic2() { + return PulsarTopic.builder("cmt-non-partitioned-2").build(); + } + + @Bean + PulsarTopic partitionedTopic() { + return PulsarTopic.builder("cmt-partitioned-1").numberOfPartitions(4).build(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = CreateMissingTopicsInSeparateNamespacesTests.CreateMissingTopicsConfig.class) + class CreateMissingTopicsInSeparateNamespacesTests { + + @Test + void topicsExist(@Autowired PulsarTopic partitionedGreenTopic, @Autowired PulsarTopic partitionedBlueTopic) + throws PulsarAdminException { + assertThatTopicsExistIn(Collections.singletonList(partitionedGreenTopic), + CreateMissingTopicsConfig.PUBLIC_GREEN_NAMESPACE); + assertThatTopicsExistIn(Collections.singletonList(partitionedBlueTopic), + CreateMissingTopicsConfig.PUBLIC_BLUE_NAMESPACE); + } + + @Configuration(proxyBeanMethods = false) + static class CreateMissingTopicsConfig { + + public static final String PUBLIC_GREEN_NAMESPACE = "public/green"; + + public static final String PUBLIC_BLUE_NAMESPACE = "public/blue"; + + static { + try (var pulsarAdmin = PulsarAdmin.builder() + .serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()) + .build()) { + pulsarAdmin.namespaces().createNamespace(PUBLIC_GREEN_NAMESPACE); + pulsarAdmin.namespaces().createNamespace(PUBLIC_BLUE_NAMESPACE); + } + catch (PulsarClientException | PulsarAdminException e) { + throw new RuntimeException(e); + } + } + + @Bean + PulsarTopic partitionedGreenTopic() { + return PulsarTopic.builder("persistent://%s/partitioned-1".formatted(PUBLIC_GREEN_NAMESPACE)) + .numberOfPartitions(2) + .build(); + } + + @Bean + PulsarTopic partitionedBlueTopic() { + return PulsarTopic.builder("persistent://%s/partitioned-1".formatted(PUBLIC_BLUE_NAMESPACE)) + .numberOfPartitions(2) + .build(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = IncrementPartitionCountTests.IncrementPartitionCountConfig.class) + class IncrementPartitionCountTests { + + @Test + void topicsExist(@Autowired ObjectProvider expectedTopics) throws Exception { + assertThatTopicsExist(expectedTopics.stream().toList()); + PulsarTopic biggerTopic = PulsarTopic.builder("ipc-partitioned-1").numberOfPartitions(4).build(); + pulsarAdministration.createOrModifyTopics(biggerTopic); + assertThatTopicsExist(Collections.singletonList(biggerTopic)); + } + + @Configuration(proxyBeanMethods = false) + static class IncrementPartitionCountConfig { + + @Bean + PulsarTopic smallerTopic() { + return PulsarTopic.builder("ipc-partitioned-1").numberOfPartitions(1).build(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = DecrementPartitionCountTests.DecrementPartitionCountConfig.class) + class DecrementPartitionCountTests { + + @Test + void topicModificationThrows(@Autowired ObjectProvider expectedTopics) throws Exception { + assertThatTopicsExist(expectedTopics.stream().toList()); + PulsarTopic smallerTopic = PulsarTopic.builder("dpc-partitioned-1").numberOfPartitions(4).build(); + assertThatIllegalStateException().isThrownBy(() -> pulsarAdministration.createOrModifyTopics(smallerTopic)) + .withMessage( + "Topic 'persistent://public/default/dpc-partitioned-1' found w/ 8 partitions but can't shrink to 4 - needs to be deleted first"); + + } + + @Configuration(proxyBeanMethods = false) + static class DecrementPartitionCountConfig { + + @Bean + PulsarTopic biggerTopic() { + return PulsarTopic.builder("dpc-partitioned-1").numberOfPartitions(8).build(); + } + + } + + } + + @Nested + @ContextConfiguration + class ConflictingTopicsTests { + + @Test + void unpartitionedTopicAlreadyExists() { + var unpartitionedTopic = PulsarTopic.builder("ctt-foo").numberOfPartitions(0).build(); + var partitionedTopic = PulsarTopic.builder("ctt-foo").numberOfPartitions(3).build(); + pulsarAdministration.createOrModifyTopics(unpartitionedTopic); + assertThatIllegalStateException() + .isThrownBy(() -> pulsarAdministration.createOrModifyTopics(partitionedTopic)) + .withMessage( + "Topic 'persistent://public/default/ctt-foo' already exists un-partitioned - needs to be deleted first"); + } + + @Test + void partitionedTopicAlreadyExists() { + var unpartitionedTopic = PulsarTopic.builder("ctt-bar").numberOfPartitions(0).build(); + var partitionedTopic = PulsarTopic.builder("ctt-bar").numberOfPartitions(3).build(); + pulsarAdministration.createOrModifyTopics(partitionedTopic); + assertThatIllegalStateException() + .isThrownBy(() -> pulsarAdministration.createOrModifyTopics(unpartitionedTopic)) + .withMessage( + "Topic 'persistent://public/default/ctt-bar' already exists partitioned - needs to be deleted first"); + } + + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java index 5303bea7..cd825d30 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java @@ -16,237 +16,66 @@ package org.springframework.pulsar.core; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; -import java.util.Collections; import java.util.List; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.pulsar.test.support.PulsarTestContainerSupport; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.mockito.InOrder; /** - * Tests for {@link PulsarAdministration}. + * Unit tests for {@link PulsarAdministration}. * - * @author Alexander Preuß * @author Chris Bono - * @author Kirill Merkushev */ -@ExtendWith(SpringExtension.class) -@ContextConfiguration -@SuppressWarnings("JUnitMalformedDeclaration") -public class PulsarAdministrationTests implements PulsarTestContainerSupport { - - private static final String NAMESPACE = "public/default"; - - @Autowired - private PulsarAdmin pulsarAdminClient; - - @Autowired - private PulsarAdministration pulsarAdministration; - - private void assertThatTopicsExist(List expected) throws PulsarAdminException { - assertThatTopicsExistIn(expected, NAMESPACE); - } - - private void assertThatTopicsExistIn(List expectedTopics, String namespace) - throws PulsarAdminException { - List expectedFullyQualifiedTopicNames = expectedTopics.stream().mapMulti((topic, consumer) -> { - if (topic.isPartitioned()) { - for (int i = 0; i < topic.numberOfPartitions(); i++) { - consumer.accept(topic.getFullyQualifiedTopicName() + "-partition-" + i); - } - } - else { - consumer.accept(topic.getFullyQualifiedTopicName()); - } - - }).toList(); - assertThat(pulsarAdminClient.topics().getList(namespace)).containsAll(expectedFullyQualifiedTopicNames); - } - - @Configuration(proxyBeanMethods = false) - static class AdminConfiguration { - - @Bean - PulsarAdmin pulsarAdminClient() throws PulsarClientException { - return PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()).build(); - } - - @Bean - PulsarAdministration pulsarAdministration() { - return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); - } - - } - - @Nested - @ContextConfiguration(classes = CreateMissingTopicsTests.CreateMissingTopicsConfig.class) - class CreateMissingTopicsTests { - - @Test - void topicsExist(@Autowired ObjectProvider expectedTopics) throws Exception { - assertThatTopicsExist(expectedTopics.stream().toList()); - } - - @Configuration(proxyBeanMethods = false) - static class CreateMissingTopicsConfig { - - @Bean - PulsarTopic nonPartitionedTopic() { - return PulsarTopic.builder("cmt-non-partitioned-1").build(); - } - - @Bean - PulsarTopic nonPartitionedTopic2() { - return PulsarTopic.builder("cmt-non-partitioned-2").build(); - } - - @Bean - PulsarTopic partitionedTopic() { - return PulsarTopic.builder("cmt-partitioned-1").numberOfPartitions(4).build(); - } - - } - - } - - @Nested - @ContextConfiguration(classes = CreateMissingTopicsInSeparateNamespacesTests.CreateMissingTopicsConfig.class) - class CreateMissingTopicsInSeparateNamespacesTests { - - @Test - void topicsExist(@Autowired PulsarTopic partitionedGreenTopic, @Autowired PulsarTopic partitionedBlueTopic) - throws PulsarAdminException { - assertThatTopicsExistIn(Collections.singletonList(partitionedGreenTopic), - CreateMissingTopicsConfig.PUBLIC_GREEN_NAMESPACE); - assertThatTopicsExistIn(Collections.singletonList(partitionedBlueTopic), - CreateMissingTopicsConfig.PUBLIC_BLUE_NAMESPACE); - } - - @Configuration(proxyBeanMethods = false) - static class CreateMissingTopicsConfig { - - public static final String PUBLIC_GREEN_NAMESPACE = "public/green"; - - public static final String PUBLIC_BLUE_NAMESPACE = "public/blue"; - - static { - try (var pulsarAdmin = PulsarAdmin.builder() - .serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()) - .build()) { - pulsarAdmin.namespaces().createNamespace(PUBLIC_GREEN_NAMESPACE); - pulsarAdmin.namespaces().createNamespace(PUBLIC_BLUE_NAMESPACE); - } - catch (PulsarClientException | PulsarAdminException e) { - throw new RuntimeException(e); - } - } +public class PulsarAdministrationTests { - @Bean - PulsarTopic partitionedGreenTopic() { - return PulsarTopic.builder("persistent://%s/partitioned-1".formatted(PUBLIC_GREEN_NAMESPACE)) - .numberOfPartitions(2) - .build(); - } - - @Bean - PulsarTopic partitionedBlueTopic() { - return PulsarTopic.builder("persistent://%s/partitioned-1".formatted(PUBLIC_BLUE_NAMESPACE)) - .numberOfPartitions(2) - .build(); - } - - } - - } + private PulsarAdminBuilder adminBuilder = mock(PulsarAdminBuilder.class); @Nested - @ContextConfiguration(classes = IncrementPartitionCountTests.IncrementPartitionCountConfig.class) - class IncrementPartitionCountTests { + class CustomizerTests { @Test - void topicsExist(@Autowired ObjectProvider expectedTopics) throws Exception { - assertThatTopicsExist(expectedTopics.stream().toList()); - PulsarTopic biggerTopic = PulsarTopic.builder("ipc-partitioned-1").numberOfPartitions(4).build(); - pulsarAdministration.createOrModifyTopics(biggerTopic); - assertThatTopicsExist(Collections.singletonList(biggerTopic)); - } - - @Configuration(proxyBeanMethods = false) - static class IncrementPartitionCountConfig { - - @Bean - PulsarTopic smallerTopic() { - return PulsarTopic.builder("ipc-partitioned-1").numberOfPartitions(1).build(); - } - + void createdWithServiceUrlOnly() throws PulsarClientException { + var admin = new PulsarAdministration("pulsar://foo:5150"); + admin.setAdminBuilder(adminBuilder); + admin.createAdminClient(); + verify(adminBuilder).build(); } - } - - @Nested - @ContextConfiguration(classes = DecrementPartitionCountTests.DecrementPartitionCountConfig.class) - class DecrementPartitionCountTests { - @Test - void topicModificationThrows(@Autowired ObjectProvider expectedTopics) throws Exception { - assertThatTopicsExist(expectedTopics.stream().toList()); - PulsarTopic smallerTopic = PulsarTopic.builder("dpc-partitioned-1").numberOfPartitions(4).build(); - assertThatIllegalStateException().isThrownBy(() -> pulsarAdministration.createOrModifyTopics(smallerTopic)) - .withMessage( - "Topic 'persistent://public/default/dpc-partitioned-1' found w/ 8 partitions but can't shrink to 4 - needs to be deleted first"); - + void createdWithNullCustomizer() throws PulsarClientException { + PulsarAdminBuilderCustomizer customizer = null; + var admin = new PulsarAdministration(customizer); + admin.setAdminBuilder(adminBuilder); + admin.createAdminClient(); + verify(adminBuilder).build(); } - @Configuration(proxyBeanMethods = false) - static class DecrementPartitionCountConfig { - - @Bean - PulsarTopic biggerTopic() { - return PulsarTopic.builder("dpc-partitioned-1").numberOfPartitions(8).build(); - } - - } - - } - - @Nested - @ContextConfiguration - class ConflictingTopicsTests { - @Test - void unpartitionedTopicAlreadyExists() { - var unpartitionedTopic = PulsarTopic.builder("ctt-foo").numberOfPartitions(0).build(); - var partitionedTopic = PulsarTopic.builder("ctt-foo").numberOfPartitions(3).build(); - pulsarAdministration.createOrModifyTopics(unpartitionedTopic); - assertThatIllegalStateException() - .isThrownBy(() -> pulsarAdministration.createOrModifyTopics(partitionedTopic)) - .withMessage( - "Topic 'persistent://public/default/ctt-foo' already exists un-partitioned - needs to be deleted first"); + void createdWithSingleCustomizer() throws PulsarClientException { + var customizer = mock(PulsarAdminBuilderCustomizer.class); + var admin = new PulsarAdministration(customizer); + admin.setAdminBuilder(adminBuilder); + admin.createAdminClient(); + verify(customizer).customize(adminBuilder); } @Test - void partitionedTopicAlreadyExists() { - var unpartitionedTopic = PulsarTopic.builder("ctt-bar").numberOfPartitions(0).build(); - var partitionedTopic = PulsarTopic.builder("ctt-bar").numberOfPartitions(3).build(); - pulsarAdministration.createOrModifyTopics(partitionedTopic); - assertThatIllegalStateException() - .isThrownBy(() -> pulsarAdministration.createOrModifyTopics(unpartitionedTopic)) - .withMessage( - "Topic 'persistent://public/default/ctt-bar' already exists partitioned - needs to be deleted first"); + void createdWithMultipleCustomizers() throws PulsarClientException { + var customizer1 = mock(PulsarAdminBuilderCustomizer.class); + var customizer2 = mock(PulsarAdminBuilderCustomizer.class); + var admin = new PulsarAdministration(List.of(customizer2, customizer1)); + admin.setAdminBuilder(adminBuilder); + admin.createAdminClient(); + InOrder inOrder = inOrder(customizer1, customizer2); + inOrder.verify(customizer2).customize(adminBuilder); + inOrder.verify(customizer1).customize(adminBuilder); } }