Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #SB-24793: Response Exhaust V2 job to support assessment blob data #451

Open
wants to merge 5 commits into
base: release-4.4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.sunbird.analytics.exhaust.collection

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf, explode_outer, round}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{JSONUtils, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}

object ResponseExhaustJobV2 extends optional.Application with BaseCollectionExhaustJob {

override def getClassName = "org.sunbird.analytics.exhaust.collection.ResponseExhaustJobV2"
override def jobName() = "ResponseExhaustJobV2";
override def jobId() = "response-exhaust";
override def getReportPath() = "response-exhaust/";
override def getReportKey() = "response";
private val partitionCols = List("batch_id", "year", "week_of_year")
utk14 marked this conversation as resolved.
Show resolved Hide resolved

private val persistedDF:scala.collection.mutable.ListBuffer[DataFrame] = scala.collection.mutable.ListBuffer[DataFrame]();

private val assessmentAggDBSettings = Map("table" -> "assessment_aggregator", "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster");

private val filterColumns = Seq("courseid", "collectionName", "batchid", "batchName", "userid", "content_id", "contentname", "attempt_id", "last_attempted_on", "questionid",
"questiontype", "questiontitle", "questiondescription", "questionduration", "questionscore", "questionmaxscore", "questionoption", "questionresponse");
private val columnsOrder = List("Collection Id", "Collection Name", "Batch Id", "Batch Name", "User UUID", "QuestionSet Id", "QuestionSet Title", "Attempt Id", "Attempted On",
"Question Id", "Question Type", "Question Title", "Question Description", "Question Duration", "Question Score", "Question Max Score", "Question Options", "Question Response");
val columnMapping = Map("courseid" -> "Collection Id", "collectionName" -> "Collection Name", "batchid" -> "Batch Id", "batchName" -> "Batch Name", "userid" -> "User UUID",
"content_id" -> "QuestionSet Id", "contentname" -> "QuestionSet Title", "attempt_id" -> "Attempt Id", "last_attempted_on" -> "Attempted On", "questionid" -> "Question Id",
"questiontype" -> "Question Type", "questiontitle" -> "Question Title", "questiondescription" -> "Question Description", "questionduration" -> "Question Duration",
"questionscore" -> "Question Score", "questionmaxscore" -> "Question Max Score", "questionoption" -> "Question Options", "questionresponse" -> "Question Response")

case class AssessmentData(user_id: String, course_id: String,batch_id: String,content_id: String, attempt_id: String,created_on: Option[String],grand_total: Option[String],last_attempted_on: Option[String],question: List[Question],total_max_score: Option[Double],total_score: Option[Double],updated_on: Option[String]) extends scala.Product with scala.Serializable
case class Question(id: String, assess_ts: String,max_score: Double, score: Double,`type`: String,title: String,resvalues: List[Map[String, String]],params: List[Map[String, String]],description: String,duration: Double) extends scala.Product with scala.Serializable

override def processBatch(userEnrolmentDF: DataFrame, collectionBatch: CollectionBatch)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = {
val assessmentDF = getAssessmentDF(userEnrolmentDF, collectionBatch).persist();
persistedDF.append(assessmentDF);
val contentIds = assessmentDF.select("content_id").dropDuplicates().collect().map(f => f.get(0));
val contentDF = searchContent(Map("request" -> Map("filters" -> Map("identifier" -> contentIds)))).withColumnRenamed("collectionName", "contentname").select("identifier", "contentname");
val reportDF = assessmentDF.join(contentDF, assessmentDF("content_id") === contentDF("identifier"), "left_outer").drop("identifier").select(filterColumns.head, filterColumns.tail: _*);
organizeDF(reportDF, columnMapping, columnsOrder);
}

override def unpersistDFs() {
persistedDF.foreach(f => f.unpersist(true))
}

def getAssessmentDF(userEnrolmentDF: DataFrame, batch: CollectionBatch)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = {

val userEnrolmentDataDF = userEnrolmentDF
.select(
col("userid"),
col("courseid"),
col("collectionName"),
col("batchName"),
col("batchid"))

val batchid = userEnrolmentDataDF.select("batchid").distinct().collect().head.getString(0)

val assessBlobData = getAssessmentBlobDF(batchid, config)

val assessAggregateData = loadData(assessmentAggDBSettings, cassandraFormat, new StructType())

val joinedDF = assessAggregateData.join(assessBlobData, Seq("batch_id", "course_id", "user_id"), "left_outer")
utk14 marked this conversation as resolved.
Show resolved Hide resolved
.select(assessAggregateData.col("*"))

joinedDF.join(userEnrolmentDataDF, joinedDF.col("user_id") === userEnrolmentDataDF.col("userid") && joinedDF.col("course_id") === userEnrolmentDataDF.col("courseid"), "inner")
.select(userEnrolmentDataDF.col("*"), joinedDF.col("question"), col("content_id"), col("attempt_id"), col("last_attempted_on"))
.withColumn("questiondata",explode_outer(col("question")))
.withColumn("questionid" , col("questiondata.id"))
.withColumn("questiontype", col("questiondata.type"))
.withColumn("questiontitle", col("questiondata.title"))
.withColumn("questiondescription", col("questiondata.description"))
.withColumn("questionduration", round(col("questiondata.duration")))
.withColumn("questionscore", col("questiondata.score"))
.withColumn("questionmaxscore", col("questiondata.max_score"))
.withColumn("questionresponse", UDFUtils.toJSON(col("questiondata.resvalues")))
.withColumn("questionoption", UDFUtils.toJSON(col("questiondata.params")))
.drop("question", "questiondata", "question_data")
}

def getAssessmentBlobDF(batchid: String, config: JobConfig)(implicit spark: SparkSession): DataFrame = {
val azureFetcherConfig = config.modelParams.get("assessmentFetcherConfig").asInstanceOf[Map[String, AnyRef]]
utk14 marked this conversation as resolved.
Show resolved Hide resolved

val store = azureFetcherConfig("store")
val format:String = azureFetcherConfig.getOrElse("format", "csv").asInstanceOf[String]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the multiple archived data has created and data has been duplicated for a batch then how are we handling it here? If not could you please handle this scenario?

val url = store match {
case "local" =>
val filePath = azureFetcherConfig("filePath").asInstanceOf[String]
filePath + s"${batchid}-*.${format}"
case "s3" | "azure" =>
val bucket = azureFetcherConfig("container").asInstanceOf[String]
val key = AppConf.getConfig("azure_storage_key")
val filePath = azureFetcherConfig.getOrElse("filePath", "data-archival/").asInstanceOf[String]
val file = s"${filePath}${batchid}-*.${format}"
s"wasb://$bucket@$key.blob.core.windows.net/$file."
}
JobLogger.log(s"Fetching data from ${store} for batchid: " + batchid)

val assessAggData = spark.read.format("csv")
.option("header","true")
.load(url)

assessAggData.withColumn("question", extractFromArrayStringFun(col("question")))
}

def extractFromArrayStringFun: UserDefinedFunction =
udf { str: String => JSONUtils.deserialize[List[Question]](str) }
utk14 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package org.sunbird.analytics.exhaust

import org.apache.spark.sql.{Encoders, SparkSession}
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}
import org.joda.time.DateTimeZone
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
import org.scalamock.scalatest.MockFactory
import org.sunbird.analytics.exhaust.collection.ResponseExhaustJobV2
import org.sunbird.analytics.job.report.BaseReportSpec
import org.sunbird.analytics.util.{EmbeddedCassandra, EmbeddedPostgresql, RedisConnect}
import redis.clients.jedis.Jedis
import redis.embedded.RedisServer

import scala.collection.JavaConverters._

class TestResponseExhaustJobV2 extends BaseReportSpec with MockFactory with BaseReportsJob {

val jobRequestTable = "job_request"
implicit var spark: SparkSession = _
var redisServer: RedisServer = _
var jedis: Jedis = _
val outputLocation = AppConf.getConfig("collection.exhaust.store.prefix")

override def beforeAll(): Unit = {
super.beforeAll()
spark = getSparkSession();

redisServer = new RedisServer(6341)
redisServer.start()
setupRedisData()
EmbeddedCassandra.loadData("src/test/resources/exhaust/report_data.cql") // Load test data in embedded cassandra server
EmbeddedPostgresql.start()
EmbeddedPostgresql.createJobRequestTable()
}

override def afterAll() : Unit = {
super.afterAll()
new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, outputLocation)
redisServer.stop()
spark.close()

EmbeddedCassandra.close()
EmbeddedPostgresql.close()
}

def setupRedisData(): Unit = {
val redisConnect = new RedisConnect("localhost", 6341)
val jedis = redisConnect.getConnection(0, 100000)
jedis.hmset("user:user-001", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Manju", "userid": "user-001", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "01250894314817126443", "email": "manju@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-002", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Mahesh", "userid": "user-002", "state": "Andhra Pradesh", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "01285019302823526477", "email": "mahesh@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-003", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Sowmya", "userid": "user-003", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "01250894314817126443", "email": "sowmya@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-004", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Utkarsha", "userid": "user-004", "state": "Delhi", "district": "babarpur", "userchannel": "sunbird-dev", "rootorgid": "01250894314817126443", "email": "utkarsha@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-005", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Isha", "userid": "user-005", "state": "MP", "district": "Jhansi", "userchannel": "sunbird-dev", "rootorgid": "01250894314817126443", "email": "isha@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-006", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Revathi", "userid": "user-006", "state": "Andhra Pradesh", "district": "babarpur", "userchannel": "sunbird-dev", "rootorgid": "01250894314817126443", "email": "revathi@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-007", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Sunil", "userid": "user-007", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "0126391644091351040", "email": "sunil@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-008", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Anoop", "userid": "user-008", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "anoop@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-009", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Kartheek", "userid": "user-009", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "01285019302823526477", "email": "kartheekp@ilimi.in", "usersignintype": "Validated"};"""))
jedis.hmset("user:user-010", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Anand", "userid": "user-010", "state": "Tamil Nadu", "district": "Chennai", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "anandp@ilimi.in", "usersignintype": "Validated"};"""))
jedis.close()
}

"TestResponseExhaustJob" should "generate final output as csv and zip files" in {
EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable")
EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration) VALUES ('do_1131350140968632321230_batch-001:01250894314817126443', '37564CF8F134EE7532F125651B51D17F', 'response-exhaust', 'SUBMITTED', '{\"batchId\": \"batch-001\"}', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0);")

implicit val fc = new FrameworkContext()
val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.ResponseExhaustJobV2","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"assessmentFetcherConfig":{"store":"local","filePath":"src/test/resources/exhaust/data-archival/"},"searchFilter":{},"sparkElasticsearchConnectionHost":"localhost","sparkRedisConnectionHost":"localhost","sparkUserDbRedisIndex":"0","sparkUserDbRedisPort":6341,"sparkCassandraConnectionHost":"localhost","fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"ResponseExhaustJob Exhaust"}"""
val jobConfig = JSONUtils.deserialize[JobConfig](strConfig)
implicit val config = jobConfig

ResponseExhaustJobV2.execute()

val outputDir = "response-exhaust"
val batch1 = "batch-001"
val requestId = "37564CF8F134EE7532F125651B51D17F"
val filePath = ResponseExhaustJobV2.getFilePath(batch1, requestId)
val jobName = ResponseExhaustJobV2.jobName()

implicit val responseExhaustEncoder = Encoders.product[ResponseExhaustReport]
val batch1Results = spark.read.format("csv").option("header", "true")
.load(s"$outputLocation/$filePath.csv").as[ResponseExhaustReport].collectAsList().asScala
batch1Results.size should be (6)

val user1Result = batch1Results.filter(f => f.`User UUID`.equals("user-001"))
user1Result.foreach(f => println("user1data: " + JSONUtils.serialize("f")))
user1Result.map(f => f.`Collection Id`).toList should contain atLeastOneElementOf List("do_1130928636168192001667")
user1Result.map(f => f.`Batch Id`).toList should contain atLeastOneElementOf List("BatchId_batch-001")
user1Result.map(f => f.`User UUID`).toList should contain atLeastOneElementOf List("user-001")
user1Result.map(f => f.`Attempt Id`).toList should contain atLeastOneElementOf List("attempat-001")
user1Result.map(f => f.`QuestionSet Id`).toList should contain atLeastOneElementOf List("do_1128870328040161281204", "do_112876961957437440179")
user1Result.map(f => f.`QuestionSet Title`).toList should contain atLeastOneElementOf List("SelfAssess for course", "Assessment score report using summary plugin")
user1Result.map(f => f.`Question Id`).toList should contain theSameElementsAs List("do_213019475454476288155", "do_213019970118279168165", "do_213019972814823424168", "do_2130256513760624641171")
user1Result.map(f => f.`Question Type`).toList should contain theSameElementsAs List("mcq", "mcq", "mtf", "mcq")
user1Result.map(f => f.`Question Title`).toList should contain atLeastOneElementOf List("testQuestiontextandformula", "test with formula")
user1Result.map(f => f.`Question Description`).toList should contain atLeastOneElementOf List("testQuestiontextandformula")
user1Result.map(f => f.`Question Duration`).toList should contain theSameElementsAs List("1", "1", "2", "12")
user1Result.map(f => f.`Question Score`).toList should contain theSameElementsAs List("1.0", "1.0", "0.33", "10.0")
user1Result.map(f => f.`Question Max Score`).toList should contain theSameElementsAs List("1.0", "1.0", "1.0", "10.0")
user1Result.map(f => f.`Question Options`).head should be ("""[{'1':'{'text':'A=pi r^2'}'},{'2':'{'text':'no'}'},{'answer':'{'correct':['1']}'}]""")
user1Result.map(f => f.`Question Response`).head should be ("""[{'1':'{'text':'A=pi r^2'}'}]""")

val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='response-exhaust'")

while(pResponse.next()) {
pResponse.getString("err_message") should be ("")
pResponse.getString("dt_job_submitted") should be ("2020-10-19 05:58:18.666")
pResponse.getString("download_urls") should be (s"{reports/response-exhaust/$requestId/batch-001_response_${getDate()}.zip}")
pResponse.getString("dt_file_created") should be (null)
pResponse.getString("iteration") should be ("0")
}

}

def getDate(): String = {
val dateFormat: DateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.forOffsetHoursMinutes(5, 30));
dateFormat.print(System.currentTimeMillis());
}

}