Skip to content

Commit

Permalink
Merge pull request #9 from FsherinP/exhaust-changes
Browse files Browse the repository at this point in the history
added exhaust fix to 16
  • Loading branch information
varshamahuli97 authored Aug 30, 2024
2 parents dd6618b + 4e55132 commit 241523f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package org.ekstep.analytics.dashboard.exhaust
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.ekstep.analytics.dashboard.DashboardUtil._
import org.ekstep.analytics.dashboard.DataUtil.Schema
import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig}
import org.ekstep.analytics.framework._

Expand All @@ -30,8 +32,63 @@ object DataExhaustModel extends AbsDashboardModel {
cache.write(batchDF, "batch")

val userAssessmentDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraUserAssessmentTable)
show(userAssessmentDF, "userAssessmentDF")
cache.write(userAssessmentDF, "userAssessment")
.select(
col("assessmentid").alias("assessChildID"),
col("starttime").alias("assessStartTime"),
col("endtime").alias("assessEndTime"),
col("status").alias("assessUserStatus"),
col("userid").alias("userID"),
col("assessmentreadresponse"),
col("submitassessmentresponse"),
col("submitassessmentrequest")
)
.na.fill("{}", Seq("submitassessmentresponse", "submitassessmentrequest"))
.withColumn("readResponse", from_json(col("assessmentreadresponse"), Schema.assessmentReadResponseSchema))
.withColumn("submitRequest", from_json(col("submitassessmentrequest"), Schema.submitAssessmentRequestSchema))
.withColumn("submitResponse", from_json(col("submitassessmentresponse"), Schema.submitAssessmentResponseSchema))
.withColumn("assessStartTimestamp", col("assessStartTime"))
.withColumn("assessEndTimestamp", col("assessEndTime"))

val assessWithSchema = userAssessmentDF.select(
col("assessChildID"),
col("assessStartTimestamp"),
col("assessEndTimestamp"),
col("assessUserStatus"),
col("userID"),

col("readResponse.totalQuestions").alias("assessTotalQuestions"),
col("readResponse.maxQuestions").alias("assessMaxQuestions"),
col("readResponse.expectedDuration").alias("assessExpectedDuration"),
col("readResponse.version").alias("assessVersion"),
col("readResponse.maxAssessmentRetakeAttempts").alias("assessMaxRetakeAttempts"),
col("readResponse.status").alias("assessReadStatus"),
col("readResponse.primaryCategory").alias("assessPrimaryCategory"),

col("submitRequest.batchId").alias("assessBatchID"),
col("submitRequest.courseId").alias("courseID"),
col("submitRequest.isAssessment").cast(IntegerType).alias("assessIsAssessment"),
col("submitRequest.timeLimit").alias("assessTimeLimit"),

col("submitResponse.result").alias("assessResult"),
col("submitResponse.total").alias("assessTotal"),
col("submitResponse.blank").alias("assessBlank"),
col("submitResponse.correct").alias("assessCorrect"),
col("submitResponse.incorrect").alias("assessIncorrect"),
col("submitResponse.pass").cast(IntegerType).alias("assessPass"),
col("submitResponse.overallResult").alias("assessOverallResult"),
col("submitResponse.passPercentage").alias("assessPassPercentage")
)

val finalAssessmentDF = assessWithSchema.select(
col("assessChildID"),col("assessUserStatus"),col("userID"),col("assessMaxQuestions"),col("assessExpectedDuration"),col("assessPrimaryCategory"),
col("assessBlank"),col("assessCorrect"),col("assessIncorrect"),
col("assessPass"),col("assessOverallResult"),col("assessPassPercentage"), col("courseID"),
col("assessTotalQuestions"), col("assessVersion"), col("assessMaxRetakeAttempts"), col("assessReadStatus"), col("assessBatchID"), col("assessIsAssessment"), col("assessTimeLimit"),
col("assessResult"), col("assessTotal"),col("assessStartTimestamp"),
col("assessEndTimestamp")
)
show(finalAssessmentDF, "userAssessmentDF")
cache.write(finalAssessmentDF, "userAssessment")

val hierarchyDF = cassandraTableAsDataFrame(conf.cassandraHierarchyStoreKeyspace, conf.cassandraContentHierarchyTable)
show(hierarchyDF, "hierarchyDF")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object UserAssessmentModel extends AbsDashboardModel {
kafkaDispatch(withTimestamp(assessWithDetailsDF, timestamp), conf.assessmentTopic)

val assessChildrenDF = assessmentChildrenDataFrame(assessWithHierarchyDF)
val userAssessmentDF = userAssessmentDataFrame()
val userAssessmentDF = cache.load("userAssessment")
val userAssessChildrenDF = userAssessmentChildrenDataFrame(userAssessmentDF, assessChildrenDF)
val userAssessChildrenDetailsDF = userAssessmentChildrenDetailsDataFrame(userAssessChildrenDF, assessWithDetailsDF,
allCourseProgramDetailsWithRatingDF, userOrgDF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ object CourseBasedAssessmentModel extends AbsDashboardModel {
val assessWithDetailsDF = assessWithHierarchyDF.drop("children")

val assessChildrenDF = assessmentChildrenDataFrame(assessWithHierarchyDF)
val userAssessmentDF = userAssessmentDataFrame().filter(col("assessUserStatus") === "SUBMITTED")
val userAssessmentDF = cache.load("userAssessment").filter(col("assessUserStatus") === "SUBMITTED")
.withColumn("assessStartTime", col("assessStartTimestamp").cast("long"))
.withColumn("assessEndTime", col("assessEndTimestamp").cast("long"))
val userAssessChildrenDF = userAssessmentChildrenDataFrame(userAssessmentDF, assessChildrenDF)
val userAssessChildrenDetailsDF = userAssessmentChildrenDetailsDataFrame(userAssessChildrenDF, assessWithDetailsDF,
allCourseProgramDetailsWithRatingDF, userOrgDF)
Expand Down

0 comments on commit 241523f

Please sign in to comment.