From 7008074cdd3bbde91f4fa4db547f2dabf9f49263 Mon Sep 17 00:00:00 2001 From: Satya Prakash Date: Sun, 19 May 2013 20:30:05 +0530 Subject: [PATCH 1/3] dropTake - drop first drop element and thne take next take elements --- core/src/main/scala/spark/RDD.scala | 29 ++++++++++++++++++++++++ core/src/test/scala/spark/RDDSuite.scala | 2 ++ 2 files changed, 31 insertions(+) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index dde131696f..612ce14788 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -713,6 +713,35 @@ abstract class RDD[T: ClassManifest]( return buf.toArray } + /** + * Drop the first drop elements and then take next num elements of the RDD. This currently scans the partitions *one by one*, so + * it will be slow if a lot of partitions are required. In that case, use dropCollect(drop) to get the + * whole RDD instead. + */ + def dropTake(drop: Int, num: Int): Array[T] = { + if (num == 0) { + return new Array[T](0) + } + val buf = new ArrayBuffer[T] + var p = 0 + var dropped = 0 + while (buf.size < num && p < partitions.size) { + val left = num - buf.size + val res = sc.runJob(this, (it: Iterator[T]) => { + while ((drop - dropped) > 0 && it.hasNext) { + it.next() + dropped += 1 + } + it.take(left).toArray + }, Array(p), true) + buf ++= res(0) + if (buf.size == num) + return buf.toArray + p += 1 + } + return buf.toArray + } + /** * Return the first element in this RDD. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index a761dd77c5..9a925e06f4 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -26,6 +26,8 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) + assert(nums.take(2).toList === List(1, 2)) + assert(nums.dropTake(1,2).toList === List(2, 3)) assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4))) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) From 14023065199ce57f5eb31374bd4bdbe4356f7137 Mon Sep 17 00:00:00 2001 From: Satya Prakash Date: Sun, 19 May 2013 21:53:44 +0530 Subject: [PATCH 2/3] Drop is now accumulated from how many the salves could drop --- core/src/main/scala/spark/RDD.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 612ce14788..905a3df9b1 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -724,15 +724,22 @@ abstract class RDD[T: ClassManifest]( } val buf = new ArrayBuffer[T] var p = 0 - var dropped = 0 + var dropped = sc.accumulator(0) while (buf.size < num && p < partitions.size) { val left = num - buf.size + val accDropped = dropped.value + //still in driver val res = sc.runJob(this, (it: Iterator[T]) => { - while ((drop - dropped) > 0 && it.hasNext) { + var leftToDrop = drop - accDropped + while (leftToDrop > 0 && it.hasNext) { it.next() - dropped += 1 + leftToDrop -= 1 } - it.take(left).toArray + //accumulate all that have been dropped here + dropped += drop - leftToDrop + //if still left to drop then don't take + val taken = if (leftToDrop > 0) it.take(0) else it.take(left) + taken.toArray }, Array(p), true) buf ++= res(0) if (buf.size == num) From d35f435f100e17a5589a83a6262449ee296c1b47 Mon Sep 17 00:00:00 2001 From: Satya Prakash Date: Mon, 20 May 2013 17:50:41 +0530 Subject: [PATCH 3/3] More test and corrections --- core/src/main/scala/spark/RDD.scala | 8 ++++---- core/src/test/scala/spark/RDDSuite.scala | 11 ++++++++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 905a3df9b1..bfb9a7bf47 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -715,8 +715,8 @@ abstract class RDD[T: ClassManifest]( /** * Drop the first drop elements and then take next num elements of the RDD. This currently scans the partitions *one by one*, so - * it will be slow if a lot of partitions are required. In that case, use dropCollect(drop) to get the - * whole RDD instead. + * it will be slow if a lot of partitions are required. In that case, use collect().drop(drop) to get the + * whole RDD instead and drop the required drop elements. */ def dropTake(drop: Int, num: Int): Array[T] = { if (num == 0) { @@ -727,8 +727,8 @@ abstract class RDD[T: ClassManifest]( var dropped = sc.accumulator(0) while (buf.size < num && p < partitions.size) { val left = num - buf.size + //read dropped so far from accumulator val accDropped = dropped.value - //still in driver val res = sc.runJob(this, (it: Iterator[T]) => { var leftToDrop = drop - accDropped while (leftToDrop > 0 && it.hasNext) { @@ -736,7 +736,7 @@ abstract class RDD[T: ClassManifest]( leftToDrop -= 1 } //accumulate all that have been dropped here - dropped += drop - leftToDrop + dropped += (drop - accDropped) - leftToDrop //if still left to drop then don't take val taken = if (leftToDrop > 0) it.take(0) else it.take(left) taken.toArray diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 9a925e06f4..37eec0431d 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -27,7 +27,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) assert(nums.take(2).toList === List(1, 2)) - assert(nums.dropTake(1,2).toList === List(2, 3)) assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4))) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) @@ -45,6 +44,16 @@ class RDDSuite extends FunSuite with LocalSparkContext { intercept[UnsupportedOperationException] { nums.filter(_ > 5).reduce(_ + _) } + val sixteen = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16), 4) + //drop none + assert(sixteen.dropTake(0, 2).toList === List(1, 2)) + //drop only from first partition + assert(sixteen.dropTake(2, 2).toList === List(3, 4)) + //drop(4+2) all 4 from first and 2 from second partition. take 2. + assert(sixteen.dropTake(6, 2).toList === List(7, 8)) + //drop(4+4+2) all 4 from first and second and 2 from third partition and take 6 values. + //The take should spill over to the next partition + assert(sixteen.dropTake(10, 6).toList === List(11, 12, 13, 14, 15, 16)) } test("SparkContext.union") {