From 8ac69976e3945da5554c441f0d3a133344683eca Mon Sep 17 00:00:00 2001 From: johannes karoff Date: Sat, 22 Jun 2024 21:55:35 +0200 Subject: [PATCH] add Observable#bufferTimed --- .../src/main/scala/colibri/Observable.scala | 41 ++++++++++++++++--- .../test/scala/colibri/ObservableSpec.scala | 39 ++++++++++++++++++ 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/colibri/src/main/scala/colibri/Observable.scala b/colibri/src/main/scala/colibri/Observable.scala index 43b12495..7b6e97d3 100644 --- a/colibri/src/main/scala/colibri/Observable.scala +++ b/colibri/src/main/scala/colibri/Observable.scala @@ -498,12 +498,12 @@ object Observable { def parMapFuture[B](f: A => Future[B]): Observable[B] = parMapEffect(a => IO.fromFuture(IO(f(a)))) - def discard: Observable[Nothing] = Observable.empty.subscribing(source) - def void: Observable[Unit] = map(_ => ()) - def as[B](value: B): Observable[B] = map(_ => value) - def asEval[B](value: => B): Observable[B] = map(_ => value) + def discard: Observable[Nothing] = Observable.empty.subscribing(source) + def void: Observable[Unit] = map(_ => ()) + def as[B](value: B): Observable[B] = map(_ => value) + def asEval[B](value: => B): Observable[B] = map(_ => value) def asEffect[F[_]: RunEffect, B](value: => F[B]): Observable[B] = mapEffect(_ => value) - def asFuture[B](value: => Future[B]): Observable[B] = mapFuture(_ => value) + def asFuture[B](value: => Future[B]): Observable[B] = mapFuture(_ => value) def mapFilter[B](f: A => Option[B]): Observable[B] = new Observable[B] { def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(sink.contramapFilter(f)) @@ -1316,6 +1316,37 @@ object Observable { } } + @inline def bufferTimed[Col[_]](duration: FiniteDuration)(implicit factory: Factory[A, Col[A]]): Observable[Col[A]] = bufferTimedMillis( + duration.toMillis.toInt, + ) + + def bufferTimedMillis[Col[_]](duration: Int)(implicit factory: Factory[A, Col[A]]): Observable[Col[A]] = new Observable[Col[A]] { + def unsafeSubscribe(sink: Observer[Col[A]]): Cancelable = { + var isCancel = false + var builder = factory.newBuilder + + def send(): Unit = { + sink.unsafeOnNext(builder.result()) + builder = factory.newBuilder + } + + val intervalId = timers.setInterval(duration.toDouble) { if (!isCancel) send() } + + Cancelable.composite( + Cancelable { () => + isCancel = true + timers.clearInterval(intervalId) + }, + source.unsafeSubscribe( + Observer.createUnrecovered( + value => builder += value, + sink.unsafeOnError, + ), + ), + ) + } + } + def evalOn(ec: ExecutionContext): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { var isCancel = false diff --git a/colibri/src/test/scala/colibri/ObservableSpec.scala b/colibri/src/test/scala/colibri/ObservableSpec.scala index 65ce27ec..5d120785 100644 --- a/colibri/src/test/scala/colibri/ObservableSpec.scala +++ b/colibri/src/test/scala/colibri/ObservableSpec.scala @@ -1931,4 +1931,43 @@ class ObservableSpec extends AsyncFlatSpec with Matchers { test.unsafeToFuture() } + + it should "bufferTimed" in { + var received = List.empty[Int] + var errors = 0 + val subject = Subject.behavior[Int](0) + val stream = subject.bufferTimedMillis[Vector](100) + + import scala.concurrent.duration._ + + val cancelable = stream.unsafeSubscribe( + Observer.create[Vector[Int]]( + received ++= _, + _ => errors += 1, + ), + ) + cancelable.isEmpty() shouldBe false + received shouldBe List() + + val test = for { + _ <- subject.onNextIO(1) + _ = received shouldBe List() + _ <- IO.sleep(50.millis) + _ <- subject.onNextIO(2) + _ = received shouldBe List() + _ <- IO.sleep(50.millis) + _ <- subject.onNextIO(3) + _ = received shouldBe List(0, 1, 2) + _ <- subject.onNextIO(4) + _ = received shouldBe List(0, 1, 2) + _ <- IO.sleep(200.millis) + _ <- subject.onNextIO(5) + _ = received shouldBe List(0, 1, 2, 3, 4) + _ <- IO.sleep(100.millis) + _ = received shouldBe List(0, 1, 2, 3, 4, 5) + _ = errors shouldBe 0 + } yield succeed + + test.unsafeToFuture() + } }