Skip to content

Commit

Permalink
feat: implement drop function
Browse files Browse the repository at this point in the history
Drop `n` elements from the source and emit subsequent elements (if any
left) e.g.:

  Source.empty[Int].drop(1).toList          // List()
  Source.fromValues(1, 2, 3).drop(1).toList // List(2 ,3)
  Source.fromValues(1).drop(2).toList       // List()
  • Loading branch information
geminicaprograms committed Oct 17, 2023
1 parent 161557a commit d738f3d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
20 changes: 19 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ox.channels

import ox.*

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -141,6 +141,24 @@ trait SourceOps[+T] { this: Source[T] =>

def take(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.take(n))

/** Drops `n` elements from this source and forwards subsequent elements to the returned channel.
*
* @param n
* Number of elements to be dropped.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].drop(1).toList // List()
* Source.fromValues(1, 2, 3).drop(1).toList // List(2 ,3)
* Source.fromValues(1).drop(2).toList // List()
* }
* }}}
*/
def drop(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.drop(n))

def filter(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.filter(f))

def transform[U](f: Iterator[T] => Iterator[U])(using Ox, StageCapacity): Source[U] =
Expand Down
29 changes: 29 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsDropTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsDropTest extends AnyFlatSpec with Matchers {
behavior of "Source.drop"

it should "not drop from the empty source" in supervised {
val s = Source.empty[Int]
s.drop(1).toList shouldBe List.empty
}

it should "drop elements from the source" in supervised {
val s = Source.fromValues(1, 2, 3)
s.drop(2).toList shouldBe List(3)
}

it should "return empty source when more elements than source length was dropped" in supervised {
val s = Source.fromValues(1, 2)
s.drop(3).toList shouldBe List.empty
}

it should "not drop when 'n == 0'" in supervised {
val s = Source.fromValues(1, 2, 3)
s.drop(0).toList shouldBe List(1, 2, 3)
}
}

0 comments on commit d738f3d

Please sign in to comment.