diff --git a/.gitignore b/.gitignore index d3ad4e7030..8c234da4c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +.cache +.project +.settings +.classpath *.swp BUILD target/ diff --git a/.travis.yml b/.travis.yml index 05c0228325..4d481dabd8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ scala: - 2.10.3 - 2.9.3 script: - - "sbt -Duser.name=$USER.$RANDOM -Dlog4j.configuration=file://$TRAVIS_BUILD_DIR/project/travis-log4j.properties ++$TRAVIS_SCALA_VERSION assembly" + - "./sbt -Duser.name=$USER.$RANDOM -Dlog4j.configuration=file://$TRAVIS_BUILD_DIR/project/travis-log4j.properties ++$TRAVIS_SCALA_VERSION assembly" - "scripts/test_tutorials.sh" jdk: - oraclejdk7 diff --git a/CHANGES.md b/CHANGES.md index cd0f5bcbb8..3629492df8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Scalding # +### Version 0.10.0 ### +* Upgrade cascading to 2.5.4, cascading jdbc to 2.5.2 +* Adding an hdfs mode for the Scalding REPL +* Added implementation of PartitionSource with tests +* Add helper methods to KeyedList and TypedPipe +* Add addTrap to TypedPipe + ### Version 0.9.0 ### * Add join operations to TypedPipe that do not require grouping beforehand * Fixed bug in size estimation of diagonal matrices diff --git a/README.md b/README.md index 5fd2ca1b3e..62c7f2362e 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. ![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png) -Current version: `0.9.1` +Current version: `0.10.0` ## Word Count diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java index ceb1a39c87..f5ad1ed2dd 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java @@ -34,7 +34,7 @@ * {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the * resource managed by a particular {@link HBaseTap} instance. */ -public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector { +public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector { /** Field LOG */ private static final Logger LOG = LoggerFactory.getLogger(HBaseTapCollector.class); /** Field conf */ @@ -50,7 +50,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector implements Outp /** * Constructor TapCollector creates a new TapCollector instance. - * + * * @param flowProcess * of type FlowProcess * @param tap @@ -101,7 +101,7 @@ public void close() { /** * Method collect writes the given values to the {@link Tap} this instance * encapsulates. - * + * * @param writableComparable * of type WritableComparable * @param writable diff --git a/project/Build.scala b/project/Build.scala index 48ee95083a..69e55657c2 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -10,6 +10,8 @@ import com.typesafe.tools.mima.plugin.MimaKeys._ import scala.collection.JavaConverters._ object ScaldingBuild extends Build { + val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencies") + val sharedSettings = Project.defaultSettings ++ assemblySettings ++ Seq( organization := "com.twitter", @@ -22,7 +24,6 @@ object ScaldingBuild extends Build { javacOptions in doc := Seq("-source", "1.6"), - libraryDependencies ++= Seq( "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", "org.scala-tools.testing" %% "specs" % "1.6.9" % "test", @@ -37,6 +38,11 @@ object ScaldingBuild extends Build { "Twitter Maven" at "http://maven.twttr.com" ), + printDependencyClasspath := { + val cp = (dependencyClasspath in Compile).value + cp.foreach(f => println(s"${f.metadata.get(moduleID.key)} => ${f.data}")) + }, + parallelExecution in Test := false, scalacOptions ++= Seq("-unchecked", "-deprecation"), @@ -151,7 +157,7 @@ object ScaldingBuild extends Build { Some(subProj) .filterNot(unreleasedModules.contains(_)) .map { - s => "com.twitter" % ("scalding-" + s + "_2.9.2") % "0.8.5" + s => "com.twitter" % ("scalding-" + s + "_2.9.2") % "0.10.0" } def module(name: String) = { @@ -167,10 +173,10 @@ object ScaldingBuild extends Build { lazy val scaldingDate = module("date") lazy val cascadingVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.5.2") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.5.4") lazy val cascadingJDBCVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.5.1") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.5.2") val hadoopVersion = "1.1.2" val algebirdVersion = "0.5.0" diff --git a/scalding-args/src/main/scala/com/twitter/scalding/Args.scala b/scalding-args/src/main/scala/com/twitter/scalding/Args.scala index 82d2c04b4e..e790422c5c 100644 --- a/scalding-args/src/main/scala/com/twitter/scalding/Args.scala +++ b/scalding-args/src/main/scala/com/twitter/scalding/Args.scala @@ -15,6 +15,8 @@ limitations under the License. */ package com.twitter.scalding +case class ArgsException(message: String) extends RuntimeException(message) + /** * The args class does a simple command line parsing. The rules are: * keys start with one or more "-". Each key has zero or more values @@ -94,7 +96,7 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable { */ def required(position: Int) : String = positional match { case l if l.size > position => l(position) - case _ => sys.error("Please provide " + (position + 1) + " positional arguments") + case _ => throw ArgsException("Please provide " + (position + 1) + " positional arguments") } /** @@ -121,9 +123,9 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable { * If there is more than one value, you get an exception */ def required(key : String) : String = list(key) match { - case List() => sys.error("Please provide a value for --" + key) + case List() => throw ArgsException("Please provide a value for --" + key) case List(a) => a - case _ => sys.error("Please only provide a single value for --" + key) + case _ => throw ArgsException("Please only provide a single value for --" + key) } def toList : List[String] = { @@ -147,7 +149,7 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable { */ def restrictTo(acceptedArgs: Set[String]) : Unit = { val invalidArgs = m.keySet.filter(!_.startsWith("scalding.")) -- (acceptedArgs + "" + "tool.graph" + "hdfs" + "local") - if (!invalidArgs.isEmpty) sys.error("Invalid args: " + invalidArgs.map("--" + _).mkString(", ")) + if (!invalidArgs.isEmpty) throw ArgsException("Invalid args: " + invalidArgs.map("--" + _).mkString(", ")) } // TODO: if there are spaces in the keys or values, this will not round-trip @@ -160,6 +162,6 @@ class Args(val m : Map[String,List[String]]) extends java.io.Serializable { def optional(key : String) : Option[String] = list(key) match { case List() => None case List(a) => Some(a) - case _ => sys.error("Please provide at most one value for --" + key) + case _ => throw ArgsException("Please provide at most one value for --" + key) } } diff --git a/scalding-avro/README.md b/scalding-avro/README.md index 8a80f8381c..eacffbe7a1 100644 --- a/scalding-avro/README.md +++ b/scalding-avro/README.md @@ -4,7 +4,8 @@ https://github.com/ScaleUnlimited/cascading.avro . In some case Kryo (the default serializer used by Scalding) doesn't work well with Avro objects. If you run in to serialization errors, or if you want to preempt and trouble, you should add the following to your Job class: ```scala -override def ioSerializations = super.ioSerializations :+ "cascading.avro.serialization.AvroSpecificRecordSerialization" +override def ioSerializations = + super.ioSerializations :+ classOf[cascading.avro.serialization.AvroSpecificRecordSerialization[_]] ``` This will use cascading.avro's Avro SpecificRecord serialization for Avro objects in place of the Kryo serialization. diff --git a/scalding-core/src/main/scala/com/twitter/package.scala b/scalding-core/src/main/scala/com/twitter/package.scala index f3e63f5f14..14c1368a0d 100644 --- a/scalding-core/src/main/scala/com/twitter/package.scala +++ b/scalding-core/src/main/scala/com/twitter/package.scala @@ -33,7 +33,7 @@ package object scalding { /** * Make sure this is in sync with version.sbt */ - val scaldingVersion: String = "0.9.1" + val scaldingVersion: String = "0.10.0" object RichPathFilter { implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala index 3da6cd9092..9106c12570 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala @@ -49,11 +49,11 @@ abstract class SchemedSource extends Source { /** The scheme to use if the source is local. */ def localScheme: Scheme[Properties, InputStream, OutputStream, _, _] = - sys.error("Cascading local mode not supported for: " + toString) + throw ModeException("Cascading local mode not supported for: " + toString) /** The scheme to use if the source is on hdfs. */ def hdfsScheme: Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_] = - sys.error("Cascading Hadoop mode not supported for: " + toString) + throw ModeException("Cascading Hadoop mode not supported for: " + toString) // The mode to use for output taps determining how conflicts with existing output are handled. val sinkMode: SinkMode = SinkMode.REPLACE diff --git a/scalding-core/src/main/scala/com/twitter/scalding/IterableSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/IterableSource.scala index 5a3eeca11b..d815c38f8f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/IterableSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/IterableSource.scala @@ -68,7 +68,7 @@ case class IterableSource[+T](@transient iter: Iterable[T], inFields : Fields = case Test(_) => new MemoryTap[InputStream,OutputStream](new NullScheme(fields, fields), asBuffer) case Hdfs(_, _) => hdfsTap case HadoopTest(_,_) => hdfsTap - case _ => sys.error("Unsupported mode for IterableSource: " + mode.toString) + case _ => throw ModeException("Unsupported mode for IterableSource: " + mode.toString) } } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala index a3f6a6ecba..fba3aef7f5 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala @@ -99,7 +99,7 @@ class Job(val args : Args) extends FieldConversions with java.io.Serializable { * Note that Mappable is a subclass of Source, and Mappable already * has mapTo and flatMapTo BUT WITHOUT incoming fields used (see * the Mappable trait). This creates some confusion when using these methods - * (this is an unfortuate mistake in our design that was not noticed until later). + * (this is an unfortunate mistake in our design that was not noticed until later). * To remove ambiguity, explicitly call .read on any Source that you begin * operating with a mapTo/flatMapTo. */ @@ -113,7 +113,7 @@ class Job(val args : Args) extends FieldConversions with java.io.Serializable { (implicit set: TupleSetter[T], conv : TupleConverter[T]): RichPipe = RichPipe(toPipe(iter)(set, conv)) - // Override this if you want change how the mapred.job.name is written in Hadoop + // Override this if you want to change how the mapred.job.name is written in Hadoop def name : String = getClass.getName //This is the FlowDef used by all Sources this job creates diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala index 4e88cef5ff..e5fba684c1 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala @@ -1,6 +1,7 @@ package com.twitter.scalding import scala.collection.mutable.{Buffer, ListBuffer} +import scala.collection.JavaConverters._ import scala.annotation.tailrec import cascading.tuple.Tuple import cascading.tuple.TupleEntry @@ -99,6 +100,9 @@ class JobTest(cons : (Args) => Job) { this } + def typedSink[A](s: Source with TypedSink[A])(op: Buffer[A] => Unit)(implicit conv: TupleConverter[A]) = + sink[A](s)(op) + // Used to pass an assertion about a counter defined by the given group and name. // If this test is checking for multiple jobs chained by next, this only checks // for the counters in the final job's FlowStat. @@ -169,6 +173,13 @@ class JobTest(cons : (Args) => Job) { @tailrec private final def runJob(job : Job, runNext : Boolean) : Unit = { + // create cascading 3.0 planner trace files during tests + if (System.getenv.asScala.getOrElse("SCALDING_CASCADING3_DEBUG", "0") == "1") { + System.setProperty("cascading.planner.plan.path", "target/test/cascading/traceplan/" + job.name) + System.setProperty("cascading.planner.plan.transforms.path", "target/test/cascading/traceplan/" + job.name + "/transform") + System.setProperty("cascading.planner.stats.path", "target/test/cascading/traceplan/" + job.name + "/stats") + } + job.run // Make sure to clean the state: job.clear diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index 45adca0eef..7dab7e5259 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -39,6 +39,8 @@ import scala.collection.mutable.{Map => MMap} import scala.collection.mutable.{Set => MSet} import scala.collection.mutable.{Iterable => MIterable} +case class ModeException(message: String) extends RuntimeException(message) + object Mode { /** This is a Args and a Mode together. It is used purely as * a work-around for the fact that Job only accepts an Args object, @@ -71,7 +73,7 @@ object Mode { else if (args.boolean("hdfs")) Hdfs(strictSources, config) else - sys.error("[ERROR] Mode must be one of --local or --hdfs, you provided neither") + throw ArgsException("[ERROR] Mode must be one of --local or --hdfs, you provided neither") } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala new file mode 100644 index 0000000000..17a73e472e --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala @@ -0,0 +1,173 @@ +/* +Copyright 2014 Snowplow Analytics Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding + +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.RecordReader +import org.apache.hadoop.mapred.OutputCollector + +import cascading.scheme.hadoop.{ TextDelimited => CHTextDelimited } +import cascading.scheme.hadoop.TextLine.Compress +import cascading.scheme.Scheme +import cascading.tap.hadoop.Hfs +import cascading.tap.hadoop.{ PartitionTap => HPartitionTap } +import cascading.tap.local.FileTap +import cascading.tap.local.{ PartitionTap => LPartitionTap } +import cascading.tap.partition.DelimitedPartition +import cascading.tap.partition.Partition +import cascading.tap.SinkMode +import cascading.tap.Tap +import cascading.tuple.Fields + +/** +* This is a base class for partition-based output sources +*/ +abstract class PartitionSource extends SchemedSource { + + // The root path of the partitioned output. + def basePath: String + // The partition. + def partition: Partition = new DelimitedPartition(Fields.ALL, "/") + + /** + * Creates the partition tap. + * + * @param readOrWrite Describes if this source is being read from or written to. + * @param mode The mode of the job. (implicit) + * + * @returns A cascading PartitionTap. + */ + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + readOrWrite match { + case Read => throw new InvalidSourceException("Using PartitionSource for input not yet implemented") + case Write => { + mode match { + case Local(_) => { + val localTap = new FileTap(localScheme, basePath, sinkMode) + new LPartitionTap(localTap, partition) + } + case hdfsMode @ Hdfs(_, _) => { + val hfsTap = new Hfs(hdfsScheme, basePath, sinkMode) + new HPartitionTap(hfsTap, partition) + } + case hdfsTest @ HadoopTest(_, _) => { + val hfsTap = new Hfs(hdfsScheme, hdfsTest.getWritePathFor(this), sinkMode) + new HPartitionTap(hfsTap, partition) + } + case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite) + } + } + } + } + + /** + * Validates the taps, makes sure there are no nulls in the path. + * + * @param mode The mode of the job. + */ + override def validateTaps(mode: Mode): Unit = { + if (basePath == null) { + throw new InvalidSourceException("basePath cannot be null for PartitionTap") + } + } +} + +/** + * An implementation of TSV output, split over a partition tap. + * + * Similar to TemplateSource, but with addition of tsvFields, to + * let users explicitly specify which fields they want to see in + * the TSV (allows user to discard path fields). + * + * apply assumes user wants a DelimitedPartition (the only + * strategy bundled with Cascading). + * + * @param basePath The root path for the output. + * @param delimiter The path delimiter, defaults to / to create sub-directory bins. + * @param pathFields The set of fields to apply to the path. + * @param writeHeader Flag to indicate that the header should be written to the file. + * @param tsvFields The set of fields to include in the TSV output. + * @param sinkMode How to handle conflicts with existing output. + */ +object PartitionedTsv { + def apply( + basePath: String, + delimiter: String = "/", + pathFields: Fields = Fields.ALL, + writeHeader: Boolean = false, + tsvFields: Fields = Fields.ALL, + sinkMode: SinkMode = SinkMode.REPLACE + ) = new PartitionedTsv(basePath, new DelimitedPartition(pathFields, delimiter), writeHeader, tsvFields, sinkMode) +} + +/** + * An implementation of TSV output, split over a partition tap. + * + * @param basePath The root path for the output. + * @param partition The partitioning strategy to use. + * @param writeHeader Flag to indicate that the header should be written to the file. + * @param sinkMode How to handle conflicts with existing output. + */ +case class PartitionedTsv( + override val basePath: String, + override val partition: Partition, + override val writeHeader: Boolean, + val tsvFields: Fields, + override val sinkMode: SinkMode) + extends PartitionSource with DelimitedScheme { + + override val fields = tsvFields +} + +/** + * An implementation of SequenceFile output, split over a partition tap. + * + * apply assumes user wants a DelimitedPartition (the only + * strategy bundled with Cascading). + * + * @param basePath The root path for the output. + * @param delimiter The path delimiter, defaults to / to create sub-directory bins. + * @param pathFields The set of fields to apply to the path. + * @param sequenceFields The set of fields to use for the sequence file. + * @param sinkMode How to handle conflicts with existing output. + */ +object PartitionedSequenceFile { + def apply( + basePath: String, + delimiter: String = "/", + pathFields: Fields = Fields.ALL, + sequenceFields: Fields = Fields.ALL, + sinkMode: SinkMode = SinkMode.REPLACE + ) = new PartitionedSequenceFile(basePath, new DelimitedPartition(pathFields, delimiter), sequenceFields, sinkMode) +} + +/** + * An implementation of SequenceFile output, split over a partition tap. + * + * @param basePath The root path for the output. + * @param partition The partitioning strategy to use. + * @param sequenceFields The set of fields to use for the sequence file. + * @param sinkMode How to handle conflicts with existing output. + */ +case class PartitionedSequenceFile( + override val basePath: String, + override val partition: Partition, + val sequenceFields: Fields, + override val sinkMode: SinkMode) + extends PartitionSource with SequenceFileScheme { + + override val fields = sequenceFields +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala index 6f5feecd10..7d4f5e95d3 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala @@ -57,7 +57,7 @@ object Stats { // flowStats is used after that. Returns None if neither is defined. def getCounterValue(counter: String, group: String = ScaldingGroup) (implicit cascadingStats: CascadingStats): Long = - cascadingStats.getCounterValue(ScaldingGroup, counter) + cascadingStats.getCounterValue(group, counter) // Returns a map of all custom counter names and their counts. def getAllCustomCounters()(implicit cascadingStats: CascadingStats): Map[String, Long] = { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala b/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala index 265257962e..1e8d0fc654 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala @@ -42,7 +42,7 @@ class Tool extends hadoop.conf.Configured with hadoop.util.Tool { rootJob.get.apply(args) } else if(args.positional.isEmpty) { - sys.error("Usage: Tool --local|--hdfs [args...]") + throw ArgsException("Usage: Tool --local|--hdfs [args...]") } else { val jobName = args.positional(0) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala index 5c48b9a410..fea51837b3 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala @@ -55,7 +55,7 @@ with Mappable[T] { val injectionBox = Externalizer(injection andThen BytesWritableCodec.get) override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T]) - override def localPath = sys.error("Local mode not yet supported.") + override def localPath = throw ModeException("Local mode not yet supported.") override def hdfsScheme = HadoopSchemeInstance(new WritableSequenceFile(field, classOf[BytesWritable]).asInstanceOf[Scheme[_, _, _, _, _]]) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala index 655eba9449..7e0b0e2435 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala @@ -88,6 +88,12 @@ trait KeyedListLike[K, +T, +This[K,+T] <: KeyedListLike[K,T,This]] def filter(fn: ((K, T)) => Boolean): This[K, T] = mapGroup { (k: K, items: Iterator[T]) => items.filter { t => fn((k, t)) } } + /** flatten the values + * Useful after sortedTake, for instance + */ + def flattenValues[U](implicit ev: T <:< TraversableOnce[U]): This[K, U] = + mapValueStream(_.flatMap { us => us.asInstanceOf[TraversableOnce[U]] }) + /** This is just short hand for mapValueStream(identity), it makes sure the * planner sees that you want to force a shuffle. For expert tuning */ diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index ca56972943..c93d4b11dc 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -129,13 +129,20 @@ trait TypedPipe[+T] extends Serializable { def aggregate[B,C](agg: Aggregator[T, B, C]): ValuePipe[C] = ComputedValue(groupAll.aggregate(agg).values) + /** Put the items in this into the keys, and unit as the value in a Group + * in some sense, this is the dual of groupAll + */ + @annotation.implicitNotFound(msg = "For asKeys method to work, the type in TypedPipe must have an Ordering.") + def asKeys[U>:T](implicit ord: Ordering[U]): Grouped[U, Unit] = + map((_, ())).group + /** Filter and map. See scala.collection.List.collect. * {@code * collect { case Some(x) => fn(x) } * } */ def collect[U](fn: PartialFunction[T, U]): TypedPipe[U] = - filter(fn.isDefinedAt(_)).map(fn(_)) + filter(fn.isDefinedAt(_)).map(fn) /** Attach a ValuePipe to each element this TypedPipe */ @@ -152,11 +159,8 @@ trait TypedPipe[+T] extends Serializable { /** Returns the set of distinct elements in the TypedPipe */ @annotation.implicitNotFound(msg = "For distinct method to work, the type in TypedPipe must have an Ordering.") - def distinct(implicit ord: Ordering[_ >: T]): TypedPipe[T] = { - // cast because Ordering is not contravariant, but should be (and this cast is safe) - implicit val ordT: Ordering[T] = ord.asInstanceOf[Ordering[T]] - map{ (_, ()) }.group.sum.keys - } + def distinct(implicit ord: Ordering[_ >: T]): TypedPipe[T] = + asKeys(ord.asInstanceOf[Ordering[T]]).sum.keys /** Returns the set of distinct elements identified by a given lambda extractor in the TypedPipe */ @@ -170,7 +174,7 @@ trait TypedPipe[+T] extends Serializable { def plus(a: T, b: T) = b } - val op = map{tup => (fn(tup), tup) }.group.sum + val op = map{tup => (fn(tup), tup) }.sumByKey val reduced = numReducers match { case Some(red) => op.withReducers(red) case None => op @@ -201,7 +205,7 @@ trait TypedPipe[+T] extends Serializable { /** Keep only items that satisfy this predicate */ def filter(f: T => Boolean): TypedPipe[T] = - flatMap { Iterator(_).filter(f) } + flatMap { t => if(f(t)) Iterator(t) else Iterator.empty } /** If T is a (K, V) for some V, then we can use this function to filter. * This is here to match the function in KeyedListLike, where it is optimized @@ -219,6 +223,16 @@ trait TypedPipe[+T] extends Serializable { def flatten[U](implicit ev: T <:< TraversableOnce[U]): TypedPipe[U] = flatMap { _.asInstanceOf[TraversableOnce[U]] } // don't use ev which may not be serializable + /** flatten just the values + * This is more useful on KeyedListLike, but added here to reduce assymmetry in the APIs + */ + def flattenValues[K, U](implicit ev: T <:< (K, TraversableOnce[U])): TypedPipe[(K, U)] = + flatMap { kus => + val (k, us) = kus.asInstanceOf[(K, TraversableOnce[U])] + // don't use ev which may not be serializable + us.map((k, _)) + } + /** Force a materialization of this pipe prior to the next operation. * This is useful if you filter almost everything before a hashJoin, for instance. */ @@ -376,6 +390,15 @@ trait TypedPipe[+T] extends Serializable { serialization: K => Array[Byte], ordering: Ordering[K]): Sketched[K,V] = Sketched(ev(this), reducers, delta, eps, seed) + + // If any errors happen below this line, but before a groupBy, write to a TypedSink + def addTrap[U >: T](trapSink: Source with TypedSink[T])( + implicit flowDef: FlowDef, mode: Mode, conv: TupleConverter[U]): TypedPipe[U] = { + val fields = trapSink.sinkFields + val pipe = RichPipe.assignName(fork.toPipe[T](fields)(trapSink.setter)) + flowDef.addTrap(pipe, trapSink.createTap(Write)) + TypedPipe.from[U](pipe, fields)(conv) + } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala index 2832dfd372..1c16f8e9e5 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala @@ -1436,6 +1436,104 @@ class ItsATrapTest extends Specification { } } +object TypedThrowsErrorsJob { + val input = TypedTsv[(String, Int)]("input") + val output = TypedTsv[(String, Int)]("output") + + def trans1(x: (String, Int)) = x match { case (str, int) => (str, int, int) } + val trap1 = TypedTsv[(String, Int, Int)]("trapped1") + + val trap2 = TypedTsv[(String, Int, Int, String)]("trapped2") + def trans2(x: (String, Int, Int)) = x match { case (str, int1, int2) => (str, int1, int2 * int1, str) } + + def trans3(x: (String, Int, Int, String)) = x match { case (str, int, _, _) => (str, int) } +} + +class TypedThrowsErrorsJob(args : Args) extends Job(args) { + import TypedThrowsErrorsJob._ + + TypedPipe.from(input) + .map { trans1(_) } + .addTrap(trap1) + .map { tup => if (tup._2 == 1) throw new Exception("Oh no!") else trans2(tup) } + .addTrap(trap2) + .map { tup => if (tup._2 % 2 == 0) throw new Exception("Oh no!") else trans3(tup) } + .write(output) +} + +object TypedThrowsErrorsJob2 { + val input = TypedTsv[(String, Int)]("input") + val output = TypedTsv[(String, Int)]("output") + val trap = TypedTsv[(String, Int, Int)]("trapped1") + + def trans1(x: (String, Int)) = x match { case (str, int) => (str, int, int) } + def trans2(x: (String, Int, Int)) = x match { case (str, int1, int2) => (str, int1, int2 * int1, str) } + def trans3(x: (String, Int, Int, String)) = x match { case (str, int, _, _) => (str, int) } +} + +class TypedThrowsErrorsJob2(args : Args) extends Job(args) { + import TypedThrowsErrorsJob2._ + + TypedPipe.from(input) + .map { trans1(_) } + .addTrap(trap) + .map { tup => if (tup._2 == 1) throw new Exception("Oh no!") else trans2(tup) } + .map { tup => if (tup._2 % 2 == 0) throw new Exception("Oh no!") else trans3(tup) } + .write(output) +} + +class TypedItsATrapTest extends Specification { + import TDsl._ + + noDetailedDiffs() //Fixes an issue with scala 2.9 + "A Typed AddTrap with many traps" should { + import TypedThrowsErrorsJob._ + + val data = List(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 5)) + + JobTest(new TypedThrowsErrorsJob(_)) + .source(input, data) + .typedSink(output) { outBuf => + "output must contain all odd except first" in { + outBuf.toList.sorted must be_==(List(("c", 3), ("e", 5))) + } + } + .typedSink(trap1) { outBuf => + "trap1 must contain only the first" in { + outBuf.toList.sorted must be_==(List(("a", 1, 1))) + } + } + .typedSink(trap2) { outBuf => + "trap2 must contain the even numbered" in { + outBuf.toList.sorted must be_==(List(("b", 2, 4, "b"), ("d", 4, 16, "d"))) + } + } + .run + .finish + } + + "A Typed AddTrap with many erroneous maps" should { + import TypedThrowsErrorsJob2._ + + val data = List(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 5)) + + JobTest(new TypedThrowsErrorsJob2(_)) + .source(input, data) + .typedSink(output) { outBuf => + "output must contain all odd except first" in { + outBuf.toList.sorted must be_==(List(("c", 3), ("e", 5))) + } + } + .typedSink(trap) { outBuf => + "trap must contain the first and the evens" in { + outBuf.toList.sorted must be_==(List(("a", 1, 1), ("b", 2, 2), ("d", 4, 4))) + } + } + .run + .finish + } +} + class GroupAllToListTestJob(args: Args) extends Job(args) { TypedTsv[(Long, String, Double)]("input") .mapTo('a, 'b) { case(id, k, v) => (id, Map(k -> v)) } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/PartitionSourceTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/PartitionSourceTest.scala new file mode 100644 index 0000000000..8ddc178eec --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/PartitionSourceTest.scala @@ -0,0 +1,186 @@ +/* +Copyright 2014 Snowplow Analytics Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.scalding + +import java.io.File +import scala.io.{Source => ScalaSource} + +import org.specs._ + +import cascading.tap.SinkMode +import cascading.tuple.Fields +import cascading.tuple.TupleEntry +import cascading.util.Util +import cascading.tap.partition.Partition +import cascading.tap.partition.DelimitedPartition + +import com.twitter.scalding.{PartitionedTsv => StandardPartitionedTsv, _} + +object PartitionSourceTestHelpers { + import Dsl._ + + class CustomPartition(val partitionFields: Fields) extends Partition { + + def getPartitionFields(): Fields = partitionFields + def getPathDepth(): Int = 1 + + def toPartition(tupleEntry: TupleEntry): String = + "{" + Util.join(tupleEntry.asIterableOf(classOf[String]), "}->{", true) + "}" + + def toTuple(partition: String, tupleEntry: TupleEntry): Unit = + throw new RuntimeException("toTuple for reading not implemented") + } + + // Define once, here, otherwise testMode.getWritePathFor() won't work + val DelimitedPartitionedTsv = StandardPartitionedTsv("base", "/", 'col1) + val CustomPartitionedTsv = StandardPartitionedTsv("base", new CustomPartition('col1, 'col2), false, Fields.ALL, SinkMode.REPLACE) + val PartialPartitionedTsv = StandardPartitionedTsv("base", "/", ('col1, 'col2), false, ('col1, 'col3)) +} + +class DelimitedPartitionTestJob(args: Args) extends Job(args) { + import PartitionSourceTestHelpers._ + try { + Tsv("input", ('col1, 'col2)).read.write(DelimitedPartitionedTsv) + } catch { + case e : Exception => e.printStackTrace() + } +} + +class CustomPartitionTestJob(args: Args) extends Job(args) { + import PartitionSourceTestHelpers._ + try { + Tsv("input", ('col1, 'col2, 'col3)).read.write(CustomPartitionedTsv) + } catch { + case e : Exception => e.printStackTrace() + } +} + +class PartialPartitionTestJob(args: Args) extends Job(args) { + import PartitionSourceTestHelpers._ + + try { + Tsv("input", ('col1, 'col2, 'col3)).read.write(PartialPartitionedTsv) + } catch { + case e : Exception => e.printStackTrace() + } +} + +class DelimitedPartitionSourceTest extends Specification { + noDetailedDiffs() + import Dsl._ + import PartitionSourceTestHelpers._ + "PartitionedTsv fed a DelimitedPartition" should { + "split output by the delimited path" in { + val input = Seq(("A", 1), ("A", 2), ("B", 3)) + + // Need to save the job to allow, find the temporary directory data was written to + var job: Job = null; + def buildJob(args: Args): Job = { + job = new DelimitedPartitionTestJob(args) + job + } + + JobTest(buildJob(_)) + .source(Tsv("input", ('col1, 'col2)), input) + .runHadoop + .finish + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(DelimitedPartitionedTsv)) + + directory.listFiles().map({ _.getName() }).toSet mustEqual Set("A", "B") + + val aSource = ScalaSource.fromFile(new File(directory, "A/part-00000-00000")) + val bSource = ScalaSource.fromFile(new File(directory, "B/part-00000-00001")) + + aSource.getLines.toList mustEqual Seq("A\t1", "A\t2") + bSource.getLines.toList mustEqual Seq("B\t3") + } + } +} + +class CustomPartitionSourceTest extends Specification { + noDetailedDiffs() + import Dsl._ + import PartitionSourceTestHelpers._ + "PartitionedTsv fed a CustomPartition" should { + "split output by the custom path" in { + val input = Seq(("A", "x", 1), ("A", "x", 2), ("B", "y", 3)) + + // Need to save the job to allow, find the temporary directory data was written to + var job: Job = null; + def buildJob(args: Args): Job = { + job = new CustomPartitionTestJob(args) + job + } + + JobTest(buildJob(_)) + .source(Tsv("input", ('col1, 'col2, 'col3)), input) + .runHadoop + .finish + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(CustomPartitionedTsv)) + + directory.listFiles().map({ _.getName() }).toSet mustEqual Set("{A}->{x}", "{B}->{y}") + + val aSource = ScalaSource.fromFile(new File(directory, "{A}->{x}/part-00000-00000")) + val bSource = ScalaSource.fromFile(new File(directory, "{B}->{y}/part-00000-00001")) + + aSource.getLines.toList mustEqual Seq("A\tx\t1", "A\tx\t2") + bSource.getLines.toList mustEqual Seq("B\ty\t3") + } + } +} + +class PartialPartitionSourceTest extends Specification { + noDetailedDiffs() + import Dsl._ + import PartitionSourceTestHelpers._ + "PartitionedTsv fed a DelimitedPartition and only a subset of fields" should { + "split output by the delimited path, discarding the unwanted fields" in { + + val input = Seq(("A", "x", 1), ("A", "x", 2), ("B", "y", 3)) + + // Need to save the job to allow, find the temporary directory data was written to + var job: Job = null; + def buildJob(args: Args): Job = { + job = new PartialPartitionTestJob(args) + job + } + + JobTest(buildJob(_)) + .source(Tsv("input", ('col1, 'col2, 'col3)), input) + .runHadoop + .finish + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(PartialPartitionedTsv)) + + directory.listFiles().map({ _.getName() }).toSet mustEqual Set("A", "B") + + val aSource = ScalaSource.fromFile(new File(directory, "A/x/part-00000-00000")) + val bSource = ScalaSource.fromFile(new File(directory, "B/y/part-00000-00001")) + + aSource.getLines.toList mustEqual Seq("A\t1", "A\t2") + bSource.getLines.toList mustEqual Seq("B\t3") + } + } +} diff --git a/scalding-repl/README.md b/scalding-repl/README.md index dc03138d8d..e1afefa139 100644 --- a/scalding-repl/README.md +++ b/scalding-repl/README.md @@ -4,21 +4,22 @@ The Scalding REPL is an extension of the Scala REPL, with added functionality that allows you to interactively experiment with Scalding flows. If Scalding is installed in SCALDING_HOME, then the REPL is launched via: -`${SCALDING_HOME}/scripts/scald-repl.sh` +`${SCALDING_HOME}/scripts/scald-repl.sh [hadoopGenericOptions] --local|--hdfs` The repl imports com.twitter.scalding._ as well as some REPL specific implicits from com.twitter.scalding.ReplImplicits. Within the REPL, you can define sources and operate on Pipes as you would in a normal Scalding job. -Pipes can be run in local mode by calling the 'run()' method on it. Pipes can -only be run once in the REPL. +Pipes can be run in local or hdfs modes by calling the 'run()' method on it. Pipes can +only be run once in the REPL. To run in hdfs mode, you need to have Hadoop installed, +and available in your path. ## Tutorial0 in REPL Form We will show you how to define and run a Scalding job equivalent to the one defined in tutorial/Tutorial0.scala From the root directory, where Scalding is installed, you can launch the REPL with: -`./scripts/scald-repl.sh` +`./scripts/scald-repl.sh --local` You will see some lovely ASCII art: diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala index 4ee464cbc5..ba6a0e4141 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala @@ -20,6 +20,9 @@ import java.io.FileOutputStream import java.util.jar.JarEntry import java.util.jar.JarOutputStream +import org.apache.hadoop.util.GenericOptionsParser +import org.apache.hadoop.conf.Configuration + import scala.tools.nsc.{Settings, GenericRunnerCommand, MainGenericRunner} import scala.tools.nsc.interpreter.ILoop import scala.tools.nsc.io.VirtualDirectory @@ -37,6 +40,11 @@ object ScaldingShell extends MainGenericRunner { */ private var scaldingREPL: Option[ILoop] = None + /** + * An instance of the default configuration for the REPL + */ + private val conf: Configuration = new Configuration() + /** * The main entry point for executing the REPL. * @@ -48,14 +56,37 @@ object ScaldingShell extends MainGenericRunner { * @return `true` if execution was successful, `false` otherwise. */ override def process(args: Array[String]): Boolean = { + // Get the mode (hdfs or local), and initialize the configuration + val (mode, jobArgs) = parseModeArgs(args) + // Process command line arguments into a settings object, and use that to start the REPL. - val command = new GenericRunnerCommand(args.toList, (x: String) => errorFn(x)) + // We ignore params we don't care about - hence error function is empty + val command = new GenericRunnerCommand(jobArgs.toList, _ => ()) command.settings.usejavacp.value = true command.settings.classpath.append(System.getProperty("java.class.path")) scaldingREPL = Some(new ScaldingILoop) + ReplImplicits.mode = mode scaldingREPL.get.process(command.settings) } + // This both updates the jobConf with hadoop arguments + // and returns all the non-hadoop arguments. Should be called once if + // you want to process hadoop arguments (like -libjars). + protected def nonHadoopArgsFrom(args : Array[String]) : Array[String] = + (new GenericOptionsParser(conf, args)).getRemainingArgs + + /** + * Sets the mode for this job, updates jobConf with hadoop arguments + * and returns all the non-hadoop arguments. + * + * @param args from the command line. + * @return a Mode for the job (e.g. local, hdfs), and the non-hadoop params + */ + def parseModeArgs(args : Array[String]) : (Mode, Array[String]) = { + val a = nonHadoopArgsFrom(args) + (Mode(Args(a), conf), a) + } + /** * Runs an instance of the shell. * diff --git a/scripts/scald-repl.sh b/scripts/scald-repl.sh index 1c3d92a581..c8f0eb9b7e 100755 --- a/scripts/scald-repl.sh +++ b/scripts/scald-repl.sh @@ -46,9 +46,38 @@ SCALA_VERSION=`cat "${bin}/project/Build.scala" | grep -E '^\s*scalaVersion' | g ## Piggyback off of scald.rb's dependency/cp management CORE_PATH=`${bin}/scripts/scald.rb --print-cp --repl --avro --local job` +if [ $? != 0 ]; then + echo "scalding-core-assembly jar is missing, you probably need to run sbt assembly" + exit 1 +fi + +# figure out mode to decide whether to run using hadoop or not +MODE="" +ARGS=`echo "$@" | tr "[:space:]" "\n"` +for a in $ARGS; +do + if [[ "$a" == "--local" ]] || [[ "$a" == "-local" ]]; then + MODE="local"; break; + elif [[ "$a" == "--hdfs" ]] || [[ "$a" == "-hdfs" ]]; then + MODE="hdfs"; break; + fi +done # launch REPL -java -cp "${CORE_PATH}" -Dscala.usejavacp=true com.twitter.scalding.ScaldingShell -Yrepl-sync +if [[ "$MODE" == "local" ]]; then + java -cp "${CORE_PATH}" -Dscala.usejavacp=true com.twitter.scalding.ScaldingShell "$@" -Yrepl-sync +elif [[ "$MODE" == "hdfs" ]]; then + # get the path for the REPL jar + REPL_JAR=`echo ${CORE_PATH} | tr ':' '\n' | grep scalding-repl` + if [ -z "$REPL_JAR" ]; then + echo "scalding-repl-assembly jar is missing, confirm that it is being built by sbt assembly" + exit 1 + fi + HADOOP_CLASSPATH=${CORE_PATH} hadoop jar $REPL_JAR "$@" -usejavacp +else + echo "Mode must be one of --local or --hdfs, you provided neither" + exit 1 +fi # record the exit status lest it be overwritten: # then reenable echo and propagate the code. diff --git a/scripts/scald.rb b/scripts/scald.rb index 65dd0c01e2..01e2b9e526 100755 --- a/scripts/scald.rb +++ b/scripts/scald.rb @@ -6,6 +6,7 @@ require 'thread' require 'trollop' require 'yaml' +require 'tmpdir' USAGE = < @@ -141,17 +142,45 @@ def maven_filename(jar_filename) def scala_libs(version) if( version.start_with?("2.10") ) - ["scala-library.jar", "scala-reflect.jar"] + ["scala-library", "scala-reflect", "scala-compiler"] else - ["scala-library.jar"] + ["scala-library", "scala-compiler"] end end -SCALA_LIBS=scala_libs(SCALA_VERSION) +def find_dependency(dep, version) + res = %x[./sbt 'set libraryDependencies := Seq("org.scala-lang" % "#{dep}" % "#{version}")' 'printDependencyClasspath'].split("\n") + first = res.find_index { |n| n.include?("#{dep}:#{version}") } + raise "Dependency #{dep}:#{version} not found" unless first + res[first].sub(/.*=> /, "") +end + +def get_dep_location(dep, version) + f = "#{SCALA_LIB_DIR}/#{dep}.jar" + if File.exists?(f) + f + else + f = find_dependency(dep, version) + raise "Unable to find jar library: #{dep}" unless f and File.exists?(f) + f + end +end + +libs = scala_libs(SCALA_VERSION).map { |l| get_dep_location(l, SCALA_VERSION) } +lib_dirs = libs.map { |f| File.dirname(f) } +unless lib_dirs.all? { |l| l == lib_dirs.first } + lib_tmp = Dir.tmpdir+"/temp_scala_home_#{SCALA_VERSION}_#{rand(1000000)}" + FileUtils.mkdir(lib_tmp) + libs.map! do |l| + FileUtils.cp(l, lib_tmp) + "#{lib_tmp}/#{File.basename(l)}" + end + SCALA_LIB_DIR = lib_tmp +end -LIBCP=(SCALA_LIBS.map { |j| "#{SCALA_LIB_DIR}/#{j}" }).join(":") +LIBCP= libs.join(":") -COMPILE_CMD="java -cp #{LIBCP}:#{SCALA_LIB_DIR}/scala-compiler.jar -Dscala.home=#{SCALA_LIB_DIR} scala.tools.nsc.Main" +COMPILE_CMD="java -cp #{LIBCP} -Dscala.home=#{SCALA_LIB_DIR} scala.tools.nsc.Main" HOST = OPTS[:host] || CONFIG["host"] diff --git a/version.sbt b/version.sbt index be40d62284..777ec1c87c 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.9.1" +version in ThisBuild := "0.10.0"