Skip to content

Commit

Permalink
Support custom subscription name for Pub/Sub health check (GoogleClou…
Browse files Browse the repository at this point in the history
…dPlatform#330)

Fixes: GoogleCloudPlatform#236.

Co-authored-by: Mike Eltsufin <meltsufin@google.com>
  • Loading branch information
2 people authored and GitHub committed Mar 24, 2021
1 parent 99c67f3 commit 512c476
Show file tree
Hide file tree
Showing 7 changed files with 555 additions and 102 deletions.
17 changes: 17 additions & 0 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,23 @@ The `pubsub` indicator will then roll up to the overall application status visib
NOTE: If your application already has actuator and Cloud Pub/Sub starters, this health indicator is enabled by default.
To disable the Cloud Pub/Sub indicator, set `management.health.pubsub.enabled` to `false`.

The health indicator validates the connection to Pub/Sub by pulling messages from a Pub/Sub subscription.

If no subscription has been specified via `spring.cloud.gcp.pubsub.health.subscription`, it will pull messages from a random subscription that is expected not to exist.
It will signal "up" if it is able to connect to GCP Pub/Sub APIs, i.e. the pull results in a response of `NOT_FOUND` or `PERMISSION_DENIED`.

If a custom subscription has been specified, this health indicator will signal "up" if messages are successfully pulled and (optionally) acknowledged, or when a successful pull is performed but no messages are returned from Pub/Sub.
Note that messages pulled from the subscription will not be acknowledged, unless you set the `spring.cloud.gcp.pubsub.health.acknowledge-messages` option to `true`.
So, take care not to configure a subscription that has a business impact, or instead leave the custom subscription out completely.

|===
| Name | Description | Required | Default value
| `management.health.pubsub.enabled` | Whether to enable the Pub/Sub health indicator | No | `true` with Spring Boot Actuator, `false` otherwise
| `spring.cloud.gcp.pubsub.health.subscription` | Subscription to health check against by pulling a message | No | Random non-existent
| `spring.cloud.gcp.pubsub.health.timeout-millis` | Milliseconds to wait for response from Pub/Sub before timing out | No | `1000`
| `spring.cloud.gcp.pubsub.health.acknowledge-messages` | Whether to acknowledge messages pulled from the optionally specified subscription | No | `false`
|===

=== Pub/Sub Operations & Template

`PubSubOperations` is an abstraction that allows Spring users to use Google Cloud Pub/Sub without depending on any Google Cloud Pub/Sub API semantics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,172 @@

package com.google.cloud.spring.autoconfigure.pubsub.health;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;

import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;

/**
* Default implemenation of
* Default implementation of
* {@link org.springframework.boot.actuate.health.HealthIndicator} for Pub/Sub. Validates
* if connection is successful by looking for a random generated subscription name that
* won't be found. If no subscription is found we know the client is able to connect to
* GCP Pub/Sub APIs.
* if connection is successful by pulling messages from the pubSubTemplate using
* {@link PubSubTemplate#pullAsync(String, Integer, Boolean)}.
*
* <p>If a custom subscription has been specified, this health indicator will signal "up"
* if messages are successfully pulled and (optionally) acknowledged <b>or</b> if a
* successful pull is performed but no messages are returned from Pub/Sub.</p>
*
* <p>If no subscription has been specified, this health indicator will pull messages from a random subscription
* that is expected not to exist. It will signal "up" if it is able to connect to GCP Pub/Sub APIs,
* i.e. the pull results in a response of {@link StatusCode.Code#NOT_FOUND} or
* {@link StatusCode.Code#PERMISSION_DENIED}.</p>
*
* <p>Note that messages pulled from the subscription will not be acknowledged, unless you
* set the {@code acknowledgeMessages} option to "true". However, take care not to configure
* a subscription that has a business impact, or leave the custom subscription out completely.
*
* @author Vinicius Carvalho
* @author Patrik Hörlin
*
* @since 1.2.2
*/
public class PubSubHealthIndicator extends AbstractHealthIndicator {

/**
* Template used when performing health check calls.
*/
private final PubSubTemplate pubSubTemplate;

public PubSubHealthIndicator(PubSubTemplate pubSubTemplate) {
/**
* Indicates whether a user subscription has been configured.
*/
private final boolean specifiedSubscription;

/**
* Subscription used when health checking.
*/
private final String subscription;

/**
* Timeout when performing health check.
*/
private final long timeoutMillis;

/**
* Whether pulled messages should be acknowledged.
*/
private final boolean acknowledgeMessages;

public PubSubHealthIndicator(PubSubTemplate pubSubTemplate, String healthCheckSubscription, long timeoutMillis, boolean acknowledgeMessages) {
super("Failed to connect to Pub/Sub APIs. Check your credentials and verify you have proper access to the service.");
Assert.notNull(pubSubTemplate, "PubSubTemplate can't be null");
Assert.notNull(pubSubTemplate, "pubSubTemplate can't be null");
this.pubSubTemplate = pubSubTemplate;
this.specifiedSubscription = StringUtils.hasText(healthCheckSubscription);
if (this.specifiedSubscription) {
this.subscription = healthCheckSubscription;
}
else {
this.subscription = "spring-cloud-gcp-healthcheck-" + UUID.randomUUID().toString();
}
this.timeoutMillis = timeoutMillis;
this.acknowledgeMessages = acknowledgeMessages;
}

void validateHealthCheck() {
doHealthCheck(
() -> { },
this::validationFailed,
this::validationFailed);
}

@Override
protected void doHealthCheck(Health.Builder builder) {
doHealthCheck(
builder::up,
builder::down,
e -> builder.withException(e).unknown());
}

private void doHealthCheck(Runnable up, Consumer<Throwable> down, Consumer<Throwable> unknown) {
try {
this.pubSubTemplate.pull("subscription-" + UUID.randomUUID().toString(), 1, true);
pullMessage();
up.run();
}
catch (ApiException aex) {
Code errorCode = aex.getStatusCode().getCode();
if (errorCode == StatusCode.Code.NOT_FOUND || errorCode == Code.PERMISSION_DENIED) {
builder.up();
catch (InterruptedException e) {
Thread.currentThread().interrupt();
unknown.accept(e);
}
catch (ExecutionException e) {
if (isHealthyException(e)) {
// ignore expected exceptions
up.run();
}
else {
builder.withException(aex).down();
down.accept(e);
}
}
catch (TimeoutException e) {
unknown.accept(e);
}
catch (Exception e) {
builder.withException(e).down();
down.accept(e);
}
}

private void pullMessage() throws InterruptedException, ExecutionException, TimeoutException {
ListenableFuture<List<AcknowledgeablePubsubMessage>> future = pubSubTemplate.pullAsync(this.subscription, 1, true);
List<AcknowledgeablePubsubMessage> messages = future.get(timeoutMillis, TimeUnit.MILLISECONDS);
if (this.acknowledgeMessages) {
messages.forEach(AcknowledgeablePubsubMessage::ack);
}
}

boolean isHealthyException(ExecutionException e) {
return !this.specifiedSubscription && isHealthyResponseForUnspecifiedSubscription(e);
}

private boolean isHealthyResponseForUnspecifiedSubscription(ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof ApiException) {
ApiException aex = (ApiException) t;
Code errorCode = aex.getStatusCode().getCode();
return errorCode == StatusCode.Code.NOT_FOUND || errorCode == Code.PERMISSION_DENIED;
}
return false;
}

private void validationFailed(Throwable e) {
throw new BeanInitializationException("Validation of health indicator failed", e);
}

boolean isSpecifiedSubscription() {
return specifiedSubscription;
}

String getSubscription() {
return subscription;
}

long getTimeoutMillis() {
return timeoutMillis;
}

boolean isAcknowledgeMessages() {
return acknowledgeMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

/**
* {@link HealthContributorAutoConfiguration Auto-configuration} for
* {@link PubSubHealthIndicator}.
*
* @author Vinicius Carvalho
* @author Elena Felder
* @author Patrik Hörlin
*
* @since 1.2.2
*/
Expand All @@ -49,13 +52,31 @@
@ConditionalOnEnabledHealthIndicator("pubsub")
@AutoConfigureBefore(HealthContributorAutoConfiguration.class)
@AutoConfigureAfter(GcpPubSubAutoConfiguration.class)
@EnableConfigurationProperties(PubSubHealthIndicatorProperties.class)
public class PubSubHealthIndicatorAutoConfiguration extends
CompositeHealthContributorConfiguration<PubSubHealthIndicator, PubSubTemplate> {

private PubSubHealthIndicatorProperties pubSubHealthProperties;

public PubSubHealthIndicatorAutoConfiguration(PubSubHealthIndicatorProperties pubSubHealthProperties) {
this.pubSubHealthProperties = pubSubHealthProperties;
}

@Bean
@ConditionalOnMissingBean(name = { "pubSubHealthIndicator", "pubSubHealthContributor"})
public HealthContributor pubSubHealthContributor(Map<String, PubSubTemplate> pubSubTemplates) {
Assert.notNull(pubSubTemplates, "pubSubTemplates must be provided");
return createContributor(pubSubTemplates);
}

@Override
protected PubSubHealthIndicator createIndicator(PubSubTemplate pubSubTemplate) {
PubSubHealthIndicator indicator = new PubSubHealthIndicator(
pubSubTemplate,
this.pubSubHealthProperties.getSubscription(),
this.pubSubHealthProperties.getTimeoutMillis(),
this.pubSubHealthProperties.isAcknowledgeMessages());
indicator.validateHealthCheck();
return indicator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spring.autoconfigure.pubsub.health;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* Properties for Pub/Sub Health Indicator.
*
* @author Patrik Hörlin
*/
@ConfigurationProperties("spring.cloud.gcp.pubsub.health")
public class PubSubHealthIndicatorProperties {

/**
* Subscription to health check against by pulling a message.
*/
private String subscription;

/**
* Milliseconds to wait for response from Pub/Sub before timing out.
*/
private Long timeoutMillis = 1000L;

/**
* Whether to acknowledge messages pulled from {@link #subscription}.
*/
private boolean acknowledgeMessages = false;

public String getSubscription() {
return subscription;
}

public void setSubscription(String subscription) {
this.subscription = subscription;
}

public Long getTimeoutMillis() {
return timeoutMillis;
}

public void setTimeoutMillis(Long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}

public boolean isAcknowledgeMessages() {
return acknowledgeMessages;
}

public void setAcknowledgeMessages(boolean acknowledgeMessages) {
this.acknowledgeMessages = acknowledgeMessages;
}
}
Loading

0 comments on commit 512c476

Please sign in to comment.