Skip to content

Commit

Permalink
Merge pull request #78 from sh0hei/refactor/explicit-type-annotation
Browse files Browse the repository at this point in the history
Explicit type annotation
  • Loading branch information
sh0hei authored Jun 19, 2021
2 parents 13dda1b + 6a60729 commit 5159482
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
24 changes: 13 additions & 11 deletions src/main/scala/sangria/streaming/monix.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,52 @@ import scala.language.higherKinds
import _root_.monix.execution.Scheduler
import _root_.monix.reactive._
import _root_.monix.eval.Task
import _root_.monix.execution.CancelableFuture
import cats.effect.ExitCase

import scala.concurrent.Future

object monix {
class ObservableSubscriptionStream(implicit scheduler: Scheduler)
extends SubscriptionStream[Observable] {
def supported[T[_]](other: SubscriptionStream[T]) =
def supported[T[_]](other: SubscriptionStream[T]): Boolean =
other.isInstanceOf[ObservableSubscriptionStream]

def map[A, B](source: Observable[A])(fn: A => B) = source.map(fn)
def map[A, B](source: Observable[A])(fn: A => B): Observable[B] = source.map(fn)

def singleFuture[T](value: Future[T]) =
def singleFuture[T](value: Future[T]): Observable[T] =
Observable.fromFuture(value)

def single[T](value: T) = Observable(value)
def single[T](value: T): Observable[T] = Observable(value)

def mapFuture[A, B](source: Observable[A])(fn: A => Future[B]) =
def mapFuture[A, B](source: Observable[A])(fn: A => Future[B]): Observable[B] =
source.mergeMap(a => Observable.fromFuture(fn(a)))

def first[T](s: Observable[T]) =
def first[T](s: Observable[T]): CancelableFuture[T] =
s.firstOrElseL(
throw new IllegalStateException(
"Promise was not completed - observable haven't produced any elements."))
.runToFuture

def failed[T](e: Throwable) = Observable.raiseError(e)
def failed[T](e: Throwable): Observable[Nothing] = Observable.raiseError(e)

def onComplete[Ctx, Res](result: Observable[Res])(op: => Unit) =
def onComplete[Ctx, Res](result: Observable[Res])(op: => Unit): Observable[Res] =
result.guaranteeCase {
case ExitCase.Error(e) => Task(op)
case _ => Task(op)
}

def flatMapFuture[Ctx, Res, T](future: Future[T])(resultFn: T => Observable[Res]) =
def flatMapFuture[Ctx, Res, T](future: Future[T])(
resultFn: T => Observable[Res]): Observable[Res] =
Observable.fromFuture(future).mergeMap(resultFn)

def merge[T](streams: Vector[Observable[T]]) =
def merge[T](streams: Vector[Observable[T]]): Observable[T] =
if (streams.nonEmpty)
Observable(streams: _*).merge
else
throw new IllegalStateException("No streams produced!")

def recover[T](stream: Observable[T])(fn: Throwable => T) =
def recover[T](stream: Observable[T])(fn: Throwable => T): Observable[T] =
stream.onErrorRecover { case e => fn(e) }
}

Expand Down
22 changes: 11 additions & 11 deletions src/test/scala/sangria/streaming/MonixIntegrationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MonixIntegrationSpec extends AnyWordSpec with Matchers {
}

"mapFuture" in {
res(impl.mapFuture(Observable(1, 2, 10))(x Future.successful(x + 1))) should be(
res(impl.mapFuture(Observable(1, 2, 10))(x => Future.successful(x + 1))) should be(
List(2, 3, 11))
}

Expand Down Expand Up @@ -82,17 +82,17 @@ class MonixIntegrationSpec extends AnyWordSpec with Matchers {

"flatMapFuture" in {
res(
impl.flatMapFuture(Future.successful(1))(i
impl.flatMapFuture(Future.successful(1))(i =>
Observable(i.toString, (i + 1).toString))) should be(List("1", "2"))
}

"recover" in {
val obs = Observable(1, 2, 3, 4).map { i
val obs = Observable(1, 2, 3, 4).map { i =>
if (i == 3) throw new IllegalStateException("foo")
else i
}

res(impl.recover(obs)(_ 100)) should be(List(1, 2, 100))
res(impl.recover(obs)(_ => 100)) should be(List(1, 2, 100))
}

"merge" in {
Expand All @@ -102,13 +102,13 @@ class MonixIntegrationSpec extends AnyWordSpec with Matchers {

val result = res(impl.merge(Vector(obs1, obs2, obs3)))

result should (have(size(6))
result should have(size(6))
.and(contain(1))
.and(contain(2))
.and(contain(3))
.and(contain(4))
.and(contain(100))
.and(contain(200)))
.and(contain(200))
}

"merge 2" in {
Expand All @@ -117,29 +117,29 @@ class MonixIntegrationSpec extends AnyWordSpec with Matchers {

val result = res(impl.merge(Vector(obs1, obs2)))

result should (have(size(4))
result should have(size(4))
.and(contain(1))
.and(contain(2))
.and(contain(100))
.and(contain(200)))
.and(contain(200))
}

"merge 1" in {
val obs1 = Observable(1, 2)

val result = res(impl.merge(Vector(obs1)))

result should (have(size(2)).and(contain(1)).and(contain(2)))
result should have(size(2)).and(contain(1)).and(contain(2))
}

"merge throws exception on empty" in {
an[IllegalStateException] should be thrownBy impl.merge(Vector.empty)
}
}

def res[T](obs: Observable[T]) =
def res[T](obs: Observable[T]): List[T] =
Await.result(obs.toListL.runToFuture, 2 seconds)

def res[T](f: Future[T]) =
def res[T](f: Future[T]): T =
Await.result(f, 2 seconds)
}

0 comments on commit 5159482

Please sign in to comment.