diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java index af50ff99f2..ca3ba52679 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java @@ -77,6 +77,9 @@ public ConsumerDestination provisionConsumerDestination( // topicName may be either the short or fully-qualified version. String topicShortName = TopicName.isParsableFrom(topicName) ? TopicName.parse(topicName).getTopic() : topicName; + if (autoCreate) { + ensureTopicExists(topicShortName, autoCreate); + } String subscriptionName = null; if (StringUtils.hasText(customName)) { diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java index a53b719029..64212bcb2a 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -286,6 +287,33 @@ void testProvisionConsumerDestination_createSubscription() { assertThat(subscription.getTopic()).isEqualTo("topic_A"); } + + @Test + void testProvisionConsumerDestination_createTopic_whenAutoCreateResources_isTrue() { + doReturn(null).when(this.pubSubAdminMock).getTopic("not_yet_created"); + + PubSubConsumerDestination result = + (PubSubConsumerDestination) + this.pubSubChannelProvisioner.provisionConsumerDestination( + "not_yet_created", "group_A", this.extendedConsumerProperties); + + verify(pubSubAdminMock).getTopic("not_yet_created"); + verify(pubSubAdminMock).createTopic("not_yet_created"); + } + + @Test + void testProvisionConsumerDestination_dontCreateTopic_whenAutoCreateResources_isFalse() { + when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false); + + PubSubConsumerDestination result = + (PubSubConsumerDestination) + this.pubSubChannelProvisioner.provisionConsumerDestination( + "not_yet_created", "group_A", this.extendedConsumerProperties); + + verify(pubSubAdminMock, never()).getTopic("not_yet_created"); + verify(pubSubAdminMock, never()).createTopic("not_yet_created"); + } + @Test void testProvisionProducerDestination_createTopic() { ProducerDestination destination = this.pubSubChannelProvisioner.provisionProducerDestination(