Releases: JaidenAshmore/java-dynamic-sqs-listener
7.0.0
Release with version bumps, a refactor of the spring modules and micronaut support.
Micronaut support!
Big thanks to lucjross, we have added support for micronaut
Usage
-
Include the Micronaut core dependency with Maven
<dependencies>
:<dependency> <groupId>com.jashmore</groupId> <artifactId>java-dynamic-sqs-listener-micronaut-core</artifactId> <version>${sqs.listener.version}</version> </dependency>
Or with Gradle:
dependencies { implementation("com.jashmore:java-dynamic-sqs-listener-micronaut-core:${sqs.listener.version}") }
-
Also, include the Micronaut annotation processor with Maven:
<pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven.compiler.version}</version> <configuration> <annotationProcessorPaths> <annotationProcessorPath> <groupId>com.jashmore</groupId> <artifactId>java-dynamic-sqs-listener-micronaut-inject-java</artifactId> <version>${sqs.listener.version}</version> </annotationProcessorPath> </annotationProcessorPaths> </configuration> </plugin> </plugins> </pluginManagement>
Or with Gradle:
dependencies { annotationProcessor("com.jashmore:java-dynamic-sqs-listener-micronaut-inject-java:${sqs.listener.version}") }
Micronaut will use this at compile time to transform usages of core listener annotations to enable
method processors to register annotated methods as message listeners. -
In one of your beans, attach a
@QueueListener or other supported annotation
to a method indicating that it should process messages from a queue.@Singleton public class MyMessageListener { // The queue here can point to your SQS server, e.g. a // local SQS server or one on AWS @QueueListener("${insert.queue.url.here}") public void processMessage(@Payload final String payload) { // process the message payload here } }
This will use any configured
SqsAsyncClient
in the application context for connecting to the queue, otherwise a default
will be provided that will look for AWS credentials/region from multiple areas, like the environment variables.
See the README.md for more details on how to configure
Version Bumps
Spring Boot 3.1.2 to 3.3.4
Jackson 2.15.2 to 2.18.0
sl4fj api 2.0.7 to 2.0.16
ktor 2.3.2 to 2.3.12
See #409
Spring API Changes
The Spring package was split up between spring-api and spring-core but when we wanted to integrate micronaut support we saw that a lot of the infrastructure for Spring is relevant to other libraries. This change pulls out a bunch of the spring common code into the root api/core packages. This would only impact users that have extended the libraries and would mostly just require some small changes to align.
The annotations for Spring have changed from import com.jashmore.sqs.spring.container.*
to com.jashmore.sqs.annotations.core.*
as the annotations logic has been pulled to the core library.
See #411 for more details about all of the changes.
6.0.0
Version upgrade release! This bumps all the dependencies to allow for the library to work for Spring Boot 3. Upgrading this library to 6.0.0 would involve making sure the corresponding dependencies are at a later version:
- Java 17 (Spring Boot 3 requires it so this library has just been bumped to 17)
- Spring Boot 2.5.14 to 3.1.2
- Jackson 2.12.7 to 2.15.2
- sl4fj api 1.7.36 to 2.0.7
- Spring Cloud 2.2.3.RELEASE to 2.2.8.RELEASE
- ktor 2.2.4 to 2.3.2
5.0.0
This is a small major release that upgrades some of the dependencies, such as Spring Boot, as well as removing the default message visibility for the Spring Queue Listener annotations.
Enhancements
Upgrade dependencies [GH-367]
See #368 for the list of dependency changes.
Spring SQS Listeners should not default to 30s visibility timeout [GH-365]
Changes the Spring Queue Listener annotations to use the Visibility Timeout set on the AWS SQS queue instead of defaulting to 30 seconds. This should prevent unexpected re-processing of messages when it wasn't realised that there was a default. This was also applied to the Kotlin DSL.
Upgrade Steps
If you wish to maintain the existing functionality you will need to change your queue annotations from:
@QueueListener("${insert.queue.url.here}")
to be
@QueueListener(value = "${insert.queue.url.here}", messageVisibilityTimeoutInSeconds = 30)
4.4.0
Enhancements
Move Prefetching/Batching listener logic into core library [GH-345]
Moved the construction of Spring's @PrefetchingQueueListener and @QueueListener into the core library as the PrefetchingMessageListenerContainer and BatchingMessageListenerContainer respectively. This should simplify the usage of the core library, as well as making it easier to provide custom construction of these containers in the wrapping libraries, like the Spring or Ktor frameworks.
Attach MessageProcessingDecorators to individual Spring Message listeners [GH-343]
Currently it is only possible to attach a MessageProcessingDecorator to a core message listener by including it as a Spring bean. This has the disadvantage that it will be applied to all message listeners and therefore it can be desirable to want to apply it to only a subset of listeners. This can be achieved implementing a MessageProcessingDecoratorFactory which will have the opportunity to wrap a message listener, e.g. by looking for an annotation.
Example
- Implement the factory, for example one that looks for the
MyAnnotation
public class MyDecoratorFactory implements MessageProcessingDecoratorFactory<MyDecorator> { @Override public Optional<MyDecorator> buildDecorator( SqsAsyncClient sqsAsyncClient, QueueProperties queueProperties, String identifier, Object bean, Method method ) { return AnnotationUtils .findMethodAnnotation(method, MyAnnotation.class) .map(annotation -> new MyDecoratorProperties(annotation.value())) .map(properties -> new MyDecorator(properties)); } }
- Add it as a bean in the Spring Context
@Configuration class MyConfiguration { @Bean MyDecoratorFactory myDecoratorFactory() { return new MyDecoratorFactory(); } }
- Apply the annotation to your message listener
@QueueListener("${insert.queue.url.here}") @MyAnnotation("someValue") public void processMessage(@Payload final String payload) { // process the message payload here }
Support dynamic configuration of Spring Message Listener containers during runtime [GH-339]
Adds the ability to more easily override the values for a message listener annotation by providing a custom annotation parser bean. A use case for this it to provide custom logic for calculating the concurrency level allowing the message listener to dynamically change the concurrency rate.
Available annotation parsers
- QueueListenerParser: used for the @QueueListener message listener.
- PrefetchingQueueListenerParser: used for the @PrefetchingQueueListener message listener.
- FifoQueueListenerParser: used for the @FifoQueueListener message listener.
Example
-
Provide a custom annotation parser bean, for example the PrefetchingQueueListenerParser which is used to parse the @PrefetchingQueueListener annotation.
public class CustomPrefetchingQueueListenerParser extends PrefetchingQueueListenerParser { private static final Random random = new Random(); private final LoadingCache<Boolean, Integer> cachedConcurrencyLevel = CacheBuilder .newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .build(CacheLoader.from(() -> random.nextInt(10))); public CustomPrefetchingQueueListenerParser(Environment environment) { super(environment); } @Override protected Supplier<Integer> concurrencySupplier(PrefetchingQueueListener annotation) { return () -> cachedConcurrencyLevel.getUnchecked(true); } }
In this example above we are using a Guava cache to make sure that the concurrency level is cached for a certain time period before it switches values.
-
Include this parser as a bean, this will replace the existing implementation and will be used for all @PrefetchingQueueListener message listeners.
class MyConfiguration {
public PrefetchingQueueListenerParser customParser(final Environment environment) {
return new CustomPrefetchingQueueListenerParser(environment);
}
}
Upgrade steps
This will not break consumers of this library but if you are building your own aspects of this library manually you may need to modify some of the constructors. For example the PrefetchingMessageListenerContainerFactory now takes a PrefetchingQueueListenerParser instead of the Spring Environment
.
Support for dynamically increasing visibility timeout for long-running message processing [GH-320]
Allows for the processing of long running messages with the message visibility of the message being automatically extended when it is about to reach its limit. If the message takes longer than a certain amount of time without completing it will be forced stopped via interrupting the thread.
Note: this only works with synchronous processing of messages, e.g. ones that do not return a CompletableFuture
. Asynchronous message listener's will be supported at a later point.
Usage
Core
- Add the AutoVisibilityExtenderMessageProcessingDecorator to a DecoratingMessageProcessor wrapping your main MessageProcessor like the LambdaMessageProcessor.
final MessageProcessingDecorator autoVisibilityExtender = new AutoVisibilityExtenderMessageProcessingDecorator( sqsAsyncClient, queueProperties, new AutoVisibilityExtenderMessageProcessingDecoratorProperties() { @Override public Duration visibilityTimeout() { return Duration.ofMinutes(1); } @Override public Duration maxDuration() { return Duration.ofMinutes(5); } @Override public Duration bufferDuration() { return Duration.ofSeconds(30); } } ); MessageProcessor processor = new DecoratingMessageProcessor( "listener-identifier", que...
4.3.0
Enhancements
FIFO SQS support using a FifoMessageListenerContainer [GH-313]
Adds the ability to consume FIFO SQS queues guaranteeing that messages in a message group consumed are executed in order and are not run at the same time. For more details about configuring this message listener, take a look at the FifoMessageListenerContainerProperties.
Usage
Java
public class Main {
public static void main(String[] args) throws InterruptedException {
final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
final MessageListenerContainer container = new FifoMessageListenerContainer(
queueProperties,
sqsAsyncClient,
() ->
new LambdaMessageProcessor(
sqsAsyncClient,
queueProperties,
message -> {
// process the message here
}
),
ImmutableFifoMessageListenerContainerProperties.builder().identifier("listener-identifier").concurrencyLevel(10).build()
);
container.start();
Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
Thread.currentThread().join();
}
}
Spring Boot
@Component
class MessageListeners {
@FifoQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10)
public void fifoListener(@Payload final String body) {
// process message here
}
}
Kotlin DSL/Ktor
fifoMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
concurrencyLevel = { 10 }
processor = lambdaProcessor {
method { message ->
// process the message payload here
}
}
}
Version Bumps
Core
org.immutables:value-annotations +2.8.8
com.fasterxml.jackson.core:jackson-databind 2.11.1 -> 2.11.2
software.amazon.awssdk:sqs 2.13.66 -> 2.14.22
Spring
org.springframework.boot:spring-boot-dependencies 2.3.2.RELEASE -> 2.3.4.RELEASE
AWS Xray Extension
com.amazonaws:aws-xray-recorder-sdk-core 2.6.1 -> 2.7.1
Brave Extension
io.zipkin.brave:brave 5.12.4 -> 5.12.6
Spring Cloud Schema Registry Extension
org.springframework.cloud:spring-cloud-schema-registry-client 1.0.7.RELEASE -> 1.0.8.RELEASE
Ktor
io.ktor:ktor-server-core 1.3.2 -> 1.4.0
4.2.0
Enhancements
Set value of isAutoStartup for DefaultMessageListenerContainerCoordinator [GH-273]
Improves the DefaultMessageListenerContainerCoordinator by allowing the auto startup of the containers to be configured. For more information see the Spring - How to prevent containers starting on startup guide.
Adds a getContainers
method to the MessageListenerContainerCoordinator to allow for smarter configuration of the containers during runtime.
Remove the dependency management for the modules in favour of explicit versioning [GH-270]
Updates how the dependencies of the library are managed resulting in the removal of the <dependencyManagement>
/ constaints
for each module in favour of explicitly setting the versions. The reason for this was a module like the api had a dependency management section setting values for libraries like Jackson or Avro which this module does not care about.
4.1.0
Enhancements
Add ability to create a message processor using lambdas [GH-255]
The CoreMessageProcessor required the use of the Java reflection API to indicate the message listener method. This works great for the Spring Boot applications where you want to run a method on a bean in the application, it doesn't work well for simpler applications like a basic Java app or simpler Web application frameworks like Ktor. This adds the ability to use a lambda/the Java functional API for defining the message listener method instead.
Examples
Synchronous Lambda consuming the message
new LambdaMessageProcessor(sqsAsyncClient, queueProperties, (message) -> log.info("processing message"));
Asynchronous Lambda consuming the message
new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, (message) -> {
log.info("processing message");
return CompletableFuture.runAsync(() -> {
// do asynchronous processing here
});
});
Asynchronous Lambda using an acknowledge field
new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, (message, acknowledge) -> {
log.info("processing message");
return CompletableFuture.runAsync(() -> {
// do asynchronous processing here
acknowledge.acknowledgeSuccessful();
});
});
Core Kotlin Dsl Example
val container = coreMessageListener("identifier", sqsAsyncClient, queueUrl) {
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
// ...other configuration
}
Add Batching and Prefetching Kotlin DSL container builders [GH-259]
The Spring Starter for the library provided some core annotations for listening to messages to simplify the configuration of a message listener by providing defaults for most of the configuration; see the @QueueListener and @PrefetchingQueueListener annotations. This adds these two types of message listeners into the Core Kotlin DSL so that consumers can more easily configure new message listeners.
Examples
val container = batchingMessageListener("identifier", sqsAsyncClient, queueUrl) {
concurrencyLevel = { 10 }
batchSize = { 5 }
batchingPeriod = { Duration.ofSeconds(5) }
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
or
val container = prefetchingMessageListener("identifier", sqsAsyncClient, queueUrl) {
concurrencyLevel = { 2 }
desiredPrefetchedMessages = 5
maxPrefetchedMessages = 10
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
Add Ktor Integration [GH-254]
Added a Ktor integration so that you can easily setup message listeners in a Ktor application. This utilises the Core Kotlin DSL to configure the message listeners and will take the containers and hook it into the lifecycle of the Ktor server.
Example
val server = embeddedServer(Netty, 8080) {
prefetchingMessageListener("prefetching-listener", sqsAsyncClient, queueUrl) {
concurrencyLevel = { 2 }
desiredPrefetchedMessages = 5
maxPrefetchedMessages = 10
processor = lambdaProcessor {
method { message -> log.info("Message: {}", message.body()) }
}
}
}
See the Ktor Example for an example application that can be used.
4.0.0
Enhancements
New MessageProcessingDecorators functionality
Added support to wrap the message processing logic via a MessageProcessingDecorator. This allows the addition of functionality like tracing, metrics or any other functionality.
Xray Support [GH-150]
Added Support for Xray Tracing allowing you to trace messages being processed by your message listener. See Core - How to add Xray Tracing or Spring - How to add Xray Tracing for guides on how to integrate this. You can use the AWS Xray Spring Example to see an example of a Spring service with this configured.
Brave Support [GH-159]
Added support for Brave Tracing allowing you to trace messages being processed processed by your message listener. See Core - How to add Brave Tracing or Spring - How to Add Brave Tracing for guides on how to integrate this. You can use the Spring Sleuth Example to see an example of a Spring service with this configured.
Added a Core Kotlin DSL [GH-216]
You can now use the core module using a Kotlin DSL reducing the verbosity in configuring a message listener. See Core - Kotlin DSL for guides on how to integrate this. You can use the Core Kotlin Example to see an example of a Java program using the DSL.
Example
val container = coreMessageListener("core-example-container", sqsAsyncClient, queueUrl) {
broker = concurrentBroker {
concurrencyLevel = cached(Duration.ofSeconds(10)) {
Random.nextInt(concurrencyLimit)
}
concurrencyPollingRate = { concurrencyLevelPeriod }
errorBackoffTime = { Duration.ofMillis(500) }
}
retriever = prefetchingMessageRetriever {
desiredPrefetchedMessages = 10
maxPrefetchedMessages = 20
}
processor = coreProcessor {
argumentResolverService = coreArgumentResolverService(objectMapper)
bean = MessageListener()
method = MessageListener::class.java.getMethod("listen", Request::class.java, String::class.java)
decorators {
add(BraveMessageProcessingDecorator(tracing))
}
}
resolver = batchingResolver {
bufferingSizeLimit = { 1 }
bufferingTime = { Duration.ofSeconds(5) }
}
}
Remove usage of Guava [GH-166]
Guava was being used for some helper functions and it didn't seem like it was worth the extra dependency for the little gain that was provided for it. Therefore, Guava has been completely removed from the library and any necessary functions were reproduced in this library.
Improve component properties and make them consistent [GH-231]
The component properties now use a Duration
type instead of integers/longs. For example, instead of backoffTimeInMs
long property, it would be backoffTime
as a Duration
.
Major version bumps [GH-162]
See build.gradle.kts for the latest versions.
Bug Fixes
- [GH-223]: Starting and then immediately stopping the CoreMessageListenerContainer can result in hung threads.
- [GH-217]: Fixed a problem with Spring Auto Configuration where this library would replace the default
ObjectMapper
provided by Spring Boot Web.
Developer Changes
- [GH-144]: convert the project to gradle
- [GH-155]: the JUnit Rules/Extensions for doing integration tests as it was not provided a lot of value
- [GH-180] [GH-230] [GH-177]: update all of the documentation to be a little simpler
- [GH-163]: added markdown linting checks to make sure markdown is consistent and links don't break
- [GH-170]: simplify the directory naming, e.g.
java-dynamic-sqs-listener-core
to becore
- [GH-153]: change import structure to not prioritise Google Imports
- [GH-154]: fix exceptions in Spring Integration tests when shutting down SQS queues
3.1.1
This patch release fixes bugs and improves logging.
Bug fixes
- [GH-151]: bug where the
CoreMessageListenerContainer
was not shutting down theMessageBroker
ExecutorService
and therefore you would have aExecutorService
sitting there waiting for tasks to be submitted which will never happen. Also adds more debug logging for theCoreMessageListenerContainer
to help debug shutdown problems. 1e806ae
3.1.0
This release adds the ability to deserialise messages that have been serialised by a schema tool like Avro. It uses the Spring Cloud Schema Registry to obtain the schema for the published message to properly deserialise it. It utilises the Spring Cloud Schema Registry Client to obtain information about this schema.
Core Changes
- [GH-142]: Updated most of the dependencies to the latest versions, Spring Boot, AWS SDK, etc. 17fd8a6
Extensions
- [GH-140]: Added a Spring Cloud Schema Registry Extension with an Avro based default implementation. See Docs - How to Version Payload Schemas using Spring Cloud Schema Registry for more information df64a93