Skip to content

Calling Scalding from inside your application

P. Oscar Boykin edited this page Jul 7, 2014 · 13 revisions

Starting in scalding 0.11, there is a clear API for doing this:

// Running in local mode:
import ExecutionContext._ // needed to get FlowDef and Mode implicitly from ExecutionContext
val (result, tryJobStats) = Execution.waitFor(Config.default, Local(true)) { implicit ec: ExecutionContext =>
  TypedPipe.from(TextLine("input"))
    .flatMap(_.split("\\s+"))
    .map { word => (word, 1L) }
    .sumByKey
    .write(TypedTsv("output"))
}

// Or for Hadoop:
val jobConf = new JobConf
val (result, tryJobStats) = Execution.run(Config.hadoopWithDefaults(jobConf), Hdfs(jobConf, true)) { implicit ec: ExecutionContext =>
  TypedPipe.from(TextLine("input"))
    .flatMap(_.split("\\s+"))
    .map { word => (word, 1L) }
    .sumByKey
    .write(TypedTsv("output"))
}
// If you want to be asynchronous, use run instead of waitFor and get a Future in return

You can also just run existing Jobs in your own code. For instance:

Working example:

WordCountJob.scala

class WordCountJob(args: Args) extends Job(args) {
  TextLine(args("input"))
    .read
    .flatMap('line -> 'word) { line: String => line.split("\\s+") }
    .groupBy('word) { _.size }
    .write(Tsv(args("output")))
}

Runner.scala

object Runner extends App {
  val hadoopConfiguration: Configuration = new Configuration
  hadoopConfiguration.set("mapred.job.tracker","hadoop-master:8021")
  hadoopConfiguration.set("fs.defaultFS","hdfs://hadoop-master:8020")

  val hdfsMode = Hdfs(strict = true, hadoopConfiguration)
  val arguments = Mode.putMode(hdfsMode, Args("--input in.txt --output counts.tsv"))

  // Now create the job after the mode is set up properly.
  val job: WordCountJob = new WordCountJob(arguments)
  val flow = job.buildFlow
  flow.complete()
}

And then you can run your App on any server, that have access to Hadoop cluster

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally