Skip to content

Commit

Permalink
Handle errors in Source.statefulMap
Browse files Browse the repository at this point in the history
  • Loading branch information
rucek committed Oct 10, 2023
1 parent 2990190 commit 2251539
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
18 changes: 13 additions & 5 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,24 @@ trait SourceOps[+T] { this: Source[T] =>
repeatWhile {
receive() match
case ChannelClosed.Done =>
onComplete(state).foreach(c.send)
c.done()
try
onComplete(state).foreach(c.send)
c.done()
catch
case t: Throwable => c.error(t)
false
case ChannelClosed.Error(r) =>
c.error(r)
false
case t: T @unchecked =>
val (nextState, result) = f(state, t)
state = nextState
result.map(c.send(_).isValue).getOrElse(true)
try
val (nextState, result) = f(state, t)
state = nextState
result.map(c.send(_).isValue).getOrElse(true)
catch
case t: Throwable =>
c.error(t)
false
}
}
c
Expand Down
34 changes: 34 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsStatefulMapTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,38 @@ class SourceOpsStatefulMapTest extends AnyFlatSpec with Matchers {

s.toList shouldBe List(1, 2, 3, 4, 5)
}

it should "propagate errors in the mapping function" in scoped {
// given
val c = Source.fromValues("a", "b", "c")

// when
val s = c.statefulMap(() => 0) { (index, element) =>
if (index < 2) (index + 1, Some(element))
else throw new RuntimeException("boom")
}

// then
s.receive() shouldBe "a"
s.receive() shouldBe "b"
s.receive() should matchPattern {
case ChannelClosed.Error(Some(reason)) if reason.getMessage == "boom" =>
}
}

it should "propagate errors in the completion callback" in scoped {
// given
val c = Source.fromValues("a", "b", "c")

// when
val s = c.statefulMap(() => 0)((index, element) => (index + 1, Some(element)), _ => throw new RuntimeException("boom"))

// then
s.receive() shouldBe "a"
s.receive() shouldBe "b"
s.receive() shouldBe "c"
s.receive() should matchPattern {
case ChannelClosed.Error(Some(reason)) if reason.getMessage == "boom" =>
}
}
}

0 comments on commit 2251539

Please sign in to comment.