Skip to content

Commit

Permalink
feat: introduce intersperse function
Browse files Browse the repository at this point in the history
The intersperse function behaves as `List.mkString` e.g.:
* called with an `inject` element only it produces

  Source.fromValues("f", "b").intersperse(", ")  // ("f", ", ", "b")

* called with `start`, `inject` and `end` it produces

  Source.fromValues("f", "b").intersperse("[", ", ", "]")  // ("[", "f", ", ", "b", "]")
  • Loading branch information
geminicaprograms committed Oct 12, 2023
1 parent 161557a commit d77e952
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 1 deletion.
53 changes: 52 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ox.channels

import ox.*

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -34,6 +34,57 @@ trait SourceOps[+T] { this: Source[T] =>
}
c2

/** Intersperses stream with provided element.
*
* @param inject
* An element to be injected between the stream elements.
* @return
* A source, onto which elements will be injected.
* @example
* {{{
* Source.empty[String].intersperse(", ") // ()
* Source.fromValues("foo").intersperse(", ") // ("foo")
* Source.fromValues("foo", "bar").intersperse(", ") // ("foo", ", ", "bar")
* }}}
*/
def intersperse[U >: T](inject: U)(using Ox, StageCapacity): Source[U] =
intersperse(None, inject, None)

/** Intersperses stream with start, end and provided element.
*
* @param start
* An element to be prepended to the stream.
* @param inject
* An element to be injected between the stream elements.
* @param end
* An element to be appended to the end of the stream.
* @return
* A source, onto which elements will be injected.
* @example
* {{{
* Source.empty[String].intersperse("[", ", ", "]") // ("[", "]")
* Source.fromValues("foo").intersperse("[", ", ", "]") // ("[", "foo", "]")
* Source.fromValues("foo", "bar").intersperse("[", ", ", "]") // ("[", "foo", ", ", "bar", "]")
* }}}
*/
def intersperse[U >: T](start: U, inject: U, end: U)(using Ox, StageCapacity): Source[U] =
intersperse(Some(start), inject, Some(end))

private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
forkDaemon {
var firstEmitted = false
start.foreach(c.send(_))
repeatWhile {
receive() match
case ChannelClosed.Done => end.foreach(c.send(_)); c.done(); false
case ChannelClosed.Error(e) => c.error(e); false
case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v).isValue
case v: U @unchecked => c.send(inject); c.send(v).isValue
}
}
c

/** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel. At
* most `parallelism` invocations of `f` are run in parallel.
*
Expand Down
107 changes: 107 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsIntersperseTest extends AnyFlatSpec with Matchers {
behavior of "Source.intersperse"

it should "intersperse with inject only over an empty source" in {
scoped {
val c = Channel[String]()
fork {
c.done()
}

val s = c.intersperse(", ")

s.receive() shouldBe ChannelClosed.Done
}
}

it should "intersperse with inject only over a source with one element" in {
scoped {
val c = Channel[String]()
fork {
c.send("foo")
c.done()
}

val s = c.intersperse(", ")

s.receive() shouldBe "foo"
s.receive() shouldBe ChannelClosed.Done
}
}

it should "intersperse with inject only over a source with multiple elements" in {
scoped {
val c = Channel[String]()
fork {
c.send("foo")
c.send("bar")
c.done()
}

val s = c.intersperse(", ")

s.receive() shouldBe "foo"
s.receive() shouldBe ", "
s.receive() shouldBe "bar"
s.receive() shouldBe ChannelClosed.Done
}
}

it should "intersperse with start, inject and end over an empty source" in {
scoped {
val c = Channel[String]()
fork {
c.done()
}

val s = c.intersperse("[", ", ", "]")

s.receive() shouldBe "["
s.receive() shouldBe "]"
s.receive() shouldBe ChannelClosed.Done
}
}

it should "intersperse with start, inject and end over a source with one element" in {
scoped {
val c = Channel[String]()
fork {
c.send("foo")
c.done()
}

val s = c.intersperse("[", ", ", "]")

s.receive() shouldBe "["
s.receive() shouldBe "foo"
s.receive() shouldBe "]"
s.receive() shouldBe ChannelClosed.Done
}
}

it should "intersperse with start, inject and end over a source with multiple elements" in {
scoped {
val c = Channel[String]()
fork {
c.send("foo")
c.send("bar")
c.done()
}

val s = c.intersperse("[", ", ", "]")

s.receive() shouldBe "["
s.receive() shouldBe "foo"
s.receive() shouldBe ", "
s.receive() shouldBe "bar"
s.receive() shouldBe "]"
s.receive() shouldBe ChannelClosed.Done
}
}
}

0 comments on commit d77e952

Please sign in to comment.