Skip to content

Commit

Permalink
Var.sequence: only trigger outer Rx if collection size changed (#275)
Browse files Browse the repository at this point in the history
Co-authored-by: johannes karoff <johannes@karoff.net>
  • Loading branch information
fdietze and cornerman authored Nov 17, 2023
1 parent 7bb56d5 commit 6f87959
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 20 deletions.
65 changes: 45 additions & 20 deletions reactive/src/main/scala/colibri/reactive/Reactive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import monocle.{Iso, Lens, Prism}

import scala.concurrent.Future
import scala.reflect.ClassTag
import collection.mutable
import scala.util.control.NonFatal
import scala.annotation.unused

Expand Down Expand Up @@ -335,30 +336,54 @@ object Var {
def createStateless[A](write: RxWriter[A], read: Rx[A]): Var[A] = new VarCreateStateless(write, read)

@inline implicit class SeqVarOperations[A](rxvar: Var[Seq[A]]) {
def sequence: Rx[Seq[Var[A]]] = Rx.observableSync(new Observable[Seq[Var[A]]] {

def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = {
rxvar.observable.unsafeSubscribe(
Observer.create(
{ seq =>
sink.unsafeOnNext(seq.zipWithIndex.map { case (a, idx) =>
val observer = new Observer[A] {
def unsafeOnNext(value: A): Unit = {
rxvar.set(seq.updated(idx, value))
}

def unsafeOnError(error: Throwable): Unit = {
sink.unsafeOnError(error)
def sequence: Rx[Seq[Var[A]]] = {
val observable = new Observable[Seq[Var[A]]] {
def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = {
// keep a var for every index of the original sequence
val vars = mutable.ArrayBuffer.empty[Var[A]]

def createIndexVar(idx: Int, seed: A): Var[A] = {
Var[A](seed).transformVarWrite {
_.contramap { newValue =>
rxvar.update(_.updated(idx, newValue))
newValue
}
}
}

rxvar.observable.zipWithIndex.unsafeSubscribe(
Observer.create(
consume = { case (seq, idx) =>
val needsRetrigger = if (seq.size < vars.size) {
vars.remove(seq.size, vars.size - seq.size)
true
} else if (seq.size > vars.size) {
for ((elem, idx) <- seq.zipWithIndex.takeRight(seq.size - vars.size)) {
vars += createIndexVar(idx, seed = elem)
}
true
} else {
idx == 0
}
Var.createStateless(RxWriter.observer(observer), Rx.const(a))
})
},
sink.unsafeOnError,
),
)

assert(seq.size == vars.size)

for ((newValue, elemVar) <- seq.zip(vars)) {
elemVar.set(newValue)
}

if (needsRetrigger) {
sink.unsafeOnNext(vars.toSeq)
}
},
failure = sink.unsafeOnError,
),
)
}
}
})
Rx.observableSync(observable)
}
}

@inline implicit class OptionVarOperations[A](rxvar: Var[Option[A]]) {
Expand Down
38 changes: 38 additions & 0 deletions reactive/src/test/scala/colibri/ReactiveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,44 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers {
sequence.nowIfSubscribed().apply(0).set(2)
variable.nowIfSubscribed() shouldBe Seq(2)
}

{
// only trigger outer Rx if collection size changed
val variable = Var[Seq[Int]](Seq(1,2))
val sequence: Rx[Seq[Var[Int]]] = variable.sequence
var sequenceTriggered = 0
sequence.unsafeForeach(_ => sequenceTriggered += 1)

sequenceTriggered shouldBe 1
sequence.now().size shouldBe 2

sequence.now().apply(0).set(3)
sequence.now().size shouldBe 2
sequenceTriggered shouldBe 1

variable.set(Seq(3,4))
sequence.now().size shouldBe 2
sequence.now().apply(0).now() shouldBe 3
sequence.now().apply(1).now() shouldBe 4
sequenceTriggered shouldBe 1

variable.set(Seq(3,4,5,6))
sequence.now().size shouldBe 4
sequence.now().apply(0).now() shouldBe 3
sequence.now().apply(1).now() shouldBe 4
sequence.now().apply(2).now() shouldBe 5
sequenceTriggered shouldBe 2

variable.set(Seq(7))
sequence.now().size shouldBe 1
sequence.now().apply(0).now() shouldBe 7
sequenceTriggered shouldBe 3

variable.set(Seq(8))
sequence.now().size shouldBe 1
sequence.now().apply(0).now() shouldBe 8
sequenceTriggered shouldBe 3
}
}

it should "sequence on Var[Option[T]]" in {
Expand Down

0 comments on commit 6f87959

Please sign in to comment.