Skip to content

Commit

Permalink
Merge branch 'release/0.10.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Coveney committed May 13, 2014
2 parents f1a85cc + 8ebdb25 commit dd104f2
Show file tree
Hide file tree
Showing 27 changed files with 654 additions and 45 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
.cache
.project
.settings
.classpath
*.swp
BUILD
target/
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ scala:
- 2.10.3
- 2.9.3
script:
- "sbt -Duser.name=$USER.$RANDOM -Dlog4j.configuration=file://$TRAVIS_BUILD_DIR/project/travis-log4j.properties ++$TRAVIS_SCALA_VERSION assembly"
- "./sbt -Duser.name=$USER.$RANDOM -Dlog4j.configuration=file://$TRAVIS_BUILD_DIR/project/travis-log4j.properties ++$TRAVIS_SCALA_VERSION assembly"
- "scripts/test_tutorials.sh"
jdk:
- oraclejdk7
Expand Down
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Scalding #

### Version 0.10.0 ###
* Upgrade cascading to 2.5.4, cascading jdbc to 2.5.2
* Adding an hdfs mode for the Scalding REPL
* Added implementation of PartitionSource with tests
* Add helper methods to KeyedList and TypedPipe
* Add addTrap to TypedPipe

### Version 0.9.0 ###
* Add join operations to TypedPipe that do not require grouping beforehand
* Fixed bug in size estimation of diagonal matrices
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs.

![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png)

Current version: `0.9.1`
Current version: `0.10.0`

## Word Count

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the
* resource managed by a particular {@link HBaseTap} instance.
*/
public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector {
public class HBaseTapCollector extends TupleEntrySchemeCollector<JobConf, TupleEntrySchemeCollector> implements OutputCollector {
/** Field LOG */
private static final Logger LOG = LoggerFactory.getLogger(HBaseTapCollector.class);
/** Field conf */
Expand All @@ -50,7 +50,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector implements Outp

/**
* Constructor TapCollector creates a new TapCollector instance.
*
*
* @param flowProcess
* of type FlowProcess
* @param tap
Expand Down Expand Up @@ -101,7 +101,7 @@ public void close() {
/**
* Method collect writes the given values to the {@link Tap} this instance
* encapsulates.
*
*
* @param writableComparable
* of type WritableComparable
* @param writable
Expand Down
14 changes: 10 additions & 4 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import com.typesafe.tools.mima.plugin.MimaKeys._
import scala.collection.JavaConverters._

object ScaldingBuild extends Build {
val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencies")

val sharedSettings = Project.defaultSettings ++ assemblySettings ++ Seq(
organization := "com.twitter",

Expand All @@ -22,7 +24,6 @@ object ScaldingBuild extends Build {

javacOptions in doc := Seq("-source", "1.6"),


libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test",
Expand All @@ -37,6 +38,11 @@ object ScaldingBuild extends Build {
"Twitter Maven" at "http://maven.twttr.com"
),

printDependencyClasspath := {
val cp = (dependencyClasspath in Compile).value
cp.foreach(f => println(s"${f.metadata.get(moduleID.key)} => ${f.data}"))
},

parallelExecution in Test := false,

scalacOptions ++= Seq("-unchecked", "-deprecation"),
Expand Down Expand Up @@ -151,7 +157,7 @@ object ScaldingBuild extends Build {
Some(subProj)
.filterNot(unreleasedModules.contains(_))
.map {
s => "com.twitter" % ("scalding-" + s + "_2.9.2") % "0.8.5"
s => "com.twitter" % ("scalding-" + s + "_2.9.2") % "0.10.0"
}

def module(name: String) = {
Expand All @@ -167,10 +173,10 @@ object ScaldingBuild extends Build {
lazy val scaldingDate = module("date")

lazy val cascadingVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.5.2")
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.5.4")

lazy val cascadingJDBCVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.5.1")
System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.5.2")

val hadoopVersion = "1.1.2"
val algebirdVersion = "0.5.0"
Expand Down
12 changes: 7 additions & 5 deletions scalding-args/src/main/scala/com/twitter/scalding/Args.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ limitations under the License.
*/
package com.twitter.scalding

case class ArgsException(message: String) extends RuntimeException(message)

/**
* The args class does a simple command line parsing. The rules are:
* keys start with one or more "-". Each key has zero or more values
Expand Down Expand Up @@ -94,7 +96,7 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable {
*/
def required(position: Int) : String = positional match {
case l if l.size > position => l(position)
case _ => sys.error("Please provide " + (position + 1) + " positional arguments")
case _ => throw ArgsException("Please provide " + (position + 1) + " positional arguments")
}

/**
Expand All @@ -121,9 +123,9 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable {
* If there is more than one value, you get an exception
*/
def required(key : String) : String = list(key) match {
case List() => sys.error("Please provide a value for --" + key)
case List() => throw ArgsException("Please provide a value for --" + key)
case List(a) => a
case _ => sys.error("Please only provide a single value for --" + key)
case _ => throw ArgsException("Please only provide a single value for --" + key)
}

def toList : List[String] = {
Expand All @@ -147,7 +149,7 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable {
*/
def restrictTo(acceptedArgs: Set[String]) : Unit = {
val invalidArgs = m.keySet.filter(!_.startsWith("scalding.")) -- (acceptedArgs + "" + "tool.graph" + "hdfs" + "local")
if (!invalidArgs.isEmpty) sys.error("Invalid args: " + invalidArgs.map("--" + _).mkString(", "))
if (!invalidArgs.isEmpty) throw ArgsException("Invalid args: " + invalidArgs.map("--" + _).mkString(", "))
}

// TODO: if there are spaces in the keys or values, this will not round-trip
Expand All @@ -160,6 +162,6 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable {
def optional(key : String) : Option[String] = list(key) match {
case List() => None
case List(a) => Some(a)
case _ => sys.error("Please provide at most one value for --" + key)
case _ => throw ArgsException("Please provide at most one value for --" + key)
}
}
3 changes: 2 additions & 1 deletion scalding-avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ https://github.com/ScaleUnlimited/cascading.avro .
In some case Kryo (the default serializer used by Scalding) doesn't work well with Avro objects. If you run in to
serialization errors, or if you want to preempt and trouble, you should add the following to your Job class:
```scala
override def ioSerializations = super.ioSerializations :+ "cascading.avro.serialization.AvroSpecificRecordSerialization"
override def ioSerializations =
super.ioSerializations :+ classOf[cascading.avro.serialization.AvroSpecificRecordSerialization[_]]
```

This will use cascading.avro's Avro SpecificRecord serialization for Avro objects in place of the Kryo serialization.
Expand Down
2 changes: 1 addition & 1 deletion scalding-core/src/main/scala/com/twitter/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ package object scalding {
/**
* Make sure this is in sync with version.sbt
*/
val scaldingVersion: String = "0.9.1"
val scaldingVersion: String = "0.10.0"

object RichPathFilter {
implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ abstract class SchemedSource extends Source {

/** The scheme to use if the source is local. */
def localScheme: Scheme[Properties, InputStream, OutputStream, _, _] =
sys.error("Cascading local mode not supported for: " + toString)
throw ModeException("Cascading local mode not supported for: " + toString)

/** The scheme to use if the source is on hdfs. */
def hdfsScheme: Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_] =
sys.error("Cascading Hadoop mode not supported for: " + toString)
throw ModeException("Cascading Hadoop mode not supported for: " + toString)

// The mode to use for output taps determining how conflicts with existing output are handled.
val sinkMode: SinkMode = SinkMode.REPLACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class IterableSource[+T](@transient iter: Iterable[T], inFields : Fields =
case Test(_) => new MemoryTap[InputStream,OutputStream](new NullScheme(fields, fields), asBuffer)
case Hdfs(_, _) => hdfsTap
case HadoopTest(_,_) => hdfsTap
case _ => sys.error("Unsupported mode for IterableSource: " + mode.toString)
case _ => throw ModeException("Unsupported mode for IterableSource: " + mode.toString)
}
}
}
4 changes: 2 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class Job(val args : Args) extends FieldConversions with java.io.Serializable {
* Note that Mappable is a subclass of Source, and Mappable already
* has mapTo and flatMapTo BUT WITHOUT incoming fields used (see
* the Mappable trait). This creates some confusion when using these methods
* (this is an unfortuate mistake in our design that was not noticed until later).
* (this is an unfortunate mistake in our design that was not noticed until later).
* To remove ambiguity, explicitly call .read on any Source that you begin
* operating with a mapTo/flatMapTo.
*/
Expand All @@ -113,7 +113,7 @@ class Job(val args : Args) extends FieldConversions with java.io.Serializable {
(implicit set: TupleSetter[T], conv : TupleConverter[T]): RichPipe =
RichPipe(toPipe(iter)(set, conv))

// Override this if you want change how the mapred.job.name is written in Hadoop
// Override this if you want to change how the mapred.job.name is written in Hadoop
def name : String = getClass.getName

//This is the FlowDef used by all Sources this job creates
Expand Down
11 changes: 11 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.scalding

import scala.collection.mutable.{Buffer, ListBuffer}
import scala.collection.JavaConverters._
import scala.annotation.tailrec
import cascading.tuple.Tuple
import cascading.tuple.TupleEntry
Expand Down Expand Up @@ -99,6 +100,9 @@ class JobTest(cons : (Args) => Job) {
this
}

def typedSink[A](s: Source with TypedSink[A])(op: Buffer[A] => Unit)(implicit conv: TupleConverter[A]) =
sink[A](s)(op)

// Used to pass an assertion about a counter defined by the given group and name.
// If this test is checking for multiple jobs chained by next, this only checks
// for the counters in the final job's FlowStat.
Expand Down Expand Up @@ -169,6 +173,13 @@ class JobTest(cons : (Args) => Job) {
@tailrec
private final def runJob(job : Job, runNext : Boolean) : Unit = {

// create cascading 3.0 planner trace files during tests
if (System.getenv.asScala.getOrElse("SCALDING_CASCADING3_DEBUG", "0") == "1") {
System.setProperty("cascading.planner.plan.path", "target/test/cascading/traceplan/" + job.name)
System.setProperty("cascading.planner.plan.transforms.path", "target/test/cascading/traceplan/" + job.name + "/transform")
System.setProperty("cascading.planner.stats.path", "target/test/cascading/traceplan/" + job.name + "/stats")
}

job.run
// Make sure to clean the state:
job.clear
Expand Down
4 changes: 3 additions & 1 deletion scalding-core/src/main/scala/com/twitter/scalding/Mode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import scala.collection.mutable.{Map => MMap}
import scala.collection.mutable.{Set => MSet}
import scala.collection.mutable.{Iterable => MIterable}

case class ModeException(message: String) extends RuntimeException(message)

object Mode {
/** This is a Args and a Mode together. It is used purely as
* a work-around for the fact that Job only accepts an Args object,
Expand Down Expand Up @@ -71,7 +73,7 @@ object Mode {
else if (args.boolean("hdfs"))
Hdfs(strictSources, config)
else
sys.error("[ERROR] Mode must be one of --local or --hdfs, you provided neither")
throw ArgsException("[ERROR] Mode must be one of --local or --hdfs, you provided neither")
}
}

Expand Down
Loading

0 comments on commit dd104f2

Please sign in to comment.