Skip to content

Commit

Permalink
Make mappers run WorkUnits (tasks) concurrently
Browse files Browse the repository at this point in the history
The solution is not to use Hadoop's MultiThreadedMapper, but instead overwriting the Mapper.run() method to run the WorkUnits in the TaskExecutor. Now the map() method is only responsible for de-serializing and collecting WorkUnits, but not running them.

Signed-off-by: Yinan Li <liyinan926@gmail.com>
  • Loading branch information
liyinan926 committed Mar 18, 2015
1 parent beab089 commit 2e941a3
Showing 1 changed file with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,16 @@ private void countersToMetrics(Counters counters, JobMetrics metrics) {
}

/**
* The mapper class that runs a given {@link Task}.
* The mapper class that runs assigned {@link WorkUnit}s.
*
* <p>
* The {@link #map} method de-serializes a {@link WorkUnit} (maybe a {@link MultiWorkUnit})
* from each input file and add the {@link WorkUnit} (or a list of {@link WorkUnit}s if it
* is a {@link MultiWorkUnit} to the list of {@link WorkUnit}s to run. The {@link #run} method
* actually runs the list of {@link WorkUnit}s in the {@link TaskExecutor}. This allows the
* {@link WorkUnit}s to be run in parallel if the {@link TaskExecutor} is configured to have
* more than one thread in its thread pool.
* </p>
*/
public static class TaskRunner extends Mapper<LongWritable, Text, NullWritable, NullWritable> {

Expand All @@ -432,6 +441,9 @@ public static class TaskRunner extends Mapper<LongWritable, Text, NullWritable,
private TaskStateTracker taskStateTracker;
private ServiceManager serviceManager;

// A list of WorkUnits (flattened for MultiWorkUnits) to be run by this mapper
private final List<WorkUnit> workUnits = Lists.newArrayList();

@Override
protected void setup(Context context) {
try {
Expand All @@ -454,6 +466,23 @@ protected void setup(Context context) {
}
}

@Override
public void run(Context context)
throws IOException, InterruptedException {
this.setup(context);

try {
// De-serialize and collect the list of WorkUnits to run
while (context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
// Actually run the list of WorkUnits
runWorkUnits(this.workUnits);
} finally {
this.cleanup(context);
}
}

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Expand All @@ -477,8 +506,11 @@ public void map(LongWritable key, Text value, Context context)
closer.close();
}

runWorkUnits(
workUnit instanceof MultiWorkUnit ? ((MultiWorkUnit) workUnit).getWorkUnits() : Lists.newArrayList(workUnit));
if (workUnit instanceof MultiWorkUnit) {
this.workUnits.addAll(((MultiWorkUnit) workUnit).getWorkUnits());
} else {
this.workUnits.add(workUnit);
}
}

@Override
Expand Down

0 comments on commit 2e941a3

Please sign in to comment.