Skip to content

Commit

Permalink
Merge branch 'master' into var-prism
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman authored Nov 17, 2023
2 parents c5dfa9c + 225513f commit f14e2c8
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@

# Scala Steward: Reformat with scalafmt 3.7.0
e41ea66e8238a2e1e924f3845d923c5911a8f29a

# Scala Steward: Reformat with scalafmt 3.7.17
8bde3f01a9da6f1e81c9c73378c777fa85393280
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
runner.dialect = scala213
version = "3.7.16"
version = "3.7.17"
maxColumn = 140
trailingCommas = always
align.preset = most
4 changes: 2 additions & 2 deletions colibri/src/main/scala/colibri/Connectable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ final class Connectable[+T] private (val value: T, val unsafeConnect: () => Canc
new Connectable(connectable.value, () => Cancelable.composite(unsafeConnect(), connectable.unsafeConnect()))
}
}
object Connectable {
object Connectable {
def apply[T](value: T, unsafeConnect: () => Cancelable) = {
val cancelable = Cancelable.refCount(unsafeConnect)
new Connectable(value, cancelable.ref)
}

@inline implicit class ConnectableObservableOperations[A](val source: Connectable[Observable[A]]) extends AnyVal {
def refCount: Observable[A] = new Observable[A] {
def refCount: Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect())
}
def unsafeHot(): Observable[A] = {
Expand Down
32 changes: 16 additions & 16 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ object Observable {
}
}

def fromEffect[F[_]: RunEffect, A](effect: F[A]): Observable[A] = new Observable[A] {
def fromEffect[F[_]: RunEffect, A](effect: F[A]): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable =
RunEffect[F].unsafeRunSyncOrAsyncCancelable[A](effect)(_.fold(sink.unsafeOnError, sink.unsafeOnNext))
}
Expand Down Expand Up @@ -214,7 +214,7 @@ object Observable {
IO.fromFuture(IO(value5)),
)

def concatEffect[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = new Observable[T] {
def concatEffect[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = new Observable[T] {
def unsafeSubscribe(sink: Observer[T]): Cancelable = {
val consecutive = Cancelable.consecutive()
consecutive.unsafeAdd(() =>
Expand All @@ -228,7 +228,7 @@ object Observable {
consecutive
}
}
def concatFuture[T](value: => Future[T], source: Observable[T]): Observable[T] =
def concatFuture[T](value: => Future[T], source: Observable[T]): Observable[T] =
concatEffect(IO.fromFuture(IO.pure(value)), source)

@inline def merge[A](sources: Observable[A]*): Observable[A] = mergeIterable(sources)
Expand Down Expand Up @@ -575,7 +575,7 @@ object Observable {
def tapFailedEffect[F[_]: RunEffect: Applicative](f: Throwable => F[Unit]): Observable[A] =
attempt.tapEffect(_.swap.traverseTap(f).void).flattenEither

def tapSubscribe(f: () => Cancelable): Observable[A] = new Observable[A] {
def tapSubscribe(f: () => Cancelable): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = {
val cancelable = f()
Cancelable.composite(
Expand All @@ -596,7 +596,7 @@ object Observable {
}
}

def tap(f: A => Unit): Observable[A] = new Observable[A] {
def tap(f: A => Unit): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = {
source.unsafeSubscribe(sink.doOnNext { value =>
f(value)
Expand Down Expand Up @@ -733,7 +733,7 @@ object Observable {
}
}

def singleMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] {
def singleMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] {
def unsafeSubscribe(sink: Observer[B]): Cancelable = {
val single = Cancelable.singleOrDrop()

Expand Down Expand Up @@ -763,7 +763,7 @@ object Observable {
}
}

@inline def singleMapFuture[B](f: A => Future[B]): Observable[B] =
@inline def singleMapFuture[B](f: A => Future[B]): Observable[B] =
singleMapEffect(v => IO.fromFuture(IO(f(v))))

@inline def flatMap[B](f: A => Observable[B]): Observable[B] = concatMap(f)
Expand Down Expand Up @@ -1329,20 +1329,20 @@ object Observable {
def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(transform(sink))
}

@inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]())
@inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]())
@inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]())
@inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]())
@inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]())
@inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]())
@inline def behavior(seed: A): Connectable[Observable[A]] = multicast(Subject.behavior(seed))

@inline def publishShare: Observable[A] = publish.refCount
@inline def replayLatestShare: Observable[A] = replayLatest.refCount
@inline def replayAllShare: Observable[A] = replayAll.refCount
@inline def publishShare: Observable[A] = publish.refCount
@inline def replayLatestShare: Observable[A] = replayLatest.refCount
@inline def replayAllShare: Observable[A] = replayAll.refCount
@inline def behaviorShare(seed: A): Observable[A] = behavior(seed).refCount

@inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount))
@inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
@inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount))
@inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.replayLatest.refCount))
@inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
@inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.replayAll.refCount))
@inline def behaviorSelector[B](value: A)(f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.behavior(value).refCount))
Expand Down
2 changes: 1 addition & 1 deletion zio/src/test/scala/colibri/ObservableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
import scala.concurrent.duration._

val test = for {
//TODO: why does it need an actual delay?
// TODO: why does it need an actual delay?
_ <- IO.sleep(FiniteDuration.apply(1, TimeUnit.SECONDS))

_ = received shouldBe List(15, 10, 6, 3, 1, 0)
Expand Down

0 comments on commit f14e2c8

Please sign in to comment.