diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateIntegrationTests.java b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateIntegrationTests.java index a7a36b13a4..7207d3ec33 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateIntegrationTests.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateIntegrationTests.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import com.fasterxml.jackson.annotation.JsonCreator; @@ -34,6 +35,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.BeforeClass; import org.junit.Test; @@ -62,9 +66,12 @@ * @author Chengyuan Zhao * @author Dmitry Solomakha * @author Daniel Zou + * @author Mike Eltsufin */ public class PubSubTemplateIntegrationTests { + private static final Log LOGGER = LogFactory.getLog(PubSubTemplateIntegrationTests.class); + private ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withPropertyValues("spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=0") .withConfiguration(AutoConfigurations.of(GcpContextAutoConfiguration.class, @@ -94,7 +101,15 @@ public void testCreatePublishPullNextAndDelete() { headers.put("cactuar", "tonberry"); headers.put("fujin", "raijin"); pubSubTemplate.publish(topicName, "tatatatata", headers).get(); - PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName); + + // get message + AtomicReference pubsubMessageRef = new AtomicReference<>(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until( + () -> { + pubsubMessageRef.set(pubSubTemplate.pullNext(subscriptionName)); + return pubsubMessageRef.get() != null; + }); + PubsubMessage pubsubMessage = pubsubMessageRef.get(); assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("tatatatata")); assertThat(pubsubMessage.getAttributesCount()).isEqualTo(2); @@ -143,7 +158,8 @@ public void testPullAndAck() { f.get(5, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException ex) { - ex.printStackTrace(); + LOGGER.error(ex); + Thread.currentThread().interrupt(); } });