diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 6494193b..5e96558f 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -2,7 +2,7 @@ package ox.channels import ox.* -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore} +import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.mutable import scala.concurrent.duration.FiniteDuration @@ -34,6 +34,68 @@ trait SourceOps[+T] { this: Source[T] => } c2 + /** Intersperses stream with provided element. + * + * @param inject + * An element to be injected between the stream elements. + * @return + * A source, onto which elements will be injected. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[String].intersperse(", ").toList // List() + * Source.fromValues("foo").intersperse(", ").toList // List(foo) + * Source.fromValues("foo", "bar").intersperse(", ").toList // List(foo, ", ", bar) + * } + * }}} + */ + def intersperse[U >: T](inject: U)(using Ox, StageCapacity): Source[U] = + intersperse(None, inject, None) + + /** Intersperses stream with start, end and provided element. + * + * @param start + * An element to be prepended to the stream. + * @param inject + * An element to be injected between the stream elements. + * @param end + * An element to be appended to the end of the stream. + * @return + * A source, onto which elements will be injected. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[String].intersperse("[", ", ", "]").toList // List([, ]) + * Source.fromValues("foo").intersperse("[", ", ", "]").toList // List([, foo, ]) + * Source.fromValues("foo", "bar").intersperse("[", ", ", "]").toList // List([, foo, ", ", bar, ]) + * } + * }}} + */ + def intersperse[U >: T](start: U, inject: U, end: U)(using Ox, StageCapacity): Source[U] = + intersperse(Some(start), inject, Some(end)) + + private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] = + val c = StageCapacity.newChannel[U] + forkDaemon { + if (start.map(c.send(_).isValue).getOrElse(true)) + var firstEmitted = false + repeatWhile { + receive() match + case ChannelClosed.Done => end.foreach(c.send); c.done(); false + case ChannelClosed.Error(e) => c.error(e); false + case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v).isValue + case v: U @unchecked => c.send(inject).isValue && c.send(v).isValue + } + else c.done() + } + c + /** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel. At * most `parallelism` invocations of `f` are run in parallel. * diff --git a/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala b/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala new file mode 100644 index 00000000..488b531e --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala @@ -0,0 +1,81 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsIntersperseTest extends AnyFlatSpec with Matchers { + behavior of "Source.intersperse" + + it should "intersperse with inject only over an empty source" in scoped { + val c = Channel[String]() + fork { + c.done() + } + + val s = c.intersperse(", ") + + s.receive() shouldBe ChannelClosed.Done + } + + it should "intersperse with inject only over a source with one element" in scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.done() + } + + val s = c.intersperse(", ") + + s.toList shouldBe List("foo") + } + + it should "intersperse with inject only over a source with multiple elements" in scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.send("bar") + c.done() + } + + val s = c.intersperse(", ") + + s.toList shouldBe List("foo", ", ", "bar") + } + + it should "intersperse with start, inject and end over an empty source" in scoped { + val c = Channel[String]() + fork { + c.done() + } + + val s = c.intersperse("[", ", ", "]") + + s.toList shouldBe List("[", "]") + } + + it should "intersperse with start, inject and end over a source with one element" in scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.done() + } + + val s = c.intersperse("[", ", ", "]") + + s.toList shouldBe List("[", "foo", "]") + } + + it should "intersperse with start, inject and end over a source with multiple elements" in scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.send("bar") + c.done() + } + + val s = c.intersperse("[", ", ", "]") + + s.toList shouldBe List("[", "foo", ", ", "bar", "]") + } +}