Skip to content

Commit

Permalink
make reactive state variables lazy (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman authored Nov 10, 2023
1 parent 809bd9d commit feff7f1
Show file tree
Hide file tree
Showing 17 changed files with 1,275 additions and 549 deletions.
104 changes: 62 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ This library includes:

Reactive core library with typeclasses:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.8.0"
```

```scala
import colibri._
```

Reactive variables with hot distinct observables (a bit like scala-rx):
Reactive variables with lazy, distinct, shared state variables (a bit like scala-rx, but lazy):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.8.0"
```

```scala
Expand All @@ -32,7 +32,7 @@ import colibri.reactive._

For jsdom-based operations in the browser (`EventObservable`, `Storage`):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.8.0"
```

```scala
Expand All @@ -41,7 +41,7 @@ import colibri.jsdom._

For scala.rx support (only Scala 2.x):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.8.0"
```

```scala
Expand All @@ -50,7 +50,7 @@ import colibri.ext.rx._

For airstream support:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.8.0"
```

```scala
Expand All @@ -59,7 +59,7 @@ import colibri.ext.airstream._

For zio support:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.8.0"
```

```scala
Expand All @@ -68,7 +68,7 @@ import colibri.ext.zio._

For fs2 support (`Source` only):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.8.0"
```

```scala
Expand Down Expand Up @@ -146,79 +146,99 @@ You can convert any `Source` into an `Observable` with `Observable.lift(source)`

## Reactive variables

The module `colibri-reactive` exposes reactive variables. This is hot, distinct observables that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy `Observable` in the core `colibri` library.
The module `colibri-reactive` exposes reactive variables. This is lazy, distinct shared state variables (internally using observables) that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy `Observable` in the core `colibri` library.

This module behaves very similar to scala-rx - just built on top of colibri Observables for seamless integration and powerful operators. It is not entirely glitch-free because invalid state can appear in operators like map or foreach, but you always have a consistent state in `now()` and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using `dropSyncGlitches` which will introduce an async boundary (micro-task).
This module behaves similar to scala-rx - though variables are not hot and it is built on top of colibri Observables for seamless integration and powerful operators.

The whole thing is not entirely glitch-free, as invalid state can appear in operators like map or foreach. But you always have a consistent state in `now()` and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using `dropSyncGlitches` which will introduce an async boundary (micro-task).

A state variable is of type `Var[A] extends Rx[A] with RxWriter[A]`.

The laziness of variables means that the current value is only tracked if anyone subscribes to the `Rx[A]`. So an Rx does not compute anything on its own. You can still always call `now()` on it - if it is currently not subscribed, it will lazily calculate the current value.

Example:

```scala

import colibri.reactive._

import colibri.owner.unsafeImplicits._ // dangerous. This never cancels subscriptions. See below!

val variable = Var(1)
val variable2 = Var("Test")

val rx = Rx {
s"${variable()} - ${variable2()}"
}

rx.foreach(println(_))
val cancelable = rx.unsafeForeach(println(_))

println(variable.now()) // 1
println(variable2.now()) // "Test"
println(rx.now()) // "1 - Test"

variable.set(2)
variable.set(2) // println("2 - Test")

println(variable.now()) // 2
println(variable2.now()) // "Test"
println(rx.now()) // "2 - Test"

variable2.set("Foo")
variable2.set("Foo") // println("2 - Foo")

println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"

```
cancelable.unsafeCancel()

If you want to work with reactive variables (hot observable), then someone need to cleanup the subscriptions. We call this concept an `Owner`. We use an *unsafe* owner in the above example. It actually never cleans up. It should only ever be used in your main method or for global state.
println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"

You can even work without ever using the unsafe owner or having to pass it implictly. You can use `Owned` blocks instead. Inside an `Owned` block, you will have to return a type that has a `SubscriptionOwner` instance. Example:
variable.set(3) // no println

```scala
// now calculates new value lazily
println(variable.now()) // 3
println(variable2.now()) // "Foo"
println(rx.now()) // "3 - Foo"
```

import colibri._
Apart from `Rx` which always has an initial value, there is `RxLater` (and `VarLater`) which will eventually have a value (both extend RxState which extends RxSource). It also meant for representing state just without an initial state. It is lazy, distinct and has shared execution just like `Rx`.

```
import colibri.reactive._
import cats.effect.SyncIO
sealed trait Modifier
object Modifier {
case class ReactiveModifier(rx: Rx[String]) extends Modifier
case class SubscriptionModifier(subscription: () => Cancelable) extends Modifier
case class CombineModifier(modifierA: Modifier, modifierB: Modifier) extends Modifier
val variable = VarLater[Int]()
implicit object subcriptionOwner extends SubscriptionOwner[Modifier] {
def own(owner: Modifier)(subscription: () => Cancelable): Modifier = CombineModifier(owner, SubscriptionModifier(subscription))
}
}
val stream1 = RxLater.empty
val stream2 = RxLater.future(Future.successful(1)).map(_ + 1)
val component: SyncIO[Modifier] = Owned {
val variable = Var(1)
val mapped = rx.map(_ + 1)
val cancelable = variable.unsafeForeach(println(_))
val cancelable1 = stream1.unsafeForeach(println(_))
val cancelable2 = stream2.unsafeForeach(println(_))
val rx = Rx {
"Hallo: ${mapped()}"
}
println(variable.toRx.now()) // None
println(stream1.toRx.now()) // None
println(stream2.toRx.now()) // Some(2)
ReactiveModifier(rx)
}
variable.set(13)
println(variable.toRx.now()) // Some(13)
```

There also exist `RxEvent` and `VarEvent`, which are event observables with shared execution. That is they behave like `Rx` and `Var` such that transformations are only applied once and not per subscription. But `RxEvent` and `VarEvent` are not distinct and have no current value. They should be used for event streams.

```
import colibri.reactive._
val variable = VarEvent[Int]()
val stream = RxEvent.empty
val mapped = RxEvent.merge(variable.tap(println(_)).map(_ + 1), stream)
val cancelable = mapped.unsafeForeach(println(_))
```

For example, [Outwatch](https://github.com/outwatch/outwatch) supports `Owned`:
[Outwatch](https://github.com/outwatch/outwatch) works perfectly with Rx (or RxLater, RxEvent which all extend RxSource) - just like Observable.

```scala

Expand All @@ -227,7 +247,7 @@ import outwatch.dsl._
import colibri.reactive._
import cats.effect.SyncIO

val component: SyncIO[VModifier] = Owned {
val component: VModifier = {
val variable = Var(1)
val mapped = rx.map(_ + 1)

Expand All @@ -241,9 +261,9 @@ val component: SyncIO[VModifier] = Owned {

### Memory management

Every subscription that is created inside of colibri-reactive methods is owned by an implicit `Owner`. For example `map` or `foreach` take an implicit `Owner`. As long as the `Owner` is cancelled when it is not needed anymore, all subscriptions will be cleaned up. The exception is the `Owner.unsafeGlobal` that never cleans up and is meant for global state.
The same principles as for Observables hold. Any cancelable that is returned from the API needs to be handled by the the caller. Best practice: use subscribe/foreach as seldomly as possible - only in selected spots or within a library.

If you are working with `Outwatch`, you can just use `Owned`-blocks returning `VModifier` and everything is handled automatically for you. No memory leaks.
If you are working with `Outwatch`, you can just use `Rx` without ever subscribing yourself. Then all memory management is handled for you automatically. No memory leaks.

## Information

Expand Down
3 changes: 2 additions & 1 deletion colibri/src/main/scala/colibri/Cancelable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ object Cancelable {

def unsafeAdd(subscription: () => Cancelable): Unit = if (buffer != null) {
val cancelable = subscription()
buffer.push(cancelable)
if (buffer == null) cancelable.unsafeCancel()
else buffer.push(cancelable)
()
}

Expand Down
5 changes: 2 additions & 3 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ object Observable {
def unsafeSubscribe(sink2: Observer[A]): Cancelable = source.unsafeSubscribe(Observer.combine(sink, sink2))
}

@deprecated("Use via instead", "0.7.8")
def to(sink: Observer[A]): Observable[Unit] = via(sink).void

@deprecated("Use tap instead", "0.7.8")
Expand Down Expand Up @@ -512,9 +511,9 @@ object Observable {
@deprecated("Use scan0 instead", "0.7.8")
def scan0ToList: Observable[List[A]] = scan0(List.empty[A])((list, x) => x :: list)

def scan0[B](seed: B)(f: (B, A) => B): Observable[B] = scan(seed)(f).prepend(seed)
def scan0[B](seed: => B)(f: (B, A) => B): Observable[B] = scan(seed)(f).prependEval(seed)

def scan[B](seed: B)(f: (B, A) => B): Observable[B] = new Observable[B] {
def scan[B](seed: => B)(f: (B, A) => B): Observable[B] = new Observable[B] {
def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(sink.contrascan(seed)(f))
}

Expand Down
6 changes: 3 additions & 3 deletions colibri/src/main/scala/colibri/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ object Observer {
def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error)
}

def contraflattenIterable[B]: Observer[Iterable[A]] = contramapIterable(identity)
def contraflattenEither[B]: Observer[Either[Throwable, A]] = contramapEither(identity)
def contraflattenOption[B]: Observer[Option[A]] = contramapFilter(identity)
def contraflattenIterable: Observer[Iterable[A]] = contramapIterable(identity)
def contraflattenEither: Observer[Either[Throwable, A]] = contramapEither(identity)
def contraflattenOption: Observer[Option[A]] = contramapFilter(identity)

// TODO return effect
def contrascan[B](seed: A)(f: (A, B) => A): Observer[B] = new Observer[B] {
Expand Down
8 changes: 8 additions & 0 deletions colibri/src/main/scala/colibri/Subject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValu

@inline def now(): Option[A] = current

def unsafeResetState(): Unit = {
current = None
}

def unsafeOnNext(value: A): Unit = {
current = Some(value)
state.unsafeOnNext(value)
Expand All @@ -37,6 +41,10 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] {

@inline def now(): Seq[A] = current.toSeq

def unsafeResetState(): Unit = {
current.clear()
}

def unsafeOnNext(value: A): Unit = {
current += value
state.unsafeOnNext(value)
Expand Down

This file was deleted.

20 changes: 4 additions & 16 deletions reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,14 @@ package colibri.reactive

import colibri.{Observable, Cancelable}

trait OwnerPlatform {
@annotation.compileTimeOnly(
"No implicit Owner is available here! Wrap inside `Owned { <code> }`, or provide an implicit `Owner`, or `import Owner.unsafeImplicits._` (dangerous).",
)
implicit object compileTimeMock extends Owner {
def unsafeSubscribe(): Cancelable = ???
def unsafeOwn(subscription: () => Cancelable): Unit = ???
def cancelable: Cancelable = ???
}
}

trait LiveOwnerPlatform {
@annotation.compileTimeOnly(
"No implicit LiveOwner is available here! Wrap inside `Rx { <code> }`, or provide an implicit `LiveOwner`.",
)
implicit object compileTimeMock extends LiveOwner {
def unsafeSubscribe(): Cancelable = ???
def unsafeOwn(subscription: () => Cancelable): Unit = ???
def cancelable: Cancelable = ???
def unsafeLive[A](rx: Rx[A]): A = ???
def liveObservable: Observable[Any] = ???
def cancelable: Cancelable = ???
def unsafeNow[A](rx: Rx[A]): A = ???
def unsafeLive[A](rx: Rx[A]): A = ???
def liveObservable: Observable[Any] = ???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package colibri.reactive
import colibri.reactive.internal.MacroUtils

trait RxPlatform {
def apply[R](f: R)(implicit owner: Owner): Rx[R] = macro MacroUtils.rxImpl[R]
def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R]
}
Loading

0 comments on commit feff7f1

Please sign in to comment.