Skip to content

Commit

Permalink
Do not retry some exceptions (#231)
Browse files Browse the repository at this point in the history
* typo

* do not retry some messages.

* dummy

* add test case

* make log more  verbose to seek attention

* catch throwable as it could be leading to premature exiting.

* status being null, set it to failed.

* javadoc

* handle cause as well

* javadoc update

* doc update

---------

Co-authored-by: Sonu Kumar <sonu@git>
  • Loading branch information
sonus21 and Sonu Kumar authored Jul 8, 2024
1 parent 7f96b9c commit 6baf574
Show file tree
Hide file tree
Showing 32 changed files with 1,129 additions and 860 deletions.
7 changes: 7 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ layout: default

All notable user-facing changes to this project are documented in this file.

## Release [3.2.0] 10-July-2024
### Fixes
* Fixed typo #218

### Feature
* Do not retry some exceptions

## Release [3.1.1] 1-Mar-2024
### Fixes
* Fixed issue for spring boot 3.2 #218
Expand Down
57 changes: 28 additions & 29 deletions docs/callback-and-events.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
---
layout: default
title: Callback and Events
title: Callbacks and Events
nav_order: 5
description: Callbacks and Events in Rqueue
permalink: /callback-and-events
---

Rqueue provides different types of callbacks and events.
Rqueue provides various types of callbacks and events for handling message processing and
application events.

Message Processor/Callback
---------------------------
Rqueue supports following message processors. Message processors can be used for different purpose,
some use case could be setting up tracer, creating transaction etc.
## Message Processors/Callbacks

#### Pre Execution Processor
Rqueue supports the following message processors, which can be used for different purposes such as
setting up tracers or managing transactions.

This message processor is called before calling the handler methods, if the method returns `false`
then message handler won't be called.
### Pre Execution Processor

This message processor is invoked before calling the handler methods. If the processor
returns `false`, the message handler will not be called.

```java
class RqueueConfiguration {
Expand All @@ -35,9 +36,9 @@ class RqueueConfiguration {
}
```

#### Discard Execution Processor
### Discard Execution Processor

This message processor would be called whenever a message is discarded due to retry limit exceed.
This message processor is called whenever a message is discarded due to exceeding the retry limit.

```java
class RqueueConfiguration {
Expand All @@ -56,9 +57,9 @@ class RqueueConfiguration {
}
```

#### Dead Letter Queue Processor
### Dead Letter Queue Processor

This message processor would be called whenever a message is moved to dead letter queue.
This message processor is called whenever a message is moved to the dead letter queue.

```java
class RqueueConfiguration {
Expand All @@ -77,9 +78,9 @@ class RqueueConfiguration {
}
```

#### Manual Deletion Processor
### Manual Deletion Processor

This message processor would be called whenever a message is deleted manually.
This message processor is called whenever a message is deleted manually.

```java
class RqueueConfiguration {
Expand All @@ -98,9 +99,9 @@ class RqueueConfiguration {
}
```

#### Post Execution Processor
### Post Execution Processor

This message processor is called on successful consumption of the message.
This message processor is called upon successful consumption of the message.

```java
class RqueueConfiguration {
Expand All @@ -119,20 +120,18 @@ class RqueueConfiguration {
}
```

Events
------------

Rqueue generates two types of **application** events one for the Rqueue container start/shutdown and
one for the task execution status.
## Events

#### Job/Task Execution Event
Rqueue generates two types of application events: one for Rqueue container start/shutdown and
another for task execution status.

On the completion of each task Rqueue generated `RqueueExecutionEvent`, the application can listen
to this event. This event has a job object that can provide all the information this job.
### Job/Task Execution Event

#### Application Bootstrap Event
Upon completion of each task, Rqueue generates `RqueueExecutionEvent`, which the application can
listen to. This event contains a job object providing all relevant information about the job.

Once RqueueListenerContainer is started it will emit one `RqueueBootstrapEvent`, this event is
generated post container shutdown. This event can be used for different purpose like queue
registration, cleaning up some local state etc.
### Application Bootstrap Event

Once the RqueueListenerContainer is started, it emits `RqueueBootstrapEvent`. This event is
generated post container shutdown and can be used for tasks such as queue registration or cleaning
up local states.
154 changes: 90 additions & 64 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,31 @@ has_children: true
permalink: configuration
---


# Configuration

{: .no_toc }

Rqueue has many configuration settings that can be configured either using application config or code.
Rqueue offers many configuration settings that can be adjusted either through the application
configuration or directly in the code.

{: .fs-6 .fw-300 }

## Table of contents

{: .no_toc .text-delta }

1. TOC
{:toc}
{:toc}

---
Apart from the basic configuration, it can be customized heavily, like number of tasks it would be
executing concurrently. More and more configurations can be provided using
`SimpleRqueueListenerContainerFactory` class. See SimpleRqueueListenerContainerFactory [doc](https://javadoc.io/doc/com.github.sonus21/rqueue-core/latest/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.html) for more configs.
Apart from the basic configuration, Rqueue can be heavily customized, such as adjusting the number
of tasks executed concurrently. Additional configurations can be provided using
the `SimpleRqueueListenerContainerFactory` class. See
SimpleRqueueListenerContainerFactory [doc](https://javadoc.io/doc/com.github.sonus21/rqueue-core/latest/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.html)
for more configs.

```java

@Configuration
public class RqueueConfiguration {
@Bean
Expand All @@ -35,19 +41,22 @@ public class RqueueConfiguration {
```

## Task or Queue Concurrency
By default, the number of task executors are twice the number of queues. A custom or shared task
executor can be configured using factory's `setTaskExecutor` method. It's also possible to provide
queue concurrency using `RqueueListener` annotation's field `concurrency`. The concurrency could be
some positive number like 10, or range 5-10. If queue concurrency is provided then each queue will
use their own task executor to execute consumed messages, otherwise a shared task executor is used
to execute tasks. A global number of workers can be configured using `setMaxNumWorkers` method.
`RqueueListener` annotation also has `batchSize` field, the default values are as,
listener having concurrency set will fetch 10 messages while other 1.

By default, the number of task executors is twice the number of queues. You can configure a custom
or shared task executor using the factory's `setTaskExecutor` method. Additionally, queue
concurrency can be set using the `RqueueListener` annotation's `concurrency` field, which can be a
positive number like 10 or a range like 5-10. If queue concurrency is specified, each queue will use
its own task executor to handle consumed messages; otherwise, a shared task executor is used.

A global number of workers can be configured using the `setMaxNumWorkers` method.
The `RqueueListener` annotation also has a `batchSize` field. By default, listeners with a
concurrency
set will fetch 10 messages, while others will fetch 1.

{: .note}
Increasing batch size has its consequences, if your thread pool size is too low in that case
you would see many processing jobs since task would be rejected by executor unless you've configured
large queueCapacity.
Increasing the batch size has its consequences. If your thread pool size is too low, you may
encounter many processing jobs being rejected by the executor unless you have configured a
large `queueCapacity`.

```java
class RqueueConfiguration {
Expand Down Expand Up @@ -80,9 +89,11 @@ class RqueueConfiguration {
}
```

When a custom executor is provided, then you must set MaxNumWorkers correctly, otherwise thread pool
might be over or under utilised. Over utilization of thread pool is not possible, it will reject new
tasks, this will lead to delay in message consumption, under utilization can be handled as
When a custom executor is provided, it is essential to set `MaxNumWorkers` correctly. Otherwise, the
thread pool might be over- or under-utilized. Over-utilization of the thread pool is not possible,
as it will reject new tasks, leading to delays in message consumption. Under-utilization can be
managed by ensuring proper configuration of the executor and adjusting the `MaxNumWorkers` setting
appropriately.

```
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
Expand All @@ -94,43 +105,51 @@ threadPoolTaskExecutor.afterPropertiesSet();
factory.setTaskExecutor(threadPoolTaskExecutor);
```

In this configuration there are three variables `corePoolSize`, `maxPoolSize` and `queueCapacity`.
In this configuration, there are three key variables: `corePoolSize`, `maxPoolSize`,
and `queueCapacity`.

* `corePoolSize` signifies the lower limit of active threads.
* `maxPoolSize` signifies the upper limit of active threads.
* `queueCapacity` signify even though we have `maxPoolSize` running threads we can have
`queueCapacity` tasks waiting in the queue, that can be dequeue and executed by the existing thread
as soon as the running threads complete the execution.
- `corePoolSize` signifies the lower limit of active threads.
- `maxPoolSize` signifies the upper limit of active threads.
- `queueCapacity` signifies that even if you have `maxPoolSize` running threads, you can
have `queueCapacity` tasks waiting in the queue, which can be dequeued and executed by the
existing threads as soon as the running threads complete their execution.

If you have N queues then you can set maximum number of workers as `(maxPoolSize + queueCapacity - N )`
If you have N queues, you can set the maximum number of workers
as `(maxPoolSize + queueCapacity - N)`.

{: .warning}
Here N threads are provided for polling queues, this is not a correct number when **priority** is
used.
In this context, N threads are allocated for polling queues, but this is not a correct number when *
*priority** is used.

The number of message pollers would be sum of the followings.
The number of message pollers is determined by the sum of the following:

1. Number of unique priority groups.
2. Number of queues whose priority is provided as `"critical=5,high=2"`.
2. Number of queues with specified priorities (e.g., `"critical=5,high=2"`).
3. Number of queues without priority.

If you don't want to go into the maths, then you can set
If you prefer not to delve into the calculations, you can set the following:

* queueCapacity >= 2 * number of queues
* maxPoolSize >= 2 * number of queues
* corePoolSize >= number of queues
- `queueCapacity >= 2 * number of queues`
- `maxPoolSize >= 2 * number of queues`
- `corePoolSize >= number of queues`
-

{: .note}
Whenever you set queue capacity to non-zero then it can create duplicate message problem,
since the polled messages are just waiting to be executed, if visibilityTimeout expires than other
message listener will pull the same message.
Setting a non-zero `queueCapacity` can indeed lead to duplicate message problems. This occurs
because polled messages that are waiting to be executed might have their `visibilityTimeout` expire,
causing another message listener to pull the same message again. This scenario can result in
duplicate processing of messages, which can impact the correctness of your application's logic. To
mitigate this issue, it's crucial to carefully configure `queueCapacity` and `visibilityTimeout`
settings to ensure that messages are processed correctly without duplication.

## Manual start of the container

Whenever container is refreshed or application is started then it is started automatically, it also
comes with a graceful shutdown. Automatic start of the container can be controlled
using `autoStartup` flag, when autoStartup is false then application must call start and stop
methods of container. For further graceful shutdown application should call destroy method as well.
When using a container that starts automatically and offers graceful shutdown, you can control its
automatic startup behavior using the `autoStartup` flag. If `autoStartup` is set to `false`, then
your application needs to manually call the `start` and `stop` methods of the container to control
its lifecycle. Additionally, for a graceful shutdown, you should call the `destroy` method when
appropriate. This gives you finer control over when the container starts and stops within your
application's lifecycle.

```java
class RqueueConfiguration {
Expand Down Expand Up @@ -172,17 +191,15 @@ public class BootstrapController {

## Message converters configuration

Generally any message can be converted to and from without any problems, though it can be customized
by providing an implementation `org.springframework.messaging.converter.MessageConverter`, this
message converter must implement both the methods of `MessageConverter` interface. Implementation
must make sure the return type of method `toMessage` is `Message<String>` while as in the case
of `fromMessage` any object can be returned as well.
To configure the message converter, you can only use application configuration by specifying the
property `rqueue.message.converter.provider.class=com.example.MyMessageConverterProvider`. This
approach allows you to customize message conversion behavior using your own implementation
of `org.springframework.messaging.converter.MessageConverter`. Typically, this customization ensures
that messages can be converted to and from various formats smoothly within your application.

We can configure message converter only using application configuration using property
`rqueue.message.converter.provider.class=com.example.MyMessageConverterProvider`
{: .note}
MyMessageConverterProvider class must implement
`com.github.sonus21.rqueue.converter.MessageConverterProvider` interface.
MyMessageConverterProvider class must
implement `com.github.sonus21.rqueue.converter.MessageConverterProvider` interface.

```java
class MyMessageConverterProvider implements MessageConverterProvider {
Expand All @@ -195,21 +212,30 @@ class MyMessageConverterProvider implements MessageConverterProvider {
}
```

The default implementation is `DefaultMessageConverterProvider`, ths converter
returns `DefaultRqueueMessageConverter`. DefaultRqueueMessageConverter can encode/decode most of the
messages, but it will have problem when message classes are not shared across application. If you do
not want to share class as jar files then you can
use `com.github.sonus21.rqueue.converter.JsonMessageConverter`
or `org.springframework.messaging.converter.MappingJackson2MessageConverter` these converters
produce `JSON` data. Other implementation can be used as well MessagePack, ProtoBuf etc
The default implementation, `DefaultMessageConverterProvider`,
returns `DefaultRqueueMessageConverter`. While `DefaultRqueueMessageConverter` can handle encoding
and decoding for most messages, it may encounter issues when message classes are not shared across
applications. To avoid sharing classes as JAR files, you can opt for converters such
as `com.github.sonus21.rqueue.converter.JsonMessageConverter`
or `org.springframework.messaging.converter.MappingJackson2MessageConverter`. These converters
serialize messages into JSON format, facilitating interoperability without shared class
dependencies.

Additionally, alternatives like MessagePack or ProtoBuf can also be employed based on specific
requirements for message serialization and deserialization. Each of these options provides
flexibility in how messages are encoded and decoded across different systems and applications.

## Additional Configuration

- **rqueue.retry.per.poll** : The number of times, a polled message should be tried before declaring it
dead or putting it back in the simple queue. The default value is `1`, that means a message would
be executed only once and next execution will happen on next poll. While if we increase this
to `N` then the polled message would be tries consecutively N times before it will be made
available for other listeners.
-
- **rqueue.retry.per.poll**: This setting determines how many times a polled message should be
retried before declaring it dead or moving it back into the queue for subsequent retries. The
default value is `1`, meaning a message will be processed once initially, and if it fails, it will
be retried on the next poll. If you increase this value to `N`, the polled message will be retried
consecutively N times before it is considered failed and made available for other listeners to
process.

This configuration allows you to control how many times Rqueue attempts to process a message before
handling it as a failed message, giving you flexibility in managing message retries and error
handling strategies.


Loading

0 comments on commit 6baf574

Please sign in to comment.