diff --git a/README.md b/README.md
index a77d6a2..1ce7673 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
-![version 0.4.3](https://img.shields.io/badge/version-0.4.3-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
+![version 0.4.4](https://img.shields.io/badge/version-0.4.4--beta--1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)
**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_
diff --git a/gradle.properties b/gradle.properties
index c2986e0..b92caea 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -2,7 +2,7 @@ kotlin.code.style=official
### Project ###
group=org.rooftopmsa
-version=0.4.3
+version=0.4.4-beta-1
compatibility=17
### Sonarcloud ###
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt
index b038e0a..34aa93d 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt
@@ -1,8 +1,10 @@
package org.rooftop.netx.engine
+import jakarta.annotation.PreDestroy
import org.rooftop.netx.engine.core.Saga
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.warningOnError
+import reactor.core.Disposable
import reactor.core.publisher.BufferOverflowStrategy
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -13,8 +15,11 @@ internal abstract class AbstractSagaListener(
private val sagaDispatcher: AbstractSagaDispatcher,
) {
+ private lateinit var disposable: Disposable
+ private var isShutdown = false
+
fun subscribeStream() {
- receive()
+ disposable = receive()
.publishOn(Schedulers.boundedElastic())
.onBackpressureBuffer(backpressureSize, BufferOverflowStrategy.DROP_LATEST)
.doOnNext {
@@ -33,7 +38,18 @@ internal abstract class AbstractSagaListener(
private fun Flux.restartWhenTerminated(): Flux {
return this.doAfterTerminate {
- subscribeStream()
+ if (isShutdown.not()) {
+ subscribeStream()
+ }
}
}
+
+ @PreDestroy
+ private fun shutdownGracefully() {
+ disposable.dispose()
+ shutdownCascade()
+ info("Shutdown SagaListenerSupporter gracefully")
+ }
+
+ protected abstract fun shutdownCascade()
}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaListener.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaListener.kt
index fc999e6..645c518 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaListener.kt
@@ -20,7 +20,7 @@ import kotlin.time.toJavaDuration
internal class RedisStreamSagaListener(
backpressureSize: Int,
sagaDispatcher: AbstractSagaDispatcher,
- connectionFactory: ReactiveRedisConnectionFactory,
+ private val connectionFactory: ReactiveRedisConnectionFactory,
private val nodeGroup: String,
private val nodeName: String,
private val reactiveRedisTemplate: ReactiveRedisTemplate,
@@ -61,6 +61,10 @@ internal class RedisStreamSagaListener(
.flatMapMany { Flux.just(it) }
}
+ override fun shutdownCascade() {
+ connectionFactory.reactiveConnection.close()
+ }
+
private companion object {
private const val STREAM_KEY = "NETX_STREAM"
}