Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

[PubSub] Messages stuck in the queue and not being pulled by listener #1005

Closed
ramzimaalej opened this issue Sep 10, 2018 · 53 comments
Closed
Labels
pubsub GCP PubSub

Comments

@ramzimaalej
Copy link

I have a spring boot application (v 2.0.3.RELEASE) that uses spring-cloud-pubsub (1.0.0.RELEASE) and it's deployed on GKE. I noticed that after some period of inactivity the listener is no longer receiving messages from pubsub and messages get stuck in the queue for a long time (sometimes more than an a hour). I tried increasing the parallel-pull-count but no luck. As soon as I deploy the service the messages get acknowledged then they start piling up in the queue. I have researched this problem and some people think that the connection is dropped by gke due to inactivity. I have also tried spring-cloud-stream-pubsub but I got the same behavior. any ideas on how to get around this?

screen shot 2018-09-10 at 1 15 40 am

@ramzimaalej ramzimaalej changed the title Messages stuck in the queue and not being pulled by listener [PubSub] Messages stuck in the queue and not being pulled by listener Sep 10, 2018
@meltsufin
Copy link
Contributor

meltsufin commented Sep 10, 2018

Are you using our spring-cloud-gcp-starter-pubsub? I think you might be experiencing the automatic ack deadline extension that the Pub/Sub client library applies. Our starter sets it to 0s, but the default in the client library is 60 seconds. What might happen is that your application will start up, and receive many messages in a local queue, but unable to process them. The client library, in the meantime, will extend the ack deadline for all those unprocessed messages, but your app is not processing them for some reason. Furthermore, since the ack deadline is extended, they are not going back on the queue on Pub/Sub server to be re-delivered to other nodes (or the same node if it got restarted). See spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period in our docs, and message flow control section in GCP docs. I hope this helps.

@ChengyuanZhao ChengyuanZhao added the pubsub GCP PubSub label Sep 10, 2018
@ramzimaalej
Copy link
Author

I have noticed this issue with spring-cloud-gcp-pubsub-stream-binder and spring-cloud-gcp-starter-pubsub as well. Actually, the app processes the message fast, and this behavior is exhibited after some idle period. I will look at message flow control and get back to you with more details.

@meltsufin
Copy link
Contributor

Is it possible that the processed messages are just not being acknowledged? I believe we have auto-acking on in the Stream Binder though.

@ramzimaalej
Copy link
Author

Auto-acking is working fine in the stream binder and we are acking the message manually with pubsub-starter. I'm going to try to just listen and ack the messages to see if this happens as well. After some reading, there was an issue with long-lived connection in GKE where connections are closed due to TCP keep-alive timeout. But, there was a fix for that, more information can be found here: googleapis/nodejs-pubsub#11

@danvalencia
Copy link

I've been having the same issue. The app is on Spring Boot 2.0.4.RELEASE and spring-cloud-gcp 1.0.0.RELEASE, also on GKE, and I have 4 pods running, on a 3 node K8S cluster.

The behavior I'm seeing is that on startup it processes old messages, and new ones as well, with minimal latency. However, after some time, it just stops processing messages. I can see that the messages are sitting there (sometimes for an hour or more) in the queue via gcloud alpha pubsub subscriptions pull command. For some reason they're not being picked up.

I'm using the Spring Integration channel adapter. I tried removing Spring Integration and use spring-cloud-gcp-starter-pubsub directly only but still, same issue. I'm doing manual ACKing.

The only thing I see weird is the c.g.c.p.v.StreamingSubscriberConnection : stream closed with retryable exception; will reconnect message. I've seen this before though, and I know it's probably just excessive DEBUG logging. Otherwise the logging is just silent and it's hard to understand what's going on.

@danvalencia
Copy link

Another thing I'd like to point out is that we have any number of short lived python processes that communicate via pubsub, and we haven't seen problems in those, so it definitely seems like an issue with long lived processes.

@dinvlad
Copy link

dinvlad commented Sep 11, 2018

@danvalencia per your post in GCP Slack, we observe exactly the same behavior with "bare-bones" Spring and google-cloud-java. So good news it may not be an issue with spring-cloud-gcp itself, bad news it's still largerly unresolved. We've communicated with GCP support but so far haven't arrived at any solutions, only workarounds (e.g. to restart the subscriber every 5 minutes).

What I'd suggest (if possible) is to use google-cloud-java library directly and compare.

EDIT: just to reiterate what was mentioned above, the issue is not with ACKs, because we don't even receive the messages in the first place. Only restarting the subscriber regularly helps resolve this.

@ramzimaalej
Copy link
Author

@meltsufin I have changed the app to just acknowledge the messages without doing any processing and I witnessed the same behavior again. I now strongly believe now that it's not linked to ack deadline extension. As @dinvlad said, we are not even receiving the messages. I will try google-cloud-java and see if the same behavior is happening. At the opposite, I have a data-flow app that reads from a topic and everything works perfectly. I will post more update about the use of native libraries.

@meltsufin
Copy link
Contributor

Thanks for the additional information. We will investigate this further and get back to you.

@kir-titievsky
Copy link

@ramzimaalej This might be a server side issue. Can you please submit a case in the support portal with the details (project number, subscription name, rough time range)? If you don't have a support plan, try cloud-pubsub@google.com

@ramzimaalej
Copy link
Author

@kir-titievsky I contacted cloud-pubsub@google.com 6 days ago, but I have not heard from them yet. I'm on the startup surge program, and I not sure I have access to the support portal. The issue has been happening for the past week consistently. Let me know if you need additional information. Thanks a lot for jumping in on that!

@qlodhi-clearlabs
Copy link

qlodhi-clearlabs commented Sep 11, 2018

We're facing same issue as well! I have just put a "bare-bones" app with Spring Integration directly connecting to google-pubsub-cloud lib bypassing spring-gcp-pubsub and so far the messages are being picked up. I will give it more time before saying it might be spring-gcp-pubsub issue but with previous setup (Spring Integration, spring-gcp-pubsub, google-cloud-pubsub), the issue appeared much faster (and so far an hour in with new setup no issue).

I've also contacted Google and they'll get back to us if it's a server side issue.

Our env: "org.springframework.cloud:spring-cloud-gcp-starter-pubsub:1.0.0.RELEASE",
"org.springframework.integration:spring-integration-core:5.0.7.RELEASE",
"com.google.cloud:google-cloud-pubsub:1.42.0"

and I was thinking of upgrading to 1.1.0.M1 but that's older than current release.

PS: We're also doing manual ack and from what I see in logs it doesn't look like messages are being lost, I've seen it process a message fine and then just stops picking next messages after some idle time. There might be network disconnection during that time and it'll be worth finding out what's causing that from GKE to pubsub but client should be able to recover from those since those issues can still happen in regular scenarios.

@meltsufin
Copy link
Contributor

Can anyone share a simple app that reproduces the issue?

@qlodhi-clearlabs
Copy link

qlodhi-clearlabs commented Sep 12, 2018

I just created this basic app to recreate this scenario following https://spring.io/guides/gs/messaging-gcp-pubsub/ but with newer versions as given below.

I suspect something might be wrong with the config so if we can eliminate this possibility then we can look further before spending time in deeper layers.

I have deployed this on GKE connecting to two subscriptions on a single topic. It received messages from both subscriptions fine and then it received only on single subscription because I accidentally had this app also running locally where it was picking the other messages (as it should). But when I stopped local app, this GKE deployed app didn't pick messages. I will try a clean test without local app also running but those should have been independent connections anyway and this deployed version should start picking up all messages when other client was stopped. When I restarted this version it started picking up fine and I'll try to recreate the original scenario where it stops after a while. Meanwhile, here's the config and if this looks right and I can recreate the original issue, I'll share the app.

UPDATE: After restart it stopped picking messages from both subscriptions.

(I'm really not sure about this PublishSubscribeChannel getting hooked up with multiple PubsubInboundAdapters.)

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.cloud.gcp.pubsub.support.PublisherFactory;
import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import com.google.cloud.pubsub.v1.AckReplyConsumer;

@SpringBootApplication
@EnableIntegration
public class PubsubTestApplication {

  public static void main(String[] args) {
    SpringApplication.run(PubsubTestApplication.class, args);
  }

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate, PublisherFactory publisherFactory, SubscriberFactory subscriberFactory) {
    
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
        "subscription-1");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
  }


  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapterA(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
        "subscription-2");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
  }
  
  @Bean
  public MessageChannel pubsubInputChannel() {
    return new PublishSubscribeChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      System.out.println("Message arrived! Payload: " + new String((byte[])message.getPayload()));
      AckReplyConsumer consumer =
          (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
      consumer.ack();
    };
  }
}

Versions:

compile("org.springframework.cloud:spring-cloud-gcp-starter-pubsub:1.0.0.RELEASE")
compile("org.springframework.integration:spring-integration-core:5.0.7.RELEASE")
compile("com.google.cloud:google-cloud-pubsub:1.43.0")

Spring Boot 2.0.4.RELEASE

Also, I'm using single-threaded subscriber (need it that way for now). I've also tried prototype beans for pubsubTemplate and publisherFactory and subscriberFactory.

spring:
  cloud:
    gcp:
      pubsub:
        subscriber:
          executor-threads: 1

@qlodhi-clearlabs
Copy link

qlodhi-clearlabs commented Sep 12, 2018

I could recreate it with single subscription and 2 subscribing threads. What might be interesting is that when I use gcloud cmd line to pull messages (without acking them), it circles between two messages on the subscription and sometimes even shows 0 messages available only to circle back and show other messages again in next commands. Maybe that's default/expected behavior of pubsub?

Following is the new simple config that still runs into this issue. Meanwhile, my other app bypassing spring-gcp-pubsub (and using SI with google-cloud-pubsub jar) is working fine for several hours.

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
        "subscription-1");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
  }
  
  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      System.out.println(Thread.currentThread().getName() + " - Message arrived! Payload: " + new String((byte[])message.getPayload()));
      AckReplyConsumer consumer =
          (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
      consumer.ack();
    };
  }

Following are the last logs where it received message "abc" successfully and the next message was sent ~15 minutes later that has not been picked up so far.

2018-09-12 01:40:39.120 DEBUG 1 --- [pool-2-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : pubsubTestApplication.messageReceiver.serviceActivator.handler received message: GenericMessage [payload=byte[3], headers={gcp_pubsub_acknowledgement=com.google.cloud.pubsub.v1.MessageDispatcher$3@69fb8fc3, id=ce8c8aa2-16f0-4ed0-b351-7e327587b1a3, timestamp=1536716439119}]
 
pool-2-thread-2 - Message arrived! Payload: abc
 
2018-09-12 01:40:39.121 DEBUG 1 --- [pool-2-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : handler 'pubsubTestApplication.messageReceiver.serviceActivator.handler' produced no reply for request Message: GenericMessage [payload=byte[3], headers={gcp_pubsub_acknowledgement=com.google.cloud.pubsub.v1.MessageDispatcher$3@69fb8fc3, id=ce8c8aa2-16f0-4ed0-b351-7e327587b1a3, timestamp=1536716439119}]
 
2018-09-12 01:40:39.122 DEBUG 1 --- [pool-2-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'pubsubInputChannel', message: GenericMessage [payload=byte[3], headers={gcp_pubsub_acknowledgement=com.google.cloud.pubsub.v1.MessageDispatcher$3@69fb8fc3, id=ce8c8aa2-16f0-4ed0-b351-7e327587b1a3, timestamp=1536716439119}]
 
2018-09-12 01:40:39.144 DEBUG 1 --- [          Gax-3] io.grpc.internal.ClientCallImpl          : Call timeout set to '59999580915' ns, due to context deadline. Explicit call timeout was not set.
 
2018-09-12 01:40:39.146 DEBUG 1 --- [          Gax-3] io.grpc.internal.ClientCallImpl          : Call timeout set to '59996978187' ns, due to context deadline. Explicit call timeout was not set.
 
2018-09-12 01:40:39.147 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] OUTBOUND HEADERS: streamId=29 headers=GrpcHttp2OutboundHeaders[:authority: pubsub.googleapis.com:443, :path: /google.pubsub.v1.Subscriber/ModifyAckDeadline, :method: POST, :scheme: https, content-type: application/grpc, te: trailers, user-agent: Spring/1.0.0.RELEASE spring-cloud-gcp-pubsub/1.0.0.RELEASE grpc-java-netty/1.13.1, x-goog-api-client: gl-java/1.8.0_181 gapic/1.43.0 gax/1.30.0 grpc/1.13.1, grpc-accept-encoding: gzip, authorization: Bearer <removed>, grpc-trace-bin: , grpc-timeout: 59998616u] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
 
2018-09-12 01:40:39.148 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] OUTBOUND HEADERS: streamId=31 headers=GrpcHttp2OutboundHeaders[:authority: pubsub.googleapis.com:443, :path: /google.pubsub.v1.Subscriber/Acknowledge, :method: POST, :scheme: https, content-type: application/grpc, te: trailers, user-agent: Spring/1.0.0.RELEASE spring-cloud-gcp-pubsub/1.0.0.RELEASE grpc-java-netty/1.13.1, x-goog-api-client: gl-java/1.8.0_181 gapic/1.43.0 gax/1.30.0 grpc/1.13.1, grpc-accept-encoding: gzip, authorization: Bearer <removed>, grpc-trace-bin: , grpc-timeout: 59995877u] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
 
2018-09-12 01:40:39.148 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] OUTBOUND DATA: streamId=29 padding=0 endStream=true length=253 bytes=00000000f80a5670726f6a656374732f636c656172766965772d6465762f737562736372697074696f6e732f746573745f72756e5f6d616e6167656d656e745f...
 
2018-09-12 01:40:39.152 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] OUTBOUND DATA: streamId=31 padding=0 endStream=true length=251 bytes=00000000f60a5670726f6a656374732f636c656172766965772d6465762f737562736372697074696f6e732f746573745f72756e5f6d616e6167656d656e745f...
 
2018-09-12 01:40:39.268 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] INBOUND HEADERS: streamId=29 headers=GrpcHttp2ResponseHeaders[:status: 200, content-disposition: attachment, content-type: application/grpc, date: Wed, 12 Sep 2018 01:40:39 GMT] padding=0 endStream=false
 
2018-09-12 01:40:39.271 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] INBOUND DATA: streamId=29 padding=0 endStream=false length=5 bytes=0000000000
 
2018-09-12 01:40:39.271 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] INBOUND HEADERS: streamId=29 headers=GrpcHttp2ResponseHeaders[grpc-status: 0, content-disposition: attachment] padding=0 endStream=true
 
2018-09-12 01:40:39.272 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] INBOUND PING: ack=false bytes=18
 
2018-09-12 01:40:39.272 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] OUTBOUND PING: ack=true bytes=18
 
2018-09-12 01:40:39.273 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] INBOUND HEADERS: streamId=31 headers=GrpcHttp2ResponseHeaders[:status: 200, content-disposition: attachment, content-type: application/grpc, date: Wed, 12 Sep 2018 01:40:39 GMT] padding=0 endStream=false
 
2018-09-12 01:40:39.274 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] INBOUND DATA: streamId=31 padding=0 endStream=false length=5 bytes=0000000000
 
2018-09-12 01:40:39.274 DEBUG 1 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0x65f081bb, L:/10.44.3.228:41826 - R:pubsub.googleapis.com/74.125.197.95:443] INBOUND HEADERS: streamId=31 headers=GrpcHttp2ResponseHeaders[grpc-status: 0, content-disposition: attachment] padding=0 endStream=true

@dhoard
Copy link
Contributor

dhoard commented Sep 12, 2018

As a different data point, we saw a similar issues months ago using the google-cloud-pubsub library (without any Spring code) running on GKE. For our use case. If messages were constantly being published, we saw no issues ... but if there was a period of no messages being published, we would get in the same "disconnected" state and not recover.

Google support provided us with two solutions ...

  1. publish "keep-alive" messages every minute

or

  1. run the code on a Compute node instance (VM) so we could tune the TCP/IP stack to detect socket timeouts by firewalls between our client and the Pub / Sub service.

Because of this issue, along with with the way the google-cloud-pubsub library extends all outstanding messages, we chose to write our own subscriber client using a polling pull (non streaming) which resolved the issue for us running on GKE.

@qlodhi-clearlabs
Copy link

qlodhi-clearlabs commented Sep 12, 2018

I looked into that as well and went as far as creating a client thread using pull method (from pubsubTemplate). But decided against it since the maintenance of the thread (timeout, scalability etc.) would become our responsibility and we'll be reinventing just another client (that's what spring-gcp-pubsub is for). I believe that solution was also not recovering well if network was manually disconnected during the pull call or something similar, can't remember specific scenario/issue.

I'm really surprised they don't have keep-alive functionality built-in to google-cloud-pubsub and I didn't really find an easy way (through searching online) to tweak grpc params (like setting ack-deadline or timing out a streaming connection sooner so it can detect a dead connection) for these libs. There must be a way but just didn't have enough time to hunt for those things when a working system breaks down in PROD.

Since it was working fine till few weeks ago, we know this setup should work so didn't want to invest too much time into writing a custom client.

This is what currently works fine for us using google-cloud-pubsub without running into this connection issue (the underlying disconnectivity might still be there probably due to GKE/Pubsub issues but I guess this method is failing and reconnecting differently, so it recovers fine):

  @Autowired
  public void messageChannelAdapter() {

    pubSubConfig.getSubscriptionConfig().stream().forEach(subscription -> {
      
      log.info("Creating messageChannelAdapter for subscription: {}", subscription.getName());
      
      String subscriptionId = subscription.getName();
      ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
      Subscriber subscriber = null;
      
      try {
        // create a subscriber bound to the asynchronous message receiver
        subscriber = Subscriber.newBuilder(subscriptionName, messageReceiver()).build();
        subscriber.startAsync().awaitRunning();
        
        String beanName = subscriber.toString();
        
        applicationContext.getAutowireCapableBeanFactory().initializeBean(subscriber, beanName);
        
      } finally {
      }
     });
  }

public MessageReceiver messageReceiver() {
    
    return (message, consumer) -> {

      String messageStr = null;
      
      try {
        
        messageStr = message.getData().toStringUtf8();

        log.info("Payload: {}", message);

      } finally {
        log.info("-=- Acking message: {}", messageStr);
        consumer.ack();
        log.info("-=- Message Acked");
      }
    }; 
  }

@dinvlad
Copy link

dinvlad commented Sep 12, 2018

@qlodhi-clearlabs strange, this last method worked fine for us previously, but that's what we've been having issues with recently. Perhaps the Google team fixed it, if it was a server-side issue?

@dhoard
Copy link
Contributor

dhoard commented Sep 12, 2018

@qlodhi-clearlabs I agree ... you shouldn't write your own client. (We did out of necessity.)

Since the Spring PubSubSubscriberTemplate and DefaultSubscriberFactory use the Subscriber in google-cloud-gcp ... this is most likely a core google-cloud-gcp or Pub / Sub server issue.

If you can publish keep-alive messages to the topic / ignore them on the consumer side, it would be a good data point to see if it's truly a socket timeout / disconnect type of issue at the TCP/IP layer. We actually found the keep-alive messages useful to track end to end latency.

@Andy2003
Copy link
Contributor

We encountered the same problem after upgrading to the new version:

  • spring-boot 2.0.4.RELEASE
  • spring-cloud-gcp 1.0.0.RELEASE

Previously, we used a custom build of spring-cloud-gcp based on version 1.0.0.0.M2 for several months. We have never encountered a delay in retrieving messages.

@meltsufin
Copy link
Contributor

The version of cloud-java-pubsub is probably where things changed. Can this behavior be reprodcued with just the Google Cloud Java Pub/Sub client library? If so, the bug should be filed there.

@qlodhi-clearlabs
Copy link

qlodhi-clearlabs commented Sep 12, 2018

We've noticed this with various versions of cloud-java-pubsub (0.47.0-beta, 1.35.0 and now 1.42.0) but in our experience only when spring-gcp-pubsub was also in the mix. It may not be a bug in spring-gcp-pubsub and underlying cause is probably network (dis)connectivity but different combination of these jars behave differently about what to do when dis-connection happens. I wanted to share this for those who might be stuck due to this issue, as we were till yesterday and for now at least we have messages flowing fluently. I would love to switch back to spring-gcp-pubsub. Is there anything spring-gcp-pubsub should do differently in this scenario? (Right now it just gets stuck, as observed.)

@meltsufin
Copy link
Contributor

I'm not sure what our Spring integration with Pub/Sub could do. We're really just delegating the connection management and message retrieval to the cloud-java-pubsub library.

@ramzimaalej
Copy link
Author

ramzimaalej commented Sep 12, 2018

@qlodhi-clearlabs which version of google-cloud-pubsub that works fine for you? I would like to give it a try and see how it behaves.

@qlodhi-clearlabs
Copy link

@ramzimaalej what works for me is direct instantiation of Subscriber from google-cloud-pubsub 1.43.0 jar (I hope that's what you meant when you say cloud-java-pubsub, or is that different?).

The config is given above (with subscriber.startAsync().awaitRunning(); in the code). I've also added an Executor since then to make it single-threaded subscriber as per our requirements.

When I put the same jar integrated with spring-gcp-pubsub and it runs into disconnectivity, it just gets stuck. With the direct connection, it must be doing something differently, so it's able to recover. @meltsufin that's the part that's a bit baffling, if spring-gcp-pubsub is also just creating a subscriber then both scenarios should have the same results, but apparently, something is different.

@ramzimaalej
Copy link
Author

Yes, I meant google-cloud-pubsub. Thank you! I will keep you posted.

@dinvlad
Copy link

dinvlad commented Sep 12, 2018

And we're using "bare-bones" Spring with google-cloud-pubsub. Perhaps there's some interaction with Spring @Service that suspends background threads now?

@danvalencia
Copy link

I was able to reproduce this issue as well, spring boot app (2.0.4.RELEASE) with spring-cloud-gcp (1.0.0.RELEASE) on GKE. Here's the code: https://gist.github.com/danvalencia/6cb0235ec01952bbc54fdc91b106be70

@ramzimaalej
Copy link
Author

@meltsufin I have used google-cloud-pubsub instead of spring-cloud-gcp-starter-pubsub and the application was picking up messages from the queue instantly. I have not noticed any delay. It's been running for an hour so far. I will post more updates later.

@artembilan
Copy link
Contributor

Thank you everyone involved here!

Right now since we know that it works with the google-cloud-pubsub, but not with the spring-cloud-gcp-starter-pubsub, let's try to find what is the difference.
The PubSubInboundChannelAdapter uses a PubSubSubscriberOperations to call this:

this.subscriber = this.pubSubSubscriberOperations.subscribeAndConvert(
				this.subscriptionName, this::consumeMessage, this.payloadType);

That one does this:

Subscriber subscriber =
		this.subscriberFactory.createSubscriber(subscription,
						(message, ackReplyConsumer) -> messageConsumer.accept(
								new ConvertedPushedAcknowledgeablePubsubMessage<T>(
										ProjectSubscriptionName.of(this.subscriberFactory.getProjectId(), subscription),
										message,
										this.getMessageConverter().fromPubSubMessage(message, payloadType),
										ackReplyConsumer)));
subscriber.startAsync();

Where the subscriberFactory goes with this:

public Subscriber createSubscriber(String subscriptionName, MessageReceiver receiver) {
		Subscriber.Builder subscriberBuilder = Subscriber.newBuilder(
				ProjectSubscriptionName.of(this.projectId, subscriptionName), receiver);

		if (this.channelProvider != null) {
			subscriberBuilder.setChannelProvider(this.channelProvider);
		}

		if (this.executorProvider != null) {
			subscriberBuilder.setExecutorProvider(this.executorProvider);
		}

		if (this.credentialsProvider != null) {
			subscriberBuilder.setCredentialsProvider(this.credentialsProvider);
		}

		if (this.headerProvider != null) {
			subscriberBuilder.setHeaderProvider(this.headerProvider);
		}

		if (this.systemExecutorProvider != null) {
			subscriberBuilder.setSystemExecutorProvider(this.systemExecutorProvider);
		}

		if (this.flowControlSettings != null) {
			subscriberBuilder.setFlowControlSettings(this.flowControlSettings);
		}

		if (this.maxAckExtensionPeriod != null) {
			subscriberBuilder.setMaxAckExtensionPeriod(this.maxAckExtensionPeriod);
		}

		if (this.parallelPullCount != null) {
			subscriberBuilder.setParallelPullCount(this.parallelPullCount);
		}

		return subscriberBuilder.build();
	}

All those options for the subscriberBuilder are auto-configured in the GcpPubSubAutoConfiguration: https://github.com/spring-cloud/spring-cloud-gcp/blob/master/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java

Please, give it a shot to analyze that class for the options which are different from those you use in case of plain google-cloud-pubsub.

@meltsufin
Copy link
Contributor

@ramzimaalej Can you clarify which version of google-cloud-pubsub is being used in each case? Thanks!

@Andy2003
Copy link
Contributor

Just for information: since this morning approx. 5:30 CEST until now (the last 11 hours) we have not seen any further unpolled messages.

@qlodhi-clearlabs
Copy link

So, Google came back saying that the issue we faced "was due to an internal problem" and we should continue to do things as we were and report any issues if it appears again. Our original code (using spring-gcp-pubsub) is now receiving messages after running for ~12 hours so it appears that underlying problem is fixed on pubsub side. But for now, we will move to using google-cloud-pubsub directly as we've already migrated half of our environments in this process of troubleshooting.

It is worry-some that code base using spring-gcp-pubsub got stuck while direct subscriber using google-cloud-pubsub continued to work even when there was an underlying problem because that problem can appear again anytime, out of our control. @artembilan good suggestion, I'll continue to dig more in that direction.

@meltsufin
Copy link
Contributor

I'm glad to hear it's now resolved!
Since the issue was inconsistent, it might just be bad luck that spring-cloud-gcp-pubsub got implicated in this. In any case, I'll close this issue because it doesn't seem to be caused by any code in this project.

@ramzimaalej
Copy link
Author

@meltsufin I used this version 1.43.0 for google-cloud-pubsub. With spring 2.0.3.RELEASE the version used for google-cloud-pubsub is 1.35.0.

@meltsufin
Copy link
Contributor

@ramzimaalej Thanks, we try to always stay on the latest of google-cloud-* libs.

@qlodhi-clearlabs
Copy link

@meltsufin it was indeed an underlying issue with pubsub env, but obviously those using spring-gcp-pubsub were unlucky than those using google-cloud-pubsub because code using spring-gcp-pubsub just got stuck. And since there are no guarantees that this issue won't happen again, I'm probably not moving back to using spring-gcp-pubsub until it is figured out why the codebase using spring-gcp-pubsub just stopped processing messages while codebase not using it continued to get messages even when there was an underlying issue.

I'd want to get back to using spring-gcp-pubsub ASAP because of wrapping benefits of it (tweaking params by config instead of writing new code, e.g. for scalability etc.). I'll report here if I can find the difference in next few days, but for now more monitoring on pubsub :)

Thanks a lot for your valuable insights, everyone!

@dinvlad
Copy link

dinvlad commented Sep 13, 2018

it was indeed an underlying issue with pubsub env

@qlodhi-clearlabs could you please clarify what was the issue? was it on Google side? Thanks

@qlodhi-clearlabs
Copy link

@dinvlad yes, as it appeared, the underlying cause was some server-side issue on Google pubsub. They just said "internal problem" and I've asked for little more details so that we can determine client-side behavior based on that scenario but I think that's all we are going to get: an "internal problem".

And I'm not sure how to categorize this, is it that server doesn't respond to a streamingPull request, or it gets stuck and doesn't respond with some other expected message in case of no data, or something else, I just do not know right now.

@drag0s
Copy link

drag0s commented Sep 18, 2018

@ramzimaalej Was the problem fixed for you? I'm experiencing the same issue with GAE and the Node.JS library

@ramzimaalej
Copy link
Author

ramzimaalej commented Sep 19, 2018

@drag0s It was an internal issue on Google side. I advise you to contact them. Yes, the problem is fixed now.

@danneville
Copy link

Is anyone experiencing these issues again? They seem to have come back for us

@qlodhi-clearlabs
Copy link

qlodhi-clearlabs commented Oct 18, 2018

Is anyone experiencing these issues again? They seem to have come back for us

We briefly switched back to spring-gcp configuration few days ago, and the issue started appearing again. Still didn't have time to dig whether its something with out config or an underlying Pubsub issue but switching back to directly using google-cloud-pubsub works fine for us.

@elefeint elefeint pinned this issue Dec 18, 2018
@ChengyuanZhao ChengyuanZhao unpinned this issue Dec 19, 2018
@piotrturski
Copy link

piotrturski commented May 28, 2019

same here. using google-cloud-pubsub with default setting does work while using spring-gcp-pubsub with default settings does not. sping's version stops receiving messages (streaming pull) within minutes on higher traffic. i don't have time to dig which defaults are broken.
it happens even for one instance of the app so it's not related to flow control. also 90% of messages is processed under 0.5s, the longest message takes less than 2s. so it's not about ack extension.
maybe channels?

'springCloudVersion', 'Greenwich.SR1'
org.springframework.boot' version '2.1.4.RELEASE'
google-cloud:1.59.0 and 1.74.0 (same bahavior)

@meltsufin
Copy link
Contributor

We'd appreciate example code that can reliably reproduce the problem to debug further.
Also, please try the synchronous pulling.

@qlodhi-clearlabs
Copy link

same here. using google-cloud-pubsub with default setting does work while using spring-gcp-pubsub with default settings does not. sping's version stops receiving messages (streaming pull) within minutes on higher traffic. i don't have time to dig which defaults are broken.
it happens even for one instance of the app so it's not related to flow control. also 90% of messages is processed under 0.5s, the longest message takes less than 2s. so it's not about ack extension.
maybe channels?

'springCloudVersion', 'Greenwich.SR1'
org.springframework.boot' version '2.1.4.RELEASE'
google-cloud:1.59.0 and 1.74.0 (same bahavior)

Sorry to hear you're having issues.
We've switched to using newer version of spring-cloud-gcp-pubsub since the last time we had issues with it, and it has now been working fine for a couple of months. Can't say if it was underlying issue with pubsub or with previous version of spring-cloud-gcp-pubsub.

These are our current versions, which are working fine:

spring-cloud-gcp-pubsub:1.1.0.RELEASE
google-cloud-pubsub:1.59.0
spring-boot:2.1.3.RELEASE

@meltsufin
Copy link
Contributor

@qlodhi-clearlabs I think we found the underlying issue and it has to do with the keep-alive setting. See the PR above.

@gredwhite
Copy link

@qlodhi-clearlabs I think we found the underlying issue and it has to do with the keep-alive setting. See the PR above.

Which PR you are speaking about?

I experience the same issue in my spring boot application.

I use springBootVersion = "2.2.0.RELEASE"
spring-cloud-gcp-starter-pubsub-1.1.3.RELEASE.jar
com.google.cloud:google-cloud-pubsub:1.98.0

In my case application(library code) accepts message but

inside com.google.cloud.pubsub.v1.MessageDispatcher#processOutstandingMessage

line executor.execute(deliverMessageTask);

doesn't lead to deliverMessageTask task execution for some reason

@meltsufin
Copy link
Contributor

I was referring to #1384
Please try version 1.2.0.RC1. That fix didn't go into the 1.1.x branch.

@brachipa
Copy link

brachipa commented Oct 11, 2020

still happens in 1.2.5.RELEASE
Can be once in a month, but it still happens, restart resolve it, but i can't relay on it

@elefeint
Copy link
Contributor

@brachipa Could you open a new issue describing your environment, and any relevant log data? It's somewhat unlikely to be the same exact issue; we'd have to re-investigate the cause.

@qlodhi-clearlabs
Copy link

still happens in 1.2.5.RELEASE
Can be once in a month, but it still happens, restart resolve it, but i can't relay on it

Yeah, we noticed this again a couple of weeks ago - undetected loss of connection, reconnected after 1.5 hours in one instance. Haven't noticed it since then, maybe it has happened again but we didn't really monitor it to be sure. We're in the process of upgrading libs.

@elefeint
Copy link
Contributor

@qlodhi-clearlabs If the connection reestablished without a restart, then the client library has done the right thing. If you observe messages getting stuck requiring a restart, though, could you comment on #2552 (the new issue @brachipa created)?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
pubsub GCP PubSub
Development

No branches or pull requests