From 512c476d243574b1aac2beec8b0b3bea92a97f1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrik=20H=C3=B6rlin?= Date: Wed, 24 Mar 2021 18:04:25 +0100 Subject: [PATCH] Support custom subscription name for Pub/Sub health check (#330) Fixes: #236. Co-authored-by: Mike Eltsufin --- docs/src/main/asciidoc/pubsub.adoc | 17 ++ .../pubsub/health/PubSubHealthIndicator.java | 143 +++++++++++++-- ...ubSubHealthIndicatorAutoConfiguration.java | 21 +++ .../PubSubHealthIndicatorProperties.java | 67 +++++++ .../pubsub/PubSubHealthIndicatorTests.java | 81 --------- ...HealthIndicatorAutoConfigurationTests.java | 159 +++++++++++++++- .../health/PubSubHealthIndicatorTests.java | 169 ++++++++++++++++++ 7 files changed, 555 insertions(+), 102 deletions(-) create mode 100644 spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorProperties.java delete mode 100644 spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/PubSubHealthIndicatorTests.java create mode 100644 spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorTests.java diff --git a/docs/src/main/asciidoc/pubsub.adoc b/docs/src/main/asciidoc/pubsub.adoc index 5da00338bf..8ca6cc5fa6 100644 --- a/docs/src/main/asciidoc/pubsub.adoc +++ b/docs/src/main/asciidoc/pubsub.adoc @@ -138,6 +138,23 @@ The `pubsub` indicator will then roll up to the overall application status visib NOTE: If your application already has actuator and Cloud Pub/Sub starters, this health indicator is enabled by default. To disable the Cloud Pub/Sub indicator, set `management.health.pubsub.enabled` to `false`. +The health indicator validates the connection to Pub/Sub by pulling messages from a Pub/Sub subscription. + +If no subscription has been specified via `spring.cloud.gcp.pubsub.health.subscription`, it will pull messages from a random subscription that is expected not to exist. +It will signal "up" if it is able to connect to GCP Pub/Sub APIs, i.e. the pull results in a response of `NOT_FOUND` or `PERMISSION_DENIED`. + +If a custom subscription has been specified, this health indicator will signal "up" if messages are successfully pulled and (optionally) acknowledged, or when a successful pull is performed but no messages are returned from Pub/Sub. +Note that messages pulled from the subscription will not be acknowledged, unless you set the `spring.cloud.gcp.pubsub.health.acknowledge-messages` option to `true`. +So, take care not to configure a subscription that has a business impact, or instead leave the custom subscription out completely. + +|=== +| Name | Description | Required | Default value +| `management.health.pubsub.enabled` | Whether to enable the Pub/Sub health indicator | No | `true` with Spring Boot Actuator, `false` otherwise +| `spring.cloud.gcp.pubsub.health.subscription` | Subscription to health check against by pulling a message | No | Random non-existent +| `spring.cloud.gcp.pubsub.health.timeout-millis` | Milliseconds to wait for response from Pub/Sub before timing out | No | `1000` +| `spring.cloud.gcp.pubsub.health.acknowledge-messages` | Whether to acknowledge messages pulled from the optionally specified subscription | No | `false` +|=== + === Pub/Sub Operations & Template `PubSubOperations` is an abstraction that allows Spring users to use Google Cloud Pub/Sub without depending on any Google Cloud Pub/Sub API semantics. diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicator.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicator.java index 3e65a1f910..47befa46c4 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicator.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicator.java @@ -16,55 +16,172 @@ package com.google.cloud.spring.autoconfigure.pubsub.health; +import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.spring.pubsub.core.PubSubTemplate; +import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage; +import org.springframework.beans.factory.BeanInitializationException; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.springframework.util.concurrent.ListenableFuture; /** - * Default implemenation of + * Default implementation of * {@link org.springframework.boot.actuate.health.HealthIndicator} for Pub/Sub. Validates - * if connection is successful by looking for a random generated subscription name that - * won't be found. If no subscription is found we know the client is able to connect to - * GCP Pub/Sub APIs. + * if connection is successful by pulling messages from the pubSubTemplate using + * {@link PubSubTemplate#pullAsync(String, Integer, Boolean)}. + * + *

If a custom subscription has been specified, this health indicator will signal "up" + * if messages are successfully pulled and (optionally) acknowledged or if a + * successful pull is performed but no messages are returned from Pub/Sub.

+ * + *

If no subscription has been specified, this health indicator will pull messages from a random subscription + * that is expected not to exist. It will signal "up" if it is able to connect to GCP Pub/Sub APIs, + * i.e. the pull results in a response of {@link StatusCode.Code#NOT_FOUND} or + * {@link StatusCode.Code#PERMISSION_DENIED}.

+ * + *

Note that messages pulled from the subscription will not be acknowledged, unless you + * set the {@code acknowledgeMessages} option to "true". However, take care not to configure + * a subscription that has a business impact, or leave the custom subscription out completely. * * @author Vinicius Carvalho + * @author Patrik Hörlin * * @since 1.2.2 */ public class PubSubHealthIndicator extends AbstractHealthIndicator { + /** + * Template used when performing health check calls. + */ private final PubSubTemplate pubSubTemplate; - public PubSubHealthIndicator(PubSubTemplate pubSubTemplate) { + /** + * Indicates whether a user subscription has been configured. + */ + private final boolean specifiedSubscription; + + /** + * Subscription used when health checking. + */ + private final String subscription; + + /** + * Timeout when performing health check. + */ + private final long timeoutMillis; + + /** + * Whether pulled messages should be acknowledged. + */ + private final boolean acknowledgeMessages; + + public PubSubHealthIndicator(PubSubTemplate pubSubTemplate, String healthCheckSubscription, long timeoutMillis, boolean acknowledgeMessages) { super("Failed to connect to Pub/Sub APIs. Check your credentials and verify you have proper access to the service."); - Assert.notNull(pubSubTemplate, "PubSubTemplate can't be null"); + Assert.notNull(pubSubTemplate, "pubSubTemplate can't be null"); this.pubSubTemplate = pubSubTemplate; + this.specifiedSubscription = StringUtils.hasText(healthCheckSubscription); + if (this.specifiedSubscription) { + this.subscription = healthCheckSubscription; + } + else { + this.subscription = "spring-cloud-gcp-healthcheck-" + UUID.randomUUID().toString(); + } + this.timeoutMillis = timeoutMillis; + this.acknowledgeMessages = acknowledgeMessages; + } + + void validateHealthCheck() { + doHealthCheck( + () -> { }, + this::validationFailed, + this::validationFailed); } @Override protected void doHealthCheck(Health.Builder builder) { + doHealthCheck( + builder::up, + builder::down, + e -> builder.withException(e).unknown()); + } + + private void doHealthCheck(Runnable up, Consumer down, Consumer unknown) { try { - this.pubSubTemplate.pull("subscription-" + UUID.randomUUID().toString(), 1, true); + pullMessage(); + up.run(); } - catch (ApiException aex) { - Code errorCode = aex.getStatusCode().getCode(); - if (errorCode == StatusCode.Code.NOT_FOUND || errorCode == Code.PERMISSION_DENIED) { - builder.up(); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + unknown.accept(e); + } + catch (ExecutionException e) { + if (isHealthyException(e)) { + // ignore expected exceptions + up.run(); } else { - builder.withException(aex).down(); + down.accept(e); } } + catch (TimeoutException e) { + unknown.accept(e); + } catch (Exception e) { - builder.withException(e).down(); + down.accept(e); + } + } + + private void pullMessage() throws InterruptedException, ExecutionException, TimeoutException { + ListenableFuture> future = pubSubTemplate.pullAsync(this.subscription, 1, true); + List messages = future.get(timeoutMillis, TimeUnit.MILLISECONDS); + if (this.acknowledgeMessages) { + messages.forEach(AcknowledgeablePubsubMessage::ack); } } + boolean isHealthyException(ExecutionException e) { + return !this.specifiedSubscription && isHealthyResponseForUnspecifiedSubscription(e); + } + + private boolean isHealthyResponseForUnspecifiedSubscription(ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof ApiException) { + ApiException aex = (ApiException) t; + Code errorCode = aex.getStatusCode().getCode(); + return errorCode == StatusCode.Code.NOT_FOUND || errorCode == Code.PERMISSION_DENIED; + } + return false; + } + + private void validationFailed(Throwable e) { + throw new BeanInitializationException("Validation of health indicator failed", e); + } + + boolean isSpecifiedSubscription() { + return specifiedSubscription; + } + + String getSubscription() { + return subscription; + } + + long getTimeoutMillis() { + return timeoutMillis; + } + + boolean isAcknowledgeMessages() { + return acknowledgeMessages; + } } diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfiguration.java index f97346b524..39291f35f9 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfiguration.java @@ -31,8 +31,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.util.Assert; /** * {@link HealthContributorAutoConfiguration Auto-configuration} for @@ -40,6 +42,7 @@ * * @author Vinicius Carvalho * @author Elena Felder + * @author Patrik Hörlin * * @since 1.2.2 */ @@ -49,13 +52,31 @@ @ConditionalOnEnabledHealthIndicator("pubsub") @AutoConfigureBefore(HealthContributorAutoConfiguration.class) @AutoConfigureAfter(GcpPubSubAutoConfiguration.class) +@EnableConfigurationProperties(PubSubHealthIndicatorProperties.class) public class PubSubHealthIndicatorAutoConfiguration extends CompositeHealthContributorConfiguration { + private PubSubHealthIndicatorProperties pubSubHealthProperties; + + public PubSubHealthIndicatorAutoConfiguration(PubSubHealthIndicatorProperties pubSubHealthProperties) { + this.pubSubHealthProperties = pubSubHealthProperties; + } + @Bean @ConditionalOnMissingBean(name = { "pubSubHealthIndicator", "pubSubHealthContributor"}) public HealthContributor pubSubHealthContributor(Map pubSubTemplates) { + Assert.notNull(pubSubTemplates, "pubSubTemplates must be provided"); return createContributor(pubSubTemplates); } + @Override + protected PubSubHealthIndicator createIndicator(PubSubTemplate pubSubTemplate) { + PubSubHealthIndicator indicator = new PubSubHealthIndicator( + pubSubTemplate, + this.pubSubHealthProperties.getSubscription(), + this.pubSubHealthProperties.getTimeoutMillis(), + this.pubSubHealthProperties.isAcknowledgeMessages()); + indicator.validateHealthCheck(); + return indicator; + } } diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorProperties.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorProperties.java new file mode 100644 index 0000000000..340c007c81 --- /dev/null +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorProperties.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spring.autoconfigure.pubsub.health; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Properties for Pub/Sub Health Indicator. + * + * @author Patrik Hörlin + */ +@ConfigurationProperties("spring.cloud.gcp.pubsub.health") +public class PubSubHealthIndicatorProperties { + + /** + * Subscription to health check against by pulling a message. + */ + private String subscription; + + /** + * Milliseconds to wait for response from Pub/Sub before timing out. + */ + private Long timeoutMillis = 1000L; + + /** + * Whether to acknowledge messages pulled from {@link #subscription}. + */ + private boolean acknowledgeMessages = false; + + public String getSubscription() { + return subscription; + } + + public void setSubscription(String subscription) { + this.subscription = subscription; + } + + public Long getTimeoutMillis() { + return timeoutMillis; + } + + public void setTimeoutMillis(Long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + + public boolean isAcknowledgeMessages() { + return acknowledgeMessages; + } + + public void setAcknowledgeMessages(boolean acknowledgeMessages) { + this.acknowledgeMessages = acknowledgeMessages; + } +} diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/PubSubHealthIndicatorTests.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/PubSubHealthIndicatorTests.java deleted file mode 100644 index 79d08300a1..0000000000 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/PubSubHealthIndicatorTests.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2019-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spring.autoconfigure.pubsub; - -import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.rpc.ApiException; -import com.google.cloud.spring.autoconfigure.pubsub.health.PubSubHealthIndicator; -import com.google.cloud.spring.pubsub.core.PubSubTemplate; -import io.grpc.Status.Code; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import org.springframework.boot.actuate.health.Status; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; - -/** - * Tests for the PubSub Health Indicator. - * - * @author Vinicius Carvalho - */ -@RunWith(MockitoJUnitRunner.class) -public class PubSubHealthIndicatorTests { - - @Mock - private PubSubTemplate pubSubTemplate; - - @Test - public void healthUpFor404() throws Exception { - when(pubSubTemplate.pull(anyString(), anyInt(), anyBoolean())).thenThrow(new ApiException( - new IllegalStateException("Illegal State"), GrpcStatusCode.of(io.grpc.Status.Code.NOT_FOUND), false)); - PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate); - assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.UP); - } - - @Test - public void healthUpFor403() throws Exception { - when(pubSubTemplate.pull(anyString(), anyInt(), anyBoolean())).thenThrow(new ApiException( - new IllegalStateException("Illegal State"), GrpcStatusCode.of(Code.PERMISSION_DENIED), false)); - PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate); - assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.UP); - } - - @Test - public void healthDown() { - when(pubSubTemplate.pull(anyString(), anyInt(), anyBoolean())) - .thenThrow(new ApiException(new IllegalStateException("Illegal State"), - GrpcStatusCode.of(io.grpc.Status.Code.INVALID_ARGUMENT), false)); - PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate); - assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN); - } - - @Test - public void healthDownGenericException() { - when(pubSubTemplate.pull(anyString(), anyInt(), anyBoolean())) - .thenThrow(new IllegalStateException("Illegal State")); - PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate); - assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN); - } - -} diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfigurationTests.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfigurationTests.java index 211392a2cb..53f743610d 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfigurationTests.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorAutoConfigurationTests.java @@ -16,67 +16,211 @@ package com.google.cloud.spring.autoconfigure.pubsub.health; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; import com.google.auth.Credentials; import com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration; import com.google.cloud.spring.core.GcpProjectIdProvider; import com.google.cloud.spring.pubsub.core.PubSubTemplate; +import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage; import org.junit.Test; +import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.boot.actuate.health.CompositeHealthContributor; -import org.springframework.boot.actuate.health.NamedContributor; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.util.concurrent.ListenableFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for Pub/Sub Health Indicator autoconfiguration. * * @author Elena Felder + * @author Patrik Hörlin */ public class PubSubHealthIndicatorAutoConfigurationTests { + private static final Pattern UUID_PATTERN = + Pattern.compile("spring-cloud-gcp-healthcheck-[a-f0-9]{8}(-[a-f0-9]{4}){4}[a-f0-9]{8}"); + private ApplicationContextRunner baseContextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of(PubSubHealthIndicatorAutoConfiguration.class, GcpPubSubAutoConfiguration.class)) .withBean(GcpProjectIdProvider.class, () -> () -> "fake project") .withBean(CredentialsProvider.class, () -> () -> mock(Credentials.class)); + @SuppressWarnings("unchecked") @Test - public void healthIndicatorPresent() { + public void healthIndicatorPresent_defaults() throws Exception { + PubSubTemplate mockPubSubTemplate = mock(PubSubTemplate.class); + ListenableFuture> future = mock(ListenableFuture.class); + + when(future.get(anyLong(), any())).thenReturn(Collections.emptyList()); + when(mockPubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + this.baseContextRunner - .withPropertyValues("management.health.pubsub.enabled=true") + .withBean("pubSubTemplate", PubSubTemplate.class, () -> mockPubSubTemplate) .run(ctx -> { PubSubHealthIndicator healthIndicator = ctx.getBean(PubSubHealthIndicator.class); assertThat(healthIndicator).isNotNull(); + assertThat(healthIndicator.getSubscription()).matches(UUID_PATTERN); + assertThat(healthIndicator.getTimeoutMillis()).isEqualTo(1000); + assertThat(healthIndicator.isAcknowledgeMessages()).isFalse(); + assertThat(healthIndicator.isSpecifiedSubscription()).isFalse(); + verify(mockPubSubTemplate).pullAsync(healthIndicator.getSubscription(), 1, true); + verify(future).get(healthIndicator.getTimeoutMillis(), TimeUnit.MILLISECONDS); }); } + @SuppressWarnings("unchecked") @Test - public void compositeHealthIndicatorPresentMultiplePubSubTemplate() { + public void healthIndicatorPresent_customConfig() throws Exception { + PubSubTemplate mockPubSubTemplate = mock(PubSubTemplate.class); + ListenableFuture> future = mock(ListenableFuture.class); + + when(future.get(anyLong(), any())).thenReturn(Collections.emptyList()); + when(mockPubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + + this.baseContextRunner + .withBean("pubSubTemplate", PubSubTemplate.class, () -> mockPubSubTemplate) + .withPropertyValues( + "management.health.pubsub.enabled=true", + "spring.cloud.gcp.pubsub.health.subscription=test", + "spring.cloud.gcp.pubsub.health.timeout-millis=1500", + "spring.cloud.gcp.pubsub.health.acknowledgeMessages=true") + .run(ctx -> { + PubSubHealthIndicator healthIndicator = ctx.getBean(PubSubHealthIndicator.class); + assertThat(healthIndicator).isNotNull(); + assertThat(healthIndicator.getSubscription()).isEqualTo("test"); + assertThat(healthIndicator.getTimeoutMillis()).isEqualTo(1500); + assertThat(healthIndicator.isAcknowledgeMessages()).isTrue(); + assertThat(healthIndicator.isSpecifiedSubscription()).isTrue(); + verify(mockPubSubTemplate).pullAsync(healthIndicator.getSubscription(), 1, true); + verify(future).get(healthIndicator.getTimeoutMillis(), TimeUnit.MILLISECONDS); + }); + } + + @SuppressWarnings("unchecked") + @Test + public void compositeHealthIndicatorPresentMultiplePubSubTemplate() throws Exception { PubSubTemplate mockPubSubTemplate1 = mock(PubSubTemplate.class); PubSubTemplate mockPubSubTemplate2 = mock(PubSubTemplate.class); + ListenableFuture> future = mock(ListenableFuture.class); + + when(future.get(anyLong(), any())).thenReturn(Collections.emptyList()); + when(mockPubSubTemplate1.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + when(mockPubSubTemplate2.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); this.baseContextRunner .withBean("pubSubTemplate1", PubSubTemplate.class, () -> mockPubSubTemplate1) .withBean("pubSubTemplate2", PubSubTemplate.class, () -> mockPubSubTemplate2) - .withPropertyValues("management.health.pubsub.enabled=true") + .withPropertyValues( + "management.health.pubsub.enabled=true", + "spring.cloud.gcp.pubsub.health.subscription=test", + "spring.cloud.gcp.pubsub.health.timeout-millis=1500", + "spring.cloud.gcp.pubsub.health.acknowledgeMessages=true") .run(ctx -> { assertThatThrownBy(() -> ctx.getBean(PubSubHealthIndicator.class)) .isInstanceOf(NoSuchBeanDefinitionException.class); CompositeHealthContributor healthContributor = ctx.getBean("pubSubHealthContributor", CompositeHealthContributor.class); assertThat(healthContributor).isNotNull(); assertThat(healthContributor.stream()).hasSize(2); - healthContributor.stream().forEach(System.out::println); - assertThat(healthContributor.stream().map(c -> ((NamedContributor) c).getName())) + assertThat(healthContributor.stream().map(c -> c.getName())) .containsExactlyInAnyOrder("pubSubTemplate1", "pubSubTemplate2"); }); } + @SuppressWarnings("unchecked") + @Test + public void apiExceptionWhenValidating_userSubscriptionSpecified_healthAutoConfigurationFails() throws Exception { + PubSubHealthIndicatorProperties properties = new PubSubHealthIndicatorProperties(); + PubSubHealthIndicatorAutoConfiguration p = new PubSubHealthIndicatorAutoConfiguration(properties); + properties.setSubscription("test"); + + PubSubTemplate mockPubSubTemplate = mock(PubSubTemplate.class); + ListenableFuture> future = mock(ListenableFuture.class); + Exception e = new ApiException(new IllegalStateException("Illegal State"), GrpcStatusCode.of(io.grpc.Status.Code.NOT_FOUND), false); + + when(mockPubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + doThrow(new ExecutionException(e)).when(future).get(anyLong(), any()); + + Map pubSubTemplates = Collections.singletonMap("pubSubTemplate", mockPubSubTemplate); + assertThatThrownBy(() -> p.pubSubHealthContributor(pubSubTemplates)) + .isInstanceOf(BeanInitializationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void apiExceptionWhenValidating_userSubscriptionNotSpecified_healthAutoConfigurationSucceeds() throws Exception { + PubSubHealthIndicatorProperties properties = new PubSubHealthIndicatorProperties(); + PubSubHealthIndicatorAutoConfiguration p = new PubSubHealthIndicatorAutoConfiguration(properties); + + PubSubTemplate mockPubSubTemplate = mock(PubSubTemplate.class); + ListenableFuture> future = mock(ListenableFuture.class); + Exception e = new ApiException(new IllegalStateException("Illegal State"), GrpcStatusCode.of(io.grpc.Status.Code.NOT_FOUND), false); + + when(mockPubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + doThrow(new ExecutionException(e)).when(future).get(anyLong(), any()); + + Map pubSubTemplates = Collections.singletonMap("pubSubTemplate", mockPubSubTemplate); + assertThatCode(() -> p.pubSubHealthContributor(pubSubTemplates)).doesNotThrowAnyException(); + } + + @SuppressWarnings("unchecked") + @Test + public void runtimeExceptionWhenValidating_healthAutoConfigurationFails() throws Exception { + PubSubHealthIndicatorProperties properties = new PubSubHealthIndicatorProperties(); + PubSubHealthIndicatorAutoConfiguration p = new PubSubHealthIndicatorAutoConfiguration(properties); + + PubSubTemplate mockPubSubTemplate = mock(PubSubTemplate.class); + ListenableFuture> future = mock(ListenableFuture.class); + Exception e = new RuntimeException("Runtime exception"); + + when(mockPubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + doThrow(e).when(future).get(anyLong(), any()); + + Map pubSubTemplates = Collections.singletonMap("pubSubTemplate", mockPubSubTemplate); + assertThatThrownBy(() -> p.pubSubHealthContributor(pubSubTemplates)).isInstanceOf(BeanInitializationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void interruptedExceptionWhenValidating_healthAutoConfigurationFails() throws Exception { + PubSubHealthIndicatorProperties properties = new PubSubHealthIndicatorProperties(); + PubSubHealthIndicatorAutoConfiguration p = new PubSubHealthIndicatorAutoConfiguration(properties); + + PubSubTemplate mockPubSubTemplate = mock(PubSubTemplate.class); + ListenableFuture> future = mock(ListenableFuture.class); + InterruptedException e = new InterruptedException("interrupted"); + + when(mockPubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + doThrow(e).when(future).get(anyLong(), any()); + + Map pubSubTemplates = Collections.singletonMap("pubSubTemplate", mockPubSubTemplate); + assertThatThrownBy(() -> p.pubSubHealthContributor(pubSubTemplates)).isInstanceOf(BeanInitializationException.class); + } + @Test public void healthCheckConfigurationBacksOffWhenHealthIndicatorBeanPresent() { PubSubHealthIndicator userHealthIndicator = mock(PubSubHealthIndicator.class); @@ -103,5 +247,4 @@ public void healthIndicatorDisabledWhenPubSubTurnedOff() { assertThat(ctx.getBeansOfType(PubSubHealthIndicator.class)).isEmpty(); }); } - } diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorTests.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorTests.java new file mode 100644 index 0000000000..0fda2a41d9 --- /dev/null +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/health/PubSubHealthIndicatorTests.java @@ -0,0 +1,169 @@ +/* + * Copyright 2019-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spring.autoconfigure.pubsub.health; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.spring.pubsub.core.PubSubTemplate; +import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage; +import io.grpc.Status.Code; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.boot.actuate.health.Status; +import org.springframework.util.concurrent.ListenableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the Pub/Sub Health Indicator. + * + * @author Vinicius Carvalho + * @author Patrik Hörlin + */ +@ExtendWith(MockitoExtension.class) +class PubSubHealthIndicatorTests { + + @Mock + private PubSubTemplate pubSubTemplate; + + @Mock + ListenableFuture> future; + + @Test + void healthUp_customSubscription() throws Exception { + when(future.get(anyLong(), any())).thenReturn(Collections.emptyList()); + when(pubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + + PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate, "test", 1000, true); + assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.UP); + } + + @Test + void acknowledgeEnabled() throws Exception { + AcknowledgeablePubsubMessage msg = mock(AcknowledgeablePubsubMessage.class); + when(future.get(anyLong(), any())).thenReturn(Arrays.asList(msg)); + when(pubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + + PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate, "test", 1000, true); + healthIndicator.health(); + verify(msg).ack(); + } + + @Test + void acknowledgeDisabled() throws Exception { + AcknowledgeablePubsubMessage msg = mock(AcknowledgeablePubsubMessage.class); + when(future.get(anyLong(), any())).thenReturn(Arrays.asList(msg)); + when(pubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + + PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate, "test", 1000, false); + healthIndicator.health(); + verify(msg, never()).ack(); + } + + void testHealth(Exception e, String customSubscription, Status expectedStatus) throws Exception { + when(pubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + doThrow(e).when(future).get(anyLong(), any()); + + PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate, customSubscription, 1000, true); + assertThat(healthIndicator.health().getStatus()).isEqualTo(expectedStatus); + } + + @ParameterizedTest + @ValueSource(strings = {"NOT_FOUND", "PERMISSION_DENIED"}) + void randomSubscription_expectedError(String code) throws Exception { + Exception e = new ExecutionException( + new ApiException(null, GrpcStatusCode.of(io.grpc.Status.Code.valueOf(code)), false)); + testHealth(e, null, Status.UP); + } + + @Test + void randomSubscription_unexpectedErrorCode() throws Exception { + Exception e = new ExecutionException( + new ApiException(null, GrpcStatusCode.of(Code.ABORTED), false)); + testHealth(e, null, Status.DOWN); + } + + @Test + void randomSubscription_NotApiExcpetion() throws Exception { + ExecutionException e = new ExecutionException("Exception", new IllegalArgumentException()); + testHealth(e, null, Status.DOWN); + } + + @ParameterizedTest + @ValueSource(strings = {"NOT_FOUND", "PERMISSION_DENIED"}) + void customSubscription_ApiException(String code) throws Exception { + Exception e = new ExecutionException( + new ApiException(null, GrpcStatusCode.of(io.grpc.Status.Code.valueOf(code)), false)); + testHealth(e, "testSubscription", Status.DOWN); + } + + @Test + void customSubscription_ExecutionException_NotApiException() throws Exception { + ExecutionException e = new ExecutionException("Exception", new IllegalArgumentException()); + testHealth(e, "testSubscription", Status.DOWN); + } + + @Test + void customSubscription_InterruptedException() throws Exception { + Exception e = new InterruptedException("Interrupted"); + testHealth(e, "testSubscription", Status.UNKNOWN); + } + + @Test + void customSubscription_TimeoutException() throws Exception { + Exception e = new TimeoutException("Timed out waiting for result"); + testHealth(e, "testSubscription", Status.UNKNOWN); + } + + @Test + void customSubscription_RuntimeException() throws Exception { + Exception e = new RuntimeException("Runtime error"); + testHealth(e, "testSubscription", Status.DOWN); + } + + @Test + void validateHealth() throws Exception { + doThrow(new RuntimeException()).when(future).get(anyLong(), any()); + when(pubSubTemplate.pullAsync(anyString(), anyInt(), anyBoolean())).thenReturn(future); + + PubSubHealthIndicator healthIndicator = new PubSubHealthIndicator(pubSubTemplate, "test", 1000, true); + assertThatThrownBy(() -> healthIndicator.validateHealthCheck()).isInstanceOf(BeanInitializationException.class); + } +}