diff --git a/README.md b/README.md index d1bf650986..5fd2ca1b3e 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. ![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png) -Current version: `0.9.0` +Current version: `0.9.1` ## Word Count diff --git a/project/Build.scala b/project/Build.scala index 23d120b0a2..48ee95083a 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -247,7 +247,7 @@ object ScaldingBuild extends Build { //previousArtifact := Some("com.twitter" % "scalding-parquet_2.9.2" % "0.1.0"), previousArtifact := None, libraryDependencies ++= Seq( - "com.twitter" % "parquet-cascading" % "1.3.2", + "com.twitter" % "parquet-cascading" % "1.4.0", "org.slf4j" % "slf4j-api" % slf4jVersion, "org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided", "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test", diff --git a/scalding-core/src/main/scala/com/twitter/package.scala b/scalding-core/src/main/scala/com/twitter/package.scala index 86d8afc7fa..f3e63f5f14 100644 --- a/scalding-core/src/main/scala/com/twitter/package.scala +++ b/scalding-core/src/main/scala/com/twitter/package.scala @@ -33,7 +33,7 @@ package object scalding { /** * Make sure this is in sync with version.sbt */ - val scaldingVersion: String = "0.9.0" + val scaldingVersion: String = "0.9.1" object RichPathFilter { implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala index d4befed9be..77ed8be293 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala @@ -130,7 +130,6 @@ case class IdentityReduce[K, V1]( extends ReduceStep[K, V1] with Grouped[K, V1] { - override def withSortOrdering[U >: V1](so: Ordering[U]): IdentityValueSortedReduce[K, V1] = IdentityValueSortedReduce[K, V1](keyOrdering, mapped, so, reducers) @@ -138,7 +137,7 @@ case class IdentityReduce[K, V1]( copy(reducers = Some(red)) override def filterKeys(fn: K => Boolean) = - IteratorMappedReduce(keyOrdering, mapped.filterKeys(fn), {(_, iter: Iterator[V1]) => iter}, reducers) + UnsortedIdentityReduce(keyOrdering, mapped.filterKeys(fn), reducers) override def mapGroup[V3](fn: (K, Iterator[V1]) => Iterator[V3]) = IteratorMappedReduce(keyOrdering, mapped, fn, reducers) @@ -146,7 +145,7 @@ case class IdentityReduce[K, V1]( // It would be nice to return IdentityReduce here, but // the type constraints prevent it currently override def mapValues[V2](fn: V1 => V2) = - IteratorMappedReduce(keyOrdering, mapped.mapValues(fn), {(k, iter:Iterator[V2]) => iter}, reducers) + UnsortedIdentityReduce(keyOrdering, mapped.mapValues(fn), reducers) // This is not correct in the type-system, but would be nice to encode //override def mapValues[V3](fn: V1 => V3) = IdentityReduce(keyOrdering, mapped.mapValues(fn), reducers) @@ -154,7 +153,49 @@ case class IdentityReduce[K, V1]( override def sum[U >: V1](implicit sg: Semigroup[U]) = { // there is no sort, mapValueStream or force to reducers: val upipe: TypedPipe[(K, U)] = mapped // use covariance to set the type - IdentityReduce(keyOrdering, upipe.sumByLocalKeys, reducers).sumLeft + UnsortedIdentityReduce(keyOrdering, upipe.sumByLocalKeys, reducers).sumLeft + } + + override lazy val toTypedPipe = reducers match { + case None => mapped // free case + case Some(reds) => + // This is wierd, but it is sometimes used to force a partition + val reducedPipe = groupOp { _.reducers(reds) } + TypedPipe.from(reducedPipe, Grouped.kvFields)(tuple2Converter[K,V1]) + } + + /** This is just an identity that casts the result to V1 */ + override def joinFunction = { (k, iter, empties) => + assert(empties.isEmpty, "this join function should never be called with non-empty right-most") + iter.map(_.getObject(Grouped.ValuePosition).asInstanceOf[V1]) + } +} + +case class UnsortedIdentityReduce[K, V1]( + override val keyOrdering: Ordering[K], + override val mapped: TypedPipe[(K, V1)], + override val reducers: Option[Int]) + extends ReduceStep[K, V1] + with UnsortedGrouped[K, V1] { + + override def withReducers(red: Int): UnsortedIdentityReduce[K, V1] = + copy(reducers = Some(red)) + + override def filterKeys(fn: K => Boolean) = + UnsortedIdentityReduce(keyOrdering, mapped.filterKeys(fn), reducers) + + override def mapGroup[V3](fn: (K, Iterator[V1]) => Iterator[V3]) = + IteratorMappedReduce(keyOrdering, mapped, fn, reducers) + + // It would be nice to return IdentityReduce here, but + // the type constraints prevent it currently + override def mapValues[V2](fn: V1 => V2) = + UnsortedIdentityReduce(keyOrdering, mapped.mapValues(fn), reducers) + + override def sum[U >: V1](implicit sg: Semigroup[U]) = { + // there is no sort, mapValueStream or force to reducers: + val upipe: TypedPipe[(K, U)] = mapped // use covariance to set the type + UnsortedIdentityReduce(keyOrdering, upipe.sumByLocalKeys, reducers).sumLeft } override lazy val toTypedPipe = reducers match { diff --git a/version.sbt b/version.sbt index 6f46242528..be40d62284 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.9.0" +version in ThisBuild := "0.9.1"