diff --git a/colibri/src/main/scala/colibri/Subject.scala b/colibri/src/main/scala/colibri/Subject.scala index a8e3cba7..330ce243 100644 --- a/colibri/src/main/scala/colibri/Subject.scala +++ b/colibri/src/main/scala/colibri/Subject.scala @@ -13,6 +13,10 @@ final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValu @inline def now(): Option[A] = current + def unsafeResetState(): Unit = { + current = None + } + def unsafeOnNext(value: A): Unit = { current = Some(value) state.unsafeOnNext(value) @@ -37,6 +41,10 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] { @inline def now(): Seq[A] = current.toSeq + def unsafeResetState(): Unit = { + current.clear() + } + def unsafeOnNext(value: A): Unit = { current += value state.unsafeOnNext(value) diff --git a/reactive/src/main/scala-2/colibri/reactive/LiveOwnerPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala similarity index 58% rename from reactive/src/main/scala-2/colibri/reactive/LiveOwnerPlatform.scala rename to reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala index db505c9b..74d57ff7 100644 --- a/reactive/src/main/scala-2/colibri/reactive/LiveOwnerPlatform.scala +++ b/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala @@ -2,6 +2,16 @@ package colibri.reactive import colibri.{Observable, Cancelable} +trait NowOwnerPlatform { + @annotation.compileTimeOnly( + "No implicit NowOwner is available here! Wrap inside `Rx { }` or `Rx.pull { }`, or provide an implicit `NowOwner`.", + ) + implicit object compileTimeMock extends NowOwner { + def cancelable: Cancelable = ??? + def unsafeNow[A](rx: Rx[A]): A = ??? + } +} + trait LiveOwnerPlatform { @annotation.compileTimeOnly( "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", diff --git a/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala index bea3f090..ddfac484 100644 --- a/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala +++ b/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala @@ -1,7 +1,10 @@ package colibri.reactive +import cats.effect.SyncIO import colibri.reactive.internal.MacroUtils trait RxPlatform { - def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R] + def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R] + def pull[R](f: R): SyncIO[R] = macro MacroUtils.rxPullImpl[R] + def unsafePull[R](f: R): R = macro MacroUtils.rxUnsafePullImpl[R] } diff --git a/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala b/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala index cab7942a..16797f66 100644 --- a/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala +++ b/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala @@ -1,24 +1,23 @@ package colibri.reactive.internal -import colibri.reactive.{Rx, LiveOwner} +import cats.effect.SyncIO +import colibri.reactive.{Rx, LiveOwner, NowOwner} import scala.reflect.macros._ // Inspired by scala.rx object MacroUtils { - private val liveOwnerName = "colibriLiveOwner" - - def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName): c.Tree = { + def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName, target: c.Type, replaces: Set[c.Type]): c.Tree = { import c.universe._ - val implicitLiveOwnerAtCaller = c.inferImplicitValue(typeOf[LiveOwner], silent = false) + val implicitOwnerAtCallers = replaces.map(c.inferImplicitValue(_, silent = false)) object transformer extends c.universe.Transformer { override def transform(tree: c.Tree): c.Tree = { val shouldReplaceOwner = tree != null && tree.isTerm && - tree.tpe =:= implicitLiveOwnerAtCaller.tpe && - tree.tpe <:< typeOf[LiveOwner] && + implicitOwnerAtCallers.exists(_.tpe =:= tree.tpe) && + tree.tpe <:< target && !(tree.tpe =:= typeOf[Nothing]) if (shouldReplaceOwner) q"$newOwner" @@ -31,9 +30,14 @@ object MacroUtils { def rxImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[Rx[R]] = { import c.universe._ - val newOwner = c.freshName(TermName(liveOwnerName)) + val newOwner = c.freshName(TermName("colibriLiveOwner")) - val newTree = c.untypecheck(injectOwner(c)(f.tree, newOwner)) + val newTree = c.untypecheck(injectOwner(c)( + f.tree, + newOwner, + typeOf[NowOwner], + replaces = Set(typeOf[NowOwner], typeOf[LiveOwner]) + )) val tree = q""" _root_.colibri.reactive.Rx.function { ($newOwner: _root_.colibri.reactive.LiveOwner) => @@ -45,4 +49,50 @@ object MacroUtils { c.Expr(tree) } + + def rxPullImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[SyncIO[R]] = { + import c.universe._ + + val newOwner = c.freshName(TermName("colibriNowOwner")) + + val newTree = c.untypecheck(injectOwner(c)( + f.tree, + newOwner, + typeOf[NowOwner], + replaces = Set(typeOf[NowOwner]) + )) + + val tree = q""" + _root_.colibri.reactive.Rx.functionPull { ($newOwner: _root_.colibri.reactive.NowOwner) => + $newTree + } + """ + + // println(tree) + + c.Expr(tree) + } + + def rxUnsafePullImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[R] = { + import c.universe._ + + val newOwner = c.freshName(TermName("colibriNowOwner")) + + val newTree = c.untypecheck(injectOwner(c)( + f.tree, + newOwner, + typeOf[NowOwner], + replaces = Set(typeOf[NowOwner]) + )) + + val tree = q""" + _root_.colibri.reactive.Rx.functionUnsafePull { ($newOwner: _root_.colibri.reactive.NowOwner) => + $newTree + } + """ + + // println(tree) + + c.Expr(tree) + } } diff --git a/reactive/src/main/scala-3/colibri/reactive/LiveOwnerPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala similarity index 68% rename from reactive/src/main/scala-3/colibri/reactive/LiveOwnerPlatform.scala rename to reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala index 0248f72f..4c8ac23f 100644 --- a/reactive/src/main/scala-3/colibri/reactive/LiveOwnerPlatform.scala +++ b/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala @@ -1,3 +1,4 @@ package colibri.reactive +trait NowOwnerPlatform trait LiveOwnerPlatform diff --git a/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala index 5f97dbed..a3ff09bf 100644 --- a/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala +++ b/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala @@ -1,5 +1,9 @@ package colibri.reactive +import cats.effect.SyncIO + trait RxPlatform { - def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f) + def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f) + def pull[R](f: NowOwner ?=> R): SyncIO[R] = Rx.functionPull(implicit owner => f) + def unsafePull[R](f: NowOwner ?=> R): R = Rx.functionUnsafePull(implicit owner => f) } diff --git a/reactive/src/main/scala/colibri/reactive/LiveOwner.scala b/reactive/src/main/scala/colibri/reactive/Owner.scala similarity index 68% rename from reactive/src/main/scala/colibri/reactive/LiveOwner.scala rename to reactive/src/main/scala/colibri/reactive/Owner.scala index 2772deed..01f48534 100644 --- a/reactive/src/main/scala/colibri/reactive/LiveOwner.scala +++ b/reactive/src/main/scala/colibri/reactive/Owner.scala @@ -2,19 +2,28 @@ package colibri.reactive import colibri._ +@annotation.implicitNotFound( + "No implicit NowOwner is available here! Wrap inside `Rx { }` or `Rx.pull { }`, or provide an implicit `NowOwner`.", +) +trait NowOwner { + def unsafeNow[A](rx: Rx[A]): A + + def cancelable: Cancelable +} +object NowOwner extends NowOwnerPlatform { + def unsafeHotRef(): NowOwner = LiveOwner.unsafeHotRef() +} + @annotation.implicitNotFound( "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", ) -trait LiveOwner { +trait LiveOwner extends NowOwner { def liveObservable: Observable[Any] def unsafeLive[A](rx: Rx[A]): A - def unsafeNow[A](rx: Rx[A]): A - - def cancelable: Cancelable } object LiveOwner extends LiveOwnerPlatform { - def create(): LiveOwner = new LiveOwner { + def unsafeHotRef(): LiveOwner = new LiveOwner { private val ref = Cancelable.builder() private val subject = Subject.publish[Any]() diff --git a/reactive/src/main/scala/colibri/reactive/Reactive.scala b/reactive/src/main/scala/colibri/reactive/Reactive.scala index 18448f02..1fc31038 100644 --- a/reactive/src/main/scala/colibri/reactive/Reactive.scala +++ b/reactive/src/main/scala/colibri/reactive/Reactive.scala @@ -1,12 +1,14 @@ package colibri.reactive import cats.Monoid +import cats.effect.SyncIO import colibri._ import colibri.effect._ import monocle.{Iso, Lens, Prism} import scala.concurrent.Future import scala.reflect.ClassTag +import scala.util.control.NonFatal object RxMissingNowException extends Exception("Missing current value inside an Rx. Make sure, the Rx has active subscriptions when calling nowGet.") @@ -14,11 +16,6 @@ trait Rx[+A] { def observable: Observable[A] def nowOption(): Option[A] - final def nowGet(): A = nowOption().getOrElse(throw RxMissingNowException) - - final def apply()(implicit liveOwner: LiveOwner): A = liveOwner.unsafeLive(this) - final def now()(implicit liveOwner: LiveOwner): A = liveOwner.unsafeNow(this) - final def map[B](f: A => B): Rx[B] = transformRxSync(_.map(f)) final def mapEither[B](f: A => Either[Throwable, B]): Rx[B] = transformRxSync(_.mapEither(f)) final def tap(f: A => Unit): Rx[A] = transformRxSync(_.tap(f)) @@ -37,22 +34,37 @@ trait Rx[+A] { final def asEffect[F[_]: RunEffect, B](value: F[B])(seed: => B): Rx[B] = transformRx(_.asEffect(value))(seed) final def asFuture[B](value: => Future[B])(seed: => B): Rx[B] = transformRx(_.asFuture(value))(seed) + final def via(writer: RxWriter[A]): Rx[A] = transformRxSync(_.via(writer.observer)) + final def switchMap[B](f: A => Rx[B]): Rx[B] = transformRxSync(_.switchMap(f andThen (_.observable))) final def mergeMap[B](f: A => Rx[B]): Rx[B] = transformRxSync(_.mergeMap(f andThen (_.observable))) final def transformRx[B](f: Observable[A] => Observable[B])(seed: => B): Rx[B] = Rx.observable(f(observable))(seed) final def transformRxSync[B](f: Observable[A] => Observable[B]): Rx[B] = Rx.observableSync(f(observable)) - final def unsafeSubscribe(writer: RxWriter[A]): Cancelable = observable.unsafeSubscribe(writer.observer) - final def unsafeSubscribe(observer: Observer[A]): Cancelable = observable.unsafeSubscribe(observer) - final def unsafeSubscribe(): Cancelable = observable.unsafeSubscribe() - final def unsafeForeach(f: A => Unit): Cancelable = observable.unsafeForeach(f) - final def unsafeForeachLater(f: A => Unit): Cancelable = observable.tail.unsafeForeach(f) + final def nowGet(): A = nowOption().getOrElse(throw RxMissingNowException) + + final def apply()(implicit owner: LiveOwner): A = owner.unsafeLive(this) + final def now()(implicit owner: NowOwner): A = owner.unsafeNow(this) + + final def pullNow: SyncIO[A] = Rx.functionPull(now()(_)) + + final def unsafePullNow(): A = Rx.functionUnsafePull(now()(_)) + + final def hot: SyncIO[Rx[A]] = SyncIO(unsafeHot()) final def unsafeHot(): Rx[A] = { val _ = unsafeSubscribe() this } + + final def subscribe: SyncIO[Cancelable] = observable.subscribeSyncIO + + final def unsafeSubscribe(): Cancelable = observable.unsafeSubscribe() + final def unsafeSubscribe(writer: RxWriter[A]): Cancelable = observable.unsafeSubscribe(writer.observer) + + final def unsafeForeach(f: A => Unit): Cancelable = observable.unsafeForeach(f) + final def unsafeForeachLater(f: A => Unit): Cancelable = observable.tail.unsafeForeach(f) } object Rx extends RxPlatform { @@ -60,16 +72,36 @@ object Rx extends RxPlatform { val subject = Subject.behavior[Any](()) val observable = subject.switchMap { _ => - val liveOwner = LiveOwner.create() - val result = f(liveOwner) - Observable[R](result) - .subscribing(liveOwner.liveObservable.dropSyncAll.head.via(subject)) - .tapCancel(liveOwner.cancelable.unsafeCancel) + val owner = LiveOwner.unsafeHotRef() + try { + val result = f(owner) + Observable[R](result) + .subscribing(owner.liveObservable.dropSyncAll.head.via(subject)) + .tapCancel(owner.cancelable.unsafeCancel) + } catch { + case NonFatal(t) => + owner.cancelable.unsafeCancel() + Observable.raiseError(t) + case t: Throwable => + owner.cancelable.unsafeCancel() + throw t + } } Rx.observableSync(observable) } + def functionPull[R](f: NowOwner => R): SyncIO[R] = SyncIO(functionUnsafePull(f)) + + def functionUnsafePull[R](f: NowOwner => R): R = { + val owner = NowOwner.unsafeHotRef() + try { + f(owner) + } finally { + owner.cancelable.unsafeCancel() + } + } + def const[A](value: A): Rx[A] = new RxConst(value) def observable[A](observable: Observable[A])(seed: => A): Rx[A] = observableSync(observable.prependEval(seed)) @@ -133,7 +165,13 @@ object RxWriter { } trait Var[A] extends Rx[A] with RxWriter[A] { - final def updateGet(f: A => A) = this.set(f(this.nowGet())) + final def updateGet(f: A => A): Unit = this.set(f(this.nowGet())) + + final def update(f: A => A)(implicit owner: NowOwner): Unit = this.set(f(this.now())) + + final def pullUpdate(f: A => A): SyncIO[Unit] = SyncIO(unsafePullUpdate(f)) + + final def unsafePullUpdate(f: A => A): Unit = this.set(f(unsafePullNow())) final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: Rx[A] => Rx[A2]): Var[A2] = Var.combine(g(this), f(this)) final def transformVarRx(g: Rx[A] => Rx[A]): Var[A] = Var.combine(g(this), this) @@ -234,7 +272,7 @@ private final class RxConst[A](value: A) extends Rx[A] { private final class RxObservableSync[A](inner: Observable[A]) extends Rx[A] { private val state = new ReplayLatestSubject[A]() - val observable: Observable[A] = inner.dropUntilSyncLatest.distinctOnEquals.multicast(state).refCount + val observable: Observable[A] = inner.dropUntilSyncLatest.distinctOnEquals.tapCancel(state.unsafeResetState).multicast(state).refCount def nowOption() = state.now() } diff --git a/reactive/src/test/scala/colibri/ReactiveSpec.scala b/reactive/src/test/scala/colibri/ReactiveSpec.scala index ae1f6b24..06b05c82 100644 --- a/reactive/src/test/scala/colibri/ReactiveSpec.scala +++ b/reactive/src/test/scala/colibri/ReactiveSpec.scala @@ -66,67 +66,67 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val cancelR2b = stream.unsafeForeach(received2 ::= _) mapped shouldBe List(5, 4, 3, 2, 1) - received1 shouldBe List(5, 4, 4, 3, 2, 1) + received1 shouldBe List(5, 4, 3, 2, 1) received2 shouldBe List(5, 4, 3, 2) variable.set(6) mapped shouldBe List(6, 5, 4, 3, 2, 1) - received1 shouldBe List(6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(6, 5, 4, 3, 2, 1) received2 shouldBe List(6, 5, 4, 3, 2) val cancelX = stream.unsafeSubscribe() mapped shouldBe List(6, 5, 4, 3, 2, 1) - received1 shouldBe List(6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(6, 5, 4, 3, 2, 1) received2 shouldBe List(6, 5, 4, 3, 2) variable.set(7) mapped shouldBe List(7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(7, 6, 5, 4, 3, 2) variable.set(8) mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(8, 7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) cancelR2b.unsafeCancel() mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(8, 7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) variable.set(9) mapped shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) cancelR1b.unsafeCancel() mapped shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) variable.set(10) mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) cancelX.unsafeCancel() mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) variable.set(11) mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) }