Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Make Pub/Sub integration tests less flaky (#2546)
Browse files Browse the repository at this point in the history
* Make PubTemplateIntegrationTests less flaky

* Make ReactiveReceiverApplicationTest less flaky
  • Loading branch information
meltsufin authored and Travis Tomsu committed Oct 9, 2020
1 parent e91787a commit 0b39de5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -44,6 +45,7 @@
import com.google.pubsub.v1.PubsubMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -99,7 +101,15 @@ public void testCreatePublishPullNextAndDelete() {
headers.put("cactuar", "tonberry");
headers.put("fujin", "raijin");
pubSubTemplate.publish(topicName, "tatatatata", headers).get();
PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName);

// get message
AtomicReference<PubsubMessage> pubsubMessageRef = new AtomicReference<>();
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(
() -> {
pubsubMessageRef.set(pubSubTemplate.pullNext(subscriptionName));
return pubsubMessageRef.get() != null;
});
PubsubMessage pubsubMessage = pubsubMessageRef.get();

assertThat(pubsubMessage).isNotNull();
assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("tatatatata"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
Expand Down Expand Up @@ -60,6 +61,9 @@ public class ReactiveReceiverApplicationIntegrationTest {
@Autowired
private TestRestTemplate testRestTemplate;

@Autowired
private PubSubTemplate pubSubTemplate;

@BeforeClass
public static void prepare() {
assumeThat("PUB/SUB-sample integration tests disabled. Use '-Dit.pubsub=true' to enable them.",
Expand All @@ -68,6 +72,11 @@ public static void prepare() {

@Test
public void testSample() throws UnsupportedEncodingException {
// clear any old messages
System.out.println("Cleared " +
pubSubTemplate.pullAndAck("exampleSubscription", 1000, true).size() +
" old messages");

String messagePostingUrl = UriComponentsBuilder.newInstance()
.scheme("http")
.host("localhost")
Expand All @@ -92,7 +101,7 @@ public void testSample() throws UnsupportedEncodingException {
.retrieve()
.bodyToFlux(String.class)
.limitRequest(2)
.collectList().block(Duration.ofSeconds(10));
.collectList().block(Duration.ofSeconds(20));

assertThat(streamedMessages).containsExactlyInAnyOrder("reactive test msg 0", "reactive test msg 1");
}
Expand Down

0 comments on commit 0b39de5

Please sign in to comment.