-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPairWise.scala
134 lines (104 loc) · 5.16 KB
/
PairWise.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, HashingTF, IDF, RegexTokenizer}
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, IndexedRow, IndexedRowMatrix, RowMatrix}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
// Program takes 1 parameter as argument (K), the number of top K most similar members
object PairWise {
final val inputPath = "file:///home/ozzy/Desktop/bd/dtst.csv"
final val stopwordsPath = "file:///home/ozzy/Desktop/bd/avoulos-main/aux_files/stopwords_gr.txt"
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder().appName("pairwise").master("local[*]").getOrCreate()
ss.sparkContext.setLogLevel("ERROR")
import ss.implicits._
// load csv file as a data frame
val df = ss.read.option("header", "true")
.csv(inputPath)
.select($"member_name", $"speech")
.na.drop()
// concat all speeches grouped by each candidate
val speechesDF = df.groupBy("member_name")
.agg(concat_ws(",", collect_list("speech")).alias("speeches"))
// tokenize speeches
val speechesTokenizedDF = new RegexTokenizer().setInputCol("speeches")
.setOutputCol("speechesTok")
.setMinTokenLength(4)
.setToLowercase(true)
.setPattern("[\\s.,!-~'\";*^%#$@()&<>/ ]")
.transform(speechesDF)
// read stopwords file
val stopwords_gr = ss.sparkContext.textFile(stopwordsPath)
.map(w => w.dropRight(2))
.collect
.toSet
// user-defined-function to perform the filtering
val udf_filter_stop = udf((s: mutable.WrappedArray[String]) => s.filter(w => !stopwords_gr.contains(w)))
// apply the stopwords filter
val cleanDF = speechesTokenizedDF.
select($"member_name".as("id"), udf_filter_stop($"speechesTok").as("rFeatures")).cache()
// TF-IDF pipeline
// First use hashing tf to get the term frequencies
val hashingTF = new HashingTF()
.setNumFeatures(3000)
.setInputCol("rFeatures")
.setOutputCol("rawFeatures")
// then apply the TF-IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// create the pipeline and run it
val speechesTF_IDF = new Pipeline()
.setStages(Array(hashingTF, idf))
.fit(cleanDF)
.transform(cleanDF)
.drop(Seq("rawFeatures", "rFeatures"): _*)
// user-defined function to convert tf-idf to a dense vector
val udf_toDense = udf((s: org.apache.spark.ml.linalg.Vector) => s.toDense)
// apply the udf to the rFeatures column (TF-IDF) and return a single column data frame with the TF-IDF scores
val df1 = speechesTF_IDF.withColumn("dense", udf_toDense($"features")).select("dense")
/*
create a new dataframe with 2 columns, the index (in ascending order from 1 to length.rows)
and the TF-IDF scores
*/
val df2 = df1.rdd.map(row => row(0).asInstanceOf[org.apache.spark.ml.linalg.DenseVector])
.zipWithIndex
.toDF(Seq("dense", "id"): _*)
// reverse the order of the columns so that index is first
val df3 = df2.select(df2.columns.map(df2(_)).reverse: _*)
// create an indexed row matrix and transpose it so that each column holds the TF-IDF score for a member
val rowMatrix = new IndexedRowMatrix(df3.rdd.map(r =>
IndexedRow(r(0).asInstanceOf[Number].longValue(),
OldVectors.fromML(r(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))))
.toBlockMatrix
.transpose
.toIndexedRowMatrix
// once the matrix is transpose we can compute column-column cosine similarities for each pair of columns
val similaritiesDF = rowMatrix.columnSimilarities().entries.toDF(Seq("S1", "S2", "CosineSim"): _*)
// create an indexed data frame, the first column is the ascending index and the second the name of the member(id)
val namesDF = cleanDF.withColumn("index", row_number().over(Window.orderBy("rFeatures")))
.select($"index", $"id")
// convert the namesDF to a map of key-value pairs (index, member_name)
val mapToNames : Map[Int,String] = namesDF
.map(row => (row(0).asInstanceOf[Number].intValue(),row(1).asInstanceOf[String]))
.collect
.toMap
// udf to map an index to the member associated with it
val udf_toName = udf( (s : Int) => mapToNames getOrElse(s,"null") )
val reorderColumns : Seq[String] = Seq("member_name_1", "member_name_2", "CosineSim")
// apply the udf to map the first two columns of integers to actual member names
val finalDF = similaritiesDF
.withColumn("member_name_1", udf_toName($"S1"))
.withColumn("member_name_2", udf_toName($"S2"))
.drop(Seq("S1", "S2") : _*)
.select(reorderColumns.head, reorderColumns.tail : _*)
// show top K most similar members
val topK : Int = args(0).toInt
finalDF.sort(desc("CosineSim")).show(topK,false)
ss.stop()
}
}