diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index d509cf20..acc79ad1 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -9,3 +9,6 @@ # Scala Steward: Reformat with scalafmt 3.7.0 e41ea66e8238a2e1e924f3845d923c5911a8f29a + +# Scala Steward: Reformat with scalafmt 3.7.17 +8bde3f01a9da6f1e81c9c73378c777fa85393280 diff --git a/.scalafmt.conf b/.scalafmt.conf index 15f7ad01..2a7bec65 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,5 +1,5 @@ runner.dialect = scala213 -version = "3.7.16" +version = "3.7.17" maxColumn = 140 trailingCommas = always align.preset = most diff --git a/colibri/src/main/scala/colibri/Connectable.scala b/colibri/src/main/scala/colibri/Connectable.scala index 03859e4f..61cf706f 100644 --- a/colibri/src/main/scala/colibri/Connectable.scala +++ b/colibri/src/main/scala/colibri/Connectable.scala @@ -7,14 +7,14 @@ final class Connectable[+T] private (val value: T, val unsafeConnect: () => Canc new Connectable(connectable.value, () => Cancelable.composite(unsafeConnect(), connectable.unsafeConnect())) } } -object Connectable { +object Connectable { def apply[T](value: T, unsafeConnect: () => Cancelable) = { val cancelable = Cancelable.refCount(unsafeConnect) new Connectable(value, cancelable.ref) } @inline implicit class ConnectableObservableOperations[A](val source: Connectable[Observable[A]]) extends AnyVal { - def refCount: Observable[A] = new Observable[A] { + def refCount: Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect()) } def unsafeHot(): Observable[A] = { diff --git a/colibri/src/main/scala/colibri/Observable.scala b/colibri/src/main/scala/colibri/Observable.scala index 762fcaab..c5262057 100644 --- a/colibri/src/main/scala/colibri/Observable.scala +++ b/colibri/src/main/scala/colibri/Observable.scala @@ -156,7 +156,7 @@ object Observable { } } - def fromEffect[F[_]: RunEffect, A](effect: F[A]): Observable[A] = new Observable[A] { + def fromEffect[F[_]: RunEffect, A](effect: F[A]): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = RunEffect[F].unsafeRunSyncOrAsyncCancelable[A](effect)(_.fold(sink.unsafeOnError, sink.unsafeOnNext)) } @@ -214,7 +214,7 @@ object Observable { IO.fromFuture(IO(value5)), ) - def concatEffect[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = new Observable[T] { + def concatEffect[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = new Observable[T] { def unsafeSubscribe(sink: Observer[T]): Cancelable = { val consecutive = Cancelable.consecutive() consecutive.unsafeAdd(() => @@ -228,7 +228,7 @@ object Observable { consecutive } } - def concatFuture[T](value: => Future[T], source: Observable[T]): Observable[T] = + def concatFuture[T](value: => Future[T], source: Observable[T]): Observable[T] = concatEffect(IO.fromFuture(IO.pure(value)), source) @inline def merge[A](sources: Observable[A]*): Observable[A] = mergeIterable(sources) @@ -575,7 +575,7 @@ object Observable { def tapFailedEffect[F[_]: RunEffect: Applicative](f: Throwable => F[Unit]): Observable[A] = attempt.tapEffect(_.swap.traverseTap(f).void).flattenEither - def tapSubscribe(f: () => Cancelable): Observable[A] = new Observable[A] { + def tapSubscribe(f: () => Cancelable): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { val cancelable = f() Cancelable.composite( @@ -596,7 +596,7 @@ object Observable { } } - def tap(f: A => Unit): Observable[A] = new Observable[A] { + def tap(f: A => Unit): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { source.unsafeSubscribe(sink.doOnNext { value => f(value) @@ -733,7 +733,7 @@ object Observable { } } - def singleMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] { + def singleMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] { def unsafeSubscribe(sink: Observer[B]): Cancelable = { val single = Cancelable.singleOrDrop() @@ -763,7 +763,7 @@ object Observable { } } - @inline def singleMapFuture[B](f: A => Future[B]): Observable[B] = + @inline def singleMapFuture[B](f: A => Future[B]): Observable[B] = singleMapEffect(v => IO.fromFuture(IO(f(v)))) @inline def flatMap[B](f: A => Observable[B]): Observable[B] = concatMap(f) @@ -1329,20 +1329,20 @@ object Observable { def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(transform(sink)) } - @inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]()) - @inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]()) - @inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]()) + @inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]()) + @inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]()) + @inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]()) @inline def behavior(seed: A): Connectable[Observable[A]] = multicast(Subject.behavior(seed)) - @inline def publishShare: Observable[A] = publish.refCount - @inline def replayLatestShare: Observable[A] = replayLatest.refCount - @inline def replayAllShare: Observable[A] = replayAll.refCount + @inline def publishShare: Observable[A] = publish.refCount + @inline def replayLatestShare: Observable[A] = replayLatest.refCount + @inline def replayAllShare: Observable[A] = replayAll.refCount @inline def behaviorShare(seed: A): Observable[A] = behavior(seed).refCount - @inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount)) - @inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] = + @inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount)) + @inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.replayLatest.refCount)) - @inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] = + @inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.replayAll.refCount)) @inline def behaviorSelector[B](value: A)(f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.behavior(value).refCount)) diff --git a/zio/src/test/scala/colibri/ObservableSpec.scala b/zio/src/test/scala/colibri/ObservableSpec.scala index b5155c81..e97b88e1 100644 --- a/zio/src/test/scala/colibri/ObservableSpec.scala +++ b/zio/src/test/scala/colibri/ObservableSpec.scala @@ -94,7 +94,7 @@ class ObservableSpec extends AsyncFlatSpec with Matchers { import scala.concurrent.duration._ val test = for { - //TODO: why does it need an actual delay? + // TODO: why does it need an actual delay? _ <- IO.sleep(FiniteDuration.apply(1, TimeUnit.SECONDS)) _ = received shouldBe List(15, 10, 6, 3, 1, 0)