Skip to content

Commit

Permalink
feat: implement takeWhile function
Browse files Browse the repository at this point in the history
Sends elements to the returned channel until predicate is satisfied.
Note that if the predicate fails then subsequent elements are not longer
taken even if they could still satisfy it. Example:

  Source.empty[Int].takeWhile(_ > 3).toList          // List()
  Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
  Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
  • Loading branch information
geminicaprograms committed Oct 17, 2023
1 parent 161557a commit f9bd0b1
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
21 changes: 20 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ox.channels

import ox.*

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -141,6 +141,25 @@ trait SourceOps[+T] { this: Source[T] =>

def take(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.take(n))

/** Sends elements to the returned channel until predicate `f` is satisfied (returns `true`). Note that when the predicate `f` is not
* satisfied (returns `false`), subsequent elements are dropped even if they could still satisfy it.
*
* @param f
* A predicate function.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].takeWhile(_ > 3).toList // List()
* Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
* Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
* }
* }}}
*/
def takeWhile(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.takeWhile(f))

def filter(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.filter(f))

def transform[U](f: Iterator[T] => Iterator[U])(using Ox, StageCapacity): Source[U] =
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTakeWhileTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsTakeWhileTest extends AnyFlatSpec with Matchers {
behavior of "Source.takeWhile"

it should "not take from the empty source" in scoped {
val s = Source.empty[Int]
s.takeWhile(_ < 3).toList shouldBe List.empty
}

it should "take as long as predicate is satisfied" in scoped {
val s = Source.fromValues(1, 2, 3)
s.takeWhile(_ < 3).toList shouldBe List(1, 2)
}

it should "not take if predicate fails for first or more elements" in scoped {
val s = Source.fromValues(3, 2, 1)
s.takeWhile(_ < 3).toList shouldBe List()
}
}

0 comments on commit f9bd0b1

Please sign in to comment.