From feff7f1755e278664e9e0ba8de50451edfee0059 Mon Sep 17 00:00:00 2001 From: johannes karoff Date: Fri, 10 Nov 2023 14:13:50 +0100 Subject: [PATCH] make reactive state variables lazy (#271) --- README.md | 104 +- .../src/main/scala/colibri/Cancelable.scala | 3 +- .../src/main/scala/colibri/Observable.scala | 5 +- colibri/src/main/scala/colibri/Observer.scala | 6 +- colibri/src/main/scala/colibri/Subject.scala | 8 + .../colibri/reactive/OwnedPlatform.scala | 9 - .../colibri/reactive/OwnerPlatform.scala | 20 +- .../scala-2/colibri/reactive/RxPlatform.scala | 2 +- .../reactive/internal/MacroUtils.scala | 53 +- .../colibri/reactive/OwnedPlatform.scala | 8 - .../colibri/reactive/OwnerPlatform.scala | 2 - .../scala-3/colibri/reactive/RxPlatform.scala | 2 +- .../main/scala/colibri/reactive/Owned.scala | 12 - .../main/scala/colibri/reactive/Owner.scala | 73 +- .../scala/colibri/reactive/Reactive.scala | 428 +++++-- .../scala/colibri/reactive/implicits.scala | 16 - .../src/test/scala/colibri/ReactiveSpec.scala | 1073 +++++++++++++---- 17 files changed, 1275 insertions(+), 549 deletions(-) delete mode 100644 reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala delete mode 100644 reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala delete mode 100644 reactive/src/main/scala/colibri/reactive/Owned.scala delete mode 100644 reactive/src/main/scala/colibri/reactive/implicits.scala diff --git a/README.md b/README.md index 489a86cc..e2e97f04 100644 --- a/README.md +++ b/README.md @@ -14,16 +14,16 @@ This library includes: Reactive core library with typeclasses: ```scala -libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.5.0" +libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.8.0" ``` ```scala import colibri._ ``` -Reactive variables with hot distinct observables (a bit like scala-rx): +Reactive variables with lazy, distinct, shared state variables (a bit like scala-rx, but lazy): ```scala -libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.5.0" +libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.8.0" ``` ```scala @@ -32,7 +32,7 @@ import colibri.reactive._ For jsdom-based operations in the browser (`EventObservable`, `Storage`): ```scala -libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.5.0" +libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.8.0" ``` ```scala @@ -41,7 +41,7 @@ import colibri.jsdom._ For scala.rx support (only Scala 2.x): ```scala -libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.5.0" +libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.8.0" ``` ```scala @@ -50,7 +50,7 @@ import colibri.ext.rx._ For airstream support: ```scala -libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.5.0" +libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.8.0" ``` ```scala @@ -59,7 +59,7 @@ import colibri.ext.airstream._ For zio support: ```scala -libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.5.0" +libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.8.0" ``` ```scala @@ -68,7 +68,7 @@ import colibri.ext.zio._ For fs2 support (`Source` only): ```scala -libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.5.0" +libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.8.0" ``` ```scala @@ -146,9 +146,15 @@ You can convert any `Source` into an `Observable` with `Observable.lift(source)` ## Reactive variables -The module `colibri-reactive` exposes reactive variables. This is hot, distinct observables that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy `Observable` in the core `colibri` library. +The module `colibri-reactive` exposes reactive variables. This is lazy, distinct shared state variables (internally using observables) that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy `Observable` in the core `colibri` library. -This module behaves very similar to scala-rx - just built on top of colibri Observables for seamless integration and powerful operators. It is not entirely glitch-free because invalid state can appear in operators like map or foreach, but you always have a consistent state in `now()` and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using `dropSyncGlitches` which will introduce an async boundary (micro-task). +This module behaves similar to scala-rx - though variables are not hot and it is built on top of colibri Observables for seamless integration and powerful operators. + +The whole thing is not entirely glitch-free, as invalid state can appear in operators like map or foreach. But you always have a consistent state in `now()` and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using `dropSyncGlitches` which will introduce an async boundary (micro-task). + +A state variable is of type `Var[A] extends Rx[A] with RxWriter[A]`. + +The laziness of variables means that the current value is only tracked if anyone subscribes to the `Rx[A]`. So an Rx does not compute anything on its own. You can still always call `now()` on it - if it is currently not subscribed, it will lazily calculate the current value. Example: @@ -156,8 +162,6 @@ Example: import colibri.reactive._ -import colibri.owner.unsafeImplicits._ // dangerous. This never cancels subscriptions. See below! - val variable = Var(1) val variable2 = Var("Test") @@ -165,60 +169,76 @@ val rx = Rx { s"${variable()} - ${variable2()}" } -rx.foreach(println(_)) +val cancelable = rx.unsafeForeach(println(_)) println(variable.now()) // 1 println(variable2.now()) // "Test" println(rx.now()) // "1 - Test" -variable.set(2) +variable.set(2) // println("2 - Test") println(variable.now()) // 2 println(variable2.now()) // "Test" println(rx.now()) // "2 - Test" -variable2.set("Foo") +variable2.set("Foo") // println("2 - Foo") println(variable.now()) // 2 println(variable2.now()) // "Foo" println(rx.now()) // "2 - Foo" -``` +cancelable.unsafeCancel() -If you want to work with reactive variables (hot observable), then someone need to cleanup the subscriptions. We call this concept an `Owner`. We use an *unsafe* owner in the above example. It actually never cleans up. It should only ever be used in your main method or for global state. +println(variable.now()) // 2 +println(variable2.now()) // "Foo" +println(rx.now()) // "2 - Foo" -You can even work without ever using the unsafe owner or having to pass it implictly. You can use `Owned` blocks instead. Inside an `Owned` block, you will have to return a type that has a `SubscriptionOwner` instance. Example: +variable.set(3) // no println -```scala +// now calculates new value lazily +println(variable.now()) // 3 +println(variable2.now()) // "Foo" +println(rx.now()) // "3 - Foo" +``` -import colibri._ +Apart from `Rx` which always has an initial value, there is `RxLater` (and `VarLater`) which will eventually have a value (both extend RxState which extends RxSource). It also meant for representing state just without an initial state. It is lazy, distinct and has shared execution just like `Rx`. + +``` import colibri.reactive._ -import cats.effect.SyncIO -sealed trait Modifier -object Modifier { - case class ReactiveModifier(rx: Rx[String]) extends Modifier - case class SubscriptionModifier(subscription: () => Cancelable) extends Modifier - case class CombineModifier(modifierA: Modifier, modifierB: Modifier) extends Modifier +val variable = VarLater[Int]() - implicit object subcriptionOwner extends SubscriptionOwner[Modifier] { - def own(owner: Modifier)(subscription: () => Cancelable): Modifier = CombineModifier(owner, SubscriptionModifier(subscription)) - } -} +val stream1 = RxLater.empty +val stream2 = RxLater.future(Future.successful(1)).map(_ + 1) -val component: SyncIO[Modifier] = Owned { - val variable = Var(1) - val mapped = rx.map(_ + 1) +val cancelable = variable.unsafeForeach(println(_)) +val cancelable1 = stream1.unsafeForeach(println(_)) +val cancelable2 = stream2.unsafeForeach(println(_)) - val rx = Rx { - "Hallo: ${mapped()}" - } +println(variable.toRx.now()) // None +println(stream1.toRx.now()) // None +println(stream2.toRx.now()) // Some(2) - ReactiveModifier(rx) -} +variable.set(13) + +println(variable.toRx.now()) // Some(13) +``` + +There also exist `RxEvent` and `VarEvent`, which are event observables with shared execution. That is they behave like `Rx` and `Var` such that transformations are only applied once and not per subscription. But `RxEvent` and `VarEvent` are not distinct and have no current value. They should be used for event streams. + +``` +import colibri.reactive._ + +val variable = VarEvent[Int]() + +val stream = RxEvent.empty + +val mapped = RxEvent.merge(variable.tap(println(_)).map(_ + 1), stream) + +val cancelable = mapped.unsafeForeach(println(_)) ``` -For example, [Outwatch](https://github.com/outwatch/outwatch) supports `Owned`: +[Outwatch](https://github.com/outwatch/outwatch) works perfectly with Rx (or RxLater, RxEvent which all extend RxSource) - just like Observable. ```scala @@ -227,7 +247,7 @@ import outwatch.dsl._ import colibri.reactive._ import cats.effect.SyncIO -val component: SyncIO[VModifier] = Owned { +val component: VModifier = { val variable = Var(1) val mapped = rx.map(_ + 1) @@ -241,9 +261,9 @@ val component: SyncIO[VModifier] = Owned { ### Memory management -Every subscription that is created inside of colibri-reactive methods is owned by an implicit `Owner`. For example `map` or `foreach` take an implicit `Owner`. As long as the `Owner` is cancelled when it is not needed anymore, all subscriptions will be cleaned up. The exception is the `Owner.unsafeGlobal` that never cleans up and is meant for global state. +The same principles as for Observables hold. Any cancelable that is returned from the API needs to be handled by the the caller. Best practice: use subscribe/foreach as seldomly as possible - only in selected spots or within a library. -If you are working with `Outwatch`, you can just use `Owned`-blocks returning `VModifier` and everything is handled automatically for you. No memory leaks. +If you are working with `Outwatch`, you can just use `Rx` without ever subscribing yourself. Then all memory management is handled for you automatically. No memory leaks. ## Information diff --git a/colibri/src/main/scala/colibri/Cancelable.scala b/colibri/src/main/scala/colibri/Cancelable.scala index adc9e074..977b8896 100644 --- a/colibri/src/main/scala/colibri/Cancelable.scala +++ b/colibri/src/main/scala/colibri/Cancelable.scala @@ -46,7 +46,8 @@ object Cancelable { def unsafeAdd(subscription: () => Cancelable): Unit = if (buffer != null) { val cancelable = subscription() - buffer.push(cancelable) + if (buffer == null) cancelable.unsafeCancel() + else buffer.push(cancelable) () } diff --git a/colibri/src/main/scala/colibri/Observable.scala b/colibri/src/main/scala/colibri/Observable.scala index 3c3a3773..45242b39 100644 --- a/colibri/src/main/scala/colibri/Observable.scala +++ b/colibri/src/main/scala/colibri/Observable.scala @@ -432,7 +432,6 @@ object Observable { def unsafeSubscribe(sink2: Observer[A]): Cancelable = source.unsafeSubscribe(Observer.combine(sink, sink2)) } - @deprecated("Use via instead", "0.7.8") def to(sink: Observer[A]): Observable[Unit] = via(sink).void @deprecated("Use tap instead", "0.7.8") @@ -512,9 +511,9 @@ object Observable { @deprecated("Use scan0 instead", "0.7.8") def scan0ToList: Observable[List[A]] = scan0(List.empty[A])((list, x) => x :: list) - def scan0[B](seed: B)(f: (B, A) => B): Observable[B] = scan(seed)(f).prepend(seed) + def scan0[B](seed: => B)(f: (B, A) => B): Observable[B] = scan(seed)(f).prependEval(seed) - def scan[B](seed: B)(f: (B, A) => B): Observable[B] = new Observable[B] { + def scan[B](seed: => B)(f: (B, A) => B): Observable[B] = new Observable[B] { def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(sink.contrascan(seed)(f)) } diff --git a/colibri/src/main/scala/colibri/Observer.scala b/colibri/src/main/scala/colibri/Observer.scala index 5c1c5989..daa6ca40 100644 --- a/colibri/src/main/scala/colibri/Observer.scala +++ b/colibri/src/main/scala/colibri/Observer.scala @@ -154,9 +154,9 @@ object Observer { def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error) } - def contraflattenIterable[B]: Observer[Iterable[A]] = contramapIterable(identity) - def contraflattenEither[B]: Observer[Either[Throwable, A]] = contramapEither(identity) - def contraflattenOption[B]: Observer[Option[A]] = contramapFilter(identity) + def contraflattenIterable: Observer[Iterable[A]] = contramapIterable(identity) + def contraflattenEither: Observer[Either[Throwable, A]] = contramapEither(identity) + def contraflattenOption: Observer[Option[A]] = contramapFilter(identity) // TODO return effect def contrascan[B](seed: A)(f: (A, B) => A): Observer[B] = new Observer[B] { diff --git a/colibri/src/main/scala/colibri/Subject.scala b/colibri/src/main/scala/colibri/Subject.scala index a8e3cba7..330ce243 100644 --- a/colibri/src/main/scala/colibri/Subject.scala +++ b/colibri/src/main/scala/colibri/Subject.scala @@ -13,6 +13,10 @@ final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValu @inline def now(): Option[A] = current + def unsafeResetState(): Unit = { + current = None + } + def unsafeOnNext(value: A): Unit = { current = Some(value) state.unsafeOnNext(value) @@ -37,6 +41,10 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] { @inline def now(): Seq[A] = current.toSeq + def unsafeResetState(): Unit = { + current.clear() + } + def unsafeOnNext(value: A): Unit = { current += value state.unsafeOnNext(value) diff --git a/reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala deleted file mode 100644 index 891830a2..00000000 --- a/reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala +++ /dev/null @@ -1,9 +0,0 @@ -package colibri.reactive - -import colibri.reactive.internal.MacroUtils -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner - -trait OwnedPlatform { - def apply[R: SubscriptionOwner: SyncEmbed](f: R): R = macro MacroUtils.ownedImpl[R] -} diff --git a/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala index ab9ad212..db505c9b 100644 --- a/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala +++ b/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala @@ -2,26 +2,14 @@ package colibri.reactive import colibri.{Observable, Cancelable} -trait OwnerPlatform { - @annotation.compileTimeOnly( - "No implicit Owner is available here! Wrap inside `Owned { }`, or provide an implicit `Owner`, or `import Owner.unsafeImplicits._` (dangerous).", - ) - implicit object compileTimeMock extends Owner { - def unsafeSubscribe(): Cancelable = ??? - def unsafeOwn(subscription: () => Cancelable): Unit = ??? - def cancelable: Cancelable = ??? - } -} - trait LiveOwnerPlatform { @annotation.compileTimeOnly( "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", ) implicit object compileTimeMock extends LiveOwner { - def unsafeSubscribe(): Cancelable = ??? - def unsafeOwn(subscription: () => Cancelable): Unit = ??? - def cancelable: Cancelable = ??? - def unsafeLive[A](rx: Rx[A]): A = ??? - def liveObservable: Observable[Any] = ??? + def cancelable: Cancelable = ??? + def unsafeNow[A](rx: Rx[A]): A = ??? + def unsafeLive[A](rx: Rx[A]): A = ??? + def liveObservable: Observable[Any] = ??? } } diff --git a/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala index 8aa1c5e2..bea3f090 100644 --- a/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala +++ b/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala @@ -3,5 +3,5 @@ package colibri.reactive import colibri.reactive.internal.MacroUtils trait RxPlatform { - def apply[R](f: R)(implicit owner: Owner): Rx[R] = macro MacroUtils.rxImpl[R] + def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R] } diff --git a/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala b/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala index 480029ce..e5d22fda 100644 --- a/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala +++ b/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala @@ -1,30 +1,23 @@ package colibri.reactive.internal -import colibri.reactive.{Rx, Owner, LiveOwner} -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner +import colibri.reactive.{Rx, NowOwner, LiveOwner} import scala.reflect.macros._ // Inspired by scala.rx object MacroUtils { - private val ownerName = "colibriOwner" - private val liveOwnerName = "colibriLiveOwner" - - def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName, exceptOwner: c.Type): c.Tree = { + def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName, replaces: Set[c.Type]): c.Tree = { import c.universe._ - val implicitOwnerAtCaller = c.inferImplicitValue(typeOf[Owner], silent = false) - val implicitLiveOwnerAtCaller = c.inferImplicitValue(typeOf[LiveOwner], silent = false) + val implicitOwnerAtCallers = replaces.map(c.inferImplicitValue(_, silent = false)) object transformer extends c.universe.Transformer { override def transform(tree: c.Tree): c.Tree = { val shouldReplaceOwner = tree != null && tree.isTerm && - (tree.tpe =:= implicitOwnerAtCaller.tpe || tree.tpe =:= implicitLiveOwnerAtCaller.tpe) && - tree.tpe <:< typeOf[Owner] && - !(tree.tpe =:= typeOf[Nothing]) && - !(tree.tpe <:< exceptOwner) + implicitOwnerAtCallers.exists(_.tpe =:= tree.tpe) && + tree.tpe <:< typeOf[NowOwner] && + !(tree.tpe =:= typeOf[Nothing]) if (shouldReplaceOwner) q"$newOwner" else super.transform(tree) @@ -33,37 +26,23 @@ object MacroUtils { transformer.transform(src) } - def ownedImpl[R]( - c: blackbox.Context, - )(f: c.Expr[R])(subscriptionOwner: c.Expr[SubscriptionOwner[R]], syncEmbed: c.Expr[SyncEmbed[R]]): c.Expr[R] = { + def rxImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[Rx[R]] = { import c.universe._ - val newOwner = c.freshName(TermName(ownerName)) + val newOwner = c.freshName(TermName("colibriLiveOwner")) - val newTree = c.untypecheck(injectOwner(c)(f.tree, newOwner, typeOf[LiveOwner])) - - val tree = q""" - _root_.colibri.reactive.Owned.function { ($newOwner: _root_.colibri.reactive.Owner) => - $newTree - }($subscriptionOwner, $syncEmbed) - """ - - // println(tree) - - c.Expr(tree) - } - - def rxImpl[R](c: blackbox.Context)(f: c.Expr[R])(owner: c.Expr[Owner]): c.Expr[Rx[R]] = { - import c.universe._ - - val newOwner = c.freshName(TermName(liveOwnerName)) - - val newTree = c.untypecheck(injectOwner(c)(f.tree, newOwner, typeOf[Nothing])) + val newTree = c.untypecheck( + injectOwner(c)( + f.tree, + newOwner, + replaces = Set(typeOf[LiveOwner], typeOf[NowOwner]), + ), + ) val tree = q""" _root_.colibri.reactive.Rx.function { ($newOwner: _root_.colibri.reactive.LiveOwner) => $newTree - }($owner) + } """ // println(tree) diff --git a/reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala deleted file mode 100644 index 31a4914d..00000000 --- a/reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala +++ /dev/null @@ -1,8 +0,0 @@ -package colibri.reactive - -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner - -trait OwnedPlatform { - def apply[R: SubscriptionOwner: SyncEmbed](f: Owner ?=> R): R = Owned.function(implicit owner => f) -} diff --git a/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala index 3c8f4ef2..0248f72f 100644 --- a/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala +++ b/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala @@ -1,5 +1,3 @@ package colibri.reactive -trait OwnerPlatform - trait LiveOwnerPlatform diff --git a/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala index 3fc09170..5f97dbed 100644 --- a/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala +++ b/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala @@ -1,5 +1,5 @@ package colibri.reactive trait RxPlatform { - def apply[R](f: LiveOwner ?=> R)(implicit owner: Owner): Rx[R] = Rx.function(implicit owner => f) + def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f) } diff --git a/reactive/src/main/scala/colibri/reactive/Owned.scala b/reactive/src/main/scala/colibri/reactive/Owned.scala deleted file mode 100644 index 2ee2f8b6..00000000 --- a/reactive/src/main/scala/colibri/reactive/Owned.scala +++ /dev/null @@ -1,12 +0,0 @@ -package colibri.reactive - -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner - -object Owned extends OwnedPlatform { - def function[R: SubscriptionOwner: SyncEmbed](f: Owner => R): R = SyncEmbed[R].delay { - val owner = Owner.unsafeHotRef() - val result = f(owner) - SubscriptionOwner[R].own(result)(owner.unsafeSubscribe) - } -} diff --git a/reactive/src/main/scala/colibri/reactive/Owner.scala b/reactive/src/main/scala/colibri/reactive/Owner.scala index b7bea9a7..1973055d 100644 --- a/reactive/src/main/scala/colibri/reactive/Owner.scala +++ b/reactive/src/main/scala/colibri/reactive/Owner.scala @@ -2,68 +2,47 @@ package colibri.reactive import colibri._ -@annotation.implicitNotFound( - "No implicit Owner is available here! Wrap inside `Owned { }`, or provide an implicit `Owner`, or `import Owner.unsafeImplicits._` (dangerous).", -) -trait Owner { - def cancelable: Cancelable - def unsafeSubscribe(): Cancelable - def unsafeOwn(subscription: () => Cancelable): Unit +trait NowOwner { + def unsafeNow[A](rx: Rx[A]): A } -object Owner extends OwnerPlatform { - def unsafeHotRef(): Owner = new Owner { - val refCountBuilder = Cancelable.refCountBuilder() - var initialRef = refCountBuilder.ref() - - def cancelable: Cancelable = refCountBuilder - - def unsafeSubscribe(): Cancelable = if (initialRef == null) { - refCountBuilder.ref() - } else { - val result = initialRef - initialRef = null - result +object NowOwner { + implicit object global extends NowOwner { + def unsafeNow[A](rx: Rx[A]): A = { + val cancelable = rx.unsafeSubscribe() + try (rx.nowIfSubscribed()) + finally (cancelable.unsafeCancel()) } - - def unsafeOwn(subscription: () => Cancelable): Unit = refCountBuilder.unsafeAdd(subscription) - } - - object unsafeGlobal extends Owner { - def cancelable: Cancelable = Cancelable.empty - def unsafeSubscribe(): Cancelable = Cancelable.empty - def unsafeOwn(subscription: () => Cancelable): Unit = { - subscription() - () - } - } - - object unsafeImplicits { - implicit def unsafeGlobalOwner: Owner = unsafeGlobal } } @annotation.implicitNotFound( "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", ) -trait LiveOwner extends Owner { +trait LiveOwner extends NowOwner { + def cancelable: Cancelable + def liveObservable: Observable[Any] + def unsafeLive[A](rx: Rx[A]): A } object LiveOwner extends LiveOwnerPlatform { - def unsafeHotRef()(implicit parentOwner: Owner): LiveOwner = new LiveOwner { - val owner: Owner = Owner.unsafeHotRef() - parentOwner.unsafeOwn(() => owner.unsafeSubscribe()) + def unsafeHotRef(): LiveOwner = new LiveOwner { + private val ref = Cancelable.builder() - val liveObservableArray = new scala.scalajs.js.Array[Observable[Any]]() - val liveObservable: Observable[Any] = Observable.mergeIterable(liveObservableArray) + private val subject = Subject.publish[Any]() - def unsafeLive[A](rx: Rx[A]): A = { - liveObservableArray.push(rx.observable) - rx.now() + val cancelable: Cancelable = ref + + val liveObservable: Observable[Any] = subject + + def unsafeNow[A](rx: Rx[A]): A = { + ref.unsafeAdd(() => rx.observable.unsafeSubscribe()) + rx.nowIfSubscribed() } - def unsafeSubscribe(): Cancelable = owner.unsafeSubscribe() - def unsafeOwn(subscription: () => Cancelable): Unit = owner.unsafeOwn(subscription) - def cancelable: Cancelable = owner.cancelable + def unsafeLive[A](rx: Rx[A]): A = { + ref.unsafeAdd(() => rx.observable.unsafeSubscribe(subject)) + rx.nowIfSubscribed() + } } } diff --git a/reactive/src/main/scala/colibri/reactive/Reactive.scala b/reactive/src/main/scala/colibri/reactive/Reactive.scala index e78acbf1..72397ba6 100644 --- a/reactive/src/main/scala/colibri/reactive/Reactive.scala +++ b/reactive/src/main/scala/colibri/reactive/Reactive.scala @@ -1,90 +1,217 @@ package colibri.reactive import cats.Monoid +import cats.effect.SyncIO import colibri._ import colibri.effect._ import monocle.{Iso, Lens, Prism} import scala.concurrent.Future import scala.reflect.ClassTag +import scala.util.control.NonFatal -trait Rx[+A] { +object RxMissingNowException + extends Exception( + "Missing current value inside an RxState (Rx or RxLater). Make sure, the RxState has active subscriptions when calling nowIfSubscribed.", + ) + +trait RxSourceSelf[+Self[+X] <: RxSource[X], +SelfSync[+X] <: RxSource[X], +A] { def observable: Observable[A] - def now(): A - final def apply()(implicit liveOwner: LiveOwner): A = liveOwner.unsafeLive(this) + def selfRxSync: SelfSync[A] + def transformRx[B](f: Observable[A] => Observable[B]): Self[B] + def transformRxSync[B](f: Observable[A] => Observable[B]): SelfSync[B] + + final def subscribe: SyncIO[Cancelable] = observable.subscribeSyncIO + + final def unsafeSubscribe(): Cancelable = observable.unsafeSubscribe() + final def unsafeSubscribe(writer: RxWriter[A]): Cancelable = observable.unsafeSubscribe(writer.observer) + final def unsafeForeach(f: A => Unit): Cancelable = observable.unsafeForeach(f) + + final def hot: SyncIO[SelfSync[A]] = SyncIO(unsafeHot()) + final def unsafeHot(): SelfSync[A] = { + val _ = unsafeSubscribe() + selfRxSync + } + + final def map[B](f: A => B): SelfSync[B] = transformRxSync(_.map(f)) + final def tap(f: A => Unit): SelfSync[A] = transformRxSync(_.tap(f)) + final def void: SelfSync[Unit] = map(_ => ()) - final def map[B](f: A => B)(implicit owner: Owner): Rx[B] = transformRxSync(_.map(f)) - final def mapEither[B](f: A => Either[Throwable, B])(implicit owner: Owner): Rx[B] = transformRxSync(_.mapEither(f)) - final def tap(f: A => Unit)(implicit owner: Owner): Rx[A] = transformRxSync(_.tap(f)) + final def collect[B](f: PartialFunction[A, B]): Self[B] = transformRx(_.collect(f)) + final def mapEither[B](f: A => Either[Throwable, B]): Self[B] = transformRx(_.mapEither(f)) + final def drop(n: Int): Self[A] = transformRx(_.drop(n)) - final def collect[B](f: PartialFunction[A, B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.collect(f))(seed) + final def mapSyncEffect[F[_]: RunSyncEffect, B](f: A => F[B]): SelfSync[B] = transformRxSync(_.mapEffect(f)) + final def mapEffect[F[_]: RunEffect, B](f: A => F[B]): Self[B] = transformRx(_.mapEffect(f)) + final def mapFuture[B](f: A => Future[B]): Self[B] = transformRx(_.mapFuture(f)) - final def mapSyncEffect[F[_]: RunSyncEffect, B](f: A => F[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.mapEffect(f)) - final def mapEffect[F[_]: RunEffect, B](f: A => F[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.mapEffect(f))(seed) - final def mapFuture[B](f: A => Future[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.mapFuture(f))(seed) + final def as[B](value: B): SelfSync[B] = transformRxSync(_.as(value)) + final def asEval[B](value: => B): SelfSync[B] = transformRxSync(_.asEval(value)) - final def as[B](value: B)(implicit owner: Owner): Rx[B] = transformRxSync(_.as(value)) - final def asEval[B](value: => B)(implicit owner: Owner): Rx[B] = transformRxSync(_.asEval(value)) + final def asSyncEffect[F[_]: RunSyncEffect, B](value: F[B]): SelfSync[B] = transformRxSync(_.asEffect(value)) + final def asEffect[F[_]: RunEffect, B](value: F[B]): Self[B] = transformRx(_.asEffect(value)) + final def asFuture[B](value: => Future[B]): Self[B] = transformRx(_.asFuture(value)) - final def asSyncEffect[F[_]: RunSyncEffect, B](value: F[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.asEffect(value)) - final def asEffect[F[_]: RunEffect, B](value: F[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.asEffect(value))(seed) - final def asFuture[B](value: => Future[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.asFuture(value))(seed) + final def via(writer: RxWriter[A]): SelfSync[A] = transformRxSync(_.via(writer.observer)) + final def to(writer: RxWriter[A]): SelfSync[Unit] = transformRxSync(_.to(writer.observer)) - final def switchMap[B](f: A => Rx[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.switchMap(f andThen (_.observable))) - final def mergeMap[B](f: A => Rx[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.mergeMap(f andThen (_.observable))) + final def switchMap[B](f: A => RxSource[B]): Self[B] = transformRx(_.switchMap(f andThen (_.observable))) + final def mergeMap[B](f: A => RxSource[B]): Self[B] = transformRx(_.mergeMap(f andThen (_.observable))) + final def concatMap[B](f: A => RxSource[B]): Self[B] = transformRx(_.concatMap(f andThen (_.observable))) - final def foreach(f: A => Unit)(implicit owner: Owner): Unit = owner.unsafeOwn(() => observable.unsafeForeach(f)) - final def foreachLater(f: A => Unit)(implicit owner: Owner): Unit = owner.unsafeOwn(() => observable.tail.unsafeForeach(f)) + final def combineLatestMap[B, R](sourceB: RxSource[B])(f: (A, B) => R): Self[R] = + transformRx(_.combineLatestMap(sourceB.observable)(f)) + final def combineLatest[B](sourceB: RxSource[B]): Self[(A, B)] = + transformRx(_.combineLatest(sourceB.observable)) - final def transformRx[B](f: Observable[A] => Observable[B])(seed: => B)(implicit owner: Owner): Rx[B] = Rx.observable(f(observable))(seed) - final def transformRxSync[B](f: Observable[A] => Observable[B])(implicit owner: Owner): Rx[B] = Rx.observableSync(f(observable)) + final def withLatestMap[B, R](sourceB: RxSource[B])(f: (A, B) => R): Self[R] = + transformRx(_.withLatestMap(sourceB.observable)(f)) + final def withLatest[B](sourceB: RxSource[B]): Self[(A, B)] = + transformRx(_.withLatest(sourceB.observable)) } -object Rx extends RxPlatform { - def function[R](f: LiveOwner => R)(implicit owner: Owner): Rx[R] = { - val subject = Subject.behavior[Any](()) +object RxSourceSelf { + @inline implicit final class RxSourceOps[Self[+X] <: RxSource[X], SelfSync[+X] <: RxSource[X], A]( + val self: RxSourceSelf[Self, SelfSync, A], + ) extends AnyVal { + def scan[B](seed: => B)(f: (B, A) => B): SelfSync[B] = self.transformRxSync(_.scan(seed)(f)) - val observable = subject.switchMap { _ => - val liveOwner = LiveOwner.unsafeHotRef() - val result = f(liveOwner) - Observable[R](result) - .subscribing(liveOwner.liveObservable.dropSyncAll.head.via(subject)) - .tapCancel(liveOwner.cancelable.unsafeCancel) + def filter(f: A => Boolean): Self[A] = self.transformRx(_.filter(f)) + } + + @inline implicit final class RxSourceBooleanOps[Self[+X] <: RxSource[X], SelfSync[+X] <: RxSource[X]]( + private val self: RxSourceSelf[Self, SelfSync, Boolean], + ) extends AnyVal { + @inline def toggle[A](ifTrue: => A, ifFalse: => A): SelfSync[A] = self.map { + case true => ifTrue + case false => ifFalse } - Rx.observableSync(observable) + @inline def toggle[A: Monoid](ifTrue: => A): SelfSync[A] = toggle(ifTrue, Monoid[A].empty) + + @inline def negated: SelfSync[Boolean] = self.map(x => !x) } +} - def const[A](value: A): Rx[A] = new RxConst(value) +trait RxSource[+A] extends RxSourceSelf[RxSource, RxSource, A] + +object RxSource { + implicit object source extends Source[RxSource] { + def unsafeSubscribe[A](source: RxSource[A])(sink: Observer[A]): Cancelable = source.observable.unsafeSubscribe(sink) + } +} + +trait RxEvent[+A] extends RxSource[A] with RxSourceSelf[RxEvent, RxEvent, A] { + final override def selfRxSync: RxEvent[A] = this + final override def transformRx[B](f: Observable[A] => Observable[B]): RxEvent[B] = RxEvent.observable(f(observable)) + final override def transformRxSync[B](f: Observable[A] => Observable[B]): RxEvent[B] = RxEvent.observable(f(observable)) +} + +object RxEvent extends RxPlatform { + private val _empty: RxEvent[Nothing] = observableUnshared(Observable.empty) + @inline def empty[A]: RxEvent[A] = _empty - def observable[A](observable: Observable[A])(seed: => A)(implicit owner: Owner): Rx[A] = observableSync(observable.prependEval(seed)) + @inline def apply[A](values: A*): RxEvent[A] = iterable(values) - def observableSync[A](observable: Observable[A])(implicit owner: Owner): Rx[A] = new RxObservableSync(observable) + def iterable[A](values: Iterable[A]): RxEvent[A] = observableUnshared(Observable.fromIterable(values)) - @inline implicit final class RxOps[A](private val self: Rx[A]) extends AnyVal { - def scan(f: (A, A) => A)(implicit owner: Owner): Rx[A] = scan(self.now())(f) + def future[A](future: => Future[A]): RxEvent[A] = observable(Observable.fromFuture(future)) + def effect[F[_]: RunEffect, A](effect: F[A]): RxEvent[A] = observable(Observable.fromEffect(effect)) - def scan[B](seed: B)(f: (B, A) => B)(implicit owner: Owner): Rx[B] = self.transformRxSync(_.scan0(seed)(f)) + def merge[A](rxs: RxEvent[A]*): RxEvent[A] = observableUnshared(Observable.mergeIterable(rxs.map(_.observable))) + def switch[A](rxs: RxEvent[A]*): RxEvent[A] = observableUnshared(Observable.switchIterable(rxs.map(_.observable))) + def concat[A](rxs: RxEvent[A]*): RxEvent[A] = observableUnshared(Observable.concatIterable(rxs.map(_.observable))) - def filter(f: A => Boolean)(seed: => A)(implicit owner: Owner): Rx[A] = self.transformRx(_.filter(f))(seed) + def observable[A](observable: Observable[A]): RxEvent[A] = new RxEventObservableShared(observable) + private def observableUnshared[A](observable: Observable[A]): RxEvent[A] = new RxEventObservableUnshared(observable) + + @inline implicit final class RxEventOps[A](private val self: RxEvent[A]) extends AnyVal { + def toRxLater: RxLater[A] = RxLater.observable(self.observable) + + def toRx: Rx[Option[A]] = Rx.observableSeed(self.observable.map[Option[A]](Some.apply))(None) + def toRx(seed: => A): Rx[A] = Rx.observableSeed(self.observable)(seed) } +} - @inline implicit class RxBooleanOps(private val source: Rx[Boolean]) extends AnyVal { - @inline def toggle[A](ifTrue: => A, ifFalse: A)(implicit owner: Owner): Rx[A] = source.map { - case true => ifTrue - case false => ifFalse - } +trait RxState[+A] extends RxSource[A] with RxSourceSelf[RxLater, RxState, A] { + final override def transformRx[B](f: Observable[A] => Observable[B]): RxLater[B] = RxLater.observable(f(observable)) +} - @inline def toggle[A: Monoid](ifTrue: => A)(implicit owner: Owner): Rx[A] = toggle(ifTrue, Monoid[A].empty) +trait RxLater[+A] extends RxState[A] with RxSourceSelf[RxLater, RxLater, A] { + type SelfSync[+X] = RxLater[X] - @inline def negated(implicit owner: Owner): Rx[Boolean] = source.map(x => !x) + final override def selfRxSync: RxLater[A] = this + final override def transformRxSync[B](f: Observable[A] => Observable[B]): RxLater[B] = RxLater.observable(f(observable)) +} + +object RxLater { + @inline def empty[A]: RxLater[A] = RxLaterEmpty + + def future[A](future: => Future[A]): RxLater[A] = observable(Observable.fromFuture(future)) + def effect[F[_]: RunEffect, A](effect: F[A]): RxLater[A] = observable(Observable.fromEffect(effect)) + + def observable[A](observable: Observable[A]): RxLater[A] = new RxLaterObservable(observable) + + def rx[A](rx: Rx[A]): RxLater[A] = new RxLaterWrap(rx) + + @inline implicit final class RxLaterOps[A](private val self: RxLater[A]) extends AnyVal { + def toRxEvent: RxEvent[A] = RxEvent.observable(self.observable) + + def toRx: Rx[Option[A]] = Rx.observableSeed(self.observable.map[Option[A]](Some.apply))(None) + def toRx(seed: => A): Rx[A] = Rx.observableSeed(self.observable)(seed) } +} + +trait Rx[+A] extends RxState[A] with RxSourceSelf[RxLater, Rx, A] { + type SelfSync[+X] = Rx[X] + + def apply()(implicit owner: LiveOwner): A + def now()(implicit owner: NowOwner): A + + def nowIfSubscribedOption(): Option[A] + + final def nowIfSubscribed(): A = nowIfSubscribedOption().getOrElse(throw RxMissingNowException) + + final override def selfRxSync: Rx[A] = this + final override def transformRxSync[B](f: Observable[A] => Observable[B]): Rx[B] = Rx.observableSync(f(observable)) +} + +object Rx extends RxPlatform { + def function[R](f: LiveOwner => R): Rx[R] = { + val subject = Subject.behavior[Any](()) + + val observable = subject.switchMap { _ => + val owner = LiveOwner.unsafeHotRef() + try { + val result = f(owner) + Observable[R](result) + .subscribing(owner.liveObservable.dropSyncAll.head.via(subject)) + .tapCancel(owner.cancelable.unsafeCancel) + } catch { + case NonFatal(t) => + owner.cancelable.unsafeCancel() + Observable.raiseError(t) + case t: Throwable => + owner.cancelable.unsafeCancel() + throw t + } + } - implicit object source extends Source[Rx] { - def unsafeSubscribe[A](source: Rx[A])(sink: Observer[A]): Cancelable = source.observable.unsafeSubscribe(sink) + Rx.observableSync(observable) } + def const[A](value: A): Rx[A] = new RxConst(value) + + def observableSeed[A](observable: Observable[A])(seed: => A): Rx[A] = observableSync(observable.prependEval(seed)) + + def observableSync[A](observable: Observable[A]): Rx[A] = new RxSyncObservable(observable) + + @inline implicit final class RxLaterOps[A](private val self: Rx[A]) extends AnyVal { + def toRxEvent: RxEvent[A] = RxEvent.observable(self.observable) + + def toRxLater: RxLater[A] = RxLater.rx(self) + } } trait RxWriter[-A] { @@ -119,57 +246,85 @@ object RxWriter { } } -trait Var[A] extends Rx[A] with RxWriter[A] { - final def update(f: PartialFunction[A, A]) = { +trait VarSource[A] extends RxWriter[A] with RxSource[A] + +trait VarEvent[A] extends VarSource[A] with RxEvent[A] { + final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: RxEvent[A] => RxEvent[A2]): VarEvent[A2] = VarEvent.create(f(this), g(this)) + final def transformVarRead(g: RxEvent[A] => RxEvent[A]): VarEvent[A] = VarEvent.create(this, g(this)) + final def transformVarWrite(f: RxWriter[A] => RxWriter[A]): VarEvent[A] = VarEvent.create(f(this), this) +} + +object VarEvent { + def apply[A](): VarEvent[A] = new VarEventSubject + + def subject[A](read: Subject[A]): VarEvent[A] = create(RxWriter.observer(read), RxEvent.observable(read)) + + def create[A](write: RxWriter[A], read: RxEvent[A]): VarEvent[A] = new VarEventCreate(write, read) +} + +trait VarState[A] extends VarSource[A] with RxState[A] + +trait VarLater[A] extends VarState[A] with RxLater[A] { + final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: RxLater[A] => RxLater[A2]): VarLater[A2] = + VarLater.createStateless(f(this), g(this)) + final def transformVarRead(g: RxLater[A] => RxLater[A]): VarLater[A] = VarLater.createStateless(this, g(this)) + final def transformVarWrite(f: RxWriter[A] => RxWriter[A]): VarLater[A] = VarLater.createStateless(f(this), this) +} + +object VarLater { + def apply[A](): VarLater[A] = new VarLaterSubject + + def subject[A](read: Subject[A]): VarLater[A] = createStateless(RxWriter.observer(read), RxLater.observable(read)) + + def createStateful[A](write: RxWriter[A], read: RxLater[A]): VarLater[A] = new VarLaterCreateStateful(write, read) + def createStateless[A](write: RxWriter[A], read: RxLater[A]): VarLater[A] = new VarLaterCreateStateless(write, read) +} + +trait Var[A] extends VarState[A] with Rx[A] { + final def update(f: PartialFunction[A, A])(implicit owner: NowOwner) = { val value = this.now() this.set(f.applyOrElse(value, (_: A) => value)) } - final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: Rx[A] => Rx[A2]): Var[A2] = Var.combine(g(this), f(this)) - final def transformVarRx(g: Rx[A] => Rx[A]): Var[A] = Var.combine(g(this), this) - final def transformVarRxWriter(f: RxWriter[A] => RxWriter[A]): Var[A] = Var.combine(this, f(this)) + final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: Rx[A] => Rx[A2]): Var[A2] = Var.createStateless(f(this), g(this)) + final def transformVarRead(g: Rx[A] => Rx[A]): Var[A] = Var.createStateless(this, g(this)) + final def transformVarWrite(f: RxWriter[A] => RxWriter[A]): Var[A] = Var.createStateless(f(this), this) - final def imap[A2](f: A2 => A)(g: A => A2)(implicit owner: Owner): Var[A2] = transformVar(_.contramap(f))(_.map(g)) - final def lens[B](read: A => B)(write: (A, B) => A)(implicit owner: Owner): Var[B] = + final def imap[A2](f: A2 => A)(g: A => A2): Var[A2] = transformVar(_.contramap(f))(_.map(g)) + final def lens[B](read: A => B)(write: (A, B) => A)(implicit owner: NowOwner): Var[B] = transformVar(_.contramap(write(now(), _)))(_.map(read)) - final def prismInit[A2](f: A2 => A)(g: A => Option[A2])(initial: A2)(implicit owner: Owner): Var[A2] = - transformVar(_.contramap(f))(rx => Rx.observableSync(rx.observable.mapFilter(g).prepend(initial))) - - final def prism[A2](f: A2 => A)(g: A => Option[A2])(implicit owner: Owner): Option[Var[A2]] = - g(now()).map(prismInit(f)(g)(_)) - - final def subType[A2 <: A: ClassTag](implicit owner: Owner): Option[Var[A2]] = prism[A2]((x: A2) => x) { - case a: A2 => Some(a) - case _ => None - } + final def prism[A2](f: A2 => A)(g: A => Option[A2])(seed: => A2): Var[A2] = + transformVar(_.contramap(f))(rx => Rx.observableSync(rx.observable.mapFilter(g).prependEval(seed))) - final def subTypeInit[A2 <: A: ClassTag](initial: A2)(implicit owner: Owner): Var[A2] = prismInit[A2]((x: A2) => x) { + final def subType[A2 <: A: ClassTag](seed: => A2): Var[A2] = prism[A2]((x: A2) => x) { case a: A2 => Some(a) case _ => None - }(initial) + }(seed) - final def imapO[B](optic: Iso[A, B])(implicit owner: Owner): Var[B] = imap(optic.reverseGet(_))(optic.get(_)) - final def lensO[B](optic: Lens[A, B])(implicit owner: Owner): Var[B] = lens(optic.get(_))((base, zoomed) => optic.replace(zoomed)(base)) - final def prismO[B](optic: Prism[A, B])(implicit owner: Owner): Option[Var[B]] = prism(optic.reverseGet(_))(optic.getOption(_)) - final def prismInitO[B](optic: Prism[A, B])(initial: B)(implicit owner: Owner): Var[B] = - prismInit(optic.reverseGet(_))(optic.getOption(_))(initial) + final def imapO[B](optic: Iso[A, B]): Var[B] = imap(optic.reverseGet(_))(optic.get(_)) + final def lensO[B](optic: Lens[A, B]): Var[B] = lens(optic.get(_))((base, zoomed) => optic.replace(zoomed)(base)) + final def prismO[B](optic: Prism[A, B])(seed: => B): Var[B] = + prism(optic.reverseGet(_))(optic.getOption(_))(seed) } object Var { def apply[A](seed: A): Var[A] = new VarSubject(seed) - def combine[A](read: Rx[A], write: RxWriter[A]): Var[A] = new VarCombine(read, write) + def subjectSync[A](read: Subject[A]): Var[A] = createStateless(RxWriter.observer(read), Rx.observableSync(read)) + + def createStateful[A](write: RxWriter[A], read: Rx[A]): Var[A] = new VarCreateStateful(write, read) + def createStateless[A](write: RxWriter[A], read: Rx[A]): Var[A] = new VarCreateStateless(write, read) @inline implicit class SeqVarOperations[A](rxvar: Var[Seq[A]]) { - def sequence(implicit owner: Owner): Rx[Seq[Var[A]]] = Rx.observableSync(new Observable[Seq[Var[A]]] { + def sequence: Rx[Seq[Var[A]]] = Rx.observableSync(new Observable[Seq[Var[A]]] { def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = { rxvar.observable.unsafeSubscribe( Observer.create( { seq => sink.unsafeOnNext(seq.zipWithIndex.map { case (a, idx) => - val observer = new Observer[A] { + val observer = new Observer[A] { def unsafeOnNext(value: A): Unit = { rxvar.set(seq.updated(idx, value)) } @@ -178,16 +333,7 @@ object Var { sink.unsafeOnError(error) } } - val observable = new Observable.Value[A] { - def now(): A = a - - def unsafeSubscribe(sink: Observer[A]): Cancelable = { - sink.unsafeOnNext(a) - Cancelable.empty - } - - } - Var.combine(Rx.observable(observable)(seed = a), RxWriter.observer(observer)) + Var.createStateless(RxWriter.observer(observer), Rx.const(a)) }) }, sink.unsafeOnError, @@ -198,7 +344,7 @@ object Var { } @inline implicit class OptionVarOperations[A](rxvar: Var[Option[A]]) { - def sequence(implicit owner: Owner): Rx[Option[Var[A]]] = Rx.observableSync(new Observable[Option[Var[A]]] { + def sequence: Rx[Option[Var[A]]] = Rx.observableSync(new Observable[Option[Var[A]]] { def unsafeSubscribe(outerSink: Observer[Option[Var[A]]]): Cancelable = { var cache = Option.empty[Var[A]] @@ -234,32 +380,120 @@ object Var { } } +// RxEvent + +private final class RxEventObservableShared[A](inner: Observable[A]) extends RxEvent[A] { + val observable: Observable[A] = inner.publish.refCount +} + +private final class RxEventObservableUnshared[A](val observable: Observable[A]) extends RxEvent[A] + +// Rx + private final class RxConst[A](value: A) extends Rx[A] { + private lazy val someValue = Some(value) + val observable: Observable[A] = Observable.pure(value) - def now(): A = value + + def apply()(implicit owner: LiveOwner): A = value + def now()(implicit owner: NowOwner): A = value + def nowIfSubscribedOption(): Option[A] = someValue +} + +private final class RxSyncObservable[A](inner: Observable[A]) extends Rx[A] { + private val state = Subject.replayLatest[A]() + + val observable: Observable[A] = + inner.dropUntilSyncLatest.distinctOnEquals.tapCancel(state.unsafeResetState).multicast(state).refCount + + def apply()(implicit owner: LiveOwner) = owner.unsafeLive(this) + def now()(implicit owner: NowOwner) = owner.unsafeNow(this) + def nowIfSubscribedOption() = state.now() +} + +// RxLater + +private object RxLaterEmpty extends RxLater[Nothing] { + def observable = Observable.empty } -private final class RxObservableSync[A](inner: Observable[A])(implicit owner: Owner) extends Rx[A] { - private val state = new ReplayLatestSubject[A]() +private final class RxLaterWrap[A](state: Rx[A]) extends RxLater[A] { + def observable: Observable[A] = state.observable +} - val observable: Observable[A] = inner.dropUntilSyncLatest.distinctOnEquals.multicast(state).refCount - owner.unsafeOwn(() => observable.unsafeSubscribe()) +private final class RxLaterObservable[A](inner: Observable[A]) extends RxLater[A] { + private val state = Subject.replayLatest[A]() - def now(): A = state.now().get + val observable: Observable[A] = + inner.dropUntilSyncLatest.distinctOnEquals.tapCancel(state.unsafeResetState).multicast(state).refCount } +// RxWriter + private final class RxWriterObserver[A](val observer: Observer[A]) extends RxWriter[A] -private final class VarSubject[A](seed: A) extends Var[A] { - private val state = new BehaviorSubject[A](seed) +// VarEvent + +private final class VarEventSubject[A] extends VarEvent[A] { + private val state = Subject.publish[A]() + + def observable: Observable[A] = state + def observer: Observer[A] = state +} + +private final class VarEventCreate[A](innerWrite: RxWriter[A], innerRead: RxEvent[A]) extends VarEvent[A] { + def observable = innerRead.observable + def observer = innerWrite.observer +} + +private final class VarLaterSubject[A] extends VarLater[A] { + private val state = Subject.replayLatest[A]() + + val observable: Observable[A] = state.distinctOnEquals + def observer: Observer[A] = state +} + +private final class VarLaterCreateStateless[A](innerWrite: RxWriter[A], innerRead: RxLater[A]) extends VarLater[A] { + def observable = innerRead.observable + def observer = innerWrite.observer +} + +private final class VarLaterCreateStateful[A](innerWrite: RxWriter[A], innerRead: RxLater[A]) extends VarLater[A] { + private val state = Subject.replayLatest[A]() + + val observable = innerRead.observable.subscribing(state.via(innerWrite.observer)).multicast(state).refCount + def observer = state +} + +// Var + +private final class VarSubject[A](seed: A) extends Var[A] { + private val state = Subject.behavior[A](seed) val observable: Observable[A] = state.distinctOnEquals - val observer: Observer[A] = state + def observer: Observer[A] = state - def now(): A = state.now() + def apply()(implicit owner: LiveOwner) = owner.unsafeLive(this) + def now()(implicit owner: NowOwner) = state.now() + def nowIfSubscribedOption() = Some(state.now()) } -private final class VarCombine[A](innerRead: Rx[A], innerWrite: RxWriter[A]) extends Var[A] { - def now() = innerRead.now() + +private final class VarCreateStateless[A](innerWrite: RxWriter[A], innerRead: Rx[A]) extends Var[A] { val observable = innerRead.observable val observer = innerWrite.observer + + def apply()(implicit owner: LiveOwner) = innerRead() + def now()(implicit owner: NowOwner) = innerRead.now() + def nowIfSubscribedOption() = innerRead.nowIfSubscribedOption() +} + +private final class VarCreateStateful[A](innerWrite: RxWriter[A], innerRead: Rx[A]) extends Var[A] { + private val state = Subject.replayLatest[A]() + + val observable = innerRead.observable.subscribing(state.via(innerWrite.observer)).multicast(state).refCount + def observer = state + + def apply()(implicit owner: LiveOwner) = owner.unsafeLive(this) + def now()(implicit owner: NowOwner) = owner.unsafeNow(this) + def nowIfSubscribedOption() = state.now() } diff --git a/reactive/src/main/scala/colibri/reactive/implicits.scala b/reactive/src/main/scala/colibri/reactive/implicits.scala deleted file mode 100644 index 8583c406..00000000 --- a/reactive/src/main/scala/colibri/reactive/implicits.scala +++ /dev/null @@ -1,16 +0,0 @@ -package colibri.reactive - -import colibri.{Observer, Observable} - -object implicits { - @inline class ObservableRxOps[A](private val self: Observable[A]) extends AnyVal { - def foreachOwned(f: A => Unit)(implicit owner: Owner): Unit = - subscribeOwned(Observer.foreach(f)) - - def subscribeOwned(rxWriter: RxWriter[A])(implicit owner: Owner): Unit = - subscribeOwned(rxWriter.observer) - - def subscribeOwned(observer: Observer[A])(implicit owner: Owner): Unit = - owner.unsafeOwn(() => self.unsafeSubscribe(observer)) - } -} diff --git a/reactive/src/test/scala/colibri/ReactiveSpec.scala b/reactive/src/test/scala/colibri/ReactiveSpec.scala index 7002d2a4..f7e8760c 100644 --- a/reactive/src/test/scala/colibri/ReactiveSpec.scala +++ b/reactive/src/test/scala/colibri/ReactiveSpec.scala @@ -1,34 +1,25 @@ package colibri.reactive -import colibri._ import cats.implicits._ -import cats.effect.SyncIO import monocle.macros.{GenLens, GenPrism} import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AsyncFlatSpec class ReactiveSpec extends AsyncFlatSpec with Matchers { - implicit def unsafeSubscriptionOwner[T]: SubscriptionOwner[SyncIO[T]] = new SubscriptionOwner[SyncIO[T]] { - def own(owner: SyncIO[T])(subscription: () => Cancelable): SyncIO[T] = - owner.flatTap(_ => SyncIO(subscription()).void) - } - - "Rx" should "map with proper subscription lifetime" in Owned(SyncIO { + "Rx" should "map with proper subscription lifetime" in { var mapped = List.empty[Int] var received1 = List.empty[Int] var received2 = List.empty[Int] - val owner = implicitly[Owner] - val variable = Var(1) val stream = variable.map { x => mapped ::= x; x } - mapped shouldBe List(1) + mapped shouldBe List.empty received1 shouldBe List.empty received2 shouldBe List.empty - stream.foreach(received1 ::= _) + val cancelR1 = stream.unsafeForeach(received1 ::= _) mapped shouldBe List(1) received1 shouldBe List(1) @@ -40,7 +31,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(2, 1) received2 shouldBe List.empty - stream.foreach(received2 ::= _) + val cancelR2 = stream.unsafeForeach(received2 ::= _) mapped shouldBe List(2, 1) received1 shouldBe List(2, 1) @@ -52,19 +43,14 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(3, 2, 1) received2 shouldBe List(3, 2) - val cancel = owner.unsafeSubscribe() - - mapped shouldBe List(3, 2, 1) - received1 shouldBe List(3, 2, 1) - received2 shouldBe List(3, 2) - variable.set(4) mapped shouldBe List(4, 3, 2, 1) received1 shouldBe List(4, 3, 2, 1) received2 shouldBe List(4, 3, 2) - cancel.unsafeCancel() + cancelR1.unsafeCancel() + cancelR2.unsafeCancel() mapped shouldBe List(4, 3, 2, 1) received1 shouldBe List(4, 3, 2, 1) @@ -76,7 +62,8 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(4, 3, 2, 1) received2 shouldBe List(4, 3, 2) - val cancel2 = owner.unsafeSubscribe() + val cancelR1b = stream.unsafeForeach(received1 ::= _) + val cancelR2b = stream.unsafeForeach(received2 ::= _) mapped shouldBe List(5, 4, 3, 2, 1) received1 shouldBe List(5, 4, 3, 2, 1) @@ -88,7 +75,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(6, 5, 4, 3, 2, 1) received2 shouldBe List(6, 5, 4, 3, 2) - val cancel3 = owner.unsafeSubscribe() + val cancelX = stream.unsafeSubscribe() mapped shouldBe List(6, 5, 4, 3, 2, 1) received1 shouldBe List(6, 5, 4, 3, 2, 1) @@ -100,19 +87,13 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(7, 6, 5, 4, 3, 2) - cancel2.unsafeCancel() - - mapped shouldBe List(7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(7, 6, 5, 4, 3, 2, 1) - received2 shouldBe List(7, 6, 5, 4, 3, 2) - variable.set(8) mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) - cancel3.unsafeCancel() + cancelR2b.unsafeCancel() mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) @@ -120,12 +101,36 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable.set(9) - mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) + mapped shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) - }).unsafeRunSync() - it should "nested owners" in Owned(SyncIO { + cancelR1b.unsafeCancel() + + mapped shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + + variable.set(10) + + mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + + cancelX.unsafeCancel() + + mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + + variable.set(11) + + mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + } + + it should "nested rx" in { var received1 = List.empty[Int] var innerRx = List.empty[Int] var outerRx = List.empty[Int] @@ -133,7 +138,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val variable = Var(1) val variable2 = Var(2) - def test(x: Int)(implicit owner: Owner) = Rx { + def test(x: Int) = Rx { innerRx ::= x variable2() * x } @@ -143,13 +148,13 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { outerRx ::= curr val result = test(curr) result() - } + }.unsafeHot() innerRx shouldBe List(1) outerRx shouldBe List(1) received1 shouldBe List.empty - rx.foreach(received1 ::= _) + rx.unsafeForeach(received1 ::= _) innerRx shouldBe List(1) outerRx shouldBe List(1) @@ -178,70 +183,64 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { innerRx shouldBe List(3, 3, 3, 2, 1, 1, 1) // TODO: triggering too often outerRx shouldBe List(3, 3, 2, 1, 1) received1 shouldBe List(12, 9, 6, 3, 2) - }).unsafeRunSync() - - it should "nested owners 2" in Owned - .function(ownedOwner => - SyncIO { - var received1 = List.empty[Int] - var innerRx = List.empty[Int] - var outerRx = List.empty[Int] + } - val variable = Var(1) - val variable2 = Var(2) + it should "nested rx 2" in { + var received1 = List.empty[Int] + var innerRx = List.empty[Int] + var outerRx = List.empty[Int] - implicit val owner: Owner = ownedOwner + val variable = Var(1) + val variable2 = Var(2) - def test(x: Int)(implicit owner: Owner) = Rx { - innerRx ::= x - variable2() * x - } + def test(x: Int) = Rx { + innerRx ::= x + variable2() * x + } - val rx = Rx { - val curr = variable() - outerRx ::= curr - val result = test(curr) - result() - } + val rx = Rx { + val curr = variable() + outerRx ::= curr + val result = test(curr) + result() + } - innerRx shouldBe List(1) - outerRx shouldBe List(1) - received1 shouldBe List.empty + innerRx shouldBe List.empty + outerRx shouldBe List.empty + received1 shouldBe List.empty - rx.foreach(received1 ::= _) + rx.unsafeForeach(received1 ::= _) - innerRx shouldBe List(1) - outerRx shouldBe List(1) - received1 shouldBe List(2) + innerRx shouldBe List(1) + outerRx shouldBe List(1) + received1 shouldBe List(2) - variable2.set(3) + variable2.set(3) - innerRx shouldBe List(1, 1, 1) // TODO: triggering too often - outerRx shouldBe List(1, 1) - received1 shouldBe List(3, 2) + innerRx shouldBe List(1, 1, 1) // TODO: triggering too often + outerRx shouldBe List(1, 1) + received1 shouldBe List(3, 2) - variable.set(2) + variable.set(2) - innerRx shouldBe List(2, 1, 1, 1) - outerRx shouldBe List(2, 1, 1) - received1 shouldBe List(6, 3, 2) + innerRx shouldBe List(2, 1, 1, 1) + outerRx shouldBe List(2, 1, 1) + received1 shouldBe List(6, 3, 2) - variable.set(3) + variable.set(3) - innerRx shouldBe List(3, 2, 1, 1, 1) - outerRx shouldBe List(3, 2, 1, 1) - received1 shouldBe List(9, 6, 3, 2) + innerRx shouldBe List(3, 2, 1, 1, 1) + outerRx shouldBe List(3, 2, 1, 1) + received1 shouldBe List(9, 6, 3, 2) - variable2.set(4) + variable2.set(4) - innerRx shouldBe List(3, 3, 3, 2, 1, 1, 1) // TODO: triggering too often - outerRx shouldBe List(3, 3, 2, 1, 1) - received1 shouldBe List(12, 9, 6, 3, 2) - }, - ) - .unsafeRunSync() + innerRx shouldBe List(3, 3, 3, 2, 1, 1, 1) // TODO: triggering too often + outerRx shouldBe List(3, 3, 2, 1, 1) + received1 shouldBe List(12, 9, 6, 3, 2) + } - it should "sequence with nesting" in Owned(SyncIO { + it should "sequence with nesting" in { var received1 = List.empty[Int] var mapped = List.empty[Boolean] @@ -269,10 +268,10 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { } } - mapped shouldBe List(false) + mapped shouldBe List.empty received1 shouldBe List.empty - stream.foreach(received1 ::= _) + stream.unsafeForeach(received1 ::= _) mapped shouldBe List(false) received1 shouldBe List(-1) @@ -285,20 +284,20 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable.set(Some(2)) mapped shouldBe List(true, false) - received1 shouldBe List(1, 12, 2, -1) - }).unsafeRunSync() + received1 shouldBe List(1, 2, -1) + } - it should "be distinct" in Owned(SyncIO { + it should "be distinct" in { var mapped = List.empty[Int] var received1 = List.empty[Boolean] val variable = Var(1) val stream = variable.map { x => mapped ::= x; x % 2 == 0 } - mapped shouldBe List(1) + mapped shouldBe List.empty received1 shouldBe List.empty - stream.foreach(received1 ::= _) + stream.unsafeForeach(received1 ::= _) mapped shouldBe List(1) received1 shouldBe List(false) @@ -322,9 +321,9 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(5, 4, 2, 1) received1 shouldBe List(false, true, false) - }).unsafeRunSync() + } - it should "work without glitches in chain" in Owned(SyncIO { + it should "work without glitches in chain" in { var liveCounter = 0 var mapped = List.empty[Int] var received1 = List.empty[Boolean] @@ -334,7 +333,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val stream = variable.map { x => mapped ::= x; x % 2 == 0 } - stream.foreach(received1 ::= _) + stream.unsafeForeach(received1 ::= _) mapped shouldBe List(1) received1 shouldBe List(false) @@ -344,12 +343,12 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { s"${variable()}: ${stream()}" } - rx.foreach(receivedRx ::= _) + rx.unsafeForeach(receivedRx ::= _) mapped shouldBe List(1) received1 shouldBe List(false) receivedRx shouldBe List("1: false") - rx.now() shouldBe "1: false" + rx.nowIfSubscribed() shouldBe "1: false" liveCounter shouldBe 1 variable.set(2) @@ -357,7 +356,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(2, 1) received1 shouldBe List(true, false) receivedRx shouldBe List("2: true", "1: false") - rx.now() shouldBe "2: true" + rx.nowIfSubscribed() shouldBe "2: true" liveCounter shouldBe 2 variable.set(2) @@ -365,7 +364,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(2, 1) received1 shouldBe List(true, false) receivedRx shouldBe List("2: true", "1: false") - rx.now() shouldBe "2: true" + rx.nowIfSubscribed() shouldBe "2: true" liveCounter shouldBe 2 variable.set(4) @@ -373,7 +372,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(4, 2, 1) received1 shouldBe List(true, false) receivedRx shouldBe List("4: true", "2: true", "1: false") - rx.now() shouldBe "4: true" + rx.nowIfSubscribed() shouldBe "4: true" liveCounter shouldBe 3 variable.set(5) @@ -381,11 +380,11 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(5, 4, 2, 1) received1 shouldBe List(false, true, false) receivedRx shouldBe List("5: false", "4: true", "2: true", "1: false") - rx.now() shouldBe "5: false" + rx.nowIfSubscribed() shouldBe "5: false" liveCounter shouldBe 4 - }).unsafeRunSync() + } - it should "work nested" in Owned(SyncIO { + it should "work nested" in { var liveCounter = 0 var liveCounter2 = 0 @@ -401,32 +400,32 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { } variable() + nested() - } + }.unsafeHot() - rx.now() shouldBe 3 + rx.nowIfSubscribed() shouldBe 3 liveCounter shouldBe 1 liveCounter2 shouldBe 1 variable.set(2) - rx.now() shouldBe 4 + rx.nowIfSubscribed() shouldBe 4 liveCounter shouldBe 2 liveCounter2 shouldBe 2 variable2.set(4) - rx.now() shouldBe 6 + rx.nowIfSubscribed() shouldBe 6 liveCounter shouldBe 3 liveCounter2 shouldBe 4 // TODO: why do we jump to 4 calculations here instead of 3? variable.set(3) - rx.now() shouldBe 7 + rx.nowIfSubscribed() shouldBe 7 liveCounter shouldBe 4 liveCounter2 shouldBe 5 - }).unsafeRunSync() + } - it should "work with now" in Owned(SyncIO { + it should "work with now" in { var liveCounter = 0 val variable = Var(1) @@ -436,92 +435,85 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val rx = Rx { liveCounter += 1 s"${variable()}, ${variable2()}, ${variable3.now()}" - } + }.unsafeHot() - rx.now() shouldBe "1, 2, 3" + rx.nowIfSubscribed() shouldBe "1, 2, 3" liveCounter shouldBe 1 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowIfSubscribed() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowIfSubscribed() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable2.set(10) - rx.now() shouldBe "2, 10, 3" + rx.nowIfSubscribed() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable3.set(5) - rx.now() shouldBe "2, 10, 3" + rx.nowIfSubscribed() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable2.set(100) - rx.now() shouldBe "2, 100, 5" + rx.nowIfSubscribed() shouldBe "2, 100, 5" liveCounter shouldBe 4 - }).unsafeRunSync() + } - it should "work with multi nesting" in Owned(SyncIO { + it should "work with multi nesting" in { var liveCounter = 0 val variable = Var(1) val variable2 = Var(2) val variable3 = Var(3) - val rx = Owned(SyncIO { - Owned(SyncIO { + val rx = Rx { + liveCounter += 1 + Rx { Rx { - liveCounter += 1 - - Owned(SyncIO { - Rx { - Rx { - s"${variable()}, ${variable2()}, ${variable3.now()}" - } - }(implicitly)() - }).unsafeRunSync()() + s"${variable()}, ${variable2()}, ${variable3.now()}" } - }).unsafeRunSync() - }).unsafeRunSync() + }.apply().apply() + }.unsafeHot() - rx.now() shouldBe "1, 2, 3" + rx.nowIfSubscribed() shouldBe "1, 2, 3" liveCounter shouldBe 1 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowIfSubscribed() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowIfSubscribed() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable2.set(10) - rx.now() shouldBe "2, 10, 3" + rx.nowIfSubscribed() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable3.set(5) - rx.now() shouldBe "2, 10, 3" + rx.nowIfSubscribed() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable2.set(100) - rx.now() shouldBe "2, 100, 5" + rx.nowIfSubscribed() shouldBe "2, 100, 5" liveCounter shouldBe 4 - }).unsafeRunSync() + } - it should "diamond" in Owned(SyncIO { + it should "diamond" in { var liveCounter = 0 var mapped1 = List.empty[Int] var mapped2 = List.empty[Int] @@ -534,8 +526,8 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val stream1 = variable.map { x => mapped1 ::= x; x % 2 == 0 } val stream2 = variable.map { x => mapped2 ::= x; x % 2 == 0 } - stream1.foreach(received1 ::= _) - stream2.foreach(received2 ::= _) + stream1.unsafeForeach(received1 ::= _) + stream2.unsafeForeach(received2 ::= _) mapped1 shouldBe List(1) mapped2 shouldBe List(1) @@ -547,14 +539,14 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { s"${stream1()}:${stream2()}" } - rx.foreach(receivedRx ::= _) + rx.unsafeForeach(receivedRx ::= _) mapped1 shouldBe List(1) mapped2 shouldBe List(1) received1 shouldBe List(false) received2 shouldBe List(false) receivedRx shouldBe List("false:false") - rx.now() shouldBe "false:false" + rx.nowIfSubscribed() shouldBe "false:false" liveCounter shouldBe 1 variable.set(2) @@ -564,7 +556,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(true, false) received2 shouldBe List(true, false) receivedRx shouldBe List("true:true", "true:false", "false:false") // glitch - rx.now() shouldBe "true:true" + rx.nowIfSubscribed() shouldBe "true:true" liveCounter shouldBe 3 variable.set(2) @@ -574,7 +566,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(true, false) received2 shouldBe List(true, false) receivedRx shouldBe List("true:true", "true:false", "false:false") - rx.now() shouldBe "true:true" + rx.nowIfSubscribed() shouldBe "true:true" liveCounter shouldBe 3 variable.set(4) @@ -584,7 +576,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(true, false) received2 shouldBe List(true, false) receivedRx shouldBe List("true:true", "true:false", "false:false") - rx.now() shouldBe "true:true" + rx.nowIfSubscribed() shouldBe "true:true" liveCounter shouldBe 3 variable.set(5) @@ -594,179 +586,292 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(false, true, false) received2 shouldBe List(false, true, false) receivedRx shouldBe List("false:false", "false:true", "true:true", "true:false", "false:false") // glitch - rx.now() shouldBe "false:false" + rx.nowIfSubscribed() shouldBe "false:false" liveCounter shouldBe 5 - }).unsafeRunSync() + } - it should "collect" in Owned(SyncIO { - val variable = Var[Option[Int]](Some(1)) - val collected = variable.collect { case Some(x) => x }(0) + it should "collect initial some" in { var collectedStates = Vector.empty[Int] - collected.foreach(collectedStates :+= _) + val variable = Var[Option[Int]](Some(1)) + val collected = variable.collect { case Some(x) => x }.toRx(0) + collected.unsafeForeach(collectedStates :+= _) + + collected.now() shouldBe 1 collectedStates shouldBe Vector(1) variable.set(None) + collected.now() shouldBe 1 collectedStates shouldBe Vector(1) variable.set(Some(17)) + collected.now() shouldBe 17 collectedStates shouldBe Vector(1, 17) + } + + it should "collect initial none" in { + var collectedStates = Vector.empty[Int] - }).unsafeRunSync() + val variable = Var[Option[Int]](None) + val collected = variable.collect { case Some(x) => x }.toRx(0) - it should "collect initial none" in Owned(SyncIO { - val variable = Var[Option[Int]](None) - val collected = variable.collect { case Some(x) => x }(0) + collected.unsafeForeach(collectedStates :+= _) + + collected.now() shouldBe 0 + collectedStates shouldBe Vector(0) + + variable.set(None) + collected.now() shouldBe 0 + collectedStates shouldBe Vector(0) + + variable.set(Some(17)) + collected.now() shouldBe 17 + collectedStates shouldBe Vector(0, 17) + } + + it should "collect later initial none" in { + var tapStates = Vector.empty[Int] var collectedStates = Vector.empty[Int] - collected.foreach(collectedStates :+= _) + val variable = Var[Option[Int]](None) + val collected = variable.collect { case Some(x) => x }.tap(tapStates :+= _) + + val cancelable = collected.unsafeForeach(collectedStates :+= _) + + collectedStates shouldBe Vector.empty + tapStates shouldBe Vector.empty + variable.set(Some(0)) collectedStates shouldBe Vector(0) + tapStates shouldBe Vector(0) + + collected.toRx.now() shouldBe Some(0) + collectedStates shouldBe Vector(0) + tapStates shouldBe Vector(0) variable.set(None) collectedStates shouldBe Vector(0) + tapStates shouldBe Vector(0) + + collected.toRx.now() shouldBe Some(0) + collectedStates shouldBe Vector(0) + tapStates shouldBe Vector(0) variable.set(Some(17)) collectedStates shouldBe Vector(0, 17) + tapStates shouldBe Vector(0, 17) - }).unsafeRunSync() + collected.toRx.now() shouldBe Some(17) + collectedStates shouldBe Vector(0, 17) + tapStates shouldBe Vector(0, 17) + + cancelable.unsafeCancel() + + variable.set(Some(18)) + collectedStates shouldBe Vector(0, 17) + tapStates shouldBe Vector(0, 17) - it should "sequence on Var[Seq[T]]" in Owned(SyncIO { + collected.toRx.now() shouldBe Some(18) + collectedStates shouldBe Vector(0, 17) + tapStates shouldBe Vector(0, 17, 18) + } + + it should "sequence on Var[Seq[T]]" in { { // inner.set on seed value val variable = Var[Seq[Int]](Seq(1)) - val sequence: Rx[Seq[Var[Int]]] = variable.sequence + val sequence: Rx[Seq[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe Seq(1) - sequence.now().map(_.now()) shouldBe Seq(1) + variable.nowIfSubscribed() shouldBe Seq(1) + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe Seq(1) - sequence.now()(0).set(2) - variable.now() shouldBe Seq(2) + sequence.nowIfSubscribed().apply(0).set(2) + variable.nowIfSubscribed() shouldBe Seq(2) } { // inner.set on value after seed val variable = Var[Seq[Int]](Seq.empty) - val sequence: Rx[Seq[Var[Int]]] = variable.sequence + val sequence: Rx[Seq[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe Seq.empty - sequence.now().map(_.now()) shouldBe Seq.empty + variable.nowIfSubscribed() shouldBe Seq.empty + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe Seq.empty variable.set(Seq(1)) - sequence.now().map(_.now()) shouldBe Seq(1) + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe Seq(1) - sequence.now()(0).set(2) - variable.now() shouldBe Seq(2) + sequence.nowIfSubscribed().apply(0).set(2) + variable.nowIfSubscribed() shouldBe Seq(2) } - }).unsafeRunSync() + } - it should "sequence on Var[Option[T]]" in Owned(SyncIO { + it should "sequence on Var[Option[T]]" in { { // inner.set on seed value val variable = Var[Option[Int]](Some(1)) - val sequence: Rx[Option[Var[Int]]] = variable.sequence + val sequence: Rx[Option[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe Some(1) - sequence.now().map(_.now()) shouldBe Some(1) + variable.nowIfSubscribed() shouldBe Some(1) + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe Some(1) - sequence.now().get.set(2) - variable.now() shouldBe Some(2) + sequence.nowIfSubscribed().get.set(2) + variable.nowIfSubscribed() shouldBe Some(2) } { // inner.set on value after seed val variable = Var[Option[Int]](Option.empty) - val sequence: Rx[Option[Var[Int]]] = variable.sequence + val sequence: Rx[Option[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe None - sequence.now().map(_.now()) shouldBe None + variable.nowIfSubscribed() shouldBe None + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe None variable.set(Option(1)) - sequence.now().map(_.now()) shouldBe Option(1) + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe Option(1) - sequence.now().get.set(2) - variable.now() shouldBe Option(2) + sequence.nowIfSubscribed().get.set(2) + variable.nowIfSubscribed() shouldBe Option(2) variable.set(None) - sequence.now().map(_.now()) shouldBe None + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe None } { // inner.set on seed value val variable = Var[Option[Int]](Some(1)) - val sequence: Rx[Option[Var[Int]]] = variable.sequence + val sequence: Rx[Option[Var[Int]]] = variable.sequence.unsafeHot() var outerTriggered = 0 var innerTriggered = 0 - sequence.foreach(_ => outerTriggered += 1) - sequence.now().foreach(_.foreach(_ => innerTriggered += 1)) + sequence.unsafeForeach(_ => outerTriggered += 1) + sequence.nowIfSubscribed().foreach(_.unsafeForeach(_ => innerTriggered += 1)) - variable.now() shouldBe Some(1) - sequence.now().map(_.now()) shouldBe Some(1) + variable.nowIfSubscribed() shouldBe Some(1) + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe Some(1) outerTriggered shouldBe 1 innerTriggered shouldBe 1 - val varRefA = sequence.now().get + val varRefA = sequence.nowIfSubscribed().get variable.set(Some(2)) - variable.now() shouldBe Some(2) - sequence.now().map(_.now()) shouldBe Some(2) + variable.nowIfSubscribed() shouldBe Some(2) + sequence.nowIfSubscribed().map(_.nowIfSubscribed()) shouldBe Some(2) outerTriggered shouldBe 1 innerTriggered shouldBe 2 - val varRefB = sequence.now().get + val varRefB = sequence.nowIfSubscribed().get assert(varRefA eq varRefB) } - }).unsafeRunSync() + } + + it should "transform stateful" in { + val original: Var[Int] = Var(1) + val encoded: Var[String] = Var.createStateful( + original.contramapIterable(str => str.toIntOption), + original.map(num => num.toString), + ) + + encoded.unsafeSubscribe() + + original.nowIfSubscribed() shouldBe 1 + encoded.nowIfSubscribed() shouldBe "1" + + original.set(2) + + original.nowIfSubscribed() shouldBe 2 + encoded.nowIfSubscribed() shouldBe "2" + + encoded.set("3") + + original.nowIfSubscribed() shouldBe 3 + encoded.nowIfSubscribed() shouldBe "3" + + encoded.set("nope") + + original.nowIfSubscribed() shouldBe 3 + encoded.nowIfSubscribed() shouldBe "nope" + } + + it should "transform stateless" in { + val original: Var[Int] = Var(1) + val encoded: Var[String] = Var.createStateless( + original.contramapIterable(str => str.toIntOption), + original.map(num => num.toString), + ) + + encoded.unsafeSubscribe() - it should "lens" in Owned(SyncIO { + original.nowIfSubscribed() shouldBe 1 + encoded.nowIfSubscribed() shouldBe "1" + + original.set(2) + + original.nowIfSubscribed() shouldBe 2 + encoded.nowIfSubscribed() shouldBe "2" + + encoded.set("3") + + original.nowIfSubscribed() shouldBe 3 + encoded.nowIfSubscribed() shouldBe "3" + + encoded.set("nope") + + original.nowIfSubscribed() shouldBe 3 + encoded.nowIfSubscribed() shouldBe "3" + } + + it should "lens" in { val a: Var[(Int, String)] = Var((0, "Wurst")) val b: Var[String] = a.lens(_._2)((a, b) => a.copy(_2 = b)) val c: Rx[String] = b.map(_ + "q") - a.now() shouldBe ((0, "Wurst")) - b.now() shouldBe "Wurst" - c.now() shouldBe "Wurstq" + a.unsafeSubscribe() + b.unsafeSubscribe() + c.unsafeSubscribe() + + a.nowIfSubscribed() shouldBe ((0, "Wurst")) + b.nowIfSubscribed() shouldBe "Wurst" + c.nowIfSubscribed() shouldBe "Wurstq" a.set((1, "hoho")) - a.now() shouldBe ((1, "hoho")) - b.now() shouldBe "hoho" - c.now() shouldBe "hohoq" + a.nowIfSubscribed() shouldBe ((1, "hoho")) + b.nowIfSubscribed() shouldBe "hoho" + c.nowIfSubscribed() shouldBe "hohoq" b.set("Voodoo") - a.now() shouldBe ((1, "Voodoo")) - b.now() shouldBe "Voodoo" - c.now() shouldBe "Voodooq" + a.nowIfSubscribed() shouldBe ((1, "Voodoo")) + b.nowIfSubscribed() shouldBe "Voodoo" + c.nowIfSubscribed() shouldBe "Voodooq" a.set((3, "genau")) - a.now() shouldBe ((3, "genau")) - b.now() shouldBe "genau" - c.now() shouldBe "genauq" + a.nowIfSubscribed() shouldBe ((3, "genau")) + b.nowIfSubscribed() shouldBe "genau" + c.nowIfSubscribed() shouldBe "genauq" b.set("Schwein") - a.now() shouldBe ((3, "Schwein")) - b.now() shouldBe "Schwein" - c.now() shouldBe "Schweinq" - }).unsafeRunSync() + a.nowIfSubscribed() shouldBe ((3, "Schwein")) + b.nowIfSubscribed() shouldBe "Schwein" + c.nowIfSubscribed() shouldBe "Schweinq" + } it should "lens with monocle" in { case class Company(name: String, zipcode: Int) case class Employee(name: String, company: Company) - Owned(SyncIO { - val employee = Var(Employee("jules", Company("wules", 7))) - val zipcode = employee.lensO(GenLens[Employee](_.company.zipcode)) + val employee = Var(Employee("jules", Company("wules", 7))) + val zipcode = employee.lensO(GenLens[Employee](_.company.zipcode)) - employee.now() shouldBe Employee("jules", Company("wules", 7)) - zipcode.now() shouldBe 7 + zipcode.unsafeSubscribe() - zipcode.set(8) - employee.now() shouldBe Employee("jules", Company("wules", 8)) - zipcode.now() shouldBe 8 + employee.nowIfSubscribed() shouldBe Employee("jules", Company("wules", 7)) + zipcode.nowIfSubscribed() shouldBe 7 - employee.set(Employee("gula", Company("bori", 6))) - employee.now() shouldBe Employee("gula", Company("bori", 6)) - zipcode.now() shouldBe 6 - }).unsafeRunSync() + zipcode.set(8) + employee.nowIfSubscribed() shouldBe Employee("jules", Company("wules", 8)) + zipcode.nowIfSubscribed() shouldBe 8 + + employee.set(Employee("gula", Company("bori", 6))) + employee.nowIfSubscribed() shouldBe Employee("gula", Company("bori", 6)) + zipcode.nowIfSubscribed() shouldBe 6 } it should "optics operations" in { @@ -774,48 +879,508 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { case class EventA(i: Int) extends Event case class EventB(s: String) extends Event - Owned(SyncIO { - val eventVar: Var[Event] = Var[Event](EventA(0)) - val eventNotAVar: Var[Event] = Var[Event](EventB("")) + val eventVar: Var[Event] = Var[Event](EventA(0)) + val eventNotVar: Var[Event] = Var[Event](EventB("")) + + val eventAVar = eventVar.prismO(GenPrism[Event, EventA])(null) + val eventAVar2 = eventVar.subType[EventA](null) + val eventNotAVar = eventNotVar.prismO(GenPrism[Event, EventA])(null) + + eventAVar.unsafeSubscribe() + eventAVar2.unsafeSubscribe() + eventNotAVar.unsafeSubscribe() + + eventVar.nowIfSubscribed() shouldBe EventA(0) + eventAVar.nowIfSubscribed() shouldBe EventA(0) + eventAVar2.nowIfSubscribed() shouldBe EventA(0) + eventNotAVar.nowIfSubscribed() shouldBe null + + eventAVar.set(EventA(1)) + + eventVar.nowIfSubscribed() shouldBe EventA(1) + eventAVar.nowIfSubscribed() shouldBe EventA(1) + eventAVar2.nowIfSubscribed() shouldBe EventA(1) + + eventVar.set(EventB("he")) + + eventVar.nowIfSubscribed() shouldBe EventB("he") + eventAVar.nowIfSubscribed() shouldBe EventA(1) + eventAVar2.nowIfSubscribed() shouldBe EventA(1) + + eventAVar.set(EventA(2)) + + eventVar.nowIfSubscribed() shouldBe EventA(2) + eventAVar.nowIfSubscribed() shouldBe EventA(2) + eventAVar2.nowIfSubscribed() shouldBe EventA(2) + + eventVar.set(EventA(3)) + + eventVar.nowIfSubscribed() shouldBe EventA(3) + eventAVar.nowIfSubscribed() shouldBe EventA(3) + eventAVar2.nowIfSubscribed() shouldBe EventA(3) + } + + it should "map and now()" in { + val variable = Var(1) + val mapped = variable.map(_ + 1) + + variable.nowIfSubscribedOption() shouldBe Some(1) + variable.now() shouldBe 1 + variable.nowIfSubscribedOption() shouldBe Some(1) + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 2 + mapped.nowIfSubscribedOption() shouldBe None + + variable.set(2) + + variable.nowIfSubscribedOption() shouldBe Some(2) + variable.now() shouldBe 2 + variable.nowIfSubscribedOption() shouldBe Some(2) + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 3 + mapped.nowIfSubscribedOption() shouldBe None + } + + it should "drop" in { + var triggers1 = List.empty[Int] + val variable1 = Var(1) + val variable1Logged = variable1.drop(1).tap(triggers1 ::= _).toRx + + triggers1 shouldBe List.empty + variable1Logged.nowIfSubscribedOption() shouldBe None + triggers1 shouldBe List.empty + + val cancelable = variable1Logged.unsafeSubscribe() + + triggers1 shouldBe List.empty + variable1Logged.nowIfSubscribedOption() shouldBe Some(None) + triggers1 shouldBe List.empty + + variable1.set(2) + + triggers1 shouldBe List(2) + variable1Logged.nowIfSubscribedOption() shouldBe Some(Some(2)) + triggers1 shouldBe List(2) + + variable1.set(3) + + triggers1 shouldBe List(3, 2) + variable1Logged.nowIfSubscribedOption() shouldBe Some(Some(3)) + triggers1 shouldBe List(3, 2) + + cancelable.unsafeCancel() + + variable1.set(4) + + triggers1 shouldBe List(3, 2) + variable1Logged.nowIfSubscribedOption() shouldBe None + triggers1 shouldBe List(3, 2) + + variable1Logged.now() shouldBe None + triggers1 shouldBe List(3, 2) + + variable1.set(5) + + triggers1 shouldBe List(3, 2) + variable1Logged.nowIfSubscribedOption() shouldBe None + triggers1 shouldBe List(3, 2) + + variable1Logged.now() shouldBe None + triggers1 shouldBe List(3, 2) + } + + it should "subscribe and now on rx with lazy subscriptions" in { + var triggers1 = List.empty[Int] + var triggerRxCount = 0 + + val variable1 = Var(1) + val variable1Logged = variable1.tap(triggers1 ::= _) + + val mapped = Rx { + triggerRxCount += 1 + variable1Logged() + 1 + } + + val cancelable = mapped.unsafeSubscribe() + + triggers1 shouldBe List(1) + triggerRxCount shouldBe 1 + + mapped.nowIfSubscribedOption() shouldBe Some(2) + mapped.now() shouldBe 2 + mapped.nowIfSubscribedOption() shouldBe Some(2) + + variable1.set(2) + + triggers1 shouldBe List(2, 1) + triggerRxCount shouldBe 2 + + mapped.nowIfSubscribedOption() shouldBe Some(3) + mapped.now() shouldBe 3 + mapped.nowIfSubscribedOption() shouldBe Some(3) + + cancelable.unsafeCancel() + + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 3 + mapped.nowIfSubscribedOption() shouldBe None + + triggers1 shouldBe List(2, 2, 1) + triggerRxCount shouldBe 3 + } + + it should "combine rx and rxlater in Rx" in { + var triggers1 = List.empty[Int] + var triggers2 = List.empty[Int] + var triggerRxCount = 0 + + val variable1 = VarLater[Int]() + val variable2 = Var(1) + val variable1Logged = variable1.tap(triggers1 ::= _) + val variable2Logged = variable2.tap(triggers2 ::= _) + + val mapped = Rx { + triggerRxCount += 1 + variable1Logged.toRx().getOrElse(0) + variable2Logged() + } + + triggers1 shouldBe List.empty + triggers2 shouldBe List.empty + triggerRxCount shouldBe 0 + + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 1 + mapped.nowIfSubscribedOption() shouldBe None + + triggers1 shouldBe List.empty + triggers2 shouldBe List(1) + triggerRxCount shouldBe 1 + + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 1 + mapped.nowIfSubscribedOption() shouldBe None + + variable1.set(10) + + triggers1 shouldBe List.empty + triggers2 shouldBe List(1, 1) + triggerRxCount shouldBe 2 + + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 11 + mapped.nowIfSubscribedOption() shouldBe None + + triggers1 shouldBe List(10) + triggers2 shouldBe List(1, 1, 1) + triggerRxCount shouldBe 3 + + val cancelable = mapped.unsafeSubscribe() + + triggers1 shouldBe List(10, 10) + triggers2 shouldBe List(1, 1, 1, 1) + triggerRxCount shouldBe 4 + + mapped.nowIfSubscribedOption() shouldBe Some(11) + mapped.now() shouldBe 11 + mapped.nowIfSubscribedOption() shouldBe Some(11) + + variable1.set(20) + + triggers1 shouldBe List(20, 10, 10) + triggers2 shouldBe List(1, 1, 1, 1) + triggerRxCount shouldBe 5 + + mapped.nowIfSubscribedOption() shouldBe Some(21) + mapped.now() shouldBe 21 + mapped.nowIfSubscribedOption() shouldBe Some(21) + + variable2.set(2) - val eventAVarOption: Option[Var[EventA]] = eventVar.prismO(GenPrism[Event, EventA]) - val eventAVarOption2: Option[Var[EventA]] = eventVar.subType[EventA] - val eventNotAVarOption: Option[Var[EventA]] = eventNotAVar.prismO(GenPrism[Event, EventA]) + triggers1 shouldBe List(20, 10, 10) + triggers2 shouldBe List(2, 1, 1, 1, 1) + triggerRxCount shouldBe 6 - eventAVarOption.isDefined shouldBe true - eventAVarOption2.isDefined shouldBe true - eventNotAVarOption.isDefined shouldBe false + mapped.nowIfSubscribedOption() shouldBe Some(22) + mapped.now() shouldBe 22 + mapped.nowIfSubscribedOption() shouldBe Some(22) + + variable2.set(3) + + mapped.nowIfSubscribedOption() shouldBe Some(23) + mapped.now() shouldBe 23 + mapped.nowIfSubscribedOption() shouldBe Some(23) + + triggers1 shouldBe List(20, 10, 10) + triggers2 shouldBe List(3, 2, 1, 1, 1, 1) + triggerRxCount shouldBe 7 + + variable1.set(30) + + mapped.nowIfSubscribedOption() shouldBe Some(33) + mapped.now() shouldBe 33 + mapped.nowIfSubscribedOption() shouldBe Some(33) + + triggers1 shouldBe List(30, 20, 10, 10) + triggers2 shouldBe List(3, 2, 1, 1, 1, 1) + triggerRxCount shouldBe 8 + + cancelable.unsafeCancel() + + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 33 + mapped.nowIfSubscribedOption() shouldBe None + + triggers1 shouldBe List(30, 30, 20, 10, 10) + triggers2 shouldBe List(3, 3, 2, 1, 1, 1, 1) + triggerRxCount shouldBe 9 + } + + it should "now() in Rx, and owners with lazy subscriptions" in { + var triggers1 = List.empty[Int] + var triggers2 = List.empty[Int] + var triggerRxCount = 0 + + val variable1 = Var(1) + val variable2 = Var(1) + val variable1Logged = variable1.tap(triggers1 ::= _) + val variable2Logged = variable2.tap(triggers2 ::= _) + + // use mock locally + implicitly[NowOwner].isInstanceOf[LiveOwner] shouldBe false + val mapped = Rx { + // use mock locally + implicitly[NowOwner].isInstanceOf[LiveOwner] shouldBe true + + triggerRxCount += 1 + variable1Logged() + variable2Logged.now() + } - val eventAVar = eventAVarOption.get - val eventAVar2 = eventAVarOption2.get + triggers1 shouldBe List.empty + triggers2 shouldBe List.empty + triggerRxCount shouldBe 0 - eventVar.now() shouldBe EventA(0) - eventAVar.now() shouldBe EventA(0) - eventAVar2.now() shouldBe EventA(0) + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 2 + mapped.nowIfSubscribedOption() shouldBe None - eventAVar.set(EventA(1)) + triggers1 shouldBe List(1) + triggers2 shouldBe List(1) + triggerRxCount shouldBe 1 - eventVar.now() shouldBe EventA(1) - eventAVar.now() shouldBe EventA(1) - eventAVar2.now() shouldBe EventA(1) + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 2 + mapped.nowIfSubscribedOption() shouldBe None - eventVar.set(EventB("he")) + triggers1 shouldBe List(1, 1) + triggers2 shouldBe List(1, 1) + triggerRxCount shouldBe 2 - eventVar.now() shouldBe EventB("he") - eventAVar.now() shouldBe EventA(1) - eventAVar2.now() shouldBe EventA(1) + val cancelable = mapped.unsafeSubscribe() - eventAVar.set(EventA(2)) + triggers1 shouldBe List(1, 1, 1) + triggers2 shouldBe List(1, 1, 1) + triggerRxCount shouldBe 3 - eventVar.now() shouldBe EventA(2) - eventAVar.now() shouldBe EventA(2) - eventAVar2.now() shouldBe EventA(2) + mapped.nowIfSubscribedOption() shouldBe Some(2) + mapped.now() shouldBe 2 + mapped.nowIfSubscribedOption() shouldBe Some(2) - eventVar.set(EventA(3)) + variable1.set(2) + + triggers1 shouldBe List(2, 1, 1, 1) + triggers2 shouldBe List(1, 1, 1) + triggerRxCount shouldBe 4 + + mapped.nowIfSubscribedOption() shouldBe Some(3) + mapped.now() shouldBe 3 + mapped.nowIfSubscribedOption() shouldBe Some(3) + + triggers1 shouldBe List(2, 1, 1, 1) + triggers2 shouldBe List(1, 1, 1) + triggerRxCount shouldBe 4 + + variable2.set(10) + + mapped.nowIfSubscribedOption() shouldBe Some(3) + mapped.now() shouldBe 3 + mapped.nowIfSubscribedOption() shouldBe Some(3) + + triggers1 shouldBe List(2, 1, 1, 1) + triggers2 shouldBe List(10, 1, 1, 1) + triggerRxCount shouldBe 4 + + variable1.set(3) + + mapped.nowIfSubscribedOption() shouldBe Some(13) + mapped.now() shouldBe 13 + mapped.nowIfSubscribedOption() shouldBe Some(13) + + triggers1 shouldBe List(3, 2, 1, 1, 1) + triggers2 shouldBe List(10, 1, 1, 1) + triggerRxCount shouldBe 5 + + cancelable.unsafeCancel() + + mapped.nowIfSubscribedOption() shouldBe None + mapped.now() shouldBe 13 + mapped.nowIfSubscribedOption() shouldBe None + + triggers1 shouldBe List(3, 3, 2, 1, 1, 1) + triggers2 shouldBe List(10, 10, 1, 1, 1) + triggerRxCount shouldBe 6 + } + + it should "start and stop" in { + var triggers1 = List.empty[Int] + var results1 = List.empty[Int] + + val variable1 = Var(1) + + val cancelable1 = variable1.tap(triggers1 ::= _).unsafeForeach(results1 ::= _) + + triggers1 shouldBe List(1) + results1 shouldBe List(1) + + variable1.set(2) + + triggers1 shouldBe List(2, 1) + results1 shouldBe List(2, 1) + + cancelable1.unsafeCancel() + variable1.set(3) + + triggers1 shouldBe List(2, 1) + results1 shouldBe List(2, 1) + + val cancelable1b = variable1.tap(triggers1 ::= _).unsafeForeach(results1 ::= _) + + triggers1 shouldBe List(3, 2, 1) + results1 shouldBe List(3, 2, 1) + + variable1.set(4) + + triggers1 shouldBe List(4, 3, 2, 1) + results1 shouldBe List(4, 3, 2, 1) + + cancelable1b.unsafeCancel() + variable1.set(5) + + triggers1 shouldBe List(4, 3, 2, 1) + results1 shouldBe List(4, 3, 2, 1) + } + + "RxEvent" should "combine, start and stop" in { + var triggers1 = List.empty[Int] + var triggers2 = List.empty[Int] + var triggerRxCount = 0 + var results1 = List.empty[Int] + var results2 = List.empty[Int] + + val variable1 = VarEvent[Int]() + val rx2 = RxEvent(1, 2) + val variable1Logged = variable1.tap(triggers1 ::= _) + val rx2Logged = rx2.tap(triggers2 ::= _) + + // use mock locally + val mapped = variable1Logged.combineLatestMap(rx2Logged) { (a, b) => + triggerRxCount += 1 + a + b + } - eventVar.now() shouldBe EventA(3) - eventAVar.now() shouldBe EventA(3) - eventAVar2.now() shouldBe EventA(3) - }).unsafeRunSync() + triggers1 shouldBe List.empty + triggers2 shouldBe List.empty + triggerRxCount shouldBe 0 + results1 shouldBe List.empty + results2 shouldBe List.empty + + val cancelable1 = mapped.unsafeForeach(results1 ::= _) + + triggers1 shouldBe List.empty + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 0 + results1 shouldBe List.empty + results2 shouldBe List.empty + + variable1.set(100) + + triggers1 shouldBe List(100) + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 1 + results1 shouldBe List(102) + results2 shouldBe List.empty + + variable1.set(200) + + triggers1 shouldBe List(200, 100) + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 2 + results1 shouldBe List(202, 102) + results2 shouldBe List.empty + + val cancelable2 = mapped.unsafeForeach(results2 ::= _) + + triggers1 shouldBe List(200, 100) + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 2 + results1 shouldBe List(202, 102) + results2 shouldBe List.empty + + variable1.set(300) + + triggers1 shouldBe List(300, 200, 100) + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 3 + results1 shouldBe List(302, 202, 102) + results2 shouldBe List(302) + + cancelable1.unsafeCancel() + + triggers1 shouldBe List(300, 200, 100) + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 3 + results1 shouldBe List(302, 202, 102) + results2 shouldBe List(302) + + variable1.set(400) + + triggers1 shouldBe List(400, 300, 200, 100) + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 4 + results1 shouldBe List(302, 202, 102) + results2 shouldBe List(402, 302) + + cancelable2.unsafeCancel() + variable1.set(500) + + triggers1 shouldBe List(400, 300, 200, 100) + triggers2 shouldBe List(2, 1) + triggerRxCount shouldBe 4 + results1 shouldBe List(302, 202, 102) + results2 shouldBe List(402, 302) + + val cancelable1b = mapped.unsafeForeach(results1 ::= _) + + triggers1 shouldBe List(400, 300, 200, 100) + triggers2 shouldBe List(2, 1, 2, 1) + triggerRxCount shouldBe 4 + results1 shouldBe List(302, 202, 102) + results2 shouldBe List(402, 302) + + variable1.set(1000) + + triggers1 shouldBe List(1000, 400, 300, 200, 100) + triggers2 shouldBe List(2, 1, 2, 1) + triggerRxCount shouldBe 5 + results1 shouldBe List(1002, 302, 202, 102) + results2 shouldBe List(402, 302) + + cancelable1b.unsafeCancel() + variable1.set(2000) + + triggers1 shouldBe List(1000, 400, 300, 200, 100) + triggers2 shouldBe List(2, 1, 2, 1) + triggerRxCount shouldBe 5 + results1 shouldBe List(1002, 302, 202, 102) + results2 shouldBe List(402, 302) } }