Skip to content

Commit

Permalink
test: fix PubSubAutoConfigurationIntegrationTests resource ALREADY_EX…
Browse files Browse the repository at this point in the history
…ISTS (#2857)

Fixes: #2831.
  • Loading branch information
meltsufin authored May 8, 2024
1 parent 21168cb commit 99592ac
Showing 1 changed file with 147 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.spring.pubsub.core.PubSubConfiguration;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.pubsub.v1.ProjectSubscriptionName;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -48,10 +49,15 @@ class PubSubAutoConfigurationIntegrationTests {
private static final Log LOGGER =
LogFactory.getLog(PubSubAutoConfigurationIntegrationTests.class);

private static final String PULL_TOPIC = "test-topic-" + UUID.randomUUID();
private static final String PULL_SUB = "test-sub-1-" + UUID.randomUUID();
private static final String SUBSCRIBE_TOPIC = "test-topic-" + UUID.randomUUID();
private static final String SUBSCRIBE_SUB = "test-sub-2-" + UUID.randomUUID();

private static GcpProjectIdProvider projectIdProvider;

private final String fullSubscriptionNameSub1 = "projects/" + projectIdProvider.getProjectId() + "/subscriptions/test-sub-1";
private final String fullSubscriptionNameSub2 = "projects/" + projectIdProvider.getProjectId() + "/subscriptions/test-sub-2";
private final String fullSubscriptionNameSub1 = "projects/" + projectIdProvider.getProjectId() + "/subscriptions/" + PULL_SUB;
private final String fullSubscriptionNameSub2 = "projects/" + projectIdProvider.getProjectId() + "/subscriptions/" + SUBSCRIBE_SUB;

private final ApplicationContextRunner contextRunner =
new ApplicationContextRunner()
Expand All @@ -68,15 +74,15 @@ class PubSubAutoConfigurationIntegrationTests {
"spring.cloud.gcp.pubsub.subscription.fully-qualified-test-sub-1-with-project-abc.retry.rpc-timeout-multiplier=1",
"spring.cloud.gcp.pubsub.subscription.fully-qualified-test-sub-1-with-project-abc.retry.max-rpc-timeout-seconds=600",
"spring.cloud.gcp.pubsub.subscription.fully-qualified-test-sub-1-with-project-abc.pull-endpoint=northamerica-northeast2-pubsub.googleapis.com:443",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.executor-threads=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.max-ack-extension-period=0",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.min-duration-per-ack-extension=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.max-duration-per-ack-extension=2",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.parallel-pull-count=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.flow-control.max-outstanding-element-Count=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.flow-control.max-outstanding-request-Bytes=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.flow-control.limit-exceeded-behavior=Ignore",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.pull-endpoint=bad.endpoint")
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".executor-threads=1",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".max-ack-extension-period=0",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".min-duration-per-ack-extension=1",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".max-duration-per-ack-extension=2",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".parallel-pull-count=1",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".flow-control.max-outstanding-element-Count=1",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".flow-control.max-outstanding-request-Bytes=1",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".flow-control.limit-exceeded-behavior=Ignore",
"spring.cloud.gcp.pubsub.subscription." + SUBSCRIBE_SUB + ".pull-endpoint=bad.endpoint")
.withConfiguration(
AutoConfigurations.of(
GcpContextAutoConfiguration.class, GcpPubSubAutoConfiguration.class));
Expand All @@ -92,74 +98,73 @@ void testPull() {
this.contextRunner.run(
context -> {
PubSubAdmin pubSubAdmin = context.getBean(PubSubAdmin.class);
String topicName = "test-topic";
String subscriptionName = "test-sub-1";
String topicName = PULL_TOPIC;
String subscriptionName = PULL_SUB;

String projectId = projectIdProvider.getProjectId();

if (pubSubAdmin.getTopic(topicName) != null) {
pubSubAdmin.deleteTopic(topicName);
}
if (pubSubAdmin.getSubscription(subscriptionName) != null) {
pubSubAdmin.deleteSubscription(subscriptionName);
}

pubSubAdmin.createTopic(topicName);
pubSubAdmin.createSubscription(subscriptionName, topicName, 10);

PubSubTemplate pubSubTemplate = context.getBean(PubSubTemplate.class);
try {
PubSubTemplate pubSubTemplate = context.getBean(PubSubTemplate.class);

pubSubTemplate.publish(topicName, "message1");
pubSubTemplate.pull(subscriptionName, 4, false);

pubSubTemplate.publish(topicName, "message1");
pubSubTemplate.pull(subscriptionName, 4, false);

// Validate auto-config properties
GcpPubSubProperties gcpPubSubProperties = context.getBean(GcpPubSubProperties.class);
RetrySettings expectedRetrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(600L))
.setInitialRetryDelay(Duration.ofSeconds(100L))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofSeconds(600L))
.setMaxAttempts(1)
.setInitialRpcTimeout(Duration.ofSeconds(600L))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ofSeconds(600L))
.build();
PubSubConfiguration.Retry retry =
gcpPubSubProperties.computeSubscriberRetrySettings(
ProjectSubscriptionName.of(projectId, subscriptionName));
assertThat(retry.getTotalTimeoutSeconds()).isEqualTo(600L);
assertThat(retry.getInitialRetryDelaySeconds()).isEqualTo(100L);
assertThat(retry.getRetryDelayMultiplier()).isEqualTo(1.3);
assertThat(retry.getMaxRetryDelaySeconds()).isEqualTo(600L);
assertThat(retry.getMaxAttempts()).isEqualTo(1);
assertThat(retry.getInitialRpcTimeoutSeconds()).isEqualTo(600L);
assertThat(retry.getRpcTimeoutMultiplier()).isEqualTo(1);
assertThat(retry.getMaxRpcTimeoutSeconds()).isEqualTo(600L);
ThreadPoolTaskScheduler scheduler =
(ThreadPoolTaskScheduler) context.getBean("threadPoolScheduler_" + fullSubscriptionNameSub1);
assertThat(scheduler).isNotNull();
assertThat(scheduler.getThreadNamePrefix()).isEqualTo("gcp-pubsub-subscriber-" + fullSubscriptionNameSub1);
assertThat(scheduler.isDaemon()).isTrue();
assertThat(
(ThreadPoolTaskScheduler)
context.getBean("globalPubSubSubscriberThreadPoolScheduler"))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("subscriberExecutorProvider-" + fullSubscriptionNameSub1))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();
assertThat(gcpPubSubProperties.computeRetryableCodes(subscriptionName, projectId))
.isEqualTo(new Code[] {Code.INTERNAL});
assertThat(gcpPubSubProperties.computePullEndpoint(fullSubscriptionNameSub1, projectId))
.isEqualTo("northamerica-northeast2-pubsub.googleapis.com:443");
assertThat(gcpPubSubProperties.computePullEndpoint("test-sub-2", projectId))
.isEqualTo("bad.endpoint");
assertThat((RetrySettings) context.getBean("subscriberRetrySettings-" + fullSubscriptionNameSub1))
.isEqualTo(expectedRetrySettings);

pubSubAdmin.deleteSubscription(subscriptionName);
pubSubAdmin.deleteTopic(topicName);
// Validate auto-config properties
GcpPubSubProperties gcpPubSubProperties = context.getBean(GcpPubSubProperties.class);
RetrySettings expectedRetrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(600L))
.setInitialRetryDelay(Duration.ofSeconds(100L))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofSeconds(600L))
.setMaxAttempts(1)
.setInitialRpcTimeout(Duration.ofSeconds(600L))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ofSeconds(600L))
.build();
PubSubConfiguration.Retry retry =
gcpPubSubProperties.computeSubscriberRetrySettings(
ProjectSubscriptionName.of(projectId, subscriptionName));
assertThat(retry.getTotalTimeoutSeconds()).isEqualTo(600L);
assertThat(retry.getInitialRetryDelaySeconds()).isEqualTo(100L);
assertThat(retry.getRetryDelayMultiplier()).isEqualTo(1.3);
assertThat(retry.getMaxRetryDelaySeconds()).isEqualTo(600L);
assertThat(retry.getMaxAttempts()).isEqualTo(1);
assertThat(retry.getInitialRpcTimeoutSeconds()).isEqualTo(600L);
assertThat(retry.getRpcTimeoutMultiplier()).isEqualTo(1);
assertThat(retry.getMaxRpcTimeoutSeconds()).isEqualTo(600L);
ThreadPoolTaskScheduler scheduler =
(ThreadPoolTaskScheduler) context.getBean(
"threadPoolScheduler_" + fullSubscriptionNameSub1);
assertThat(scheduler).isNotNull();
assertThat(scheduler.getThreadNamePrefix()).isEqualTo(
"gcp-pubsub-subscriber-" + fullSubscriptionNameSub1);
assertThat(scheduler.isDaemon()).isTrue();
assertThat(
(ThreadPoolTaskScheduler)
context.getBean("globalPubSubSubscriberThreadPoolScheduler"))
.isNotNull();
assertThat((ExecutorProvider) context.getBean(
"subscriberExecutorProvider-" + fullSubscriptionNameSub1))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();
assertThat(gcpPubSubProperties.computeRetryableCodes(subscriptionName, projectId))
.isEqualTo(new Code[]{Code.INTERNAL});
assertThat(gcpPubSubProperties.computePullEndpoint(fullSubscriptionNameSub1, projectId))
.isEqualTo("northamerica-northeast2-pubsub.googleapis.com:443");
assertThat(gcpPubSubProperties.computePullEndpoint(SUBSCRIBE_SUB, projectId))
.isEqualTo("bad.endpoint");
assertThat((RetrySettings) context.getBean(
"subscriberRetrySettings-" + fullSubscriptionNameSub1))
.isEqualTo(expectedRetrySettings);
} finally {
pubSubAdmin.deleteSubscription(subscriptionName);
pubSubAdmin.deleteTopic(topicName);
}
});
}

Expand All @@ -173,81 +178,81 @@ void testSubscribe() {

String projectId = projectIdProvider.getProjectId();

String topicName = "test-topic";
String subscriptionName = "test-sub-2";
if (pubSubAdmin.getTopic(topicName) != null) {
pubSubAdmin.deleteTopic(topicName);
}
if (pubSubAdmin.getSubscription(subscriptionName) != null) {
pubSubAdmin.deleteSubscription(subscriptionName);
}
String topicName = SUBSCRIBE_TOPIC;
String subscriptionName = SUBSCRIBE_SUB;

assertThat(pubSubAdmin.getTopic(topicName)).isNull();
assertThat(pubSubAdmin.getSubscription(subscriptionName)).isNull();
pubSubAdmin.createTopic(topicName);
pubSubAdmin.createSubscription(subscriptionName, topicName);
pubSubTemplate.publish(topicName, "tatatatata").get();
pubSubTemplate.subscribe(
subscriptionName,
message -> {
LOGGER.info(
"Message received from "
+ subscriptionName
+ " subscription: "
+ message.getPubsubMessage().getData().toStringUtf8());
message.ack();
});

// Validate auto-config properties
GcpPubSubProperties gcpPubSubProperties = context.getBean(GcpPubSubProperties.class);
PubSubConfiguration.FlowControl flowControl =
gcpPubSubProperties.computeSubscriberFlowControlSettings(
ProjectSubscriptionName.of(projectId, subscriptionName));
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
.build();
assertThat(
(FlowControlSettings) context.getBean("subscriberFlowControlSettings-" + fullSubscriptionNameSub2))
.isEqualTo(flowControlSettings);
assertThat(flowControl.getMaxOutstandingElementCount()).isEqualTo(1L);
assertThat(flowControl.getMaxOutstandingRequestBytes()).isEqualTo(1L);
assertThat(flowControl.getLimitExceededBehavior())
.isEqualTo(FlowController.LimitExceededBehavior.Ignore);
assertThat(
gcpPubSubProperties.computeMaxAckExtensionPeriod(
subscriptionName, projectId))
.isZero();
assertThat(
gcpPubSubProperties.computeMinDurationPerAckExtension(
subscriptionName, projectId))
.isEqualTo(1L);
assertThat(
gcpPubSubProperties.computeMaxDurationPerAckExtension(
subscriptionName, projectId))
.isEqualTo(2L);
assertThat(
gcpPubSubProperties.computeParallelPullCount(
subscriptionName, projectId))
.isEqualTo(1);
ThreadPoolTaskScheduler scheduler =
(ThreadPoolTaskScheduler) context.getBean("threadPoolScheduler_" + fullSubscriptionNameSub2);
assertThat(scheduler).isNotNull();
assertThat(scheduler.getThreadNamePrefix()).isEqualTo("gcp-pubsub-subscriber-" + fullSubscriptionNameSub2);
assertThat(scheduler.isDaemon()).isTrue();
assertThat(
(ThreadPoolTaskScheduler)
context.getBean("globalPubSubSubscriberThreadPoolScheduler"))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("subscriberExecutorProvider-" + fullSubscriptionNameSub2))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();

pubSubAdmin.deleteSubscription(subscriptionName);
pubSubAdmin.deleteTopic(topicName);
try {
pubSubTemplate.publish(topicName, "tatatatata").get();
pubSubTemplate.subscribe(
subscriptionName,
message -> {
LOGGER.info(
"Message received from "
+ subscriptionName
+ " subscription: "
+ message.getPubsubMessage().getData().toStringUtf8());
message.ack();
});

// Validate auto-config properties
GcpPubSubProperties gcpPubSubProperties = context.getBean(GcpPubSubProperties.class);
PubSubConfiguration.FlowControl flowControl =
gcpPubSubProperties.computeSubscriberFlowControlSettings(
ProjectSubscriptionName.of(projectId, subscriptionName));
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
.build();
assertThat(
(FlowControlSettings) context.getBean(
"subscriberFlowControlSettings-" + fullSubscriptionNameSub2))
.isEqualTo(flowControlSettings);
assertThat(flowControl.getMaxOutstandingElementCount()).isEqualTo(1L);
assertThat(flowControl.getMaxOutstandingRequestBytes()).isEqualTo(1L);
assertThat(flowControl.getLimitExceededBehavior())
.isEqualTo(FlowController.LimitExceededBehavior.Ignore);
assertThat(
gcpPubSubProperties.computeMaxAckExtensionPeriod(
subscriptionName, projectId))
.isZero();
assertThat(
gcpPubSubProperties.computeMinDurationPerAckExtension(
subscriptionName, projectId))
.isEqualTo(1L);
assertThat(
gcpPubSubProperties.computeMaxDurationPerAckExtension(
subscriptionName, projectId))
.isEqualTo(2L);
assertThat(
gcpPubSubProperties.computeParallelPullCount(
subscriptionName, projectId))
.isEqualTo(1);
ThreadPoolTaskScheduler scheduler =
(ThreadPoolTaskScheduler) context.getBean(
"threadPoolScheduler_" + fullSubscriptionNameSub2);
assertThat(scheduler).isNotNull();
assertThat(scheduler.getThreadNamePrefix()).isEqualTo(
"gcp-pubsub-subscriber-" + fullSubscriptionNameSub2);
assertThat(scheduler.isDaemon()).isTrue();
assertThat(
(ThreadPoolTaskScheduler)
context.getBean("globalPubSubSubscriberThreadPoolScheduler"))
.isNotNull();
assertThat((ExecutorProvider) context.getBean(
"subscriberExecutorProvider-" + fullSubscriptionNameSub2))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();
} finally {
pubSubAdmin.deleteSubscription(subscriptionName);
pubSubAdmin.deleteTopic(topicName);
}
});
}
}

0 comments on commit 99592ac

Please sign in to comment.