Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scalafmt-core to 3.7.17 #359

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@

# Scala Steward: Reformat with scalafmt 3.7.0
e41ea66e8238a2e1e924f3845d923c5911a8f29a

# Scala Steward: Reformat with scalafmt 3.7.17
8bde3f01a9da6f1e81c9c73378c777fa85393280
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
runner.dialect = scala213
version = "3.7.16"
version = "3.7.17"
maxColumn = 140
trailingCommas = always
align.preset = most
4 changes: 2 additions & 2 deletions colibri/src/main/scala/colibri/Connectable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ final class Connectable[+T] private (val value: T, val unsafeConnect: () => Canc
new Connectable(connectable.value, () => Cancelable.composite(unsafeConnect(), connectable.unsafeConnect()))
}
}
object Connectable {
object Connectable {
def apply[T](value: T, unsafeConnect: () => Cancelable) = {
val cancelable = Cancelable.refCount(unsafeConnect)
new Connectable(value, cancelable.ref)
}

@inline implicit class ConnectableObservableOperations[A](val source: Connectable[Observable[A]]) extends AnyVal {
def refCount: Observable[A] = new Observable[A] {
def refCount: Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect())
}
def unsafeHot(): Observable[A] = {
Expand Down
32 changes: 16 additions & 16 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ object Observable {
}
}

def fromEffect[F[_]: RunEffect, A](effect: F[A]): Observable[A] = new Observable[A] {
def fromEffect[F[_]: RunEffect, A](effect: F[A]): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable =
RunEffect[F].unsafeRunSyncOrAsyncCancelable[A](effect)(_.fold(sink.unsafeOnError, sink.unsafeOnNext))
}
Expand Down Expand Up @@ -214,7 +214,7 @@ object Observable {
IO.fromFuture(IO(value5)),
)

def concatEffect[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = new Observable[T] {
def concatEffect[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = new Observable[T] {
def unsafeSubscribe(sink: Observer[T]): Cancelable = {
val consecutive = Cancelable.consecutive()
consecutive.unsafeAdd(() =>
Expand All @@ -228,7 +228,7 @@ object Observable {
consecutive
}
}
def concatFuture[T](value: => Future[T], source: Observable[T]): Observable[T] =
def concatFuture[T](value: => Future[T], source: Observable[T]): Observable[T] =
concatEffect(IO.fromFuture(IO.pure(value)), source)

@inline def merge[A](sources: Observable[A]*): Observable[A] = mergeIterable(sources)
Expand Down Expand Up @@ -575,7 +575,7 @@ object Observable {
def tapFailedEffect[F[_]: RunEffect: Applicative](f: Throwable => F[Unit]): Observable[A] =
attempt.tapEffect(_.swap.traverseTap(f).void).flattenEither

def tapSubscribe(f: () => Cancelable): Observable[A] = new Observable[A] {
def tapSubscribe(f: () => Cancelable): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = {
val cancelable = f()
Cancelable.composite(
Expand All @@ -596,7 +596,7 @@ object Observable {
}
}

def tap(f: A => Unit): Observable[A] = new Observable[A] {
def tap(f: A => Unit): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = {
source.unsafeSubscribe(sink.doOnNext { value =>
f(value)
Expand Down Expand Up @@ -733,7 +733,7 @@ object Observable {
}
}

def singleMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] {
def singleMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] {
def unsafeSubscribe(sink: Observer[B]): Cancelable = {
val single = Cancelable.singleOrDrop()

Expand Down Expand Up @@ -763,7 +763,7 @@ object Observable {
}
}

@inline def singleMapFuture[B](f: A => Future[B]): Observable[B] =
@inline def singleMapFuture[B](f: A => Future[B]): Observable[B] =
singleMapEffect(v => IO.fromFuture(IO(f(v))))

@inline def flatMap[B](f: A => Observable[B]): Observable[B] = concatMap(f)
Expand Down Expand Up @@ -1329,20 +1329,20 @@ object Observable {
def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(transform(sink))
}

@inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]())
@inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]())
@inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]())
@inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]())
@inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]())
@inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]())
@inline def behavior(seed: A): Connectable[Observable[A]] = multicast(Subject.behavior(seed))

@inline def publishShare: Observable[A] = publish.refCount
@inline def replayLatestShare: Observable[A] = replayLatest.refCount
@inline def replayAllShare: Observable[A] = replayAll.refCount
@inline def publishShare: Observable[A] = publish.refCount
@inline def replayLatestShare: Observable[A] = replayLatest.refCount
@inline def replayAllShare: Observable[A] = replayAll.refCount
@inline def behaviorShare(seed: A): Observable[A] = behavior(seed).refCount

@inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount))
@inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
@inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount))
@inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.replayLatest.refCount))
@inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
@inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.replayAll.refCount))
@inline def behaviorSelector[B](value: A)(f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.behavior(value).refCount))
Expand Down
2 changes: 1 addition & 1 deletion zio/src/test/scala/colibri/ObservableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
import scala.concurrent.duration._

val test = for {
//TODO: why does it need an actual delay?
// TODO: why does it need an actual delay?
_ <- IO.sleep(FiniteDuration.apply(1, TimeUnit.SECONDS))

_ = received shouldBe List(15, 10, 6, 3, 1, 0)
Expand Down