Skip to content

Commit

Permalink
Merge branch 'release/0.9.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Apr 3, 2014
2 parents 7106617 + 7851d27 commit f1a85cc
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs.

![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png)

Current version: `0.9.0`
Current version: `0.9.1`

## Word Count

Expand Down
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ object ScaldingBuild extends Build {
//previousArtifact := Some("com.twitter" % "scalding-parquet_2.9.2" % "0.1.0"),
previousArtifact := None,
libraryDependencies ++= Seq(
"com.twitter" % "parquet-cascading" % "1.3.2",
"com.twitter" % "parquet-cascading" % "1.4.0",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test",
Expand Down
2 changes: 1 addition & 1 deletion scalding-core/src/main/scala/com/twitter/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ package object scalding {
/**
* Make sure this is in sync with version.sbt
*/
val scaldingVersion: String = "0.9.0"
val scaldingVersion: String = "0.9.1"

object RichPathFilter {
implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,31 +130,72 @@ case class IdentityReduce[K, V1](
extends ReduceStep[K, V1]
with Grouped[K, V1] {


override def withSortOrdering[U >: V1](so: Ordering[U]): IdentityValueSortedReduce[K, V1] =
IdentityValueSortedReduce[K, V1](keyOrdering, mapped, so, reducers)

override def withReducers(red: Int): IdentityReduce[K, V1] =
copy(reducers = Some(red))

override def filterKeys(fn: K => Boolean) =
IteratorMappedReduce(keyOrdering, mapped.filterKeys(fn), {(_, iter: Iterator[V1]) => iter}, reducers)
UnsortedIdentityReduce(keyOrdering, mapped.filterKeys(fn), reducers)

override def mapGroup[V3](fn: (K, Iterator[V1]) => Iterator[V3]) =
IteratorMappedReduce(keyOrdering, mapped, fn, reducers)

// It would be nice to return IdentityReduce here, but
// the type constraints prevent it currently
override def mapValues[V2](fn: V1 => V2) =
IteratorMappedReduce(keyOrdering, mapped.mapValues(fn), {(k, iter:Iterator[V2]) => iter}, reducers)
UnsortedIdentityReduce(keyOrdering, mapped.mapValues(fn), reducers)

// This is not correct in the type-system, but would be nice to encode
//override def mapValues[V3](fn: V1 => V3) = IdentityReduce(keyOrdering, mapped.mapValues(fn), reducers)

override def sum[U >: V1](implicit sg: Semigroup[U]) = {
// there is no sort, mapValueStream or force to reducers:
val upipe: TypedPipe[(K, U)] = mapped // use covariance to set the type
IdentityReduce(keyOrdering, upipe.sumByLocalKeys, reducers).sumLeft
UnsortedIdentityReduce(keyOrdering, upipe.sumByLocalKeys, reducers).sumLeft
}

override lazy val toTypedPipe = reducers match {
case None => mapped // free case
case Some(reds) =>
// This is wierd, but it is sometimes used to force a partition
val reducedPipe = groupOp { _.reducers(reds) }
TypedPipe.from(reducedPipe, Grouped.kvFields)(tuple2Converter[K,V1])
}

/** This is just an identity that casts the result to V1 */
override def joinFunction = { (k, iter, empties) =>
assert(empties.isEmpty, "this join function should never be called with non-empty right-most")
iter.map(_.getObject(Grouped.ValuePosition).asInstanceOf[V1])
}
}

case class UnsortedIdentityReduce[K, V1](
override val keyOrdering: Ordering[K],
override val mapped: TypedPipe[(K, V1)],
override val reducers: Option[Int])
extends ReduceStep[K, V1]
with UnsortedGrouped[K, V1] {

override def withReducers(red: Int): UnsortedIdentityReduce[K, V1] =
copy(reducers = Some(red))

override def filterKeys(fn: K => Boolean) =
UnsortedIdentityReduce(keyOrdering, mapped.filterKeys(fn), reducers)

override def mapGroup[V3](fn: (K, Iterator[V1]) => Iterator[V3]) =
IteratorMappedReduce(keyOrdering, mapped, fn, reducers)

// It would be nice to return IdentityReduce here, but
// the type constraints prevent it currently
override def mapValues[V2](fn: V1 => V2) =
UnsortedIdentityReduce(keyOrdering, mapped.mapValues(fn), reducers)

override def sum[U >: V1](implicit sg: Semigroup[U]) = {
// there is no sort, mapValueStream or force to reducers:
val upipe: TypedPipe[(K, U)] = mapped // use covariance to set the type
UnsortedIdentityReduce(keyOrdering, upipe.sumByLocalKeys, reducers).sumLeft
}

override lazy val toTypedPipe = reducers match {
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

version in ThisBuild := "0.9.0"
version in ThisBuild := "0.9.1"

0 comments on commit f1a85cc

Please sign in to comment.