Skip to content

Commit

Permalink
up-to-date pull-accessors for now and update
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Dec 16, 2022
1 parent d88e569 commit 93f7829
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 44 deletions.
8 changes: 8 additions & 0 deletions colibri/src/main/scala/colibri/Subject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValu

@inline def now(): Option[A] = current

def unsafeResetState(): Unit = {
current = None
}

def unsafeOnNext(value: A): Unit = {
current = Some(value)
state.unsafeOnNext(value)
Expand All @@ -37,6 +41,10 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] {

@inline def now(): Seq[A] = current.toSeq

def unsafeResetState(): Unit = {
current.clear()
}

def unsafeOnNext(value: A): Unit = {
current += value
state.unsafeOnNext(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ package colibri.reactive

import colibri.{Observable, Cancelable}

trait NowOwnerPlatform {
@annotation.compileTimeOnly(
"No implicit NowOwner is available here! Wrap inside `Rx { <code> }` or `Rx.pull { <code> }`, or provide an implicit `NowOwner`.",
)
implicit object compileTimeMock extends NowOwner {
def cancelable: Cancelable = ???
def unsafeNow[A](rx: Rx[A]): A = ???
}
}

trait LiveOwnerPlatform {
@annotation.compileTimeOnly(
"No implicit LiveOwner is available here! Wrap inside `Rx { <code> }`, or provide an implicit `LiveOwner`.",
Expand Down
5 changes: 4 additions & 1 deletion reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package colibri.reactive

import cats.effect.SyncIO
import colibri.reactive.internal.MacroUtils

trait RxPlatform {
def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R]
def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R]
def pull[R](f: R): SyncIO[R] = macro MacroUtils.rxPullImpl[R]
def unsafePull[R](f: R): R = macro MacroUtils.rxUnsafePullImpl[R]
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package colibri.reactive.internal

import colibri.reactive.{Rx, LiveOwner}
import cats.effect.SyncIO
import colibri.reactive.{Rx, LiveOwner, NowOwner}
import scala.reflect.macros._

// Inspired by scala.rx
object MacroUtils {

private val liveOwnerName = "colibriLiveOwner"

def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName): c.Tree = {
def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName, target: c.Type, replaces: Set[c.Type]): c.Tree = {
import c.universe._

val implicitLiveOwnerAtCaller = c.inferImplicitValue(typeOf[LiveOwner], silent = false)
val implicitOwnerAtCallers = replaces.map(c.inferImplicitValue(_, silent = false))

object transformer extends c.universe.Transformer {
override def transform(tree: c.Tree): c.Tree = {
val shouldReplaceOwner = tree != null &&
tree.isTerm &&
tree.tpe =:= implicitLiveOwnerAtCaller.tpe &&
tree.tpe <:< typeOf[LiveOwner] &&
implicitOwnerAtCallers.exists(_.tpe =:= tree.tpe) &&
tree.tpe <:< target &&
!(tree.tpe =:= typeOf[Nothing])

if (shouldReplaceOwner) q"$newOwner"
Expand All @@ -31,9 +30,14 @@ object MacroUtils {
def rxImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[Rx[R]] = {
import c.universe._

val newOwner = c.freshName(TermName(liveOwnerName))
val newOwner = c.freshName(TermName("colibriLiveOwner"))

val newTree = c.untypecheck(injectOwner(c)(f.tree, newOwner))
val newTree = c.untypecheck(injectOwner(c)(
f.tree,
newOwner,
typeOf[NowOwner],
replaces = Set(typeOf[NowOwner], typeOf[LiveOwner])
))

val tree = q"""
_root_.colibri.reactive.Rx.function { ($newOwner: _root_.colibri.reactive.LiveOwner) =>
Expand All @@ -45,4 +49,50 @@ object MacroUtils {

c.Expr(tree)
}

def rxPullImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[SyncIO[R]] = {
import c.universe._

val newOwner = c.freshName(TermName("colibriNowOwner"))

val newTree = c.untypecheck(injectOwner(c)(
f.tree,
newOwner,
typeOf[NowOwner],
replaces = Set(typeOf[NowOwner])
))

val tree = q"""
_root_.colibri.reactive.Rx.functionPull { ($newOwner: _root_.colibri.reactive.NowOwner) =>
$newTree
}
"""

// println(tree)

c.Expr(tree)
}

def rxUnsafePullImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[R] = {
import c.universe._

val newOwner = c.freshName(TermName("colibriNowOwner"))

val newTree = c.untypecheck(injectOwner(c)(
f.tree,
newOwner,
typeOf[NowOwner],
replaces = Set(typeOf[NowOwner])
))

val tree = q"""
_root_.colibri.reactive.Rx.functionUnsafePull { ($newOwner: _root_.colibri.reactive.NowOwner) =>
$newTree
}
"""

// println(tree)

c.Expr(tree)
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
package colibri.reactive

trait NowOwnerPlatform
trait LiveOwnerPlatform
6 changes: 5 additions & 1 deletion reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package colibri.reactive

import cats.effect.SyncIO

trait RxPlatform {
def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f)
def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f)
def pull[R](f: NowOwner ?=> R): SyncIO[R] = Rx.functionPull(implicit owner => f)
def unsafePull[R](f: NowOwner ?=> R): R = Rx.functionUnsafePull(implicit owner => f)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,28 @@ package colibri.reactive

import colibri._

@annotation.implicitNotFound(
"No implicit NowOwner is available here! Wrap inside `Rx { <code> }` or `Rx.pull { <code> }`, or provide an implicit `NowOwner`.",
)
trait NowOwner {
def unsafeNow[A](rx: Rx[A]): A

def cancelable: Cancelable
}
object NowOwner extends NowOwnerPlatform {
def unsafeHotRef(): NowOwner = LiveOwner.unsafeHotRef()
}

@annotation.implicitNotFound(
"No implicit LiveOwner is available here! Wrap inside `Rx { <code> }`, or provide an implicit `LiveOwner`.",
)
trait LiveOwner {
trait LiveOwner extends NowOwner {
def liveObservable: Observable[Any]

def unsafeLive[A](rx: Rx[A]): A
def unsafeNow[A](rx: Rx[A]): A

def cancelable: Cancelable
}
object LiveOwner extends LiveOwnerPlatform {
def create(): LiveOwner = new LiveOwner {
def unsafeHotRef(): LiveOwner = new LiveOwner {
private val ref = Cancelable.builder()

private val subject = Subject.publish[Any]()
Expand Down
72 changes: 55 additions & 17 deletions reactive/src/main/scala/colibri/reactive/Reactive.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package colibri.reactive

import cats.Monoid
import cats.effect.SyncIO
import colibri._
import colibri.effect._
import monocle.{Iso, Lens, Prism}

import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.util.control.NonFatal

object RxMissingNowException extends Exception("Missing current value inside an Rx. Make sure, the Rx has active subscriptions when calling nowGet.")

trait Rx[+A] {
def observable: Observable[A]
def nowOption(): Option[A]

final def nowGet(): A = nowOption().getOrElse(throw RxMissingNowException)

final def apply()(implicit liveOwner: LiveOwner): A = liveOwner.unsafeLive(this)
final def now()(implicit liveOwner: LiveOwner): A = liveOwner.unsafeNow(this)

final def map[B](f: A => B): Rx[B] = transformRxSync(_.map(f))
final def mapEither[B](f: A => Either[Throwable, B]): Rx[B] = transformRxSync(_.mapEither(f))
final def tap(f: A => Unit): Rx[A] = transformRxSync(_.tap(f))
Expand All @@ -37,39 +34,74 @@ trait Rx[+A] {
final def asEffect[F[_]: RunEffect, B](value: F[B])(seed: => B): Rx[B] = transformRx(_.asEffect(value))(seed)
final def asFuture[B](value: => Future[B])(seed: => B): Rx[B] = transformRx(_.asFuture(value))(seed)

final def via(writer: RxWriter[A]): Rx[A] = transformRxSync(_.via(writer.observer))

final def switchMap[B](f: A => Rx[B]): Rx[B] = transformRxSync(_.switchMap(f andThen (_.observable)))
final def mergeMap[B](f: A => Rx[B]): Rx[B] = transformRxSync(_.mergeMap(f andThen (_.observable)))

final def transformRx[B](f: Observable[A] => Observable[B])(seed: => B): Rx[B] = Rx.observable(f(observable))(seed)
final def transformRxSync[B](f: Observable[A] => Observable[B]): Rx[B] = Rx.observableSync(f(observable))

final def unsafeSubscribe(writer: RxWriter[A]): Cancelable = observable.unsafeSubscribe(writer.observer)
final def unsafeSubscribe(observer: Observer[A]): Cancelable = observable.unsafeSubscribe(observer)
final def unsafeSubscribe(): Cancelable = observable.unsafeSubscribe()
final def unsafeForeach(f: A => Unit): Cancelable = observable.unsafeForeach(f)
final def unsafeForeachLater(f: A => Unit): Cancelable = observable.tail.unsafeForeach(f)
final def nowGet(): A = nowOption().getOrElse(throw RxMissingNowException)

final def apply()(implicit owner: LiveOwner): A = owner.unsafeLive(this)
final def now()(implicit owner: NowOwner): A = owner.unsafeNow(this)

final def pullNow: SyncIO[A] = Rx.functionPull(now()(_))

final def unsafePullNow(): A = Rx.functionUnsafePull(now()(_))

final def hot: SyncIO[Rx[A]] = SyncIO(unsafeHot())

final def unsafeHot(): Rx[A] = {
val _ = unsafeSubscribe()
this
}

final def subscribe: SyncIO[Cancelable] = observable.subscribeSyncIO

final def unsafeSubscribe(): Cancelable = observable.unsafeSubscribe()
final def unsafeSubscribe(writer: RxWriter[A]): Cancelable = observable.unsafeSubscribe(writer.observer)

final def unsafeForeach(f: A => Unit): Cancelable = observable.unsafeForeach(f)
final def unsafeForeachLater(f: A => Unit): Cancelable = observable.tail.unsafeForeach(f)
}

object Rx extends RxPlatform {
def function[R](f: LiveOwner => R): Rx[R] = {
val subject = Subject.behavior[Any](())

val observable = subject.switchMap { _ =>
val liveOwner = LiveOwner.create()
val result = f(liveOwner)
Observable[R](result)
.subscribing(liveOwner.liveObservable.dropSyncAll.head.via(subject))
.tapCancel(liveOwner.cancelable.unsafeCancel)
val owner = LiveOwner.unsafeHotRef()
try {
val result = f(owner)
Observable[R](result)
.subscribing(owner.liveObservable.dropSyncAll.head.via(subject))
.tapCancel(owner.cancelable.unsafeCancel)
} catch {
case NonFatal(t) =>
owner.cancelable.unsafeCancel()
Observable.raiseError(t)
case t: Throwable =>
owner.cancelable.unsafeCancel()
throw t
}
}

Rx.observableSync(observable)
}

def functionPull[R](f: NowOwner => R): SyncIO[R] = SyncIO(functionUnsafePull(f))

def functionUnsafePull[R](f: NowOwner => R): R = {
val owner = NowOwner.unsafeHotRef()
try {
f(owner)
} finally {
owner.cancelable.unsafeCancel()
}
}

def const[A](value: A): Rx[A] = new RxConst(value)

def observable[A](observable: Observable[A])(seed: => A): Rx[A] = observableSync(observable.prependEval(seed))
Expand Down Expand Up @@ -133,7 +165,13 @@ object RxWriter {
}

trait Var[A] extends Rx[A] with RxWriter[A] {
final def updateGet(f: A => A) = this.set(f(this.nowGet()))
final def updateGet(f: A => A): Unit = this.set(f(this.nowGet()))

final def update(f: A => A)(implicit owner: NowOwner): Unit = this.set(f(this.now()))

final def pullUpdate(f: A => A): SyncIO[Unit] = SyncIO(unsafePullUpdate(f))

final def unsafePullUpdate(f: A => A): Unit = this.set(f(unsafePullNow()))

final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: Rx[A] => Rx[A2]): Var[A2] = Var.combine(g(this), f(this))
final def transformVarRx(g: Rx[A] => Rx[A]): Var[A] = Var.combine(g(this), this)
Expand Down Expand Up @@ -234,7 +272,7 @@ private final class RxConst[A](value: A) extends Rx[A] {
private final class RxObservableSync[A](inner: Observable[A]) extends Rx[A] {
private val state = new ReplayLatestSubject[A]()

val observable: Observable[A] = inner.dropUntilSyncLatest.distinctOnEquals.multicast(state).refCount
val observable: Observable[A] = inner.dropUntilSyncLatest.distinctOnEquals.tapCancel(state.unsafeResetState).multicast(state).refCount

def nowOption() = state.now()
}
Expand Down
Loading

0 comments on commit 93f7829

Please sign in to comment.