A simple TeraSort Implementation that is based on Hadoop MapReduce framework.
Shuffle: data from map task to reduce task.
Partition: decide the current output data should be passed to which reduce task to process according to key or value or the number of reduce tasks(Here, based on key and the number of reduce tasks).
sample + partition
Sample: get the split points according to the number of reduce tasks.
InputFormat inputFormat = new TextInputFormat();
List<InputSplit> inputSplitList = inputFormat.getSplits(job);
int splitSize = inputSplitList.size();
int samplePerPartition =
sampleNum / splitSize;
Notice inputSplitList.size() will be 1 if the file size below the block size of HDFS(64MB or 128MB).
Sample for the first samplePerPartition records.
List<Integer> sampleList = new ArrayList<>();
for (InputSplit split : inputSplitList) {
TaskAttemptContext context = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID()
);
RecordReader<Object, Text> reader =
inputFormat.createRecordReader(split, context);
Text text;
reader.initialize(split, context);
int count = 1;
while (reader.nextKeyValue()) {
if (count > samplePerPartition) {
break;
}
text = reader.getCurrentValue();
sampleList.add(
Integer.parseInt(text.toString())
);
count++;
}
reader.close();
}
After Sort, get split points and write them into local file system, prepare for partition.
FileSystem fs = FileSystem
.get(job.getConfiguration());
DataOutputStream writer = fs
.create(SPLIT_SAMPLE_PATH, true);
int stepLength =
sampleList.size() / job.getNumReduceTasks();
int n = 0;
for (int i = stepLength; i < sampleList.size(); i += stepLength) {
n++;
if (n >= job.getNumReduceTasks()) {
break;
}
new Text(sampleList.get(i) + "\r\n")
.write(writer);
}
writer.close();
Partition: do partition according to the split points(namely the number of reduce tasks).
@Override
public int getPartition(IntWritable key, NullWritable value, int reduceNum) {
if (splitPoints == null) {
splitPoints = TeraSortSampler.getSplitPoints(conf, reduceNum);
}
// just for print
System.out.println("Key:" + key);
int index = splitPoints.length;
for (int i = 0; i < splitPoints.length; i++) {
if (key.get() < splitPoints[i]) {
index = i;
break;
}
}
return index;
}
Run DataGenerator to generate test data in input directory.
Result
Run TeraSort, it will output some files in output directory, the number is determined by the reduce number that you choose.
You can do it in two ways, one by shell, another by IDE.
Just execute
hdfs dfs -mkdir /user/happylrd/TeraSort
hdfs dfs -put /home/happylrd/MyCode/HadoopProjects/TeraSort/data/input/ /user/happylrd/TeraSort
Meanwhile, you can query for validation.
hdfs dfs -ls -R | grep TeraSort
Just execute
hadoop jar /home/happylrd/MyCode/HadoopProjects/TeraSort/out/artifacts/terasort/terasort.jar io.happylrd.terasort.TeraSort /user/happylrd/TeraSort/input /user/happylrd/TeraSort/output
Result
Query for validation
Everything goes well.
Just execute
hdfs dfs -get /user/happylrd/TeraSort/output /home/happylrd/MyCode/HadoopProjects/TeraSort/data/
Result
Let org.apache.hadoop.util.RunJar
as Main class.
Configure program arguments.
Result
Run merge.sh to merge the output files that the reduces produce.
Shell script is as follows
#!/usr/bin/env bash
cd ../data/output
cat part-* > ../mergeResult/all-result.txt
Result
Run TopKImpl to get the top k values.
Result
Development environment
- Ubuntu 16.04
- IntelliJ IDEA
- Hadoop
- MapReduce
- HDFS
Map/Reduce之间的shuffle,partition,combiner过程的详解
Thanks to Kubi Code, you saved me a lot of time.
Meanwhile, I found one error in your code that I guessed it's probably because of the wrong hands.
You can add break;
to let three or more reduce tasks work fine. In essence, let partition process work fine.
Modified Code is as follows:
int index = splitPoints.length;
for (int i = 0; i < splitPoints.length; i++) {
if (key.get() < splitPoints[i]) {
index = i;
break;
}
}
return index;
Copyright © 2017 happylrd