Skip to content

Commit

Permalink
feat: Support graceful Shutdown AbstractSagaListener (#141)
Browse files Browse the repository at this point in the history
* refactor: abstract saga listener를 graceful하게 종료한다

* build: version to 0.4.3 -> 0.4.4-beta-1
  • Loading branch information
devxb authored Jun 22, 2024
1 parent 31b0393 commit de3a1a2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<br>

![version 0.4.3](https://img.shields.io/badge/version-0.4.3-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.4.4](https://img.shields.io/badge/version-0.4.4--beta--1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftopmsa
version=0.4.3
version=0.4.4-beta-1
compatibility=17

### Sonarcloud ###
Expand Down
20 changes: 18 additions & 2 deletions src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.rooftop.netx.engine

import jakarta.annotation.PreDestroy
import org.rooftop.netx.engine.core.Saga
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.warningOnError
import reactor.core.Disposable
import reactor.core.publisher.BufferOverflowStrategy
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
Expand All @@ -13,8 +15,11 @@ internal abstract class AbstractSagaListener(
private val sagaDispatcher: AbstractSagaDispatcher,
) {

private lateinit var disposable: Disposable
private var isShutdown = false

fun subscribeStream() {
receive()
disposable = receive()
.publishOn(Schedulers.boundedElastic())
.onBackpressureBuffer(backpressureSize, BufferOverflowStrategy.DROP_LATEST)
.doOnNext {
Expand All @@ -33,7 +38,18 @@ internal abstract class AbstractSagaListener(

private fun <T> Flux<T>.restartWhenTerminated(): Flux<T> {
return this.doAfterTerminate {
subscribeStream()
if (isShutdown.not()) {
subscribeStream()
}
}
}

@PreDestroy
private fun shutdownGracefully() {
disposable.dispose()
shutdownCascade()
info("Shutdown SagaListenerSupporter gracefully")
}

protected abstract fun shutdownCascade()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kotlin.time.toJavaDuration
internal class RedisStreamSagaListener(
backpressureSize: Int,
sagaDispatcher: AbstractSagaDispatcher,
connectionFactory: ReactiveRedisConnectionFactory,
private val connectionFactory: ReactiveRedisConnectionFactory,
private val nodeGroup: String,
private val nodeName: String,
private val reactiveRedisTemplate: ReactiveRedisTemplate<String, Saga>,
Expand Down Expand Up @@ -61,6 +61,10 @@ internal class RedisStreamSagaListener(
.flatMapMany { Flux.just(it) }
}

override fun shutdownCascade() {
connectionFactory.reactiveConnection.close()
}

private companion object {
private const val STREAM_KEY = "NETX_STREAM"
}
Expand Down

0 comments on commit de3a1a2

Please sign in to comment.