Skip to content

Commit

Permalink
Optimization trickkkk!
Browse files Browse the repository at this point in the history
  • Loading branch information
fferegrino committed Feb 13, 2018
1 parent ed604f8 commit 7ae14b7
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 146 deletions.
4 changes: 2 additions & 2 deletions wiki/src/main/java/mapreduce/input/WikiInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* @author 2338066f ANTONIO FEREGRINO BOLANOS
* @author 2338067g HOMERO GARCIA MERINO
*/
public class WikiInputFormat extends FileInputFormat<Text, WikiInputValue> {
public RecordReader<Text, WikiInputValue> createRecordReader(InputSplit split, TaskAttemptContext context) {
public class WikiInputFormat extends FileInputFormat<Text, Text> {
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new WikiMultiRecordReader();
}
}
291 changes: 157 additions & 134 deletions wiki/src/main/java/mapreduce/input/WikiMultiRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,142 +14,165 @@
import java.io.IOException;

/**
* RecordReader class that specifies the required format for Wikipedia edit history
* tagged multi-line's file.
* RecordReader class that specifies the required format for Wikipedia edit
* history tagged multi-line's file.
*
* @author 2338066f ANTONIO FEREGRINO BOLANOS
* @author 2338067g HOMERO GARCIA MERINO
*/
public class WikiMultiRecordReader extends RecordReader<Text, WikiInputValue> {
// Record separator
public class WikiMultiRecordReader extends RecordReader<Text, Text> {
// Record separator
private static final byte[] recordSeparator = "\n\n".getBytes();
private FSDataInputStream fsin;
private long start, end;
private boolean stillInChunk = true;
private StringBuffer sb;
private Text key = new Text();
private WikiInputValue value;


/**
* Initialize method that configures node's context for reading an input split
* text file (parsed version of the complete Wikipedia edit history).
* @param inputSplit is a logical chunk of data that points to start and end locations
* within physical blocks.
* @param context object contains configuration data to interact with Hadoop's environment.
* @throws IOException when the file or the filesystem can not be reached.
*/
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
sb = new StringBuffer();
FileSplit split = (FileSplit) inputSplit;
Configuration conf = context.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);

fsin = fs.open(path);
start = split.getStart();
end = split.getStart() + split.getLength();
fsin.seek(start);

if (start != 0) {
readRecord(false);
}
}

/**
* ReadRecord method for determining the type of the processed block.
* @param withinBlock is a boolean value that determines the type of the processed block
* (end of block or inner block).
* @return a boolean value with the current type of the processed block.
* @throws IOException when the block can not be read.
*/
private boolean readRecord(boolean withinBlock) throws IOException {
int i = 0, b;
while (true) {
if ((b = fsin.read()) == -1) { // End of file
return false;
}

if (withinBlock) {
sb.append((char) b);
}

if (b == recordSeparator[i]) {
if (++i == recordSeparator.length) {
return fsin.getPos() < end;
}
} else {
i = 0;
}
}
}

/**
* NextKeyValueRead method reads the next key, value pair.
* @return true if a key/value pair is read
* @throws IOException if the record reader can not be reached.
*/
public boolean nextKeyValue() throws IOException {
if (!stillInChunk) {
return false;
}

boolean status = readRecord(true);

String[] lines = sb.toString().split("\n");

String[] revisionValues = lines[0].split(" ");
key.set(revisionValues[3]);

value = new WikiInputValue();
String mainLine = lines[3];
if (mainLine.length() > 5) {
String[] outlinks = mainLine.substring(5).split("\\s");
value.setOutlinksNumber(outlinks.length);
value.setOutlinks(String.join(" ", outlinks));
}
value.setRevisionId(Long.parseLong(revisionValues[2]));

// Clear the buffer
sb.setLength(0);

if (!status) {
stillInChunk = false;
}
return true;
}

/**
* GetCurrentKey method retrieves the current key of the processed record.
* @return the current key or null if there is no current key.
* @throws IOException if the record reader can not be reached.
*/
public Text getCurrentKey() {
return key;
}

/**
* GetCurrentValue method retrieves the current value of the processed record.
* @return the read record.
* @throws IOException if the record reader can not be reached.
*/
public WikiInputValue getCurrentValue() {
return value;
}

/**
* GetProgress method retrieves the current progress of the record reader through the data.
* @return a number between 0.0 and 1.0 that is the portion of read data.
* @throws IOException if the record reader can not be reached.
*/
public float getProgress() throws IOException {
return (float) (fsin.getPos() - start) / (end - start);
}

/**
* Close method terminates the record reader object.
* @throws IOException if the record reader can not be closed.
*/
public void close() throws IOException {
fsin.close();
}
private FSDataInputStream fsin;
private long start, end;
private boolean stillInChunk = true;
private StringBuffer sb;
private Text key = new Text();
private Text value;

/**
* Initialize method that configures node's context for reading an input split
* text file (parsed version of the complete Wikipedia edit history).
*
* @param inputSplit
* is a logical chunk of data that points to start and end locations
* within physical blocks.
* @param context
* object contains configuration data to interact with Hadoop's
* environment.
* @throws IOException
* when the file or the filesystem can not be reached.
*/
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
sb = new StringBuffer();
FileSplit split = (FileSplit) inputSplit;
Configuration conf = context.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);

fsin = fs.open(path);
start = split.getStart();
end = split.getStart() + split.getLength();
fsin.seek(start);

if (start != 0) {
readRecord(false);
}
}

/**
* ReadRecord method for determining the type of the processed block.
*
* @param withinBlock
* is a boolean value that determines the type of the processed block
* (end of block or inner block).
* @return a boolean value with the current type of the processed block.
* @throws IOException
* when the block can not be read.
*/
private boolean readRecord(boolean withinBlock) throws IOException {
int i = 0, b;
while (true) {
if ((b = fsin.read()) == -1) { // End of file
return false;
}

if (withinBlock) {
sb.append((char) b);
}

if (b == recordSeparator[i]) {
if (++i == recordSeparator.length) {
return fsin.getPos() < end;
}
} else {
i = 0;
}
}
}

/**
* NextKeyValueRead method reads the next key, value pair.
*
* @return true if a key/value pair is read
* @throws IOException
* if the record reader can not be reached.
*/
public boolean nextKeyValue() throws IOException {
if (!stillInChunk) {
return false;
}

boolean status = readRecord(true);

String[] lines = sb.toString().split("\n");

String[] revisionValues = lines[0].split(" ");
key.set(revisionValues[3]);

long revisionId = Long.parseLong(revisionValues[2]);
String outlinks = null;
String mainLine = lines[3];
if (mainLine.length() > 5) {
outlinks = mainLine.substring(5).trim();
}

if (outlinks != null) {
value = new Text(revisionId + " " + outlinks);
} else {
value = new Text(String.valueOf(revisionId));
}

// Clear the buffer
sb.setLength(0);

if (!status) {
stillInChunk = false;
}
return true;
}

/**
* GetCurrentKey method retrieves the current key of the processed record.
*
* @return the current key or null if there is no current key.
* @throws IOException
* if the record reader can not be reached.
*/
public Text getCurrentKey() {
return key;
}

/**
* GetCurrentValue method retrieves the current value of the processed record.
*
* @return the read record.
* @throws IOException
* if the record reader can not be reached.
*/
public Text getCurrentValue() {
return value;
}

/**
* GetProgress method retrieves the current progress of the record reader
* through the data.
*
* @return a number between 0.0 and 1.0 that is the portion of read data.
* @throws IOException
* if the record reader can not be reached.
*/
public float getProgress() throws IOException {
return (float) (fsin.getPos() - start) / (end - start);
}

/**
* Close method terminates the record reader object.
*
* @throws IOException
* if the record reader can not be closed.
*/
public void close() throws IOException {
fsin.close();
}
}
54 changes: 44 additions & 10 deletions wiki/src/main/java/mapreduce/mapping/ArticleMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,55 @@
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;

public class ArticleMapper extends Mapper<Text, WikiInputValue, Text, WikiInputValue> {
public class ArticleMapper extends Mapper<Text, Text, Text, WikiInputValue> {

@Override
protected void map(Text key, WikiInputValue value,
Mapper<Text, WikiInputValue, Text, WikiInputValue>.Context context)
throws IOException, InterruptedException {
private final static boolean FilterDuplicates = true;

context.write(key, value);
@Override
protected void map(Text key, Text value,
Mapper<Text, Text, Text, WikiInputValue>.Context context)
throws IOException, InterruptedException {

String[] contents = value.toString().split(" ", 2);

WikiInputValue outValue = new WikiInputValue();
outValue.setRevisionId(Long.parseLong(contents[0]));

String newOutlinksJoint = "";
int numberOfOutlinks = 0;

if(contents.length == 2) { // Has outlinks
String [] originalOutlinks = contents[1].split("\\s");

if(FilterDuplicates)
{
ArrayList<String> newOutlinks = new ArrayList<>(originalOutlinks.length);
HashSet<String> unique = new HashSet<String>(originalOutlinks.length);

for(String outlink : originalOutlinks ) {
if(!unique.contains(outlink)) {
unique.add(outlink);
newOutlinks.add(outlink);
}
}
numberOfOutlinks = newOutlinks.size();
newOutlinksJoint = String.join(" ", newOutlinks);
}
else {
numberOfOutlinks = originalOutlinks.length;
newOutlinksJoint = String.join(" ", originalOutlinks);
}
}

outValue.setOutlinks(newOutlinksJoint);
outValue.setOutlinksNumber(numberOfOutlinks);

context.write(key, outValue);

context.getCounter(MapperCounters.TOTAL_WIKI_ARTICLES).increment(1);
}

static enum MapperCounters {
TOTAL_WIKI_ARTICLES
}

}

0 comments on commit 7ae14b7

Please sign in to comment.