diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 6494193b..9272518d 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -2,7 +2,7 @@ package ox.channels import ox.* -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore} +import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.mutable import scala.concurrent.duration.FiniteDuration @@ -34,6 +34,57 @@ trait SourceOps[+T] { this: Source[T] => } c2 + /** Intersperses stream with provided element. + * + * @param inject + * An element to be injected between the stream elements. + * @return + * A source, onto which elements will be injected. + * @example + * {{{ + * Source.empty[String].intersperse(", ") // () + * Source.fromValues("foo").intersperse(", ") // ("foo") + * Source.fromValues("foo", "bar").intersperse(", ") // ("foo", ", ", "bar") + * }}} + */ + def intersperse[T](inject: T)(using Ox, StageCapacity): Source[T] = + intersperse(None, inject, None) + + /** Intersperses stream with start, end and provided element. + * + * @param start + * An element to be prepended to the stream. + * @param inject + * An element to be injected between the stream elements. + * @param end + * An element to be appended to the end of the stream. + * @return + * A source, onto which elements will be injected. + * @example + * {{{ + * Source.empty[String].intersperse("[", ", ", "]") // ("[", "]") + * Source.fromValues("foo").intersperse("[", ", ", "]") // ("[", "foo", "]") + * Source.fromValues("foo", "bar").intersperse("[", ", ", "]") // ("[", "foo", ", ", "bar", "]") + * }}} + */ + def intersperse[T](start: T, inject: T, end: T)(using Ox, StageCapacity): Source[T] = + intersperse(Some(start), inject, Some(end)) + + private def intersperse[T](start: Option[T], inject: T, end: Option[T])(using Ox, StageCapacity): Source[T] = + val c = StageCapacity.newChannel[T] + forkDaemon { + var firstEmitted = false + start.foreach(c.send(_)) + repeatWhile { + receive() match + case ChannelClosed.Done => end.foreach(c.send(_)); c.done(); false + case ChannelClosed.Error(e) => c.error(e); false + case v: T @unchecked if !firstEmitted => firstEmitted = true; c.send(v).isValue + case v: T @unchecked => c.send(inject); c.send(v).isValue + } + } + c + /** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel. At * most `parallelism` invocations of `f` are run in parallel. * diff --git a/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala b/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala new file mode 100644 index 00000000..d6d8d3fb --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala @@ -0,0 +1,107 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsIntersperseTest extends AnyFlatSpec with Matchers { + behavior of "Source.intersperse" + + it should "intersperse with inject only over an empty source" in { + scoped { + val c = Channel[String]() + fork { + c.done() + } + + val s = c.intersperse(", ") + + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with inject only over a source with one element" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.done() + } + + val s = c.intersperse(", ") + + s.receive() shouldBe "foo" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with inject only over a source with multiple elements" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.send("bar") + c.done() + } + + val s = c.intersperse(", ") + + s.receive() shouldBe "foo" + s.receive() shouldBe ", " + s.receive() shouldBe "bar" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with start, inject and end over an empty source" in { + scoped { + val c = Channel[String]() + fork { + c.done() + } + + val s = c.intersperse("[", ", ", "]") + + s.receive() shouldBe "[" + s.receive() shouldBe "]" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with start, inject and end over a source with one element" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.done() + } + + val s = c.intersperse("[", ", ", "]") + + s.receive() shouldBe "[" + s.receive() shouldBe "foo" + s.receive() shouldBe "]" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with start, inject and end over a source with multiple elements" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.send("bar") + c.done() + } + + val s = c.intersperse("[", ", ", "]") + + s.receive() shouldBe "[" + s.receive() shouldBe "foo" + s.receive() shouldBe ", " + s.receive() shouldBe "bar" + s.receive() shouldBe "]" + s.receive() shouldBe ChannelClosed.Done + } + } +}