Skip to content

Commit

Permalink
ajustes Homero
Browse files Browse the repository at this point in the history
  • Loading branch information
homero-merino committed Mar 7, 2018
1 parent ef44277 commit 914e3e9
Showing 1 changed file with 66 additions and 9 deletions.
75 changes: 66 additions & 9 deletions wiki-spark/scala/src/main/scala/pageRank/Improved.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package pageRank

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

import pageRank.dataTypes.WikipediaRevision
import pageRank.dataTypes.WikipediaRevision

import org.apache.spark.api.java.JavaPairRDD
import java.util.ArrayList
import utils.ISO8601
Expand All @@ -19,14 +22,30 @@ import org.apache.spark.rdd.RDD
object Improved {

def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("MrPageRankII usage: PageRank <String input_filepath> <String output_filepath> <Int iterations> <Datetime YYYY-MM-DD'T'HH:mm:ss'Z' format>")
System.exit(1)
}

// arguments handling
val inputFile = args(0)
val outputFile = args(1)
val iterations = args(2).toInt
val timeLimit = if (args.length > 3) ISO8601.toTimeMS(args(3)) else new Date().getTime

val conf = new SparkConf().setAppName("Original Page Rank")
val outputFile = new Path(args(1))
val iterations = iterToInt(args(2))
val timeLimit = if (args.length == 4) datetimeToISO8601(args(3)) else new Date().getTime


// create configuration object and register application
val conf = new SparkConf().setAppName("Mighty-WikiPageRank_II")
// obtain Spark context from configuration object
val sc = new SparkContext(conf)
sc.hadoopConfiguration.set("textinputformat.record.delimiter", "\n\n")

// Hadoop configuration - clean environment
// Obtain filesystem configuration details from Hadoop's cluster
val fsys = FileSystem.get(sc.hadoopConfiguration);
// Delete recursively results - output directory if exists
fsys.delete(outputFile, true)
// Set input format record delimiter for Wikipedia's HDFS files
sc.hadoopConfiguration.set("textinputformat.record.delimiter", "\n\n")

// All revisions, filtered initially by date
val revisions: RDD[WikipediaRevision] = sc.newAPIHadoopFile(inputFile, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
Expand Down Expand Up @@ -101,7 +120,45 @@ object Improved {
ranks = contribs.reduceByKey((a, b) => a + b).mapValues(v => 0.15 + v * 0.85);
}

ranks.sortBy(f=>f._2, false).map(prs => prs._1 + " " + prs._2).saveAsTextFile(outputFile)

//ranks.repartition(40).sortBy(f=>f._2, false).map(prs => prs._1 + " " + prs._2).saveAsTextFile(args(2))
ranks.map(prs => prs._1 + " " + prs._2).saveAsTextFile(args(2))

// Close the SparkContextobject, releasing its resources.
sc.stop()
}

/**
* Handles iterations - args(2) integer conversion.
* If a string representation can not be converted the returned default value is 10.
* No exception is raised since an automatic adjustment is placed to continue with
* the page rank calculation.
* @param iter string representing the number of iterations.
* @return iterations integer
*/
def iterToInt(iter: String): Int = {
try {
iter.toInt
} catch {
case e: Exception => 10
}
}

/**
* Handles PageRank timeLimit - args(3) date conversion.
* If a string representation can not be converted the returned default value is the actual date.
* No exception is raised since an automatic adjustment is placed to continue with
* the page rank calculation.
* @param date string representing an ISO8601-formatted date.
* @return search date
*/
def datetimeToISO8601(date: String): Long = {
val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
val now = new java.util.Date()
format.format(now)
try {
format.parse(date).getTime
} catch {
case e: Exception => now.getTime
}
}
}

0 comments on commit 914e3e9

Please sign in to comment.