Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

acknowledge message #119

Open
mantovani opened this issue Nov 1, 2017 · 6 comments
Open

acknowledge message #119

mantovani opened this issue Nov 1, 2017 · 6 comments

Comments

@mantovani
Copy link

I couldn't find how to acknowledge a message, suggestion ?

channel.basicAck(...)

@kaveh-hariri
Copy link

I'm attempting to do the same thing, re issue #117
I think we are going to have to update the library itself. It may be easier to do it with the receiver approach than the distributed approach.

I've only looked at it for a few hours, but here is what I think needs to be done

  1. create a new ack type which requires an ack, but does not send an ack on processDelivery
  2. save the deliveryTags
  3. keep the channel open
  4. Create a method to send the acks to the open channel after you have processed your data.

But let's wait to hear from some of the contributors as they may have an easier route.

@mantovani
Copy link
Author

mantovani commented Nov 1, 2017

This is a feature for a minimum level of reliability.

In my case, I'm sending messages to Kafka using the asynchrony batch feature, after the batch is successfully committed onto the broker, it send the ack to RabbitMQ. This process works through a callback implementation.

broker.send("id",callback())

For Spark Streaming, the callback need to be serializable. RabbitMQ don't have a feature which we can send a bulk of deliveryTags at once to the server, we just can ack all the preview tags which the context is based on the channel.

On the section "Design Patterns for using foreachRDD
" https://spark.apache.org/docs/latest/streaming-programming-guide.html

We can use a RabbitMQ's Channel object to achieve the desired design, this Channel object can be preview created and used for the creation of the streaming. I'm not sure if the RabbitMQ deliveryTags are tied to a specific Channel object or not. If it's tied, we can create a UUID and map it to the Channel object deliveryTag (i.e UUID -> (Channel,DeliveryTag)). The only issue that I didn't look yet, is if the Channel object can be serializable.

@kaveh-hariri
Copy link

I was able to achieve this with the receiver approach. Basically you need to comment out the basicAck in the "processDelivery" method.
image

Next make the channel available via the Consumer object:
image
image

And finally ack the messages after processing the data.
image

@fjavierjimenez
Copy link
Contributor

The distributed approach is not reliable at all. It send the ack just after consuming the message from the queue without doing anything else. If you don't want to loose any message, you have to use the receiver approach.

@kaveh-hariri removing the ack in the receiver you are delegating the responsibility to send the ack to the spark developer. In your case its works, but you are creating an extra job just to send the acks processed by the previous job. For example, if you set the property spark.streaming.concurrentJobs to 2, It will not work correctly, because both job will run concurrently.

I think the proper way to make sure you don't lost any message is to set the property spark.streaming.receiver.writeAheadLog.enable. Using this property Spark will save the message consumed before sending the ack. Obviously, this solution will decrease the performance cause spark will have to save every message to your HDFS/S3/local filesystem....

@kaveh-hariri
Copy link

kaveh-hariri commented Nov 22, 2017

@fjavierjimenez Hi, yes, you need the ack job to run after ingestion job is complete, so you need to leave the concurrentJobs setting at its default of 1. From my understanding that setting is experimental anyway.

I'm currently running this job in a production environment, using a 1 minute window. There are 3 distinct stages to the job, a repartition of the data, an ingestion, and an ack. The ack stage for even 100k messages (roughly 35kb per message), is taking .5 seconds or less, so even though it's a separate stage, it's not taking much time/resources.

Regarding the write ahead log, two questions:

  1. How are we sure that the ack is sent post data being written to the write ahead log? Looking at the receiver approach, the message is acked immediately after it is stored in memory. Not sure if there will still be zero data loss if the job is killed while consuming.
  2. How do we make sure the next time the job is started, it reads the data from the write ahead logs?

edit: from databricks:
https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html
image

Basically, it does not look like write-ahead logs with the receiver approach as is will ensure zero-data loss. According to the above, since the ack is sent on storing the data into memory, the buffered data will be lost even with write-ahead logs.

@niloo-sh
Copy link

@kaveh-hariri, At the last stage, I get this error that "getOrElse" can not apply on a row of RDD, Do you have a solution, and my next question is that, is "basicAck" method of Consumer parallelizable?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants