Kotlin multiplatform implementation of Reactive Extensions.
Should you have any questions or feedback welcome to the Kotlin Slack channel: #reaktive
There are a number of modules published to Maven Central:
reaktive
- the main Reaktive library (multiplatform)reaktive-annotations
- collection of annotations (mutiplatform)reaktive-testing
- testing utilities (multiplatform)utils
- some utilities likeClock
,AtomicReference
,Lock
, etc. (multiplatform)coroutines-interop
- Kotlin coroutines interoperability helpers (multiplatform)rxjava2-interop
- RxJava v2 interoperability helpers (JVM and Android)rxjava3-interop
- RxJava v3 interoperability helpers (JVM and Android)
kotlin {
sourceSets {
commonMain {
dependencies {
implementation 'com.badoo.reaktive:reaktive:<version>'
implementation 'com.badoo.reaktive:reaktive-annotations:<version>'
implementation 'com.badoo.reaktive:coroutines-interop:<version>' // For interop with coroutines
implementation 'com.badoo.reaktive:rxjava2-interop:<version>' // For interop with RxJava v2
implementation 'com.badoo.reaktive:rxjava3-interop:<version>' // For interop with RxJava v3
}
}
commonTest {
dependencies {
implementation 'com.badoo.reaktive:reaktive-testing:<version>'
}
}
}
}
- Multiplatform: JVM, Android, iOS, macOS, watchOS, tvOS, JavaScript, Linux X64
- Schedulers support:
computationScheduler
- fixed thread pool equal to a number of coresioScheduler
- unbound thread pool with caching policynewThreadScheduler
- creates a new thread for each unit of worksingleScheduler
- executes tasks on a single shared background threadtrampolineScheduler
- queues tasks and executes them on one of the participating threadsmainScheduler
- executes tasks on main thread
- True multithreading for Kotlin/Native (since v2.0 only the new memory model is supported)
- Supported sources:
Observable
,Maybe
,Single
,Completable
- Subjects:
PublishSubject
,BehaviorSubject
,ReplaySubject
,UnicastSubject
- Interoperability with Kotlin Coroutines
- Convert
suspend
functions to/fromSingle
,Maybe
andCompletable
- Convert
Flow
to/fromObservable
- Convert
CoroutineContext
toScheduler
- Convert
Scheduler
toCoroutineDispatcher
- Convert
- Interoperability with RxJava2 and RxJava3
- Conversion of sources and schedulers between Reaktive and RxJava
Since version 2.x, Reaktive only works with the new memory model.
Reaktive 1.x and the old (strict) memory model
The old (strict) Kotlin Native memory model and concurrency are very special. In general shared mutable state between threads is not allowed. Since Reaktive supports multithreading in Kotlin Native, please read the following documents before using it:
Object detachment is relatively difficult to achieve and is very error-prone when the objects are created from outside and are not fully managed by the library. This is why Reaktive prefers frozen state. Here are some hints:
- Any callback (and any captured objects) submitted to a Scheduler will be frozen
subscribeOn
freezes both its upstream source and downstream observer, all the Disposables (upstream's and downstream's) are frozen as well, all the values (including errors) are not frozen by the operatorobserveOn
freezes only its downstream observer and all the values (including errors) passed through it, plus all the Disposables, upstream source is not frozen by the operator- Other operators that use scheduler (like
debounce
,timer
,delay
, etc.) behave same asobserveOn
in most of the cases
Sometimes freezing is not acceptable, e.g. we might want to load some data in background and then update the UI. Obviously UI can not be frozen. With Reaktive it is possible to achieve such a behaviour in two ways:
Use threadLocal
operator:
val values = mutableListOf<Any>()
var isFinished = false
observable<Any> { emitter ->
// Background job
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.threadLocal()
.doOnBeforeNext { values += it } // Callback is not frozen, we can update the mutable list
.doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
.subscribe()
Set isThreadLocal
flag to true
in subscribe
operator:
val values = mutableListOf<Any>()
var isComplete = false
observable<Any> { emitter ->
// Background job
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.subscribe(
isThreadLocal = true,
onNext = { values += it }, // Callback is not frozen, we can update the mutable list
onComplete = { isComplete = true } // Callback is not frozen, we can change the flag
)
In both cases subscription (subscribe
call) must be performed on the Main thread.
This functionality is provided by the coroutines-interop
module. Please mind some known problems with multi-threaded coroutines on Kotlin/Native.
val flow: Flow<Int> = observableOf(1, 2, 3).asFlow()
val observable: Observable<Int> = flowOf(1, 2, 3).asObservable()
fun doSomething() {
singleFromCoroutine { getSomething() }
.subscribe { println(it) }
}
suspend fun getSomething(): String {
delay(1.seconds)
return "something"
}
val defaultScheduler = Dispatchers.Default.asScheduler()
val computationDispatcher = computationScheduler.asCoroutineDispatcher()
Reaktive provides an easy way to manage subscriptions: DisposableScope.
Take a look at the following examples:
val scope =
disposableScope {
observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed
doOnDispose {
// Will be called when the scope is disposed
}
someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed
}
// At some point later
scope.dispose()
class MyPresenter(
private val view: MyView,
private val longRunningAction: Completable
) : DisposableScope by DisposableScope() {
init {
doOnDispose {
// Will be called when the presenter is disposed
}
}
fun load() {
view.showProgressBar()
// Subscription will be disposed when the presenter is disposed
longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)
}
}
class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
MyPresenter(...).scope()
}
override fun onDestroy() {
dispose()
super.onDestroy()
}
}
Please see the corresponding documentation page: Reaktive and Swift interoperability.
Reaktive provides Plugin API, something similar to RxJava plugins. The Plugin API provides a way to decorate Reaktive sources. A plugin should implement the ReaktivePlugin interface, and can be registered using the registerReaktivePlugin
function and unregistered using the unregisterReaktivePlugin
function.
object MyPlugin : ReaktivePlugin {
override fun <T> onAssembleObservable(observable: Observable<T>): Observable<T> =
object : Observable<T> {
private val traceException = TraceException()
override fun subscribe(observer: ObservableObserver<T>) {
observable.subscribe(
object : ObservableObserver<T> by observer {
override fun onError(error: Throwable) {
observer.onError(error, traceException)
}
}
)
}
}
override fun <T> onAssembleSingle(single: Single<T>): Single<T> =
TODO("Similar to onAssembleSingle")
override fun <T> onAssembleMaybe(maybe: Maybe<T>): Maybe<T> =
TODO("Similar to onAssembleSingle")
override fun onAssembleCompletable(completable: Completable): Completable =
TODO("Similar to onAssembleSingle")
private fun ErrorCallback.onError(error: Throwable, traceException: TraceException) {
if (error.suppressedExceptions.lastOrNull() !is TraceException) {
error.addSuppressed(traceException)
}
onError(error)
}
private class TraceException : Exception()
}