diff --git a/wiki-spark/scala/src/main/scala/pageRank/Improved.scala b/wiki-spark/scala/src/main/scala/pageRank/Improved.scala index c71a482..5eeab5f 100644 --- a/wiki-spark/scala/src/main/scala/pageRank/Improved.scala +++ b/wiki-spark/scala/src/main/scala/pageRank/Improved.scala @@ -1,14 +1,17 @@ package pageRank import org.apache.spark.SparkConf - import org.apache.spark.SparkContext + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import pageRank.dataTypes.WikipediaRevision import pageRank.dataTypes.WikipediaRevision + import org.apache.spark.api.java.JavaPairRDD import java.util.ArrayList import utils.ISO8601 @@ -19,14 +22,30 @@ import org.apache.spark.rdd.RDD object Improved { def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("MrPageRankII usage: PageRank ") + System.exit(1) + } + + // arguments handling val inputFile = args(0) - val outputFile = args(1) - val iterations = args(2).toInt - val timeLimit = if (args.length > 3) ISO8601.toTimeMS(args(3)) else new Date().getTime - - val conf = new SparkConf().setAppName("Original Page Rank") + val outputFile = new Path(args(1)) + val iterations = iterToInt(args(2)) + val timeLimit = if (args.length == 4) datetimeToISO8601(args(3)) else new Date().getTime + + + // create configuration object and register application + val conf = new SparkConf().setAppName("Mighty-WikiPageRank_II") + // obtain Spark context from configuration object val sc = new SparkContext(conf) - sc.hadoopConfiguration.set("textinputformat.record.delimiter", "\n\n") + + // Hadoop configuration - clean environment + // Obtain filesystem configuration details from Hadoop's cluster + val fsys = FileSystem.get(sc.hadoopConfiguration); + // Delete recursively results - output directory if exists + fsys.delete(outputFile, true) + // Set input format record delimiter for Wikipedia's HDFS files + sc.hadoopConfiguration.set("textinputformat.record.delimiter", "\n\n") // All revisions, filtered initially by date val revisions: RDD[WikipediaRevision] = sc.newAPIHadoopFile(inputFile, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) @@ -101,7 +120,45 @@ object Improved { ranks = contribs.reduceByKey((a, b) => a + b).mapValues(v => 0.15 + v * 0.85); } - ranks.sortBy(f=>f._2, false).map(prs => prs._1 + " " + prs._2).saveAsTextFile(outputFile) - + //ranks.repartition(40).sortBy(f=>f._2, false).map(prs => prs._1 + " " + prs._2).saveAsTextFile(args(2)) + ranks.map(prs => prs._1 + " " + prs._2).saveAsTextFile(args(2)) + + // Close the SparkContextobject, releasing its resources. + sc.stop() + } + + /** + * Handles iterations - args(2) integer conversion. + * If a string representation can not be converted the returned default value is 10. + * No exception is raised since an automatic adjustment is placed to continue with + * the page rank calculation. + * @param iter string representing the number of iterations. + * @return iterations integer + */ + def iterToInt(iter: String): Int = { + try { + iter.toInt + } catch { + case e: Exception => 10 + } + } + + /** + * Handles PageRank timeLimit - args(3) date conversion. + * If a string representation can not be converted the returned default value is the actual date. + * No exception is raised since an automatic adjustment is placed to continue with + * the page rank calculation. + * @param date string representing an ISO8601-formatted date. + * @return search date + */ + def datetimeToISO8601(date: String): Long = { + val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") + val now = new java.util.Date() + format.format(now) + try { + format.parse(date).getTime + } catch { + case e: Exception => now.getTime + } } } \ No newline at end of file