From ea8a9c37bd18b353c77fdb17c3db4e00ef7e8eff Mon Sep 17 00:00:00 2001 From: Jacek Centkowski Date: Mon, 16 Oct 2023 17:44:25 +0200 Subject: [PATCH] feat: implement `drop` function Drop `n` elements from the source and emit subsequent elements (if any left) e.g.: Source.empty[Int].drop(1).toList // List() Source.fromValues(1, 2, 3).drop(1).toList // List(2 ,3) Source.fromValues(1).drop(2).toList // List() --- .../main/scala/ox/channels/SourceOps.scala | 20 +++++++++++++++- .../scala/ox/channels/SourceOpsDropTest.scala | 24 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/ox/channels/SourceOpsDropTest.scala diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 6494193b..6984efc4 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -2,7 +2,7 @@ package ox.channels import ox.* -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore} +import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.mutable import scala.concurrent.duration.FiniteDuration @@ -141,6 +141,24 @@ trait SourceOps[+T] { this: Source[T] => def take(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.take(n)) + /** Drops `n` elements from the source and emits any subsequent elements. + * + * @param n + * Number of elements to be dropped. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[Int].drop(1).toList // List() + * Source.fromValues(1, 2, 3).drop(1).toList // List(2 ,3) + * Source.fromValues(1).drop(2).toList // List() + * } + * }}} + */ + def drop(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.drop(n)) + def filter(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.filter(f)) def transform[U](f: Iterator[T] => Iterator[U])(using Ox, StageCapacity): Source[U] = diff --git a/core/src/test/scala/ox/channels/SourceOpsDropTest.scala b/core/src/test/scala/ox/channels/SourceOpsDropTest.scala new file mode 100644 index 00000000..d41b0ee2 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsDropTest.scala @@ -0,0 +1,24 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsDropTest extends AnyFlatSpec with Matchers { + behavior of "Source.drop" + + it should "not drop from the empty source" in scoped { + val s = Source.empty[Int] + s.drop(1).toList shouldBe List.empty + } + + it should "drop elements from the source" in scoped { + val s = Source.fromValues(1, 2, 3) + s.drop(2).toList shouldBe List(3) + } + + it should "return empty source when more elements than source length was dropped" in scoped { + val s = Source.fromValues(1, 2) + s.drop(3).toList shouldBe List.empty + } +}