Skip to content

Commit

Permalink
Merge pull request #16 from varshamahuli97/cbrelease-4.8.17
Browse files Browse the repository at this point in the history
Cbrelease 4.8.17
  • Loading branch information
sivaprakash123 authored Sep 21, 2024
2 parents e8a9ca7 + 13d6cd4 commit f4485e4
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.ekstep.analytics.dashboard.DataUtil._
import org.ekstep.analytics.framework._
import org.joda.time.DateTime
import org.apache.spark.sql.functions.col

import java.text.SimpleDateFormat
import java.time.{LocalDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter
import java.time.LocalDate

/**
Expand Down Expand Up @@ -106,11 +107,15 @@ object DashboardSyncModel extends AbsDashboardModel {

Redis.dispatchDataFrame[String]("dashboard_top_10_learners_on_kp_by_user_org", resultDF, "userOrgID", "top_learners")

val (hierarchy1DF, cbpDetailsWithCompDF, cbpDetailsDF,
cbpDetailsWithRatingDF) = contentDataFrames(orgDF, Seq("Course", "Program", "Blended Program", "Curated Program"))
val cbpCompletionWithDetailsDF = allCourseProgramCompletionWithDetailsDataFrame(userCourseProgramCompletionDF, cbpDetailsDF, userOrgDF)

// update redis data for learner home page
updateLearnerHomePageData(orgDF, userOrgDF, userCourseProgramCompletionDF)
updateLearnerHomePageData(orgDF, userOrgDF, userCourseProgramCompletionDF, cbpCompletionWithDetailsDF, cbpDetailsWithRatingDF)

// update redis data for dashboards
dashboardRedisUpdates(orgRoleCount, userDF, allCourseProgramDetailsWithRatingDF, allCourseProgramCompletionWithDetailsDF, allCourseProgramCompetencyDF)
dashboardRedisUpdates(orgRoleCount, userDF, allCourseProgramDetailsWithRatingDF, allCourseProgramCompletionWithDetailsDF, allCourseProgramCompetencyDF, cbpCompletionWithDetailsDF)

// update cbp top 10 reviews
cbpTop10Reviews(allCourseProgramDetailsWithRatingDF)
Expand All @@ -119,7 +124,7 @@ object DashboardSyncModel extends AbsDashboardModel {
}

def dashboardRedisUpdates(orgRoleCount: DataFrame, userDF: DataFrame, allCourseProgramDetailsWithRatingDF: DataFrame,
allCourseProgramCompletionWithDetailsDF: DataFrame, allCourseProgramCompetencyDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = {
allCourseProgramCompletionWithDetailsDF: DataFrame, allCourseProgramCompetencyDF: DataFrame, cbpCompletionWithDetailsDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = {
import spark.implicits._
// new redis updates - start
// MDO onboarded, with atleast one MDO_ADMIN/MDO_LEADER
Expand Down Expand Up @@ -351,9 +356,61 @@ object DashboardSyncModel extends AbsDashboardModel {
val competencyCountByCBPDF = courseIDsByCBPConcatenatedDF.withColumnRenamed("courseOrgID", "courseOrgID").withColumnRenamed("sortedCourseIDs", "courseIDs")
Redis.dispatchDataFrame[Long]("dashboard_competencies_count_by_course_org", competencyCountByCBPDF, "courseOrgID", "courseIDs")

// national learning week metrics
val nationalLearningWeekStartString = conf.nationalLearningWeekStart
val nationalLearningWeekEndString = conf.nationalLearningWeekEnd
val zoneOffset = ZoneOffset.ofHoursMinutes(5, 30)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
// Parse the strings to LocalDateTime
val nationalLearningWeekStartDateTime = LocalDateTime.parse(nationalLearningWeekStartString, formatter)
val nationalLearningWeekEndDateTime = LocalDateTime.parse(nationalLearningWeekEndString, formatter)
val nationalLearningWeekStartOffsetDateTime = nationalLearningWeekStartDateTime.atOffset(zoneOffset)
val nationalLearningWeekEndOffsetDateTime = nationalLearningWeekEndDateTime.atOffset(zoneOffset)
// Convert OffsetDateTime to epoch seconds
val nationalLearningWeekStartDateTimeEpoch = nationalLearningWeekStartOffsetDateTime.toEpochSecond
val nationalLearningWeekEndDateTimeEpoch = nationalLearningWeekEndOffsetDateTime.toEpochSecond

/* total enrolments that week across all types of content */
val enrolmentContentNLWDF = liveRetiredContentEnrolmentDF.filter($"courseEnrolledTimestamp".isNotNull && $"courseEnrolledTimestamp" =!= "" && $"courseEnrolledTimestamp".between(nationalLearningWeekStartDateTimeEpoch, nationalLearningWeekEndDateTimeEpoch))
val enrolmentContentNLWCountDF = enrolmentContentNLWDF.agg(count("*").alias("count"))
val enrolmentContentNLWCount = enrolmentContentNLWCountDF.select("count").first().getLong(0)
Redis.update("dashboard_content_enrolment_nlw_count", enrolmentContentNLWCount.toString)


/* total certificates issued yesterday across all types of content */
val certificateDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX")

// Calculate start and end of the previous day as OffsetDateTime
val previousDayStart = currentDate.minusDays(1).atStartOfDay().atOffset(zoneOffset)
val previousDayEnd = currentDate.atStartOfDay().minusSeconds(1).atOffset(zoneOffset)
val previousDayStartString = previousDayStart.format(certificateDateTimeFormatter)
val previousDayEndString = previousDayEnd.format(certificateDateTimeFormatter)
val certificateGeneratedYdayDF = liveRetiredContentEnrolmentDF.filter($"certificateGeneratedOn".isNotNull && $"certificateGeneratedOn" =!= "" && $"certificateGeneratedOn".between(previousDayStartString, previousDayEndString))
val certificateGeneratedYdayCountDF = certificateGeneratedYdayDF.agg(count("*").alias("count"))
val certificateGeneratedYdayCount = certificateGeneratedYdayCountDF.select("count").first().getLong(0)
Redis.update("dashboard_content_certificates_generated_yday_nlw_count", certificateGeneratedYdayCount.toString)


/* total certificates issued that week across all types of content */
val nationalLearningWeekStartDateTimeString = nationalLearningWeekStartOffsetDateTime.format(certificateDateTimeFormatter)
val nationalLearningWeekEndDateTimeString = nationalLearningWeekEndOffsetDateTime.format(certificateDateTimeFormatter)
val certificateGeneratedInNLWDF = liveRetiredContentEnrolmentDF.filter($"certificateGeneratedOn".isNotNull && $"certificateGeneratedOn" =!= "" && $"certificateGeneratedOn".between(nationalLearningWeekStartDateTimeString, nationalLearningWeekEndDateTimeString))
val certificateGeneratedInNLWCountDF = certificateGeneratedInNLWDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount"))
val certificateGeneratedInNLWCount = certificateGeneratedInNLWCountDF.select("count").first().getLong(0)
Redis.update("dashboard_content_certificates_generated_nlw_count", certificateGeneratedInNLWCount.toString)

/* total number of events published that week */
// Redis.update("dashboard_events_published_nlw_count", eventsPublishedInNLWCount.toString)
/* certificates issued by user that week across all types of content*/
val certificateGeneratedInNLWByUserDF = certificateGeneratedInNLWDF.groupBy("userID").agg(count("*").alias("count"))
Redis.dispatchDataFrame[String]("dashboard_content_certificates_issued_nlw_by_user", certificateGeneratedInNLWByUserDF, "userID", "count")

/* learning hours by user that week across all types of content*/
val enrolmentContentDurationNLWByUserDF = cbpCompletionWithDetailsDF.filter("userID IS NOT NULL AND TRIM(userID) != ''").groupBy("userID").agg(sum(expr("(completionPercentage / 100) * courseDuration")).alias("totalLearningSeconds"))
.withColumn("totalLearningHours", bround(col("totalLearningSeconds") / 3600, 2)).select("userID", "totalLearningHours")
Redis.dispatchDataFrame[String]("dashboard_content_learning_hours_nlw_by_user", enrolmentContentDurationNLWByUserDF, "userID", "totalLearningHours")
// get the count for each courseID
val topContentCountDF = liveCourseProgramExcludingModeratedCompletedDF.groupBy("courseID").agg(count("*").alias("count"))

val liveCourseProgramExcludingModeratedCompletedWithCountDF = liveCourseProgramExcludingModeratedCompletedDF.join(topContentCountDF, "courseID")

val topCoursesByCBPDF = liveCourseProgramExcludingModeratedCompletedDF
Expand Down Expand Up @@ -602,11 +659,7 @@ object DashboardSyncModel extends AbsDashboardModel {
averageMonthlyActiveUserCount
}

def updateLearnerHomePageData(orgDF: DataFrame, userOrgDF: DataFrame, userCourseProgramCompletionDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = {

val (hierarchyDF, cbpDetailsWithCompDF, cbpDetailsDF,
cbpDetailsWithRatingDF) = contentDataFrames(orgDF, Seq("Course", "Program", "Blended Program", "Curated Program"))
val cbpCompletionWithDetailsDF = allCourseProgramCompletionWithDetailsDataFrame(userCourseProgramCompletionDF, cbpDetailsDF, userOrgDF)
def updateLearnerHomePageData(orgDF: DataFrame, userOrgDF: DataFrame, userCourseProgramCompletionDF: DataFrame, cbpCompletionWithDetailsDF: DataFrame, cbpDetailsWithRatingDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = {

//We only want the date here as the intent is to run this part of the script only once a day. The competency metrics
// script may run a second time if we run into issues and this function should be skipped in that case.
Expand Down
Loading

0 comments on commit f4485e4

Please sign in to comment.