Skip to content

Commit

Permalink
feat: implement takeWhile function
Browse files Browse the repository at this point in the history
Sends elements to the returned channel until predicate is satisfied.
Note that if the predicate fails then subsequent elements are not longer
taken even if they could still satisfy it. Example:

  Source.empty[Int].takeWhile(_ > 3).toList          // List()
  Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
  Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
  • Loading branch information
geminicaprograms authored Oct 17, 2023
1 parent 23be5b5 commit fe50670
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
22 changes: 20 additions & 2 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package ox.channels
import ox.*

import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.mutable
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.collection.IterableOnce

trait SourceOps[+T] { this: Source[T] =>
// view ops (lazy)
Expand Down Expand Up @@ -203,6 +202,25 @@ trait SourceOps[+T] { this: Source[T] =>

def take(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.take(n))

/** Sends elements to the returned channel until predicate `f` is satisfied (returns `true`). Note that when the predicate `f` is not
* satisfied (returns `false`), subsequent elements are dropped even if they could still satisfy it.
*
* @param f
* A predicate function.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].takeWhile(_ > 3).toList // List()
* Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
* Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
* }
* }}}
*/
def takeWhile(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.takeWhile(f))

/** Drops `n` elements from this source and forwards subsequent elements to the returned channel.
*
* @param n
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTakeWhileTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsTakeWhileTest extends AnyFlatSpec with Matchers {
behavior of "Source.takeWhile"

it should "not take from the empty source" in supervised {
val s = Source.empty[Int]
s.takeWhile(_ < 3).toList shouldBe List.empty
}

it should "take as long as predicate is satisfied" in supervised {
val s = Source.fromValues(1, 2, 3)
s.takeWhile(_ < 3).toList shouldBe List(1, 2)
}

it should "not take if predicate fails for first or more elements" in supervised {
val s = Source.fromValues(3, 2, 1)
s.takeWhile(_ < 3).toList shouldBe List()
}
}

0 comments on commit fe50670

Please sign in to comment.