Skip to content

Calling Scalding from inside your application

johnynek edited this page Dec 3, 2014 · 13 revisions

Starting in scalding 0.12, there is a clear API for doing this. See Execution[T], which describes a set of map/reduce operations that when executed return a Future[T]. Below is an example.

val job: Execution[Unit] =
  TypedPipe.from(TextLine("input"))
    .flatMap(_.split("\\s+"))
    .map { word => (word, 1L) }
    .sumByKey
    .writeExecution(TypedTsv("output"))
// Now we run it in Local mode
val u: Unit = job.waitFor(Config.default, Local(true))

// Or for Hadoop:
val jobConf = new JobConf
val u: Unit = job.waitFor(Config.hadoopWithDefaults(jobConf), Hdfs(jobConf, true))
// If you want to be asynchronous, use run instead of waitFor and get a Future in return

For testing or cases where you aggregate data down to a manageable level, .toIterableExecution on TypedPipe is very useful:

val job: Execution[Iterable[(String, Long)]] =
  TypedPipe.from(TextLine("input"))
    .flatMap(_.split("\\s+"))
    .map { word => (word, 1L) }
    .sumByKey
    .toIterableExecution
// Now we run it in Local mode
val counts: Map[String, Long] = job.waitFor(Config.default, Local(true)).toMap

Running Existing Jobs Inside A Library

We recommend the above approach to build composable jobs with Executions. But if you have an existing Job, you can also run that:

Working example:

WordCountJob.scala

class WordCountJob(args: Args) extends Job(args) {
  TextLine(args("input"))
    .read
    .flatMap('line -> 'word) { line: String => line.split("\\s+") }
    .groupBy('word) { _.size }
    .write(Tsv(args("output")))
}

Runner.scala

object Runner extends App {
  val hadoopConfiguration: Configuration = new Configuration
  hadoopConfiguration.set("mapred.job.tracker","hadoop-master:8021")
  hadoopConfiguration.set("fs.defaultFS","hdfs://hadoop-master:8020")

  val hdfsMode = Hdfs(strict = true, hadoopConfiguration)
  val arguments = Mode.putMode(hdfsMode, Args("--input in.txt --output counts.tsv"))

  // Now create the job after the mode is set up properly.
  val job: WordCountJob = new WordCountJob(arguments)
  val flow = job.buildFlow
  flow.complete()
}

And then you can run your App on any server, that have access to Hadoop cluster

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally