From a18cc46d6480ab633c239525fbd2680eddb30928 Mon Sep 17 00:00:00 2001 From: Jacek Centkowski Date: Wed, 11 Oct 2023 15:50:00 +0200 Subject: [PATCH] feat: introduce `intersperce` function The intersperce function behaves as `List.mkString` e.g.: * called with an `inject` element only it produces Source.fromValues("f", "b").intersperce(", ") // ("f", ", ", "b") * callend with `start`, `inject` and `end` it produces Source.fromValues("f", "b").intersperce("[", ", ", "]") // ("[", "f", ", ", "b", "]") --- .../main/scala/ox/channels/SourceOps.scala | 53 ++++++++- .../channels/SourceOpsIntersperseTest.scala | 107 ++++++++++++++++++ 2 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 6494193b..cf7e1369 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -2,7 +2,7 @@ package ox.channels import ox.* -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore} +import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.mutable import scala.concurrent.duration.FiniteDuration @@ -34,6 +34,57 @@ trait SourceOps[+T] { this: Source[T] => } c2 + /** Intersperses stream with provided element. + * + * @param inject + * An element to be injected between the stream elements. + * @return + * A source, onto which elements will be injected. + * @example + * {{{ + * Source.empty[String].intersperce(", ") // () + * Source.fromValues("foo").intersperce(", ") // ("foo") + * Source.fromValues("foo", "bar").intersperce(", ") // ("foo", ", ", "bar") + * }}} + */ + def intersperce[T](inject: T)(using Ox, StageCapacity): Source[T] = + intersperce(None, inject, None) + + /** Intersperses stream with start, end and provided element. + * + * @param start + * An element to be prepended to the stream. + * @param inject + * An element to be injected between the stream elements. + * @param end + * An element to be appended to the end of the stream. + * @return + * A source, onto which elements will be injected. + * @example + * {{{ + * Source.empty[String].intersperce("[", ", ", "]") // ("[", "]") + * Source.fromValues("foo").intersperce("[", ", ", "]") // ("[", "foo", "]") + * Source.fromValues("foo", "bar").intersperce("[", ", ", "]") // ("[", "foo", ", ", "bar", "]") + * }}} + */ + def intersperce[T](start: T, inject: T, end: T)(using Ox, StageCapacity): Source[T] = + intersperce(Some(start), inject, Some(end)) + + private def intersperce[T](start: Option[T], inject: T, end: Option[T])(using Ox, StageCapacity): Source[T] = + val c = StageCapacity.newChannel[T] + forkDaemon { + var firstEmitted = false + start.foreach(c.send(_)) + repeatWhile { + receive() match + case ChannelClosed.Done => end.foreach(c.send(_)); c.done(); false + case ChannelClosed.Error(e) => c.error(e); false + case v: T @unchecked if !firstEmitted => firstEmitted = true; c.send(v).isValue + case v: T @unchecked => c.send(inject); c.send(v).isValue + } + } + c + /** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel. At * most `parallelism` invocations of `f` are run in parallel. * diff --git a/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala b/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala new file mode 100644 index 00000000..95aee638 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala @@ -0,0 +1,107 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsIntersperseTest extends AnyFlatSpec with Matchers { + behavior of "Source.intersperse" + + it should "intersperse with inject only over an empty source" in { + scoped { + val c = Channel[String]() + fork { + c.done() + } + + val s = c.intersperce(", ") + + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with inject only over a source with one element" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.done() + } + + val s = c.intersperce(", ") + + s.receive() shouldBe "foo" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with inject only over a source with multiple elements" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.send("bar") + c.done() + } + + val s = c.intersperce(", ") + + s.receive() shouldBe "foo" + s.receive() shouldBe ", " + s.receive() shouldBe "bar" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with start, inject and end over an empty source" in { + scoped { + val c = Channel[String]() + fork { + c.done() + } + + val s = c.intersperce("[", ", ", "]") + + s.receive() shouldBe "[" + s.receive() shouldBe "]" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with start, inject and end over a source with one element" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.done() + } + + val s = c.intersperce("[", ", ", "]") + + s.receive() shouldBe "[" + s.receive() shouldBe "foo" + s.receive() shouldBe "]" + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "intersperse with start, inject and end over a source with multiple elements" in { + scoped { + val c = Channel[String]() + fork { + c.send("foo") + c.send("bar") + c.done() + } + + val s = c.intersperce("[", ", ", "]") + + s.receive() shouldBe "[" + s.receive() shouldBe "foo" + s.receive() shouldBe ", " + s.receive() shouldBe "bar" + s.receive() shouldBe "]" + s.receive() shouldBe ChannelClosed.Done + } + } +}