From 2e941a351ede6bae72d10142e9ba415a57c2ec18 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Wed, 18 Mar 2015 11:25:52 -0700 Subject: [PATCH] Make mappers run WorkUnits (tasks) concurrently The solution is not to use Hadoop's MultiThreadedMapper, but instead overwriting the Mapper.run() method to run the WorkUnits in the TaskExecutor. Now the map() method is only responsible for de-serializing and collecting WorkUnits, but not running them. Signed-off-by: Yinan Li --- .../runtime/mapreduce/MRJobLauncher.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java index 4019332c605..dbc9a53b43e 100644 --- a/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -422,7 +422,16 @@ private void countersToMetrics(Counters counters, JobMetrics metrics) { } /** - * The mapper class that runs a given {@link Task}. + * The mapper class that runs assigned {@link WorkUnit}s. + * + *

+ * The {@link #map} method de-serializes a {@link WorkUnit} (maybe a {@link MultiWorkUnit}) + * from each input file and add the {@link WorkUnit} (or a list of {@link WorkUnit}s if it + * is a {@link MultiWorkUnit} to the list of {@link WorkUnit}s to run. The {@link #run} method + * actually runs the list of {@link WorkUnit}s in the {@link TaskExecutor}. This allows the + * {@link WorkUnit}s to be run in parallel if the {@link TaskExecutor} is configured to have + * more than one thread in its thread pool. + *

*/ public static class TaskRunner extends Mapper { @@ -432,6 +441,9 @@ public static class TaskRunner extends Mapper workUnits = Lists.newArrayList(); + @Override protected void setup(Context context) { try { @@ -454,6 +466,23 @@ protected void setup(Context context) { } } + @Override + public void run(Context context) + throws IOException, InterruptedException { + this.setup(context); + + try { + // De-serialize and collect the list of WorkUnits to run + while (context.nextKeyValue()) { + this.map(context.getCurrentKey(), context.getCurrentValue(), context); + } + // Actually run the list of WorkUnits + runWorkUnits(this.workUnits); + } finally { + this.cleanup(context); + } + } + @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { @@ -477,8 +506,11 @@ public void map(LongWritable key, Text value, Context context) closer.close(); } - runWorkUnits( - workUnit instanceof MultiWorkUnit ? ((MultiWorkUnit) workUnit).getWorkUnits() : Lists.newArrayList(workUnit)); + if (workUnit instanceof MultiWorkUnit) { + this.workUnits.addAll(((MultiWorkUnit) workUnit).getWorkUnits()); + } else { + this.workUnits.add(workUnit); + } } @Override