diff --git a/CHANGES.md b/CHANGES.md index fd151b14d8..009c314d30 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,8 @@ # Scalding # +### Version 0.11.2 ### +* hadoop.tmp.dir for snapshot in config + ### Version 0.11.1 ### * Fixes bad release portion where code wasn't updated for new scalding version number. * use cascading-jdbc 2.5.3 for table exists fix and cascading 2.5.5: https://github.com/twitter/scalding/pull/951 diff --git a/scalding-core/src/main/scala/com/twitter/package.scala b/scalding-core/src/main/scala/com/twitter/package.scala index 2caf86eb8b..817fd1b999 100644 --- a/scalding-core/src/main/scala/com/twitter/package.scala +++ b/scalding-core/src/main/scala/com/twitter/package.scala @@ -33,7 +33,7 @@ package object scalding { /** * Make sure this is in sync with version.sbt */ - val scaldingVersion: String = "0.11.1" + val scaldingVersion: String = "0.11.2" object RichPathFilter { implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index 37f524fcaa..d3a46e8e41 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -49,45 +49,47 @@ object ReplImplicits extends FieldConversions { fd } + def replConfig = { + val conf = Config.default ++ { + mode match { + case h: HadoopMode => Config.fromHadoop(h.jobConf) + case _ => Config.empty + } + } + // Create a jar to hold compiled code for this REPL session in addition to + // "tempjars" which can be passed in from the command line, allowing code + // in the repl to be distributed for the Hadoop job to run. + val replCodeJar = ScaldingShell.createReplCodeJar() + val tmpJarsConfig: Map[String, String] = + replCodeJar match { + case Some(jar) => + Map("tmpjars" -> { + // Use tmpjars already in the configuration. + conf.get("tmpjars").map(_ + ",").getOrElse("") + // And a jar of code compiled by the REPL. + .concat("file://" + jar.getAbsolutePath) + }) + case None => + // No need to add the tmpjars to the configuration + Map() + } + + conf ++ tmpJarsConfig + } + /** * Runs this pipe as a Scalding job. * * Automatically cleans up the flowDef to include only sources upstream from tails. */ - def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = { - - def config = { - val conf = Config.default - - // Create a jar to hold compiled code for this REPL session in addition to - // "tempjars" which can be passed in from the command line, allowing code - // in the repl to be distributed for the Hadoop job to run. - val replCodeJar = ScaldingShell.createReplCodeJar() - val tmpJarsConfig: Map[String, String] = - replCodeJar match { - case Some(jar) => - Map("tmpjars" -> { - // Use tmpjars already in the configuration. - conf.get("tmpjars").map(_ + ",").getOrElse("") - // And a jar of code compiled by the REPL. - .concat("file://" + jar.getAbsolutePath) - }) - case None => - // No need to add the tmpjars to the configuration - Map() - } - - conf ++ tmpJarsConfig - } - - ExecutionContext.newContext(config)(fd, md).waitFor match { + def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = + ExecutionContext.newContext(replConfig)(fd, md).waitFor match { case Success(stats) => Some(stats) case Failure(e) => println("Flow execution failed!") e.printStackTrace() None } - } /** * Converts a Cascading Pipe to a Scalding RichPipe. This method permits implicit conversions from diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala index 12305e168c..b9171835d6 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala @@ -61,7 +61,12 @@ class ShellTypedPipe[T](pipe: TypedPipe[T]) { case _: HadoopMode => // come up with unique temporary filename // TODO: refactor into TemporarySequenceFile class - val tmpSeq = "/tmp/scalding-repl/snapshot-" + UUID.randomUUID + ".seq" + val conf = replConfig + val tmpDir = conf.get("hadoop.tmp.dir") + .orElse(conf.get("cascading.tmp.dir")) + .getOrElse("/tmp") + + val tmpSeq = tmpDir + "/scalding-repl/snapshot-" + java.util.UUID.randomUUID + ".seq" val dest = TypedSequenceFile[T](tmpSeq) dest.writeFrom(p)(localFlow, md) run(localFlow, md) diff --git a/version.sbt b/version.sbt index 70a4e0c55f..798d1a1e01 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.11.1" +version in ThisBuild := "0.11.2"