diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala index d259db5f..b9ffa80d 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala @@ -10,8 +10,9 @@ import org.ekstep.analytics.dashboard.DataUtil._ import org.ekstep.analytics.framework._ import org.joda.time.DateTime import org.apache.spark.sql.functions.col - import java.text.SimpleDateFormat +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter import java.time.LocalDate /** @@ -106,11 +107,15 @@ object DashboardSyncModel extends AbsDashboardModel { Redis.dispatchDataFrame[String]("dashboard_top_10_learners_on_kp_by_user_org", resultDF, "userOrgID", "top_learners") + val (hierarchy1DF, cbpDetailsWithCompDF, cbpDetailsDF, + cbpDetailsWithRatingDF) = contentDataFrames(orgDF, Seq("Course", "Program", "Blended Program", "Curated Program")) + val cbpCompletionWithDetailsDF = allCourseProgramCompletionWithDetailsDataFrame(userCourseProgramCompletionDF, cbpDetailsDF, userOrgDF) + // update redis data for learner home page - updateLearnerHomePageData(orgDF, userOrgDF, userCourseProgramCompletionDF) + updateLearnerHomePageData(orgDF, userOrgDF, userCourseProgramCompletionDF, cbpCompletionWithDetailsDF, cbpDetailsWithRatingDF) // update redis data for dashboards - dashboardRedisUpdates(orgRoleCount, userDF, allCourseProgramDetailsWithRatingDF, allCourseProgramCompletionWithDetailsDF, allCourseProgramCompetencyDF) + dashboardRedisUpdates(orgRoleCount, userDF, allCourseProgramDetailsWithRatingDF, allCourseProgramCompletionWithDetailsDF, allCourseProgramCompetencyDF, cbpCompletionWithDetailsDF) // update cbp top 10 reviews cbpTop10Reviews(allCourseProgramDetailsWithRatingDF) @@ -119,7 +124,7 @@ object DashboardSyncModel extends AbsDashboardModel { } def dashboardRedisUpdates(orgRoleCount: DataFrame, userDF: DataFrame, allCourseProgramDetailsWithRatingDF: DataFrame, - allCourseProgramCompletionWithDetailsDF: DataFrame, allCourseProgramCompetencyDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { + allCourseProgramCompletionWithDetailsDF: DataFrame, allCourseProgramCompetencyDF: DataFrame, cbpCompletionWithDetailsDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { import spark.implicits._ // new redis updates - start // MDO onboarded, with atleast one MDO_ADMIN/MDO_LEADER @@ -351,9 +356,61 @@ object DashboardSyncModel extends AbsDashboardModel { val competencyCountByCBPDF = courseIDsByCBPConcatenatedDF.withColumnRenamed("courseOrgID", "courseOrgID").withColumnRenamed("sortedCourseIDs", "courseIDs") Redis.dispatchDataFrame[Long]("dashboard_competencies_count_by_course_org", competencyCountByCBPDF, "courseOrgID", "courseIDs") + // national learning week metrics + val nationalLearningWeekStartString = conf.nationalLearningWeekStart + val nationalLearningWeekEndString = conf.nationalLearningWeekEnd + val zoneOffset = ZoneOffset.ofHoursMinutes(5, 30) + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") + // Parse the strings to LocalDateTime + val nationalLearningWeekStartDateTime = LocalDateTime.parse(nationalLearningWeekStartString, formatter) + val nationalLearningWeekEndDateTime = LocalDateTime.parse(nationalLearningWeekEndString, formatter) + val nationalLearningWeekStartOffsetDateTime = nationalLearningWeekStartDateTime.atOffset(zoneOffset) + val nationalLearningWeekEndOffsetDateTime = nationalLearningWeekEndDateTime.atOffset(zoneOffset) + // Convert OffsetDateTime to epoch seconds + val nationalLearningWeekStartDateTimeEpoch = nationalLearningWeekStartOffsetDateTime.toEpochSecond + val nationalLearningWeekEndDateTimeEpoch = nationalLearningWeekEndOffsetDateTime.toEpochSecond + + /* total enrolments that week across all types of content */ + val enrolmentContentNLWDF = liveRetiredContentEnrolmentDF.filter($"courseEnrolledTimestamp".isNotNull && $"courseEnrolledTimestamp" =!= "" && $"courseEnrolledTimestamp".between(nationalLearningWeekStartDateTimeEpoch, nationalLearningWeekEndDateTimeEpoch)) + val enrolmentContentNLWCountDF = enrolmentContentNLWDF.agg(count("*").alias("count")) + val enrolmentContentNLWCount = enrolmentContentNLWCountDF.select("count").first().getLong(0) + Redis.update("dashboard_content_enrolment_nlw_count", enrolmentContentNLWCount.toString) + + + /* total certificates issued yesterday across all types of content */ + val certificateDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX") + + // Calculate start and end of the previous day as OffsetDateTime + val previousDayStart = currentDate.minusDays(1).atStartOfDay().atOffset(zoneOffset) + val previousDayEnd = currentDate.atStartOfDay().minusSeconds(1).atOffset(zoneOffset) + val previousDayStartString = previousDayStart.format(certificateDateTimeFormatter) + val previousDayEndString = previousDayEnd.format(certificateDateTimeFormatter) + val certificateGeneratedYdayDF = liveRetiredContentEnrolmentDF.filter($"certificateGeneratedOn".isNotNull && $"certificateGeneratedOn" =!= "" && $"certificateGeneratedOn".between(previousDayStartString, previousDayEndString)) + val certificateGeneratedYdayCountDF = certificateGeneratedYdayDF.agg(count("*").alias("count")) + val certificateGeneratedYdayCount = certificateGeneratedYdayCountDF.select("count").first().getLong(0) + Redis.update("dashboard_content_certificates_generated_yday_nlw_count", certificateGeneratedYdayCount.toString) + + + /* total certificates issued that week across all types of content */ + val nationalLearningWeekStartDateTimeString = nationalLearningWeekStartOffsetDateTime.format(certificateDateTimeFormatter) + val nationalLearningWeekEndDateTimeString = nationalLearningWeekEndOffsetDateTime.format(certificateDateTimeFormatter) + val certificateGeneratedInNLWDF = liveRetiredContentEnrolmentDF.filter($"certificateGeneratedOn".isNotNull && $"certificateGeneratedOn" =!= "" && $"certificateGeneratedOn".between(nationalLearningWeekStartDateTimeString, nationalLearningWeekEndDateTimeString)) + val certificateGeneratedInNLWCountDF = certificateGeneratedInNLWDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) + val certificateGeneratedInNLWCount = certificateGeneratedInNLWCountDF.select("count").first().getLong(0) + Redis.update("dashboard_content_certificates_generated_nlw_count", certificateGeneratedInNLWCount.toString) + + /* total number of events published that week */ + // Redis.update("dashboard_events_published_nlw_count", eventsPublishedInNLWCount.toString) + /* certificates issued by user that week across all types of content*/ + val certificateGeneratedInNLWByUserDF = certificateGeneratedInNLWDF.groupBy("userID").agg(count("*").alias("count")) + Redis.dispatchDataFrame[String]("dashboard_content_certificates_issued_nlw_by_user", certificateGeneratedInNLWByUserDF, "userID", "count") + + /* learning hours by user that week across all types of content*/ + val enrolmentContentDurationNLWByUserDF = cbpCompletionWithDetailsDF.filter("userID IS NOT NULL AND TRIM(userID) != ''").groupBy("userID").agg(sum(expr("(completionPercentage / 100) * courseDuration")).alias("totalLearningSeconds")) + .withColumn("totalLearningHours", bround(col("totalLearningSeconds") / 3600, 2)).select("userID", "totalLearningHours") + Redis.dispatchDataFrame[String]("dashboard_content_learning_hours_nlw_by_user", enrolmentContentDurationNLWByUserDF, "userID", "totalLearningHours") // get the count for each courseID val topContentCountDF = liveCourseProgramExcludingModeratedCompletedDF.groupBy("courseID").agg(count("*").alias("count")) - val liveCourseProgramExcludingModeratedCompletedWithCountDF = liveCourseProgramExcludingModeratedCompletedDF.join(topContentCountDF, "courseID") val topCoursesByCBPDF = liveCourseProgramExcludingModeratedCompletedDF @@ -602,11 +659,7 @@ object DashboardSyncModel extends AbsDashboardModel { averageMonthlyActiveUserCount } - def updateLearnerHomePageData(orgDF: DataFrame, userOrgDF: DataFrame, userCourseProgramCompletionDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { - - val (hierarchyDF, cbpDetailsWithCompDF, cbpDetailsDF, - cbpDetailsWithRatingDF) = contentDataFrames(orgDF, Seq("Course", "Program", "Blended Program", "Curated Program")) - val cbpCompletionWithDetailsDF = allCourseProgramCompletionWithDetailsDataFrame(userCourseProgramCompletionDF, cbpDetailsDF, userOrgDF) + def updateLearnerHomePageData(orgDF: DataFrame, userOrgDF: DataFrame, userCourseProgramCompletionDF: DataFrame, cbpCompletionWithDetailsDF: DataFrame, cbpDetailsWithRatingDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { //We only want the date here as the intent is to run this part of the script only once a day. The competency metrics // script may run a second time if we run into issues and this function should be skipped in that case. diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala index 78dc8caa..b7b62ada 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala @@ -61,8 +61,7 @@ object DataUtil extends Serializable { StructField("verifiedKarmayogi", BooleanType, nullable = true), StructField("mandatoryFieldsExists", BooleanType, nullable = true), StructField("profileImageUrl", StringType, nullable = true), - StructField("personalDetails", personalDetailsSchema, nullable = true), - StructField("profileStatus", StringType, nullable = true) + StructField("personalDetails", personalDetailsSchema, nullable = true) ) if (competencies) { fields.append(StructField("competencies", ArrayType(profileCompetencySchema), nullable = true)) @@ -115,6 +114,16 @@ object DataUtil extends Serializable { StructField("competencyStatus", StringType, nullable = true) )) + val enrolmentCountByUserSchema: StructType = StructType(Seq( + StructField("userID", StringType), + StructField("count", StringType) + )) + + val learningHoursByUserSchema: StructType = StructType(Seq( + StructField("userID", StringType), + StructField("totalLearningHours", StringType) + )) + /* schema definitions for content hierarchy table */ def makeHierarchyChildSchema(children: Boolean = false): StructType = { val fields = ListBuffer( @@ -398,7 +407,6 @@ object DataUtil extends Serializable { .withColumn("userVerified", when(col("profileDetails.verifiedKarmayogi").isNull, false).otherwise(col("profileDetails.verifiedKarmayogi"))) .withColumn("userMandatoryFieldsExists", col("profileDetails.mandatoryFieldsExists")) .withColumn("userProfileImgUrl", col("profileDetails.profileImageUrl")) - .withColumn("userProfileStatus", col("profileDetails.profileStatus")) .withColumn("userPhoneVerified", expr("LOWER(personalDetails.phoneVerified) = 'true'")) .withColumn("fullName", concat_ws(" ", col("firstName"), col("lastName"))) @@ -458,11 +466,6 @@ object DataUtil extends Serializable { roleDF } - def orgCompleteHierarchyDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { - val orgCompleteHierarchyDF = cache.load("orgCompleteHierarchy") - orgCompleteHierarchyDF - } - /** * * @param userOrgDF DataFrame(userID, firstName, lastName, maskedEmail, userStatus, userOrgID, userOrgName, userOrgStatus) @@ -601,7 +604,6 @@ object DataUtil extends Serializable { // now that error handling is done, proceed with business as usual df = df .withColumn("courseOrgID", explode_outer(col("createdFor"))) - .withColumn("contentLanguage", explode_outer(col("language"))) .select( col("identifier").alias("courseID"), col("primaryCategory").alias("category"), @@ -616,8 +618,7 @@ object DataUtil extends Serializable { col("courseOrgID"), col("competencies_v5.competencyAreaId"), col("competencies_v5.competencyThemeId"), - col("competencies_v5.competencySubThemeId"), - col("contentLanguage") + col("competencies_v5.competencySubThemeId") ) @@ -1028,7 +1029,7 @@ object DataUtil extends Serializable { def userCourseProgramCompletionDataFrame(extraCols: Seq[String] = Seq(), datesAsLong: Boolean = false)(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { val selectCols = Seq("userID", "courseID", "batchID", "courseProgress", "dbCompletionStatus", "courseCompletedTimestamp", - "courseEnrolledTimestamp", "lastContentAccessTimestamp", "issuedCertificateCount","issuedCertificateCountPerContent", "firstCompletedOn", "certificateGeneratedOn", "certificateID") ++ extraCols + "courseEnrolledTimestamp", "lastContentAccessTimestamp", "issuedCertificateCount", "firstCompletedOn", "certificateGeneratedOn", "certificateID") ++ extraCols var df = cache.load("enrolment") .where(expr("active=true")) @@ -1036,10 +1037,9 @@ object DataUtil extends Serializable { .withColumn("courseEnrolledTimestamp", col("enrolled_date")) .withColumn("lastContentAccessTimestamp", col("lastcontentaccesstime")) .withColumn("issuedCertificateCount", size(col("issued_certificates"))) - .withColumn("issuedCertificateCountPerContent", when(size(col("issued_certificates")) > 0, lit(1)).otherwise( lit(0))) - .withColumn("certificateGeneratedOn", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("lastIssuedOn"))) - .withColumn("firstCompletedOn", when(col("issued_certificates").isNull, "").otherwise(when(size(col("issued_certificates")) > 0, col("issued_certificates")(0).getItem("lastIssuedOn")).otherwise(""))) - .withColumn("certificateID", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("identifier"))) + .withColumn("certificateGeneratedOn", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(0).getItem("lastIssuedOn"))) + .withColumn("firstCompletedOn", when(col("issued_certificates").isNull, "").otherwise(when(size(col("issued_certificates")) > 0, col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("lastIssuedOn")).otherwise(""))) + .withColumn("certificateID", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(0).getItem("identifier"))) .withColumnRenamed("userid", "userID") .withColumnRenamed("courseid", "courseID") .withColumnRenamed("batchid", "batchID") @@ -1548,17 +1548,11 @@ object DataUtil extends Serializable { * Reading user_karma_points data */ def userKarmaPointsDataFrame()(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): DataFrame = { - val df = cache.load("userKarmaPoints") + val df = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsTable) show(df, "Karma Points data") df } - /** - * Reading old assessment details - */ - def oldAssessmentDetailsDataframe()(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): DataFrame = { - val df = cache.load("oldAssessmentDetails").withColumnRenamed("user_id", "userID").withColumnRenamed("parent_source_id", "courseID") - df - } + /* telemetry data frames */ def loggedInMobileUserDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { @@ -1614,10 +1608,11 @@ object DataUtil extends Serializable { } def npsUpgradedTriggerC1DataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { - val query = """SELECT userID as userid FROM \"nps-upgraded-users-data\" where __time >= CURRENT_TIMESTAMP - INTERVAL '15' DAY""" + val query = """SELECT userID as userid FROM \"nps-upgraded-users-data\" where __time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR""" var df = druidDFOption(query, conf.sparkDruidRouterHost, limit = 1000000).orNull if(df == null) return emptySchemaDataFrame(Schema.npsUserIds) df = df.na.drop(Seq("userid")) + show(df, "user data") df } @@ -1670,7 +1665,6 @@ object DataUtil extends Serializable { ratingsDF } - def userFeedFromCassandraDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { var df = cassandraTableAsDataFrame(conf.cassandraUserFeedKeyspace, conf.cassandraUserFeedTable) .select(col("userid").alias("userid")) @@ -1769,15 +1763,6 @@ object DataUtil extends Serializable { org.sunbird.cloud.storage.factory.StorageConfig and org.ekstep.analytics.framework.StorageConfig */ -// def syncReports(reportTempPath: String, reportPath: String)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { -// println(s"REPORT: Syncing reports from ${reportTempPath} to ${conf.store}://${conf.container}/${reportPath} ...") -// val storageService = StorageUtil.getStorageService(conf) -// // upload files to - {store}://{container}/{reportPath}/ -// val storageConfig = new StorageConfig(conf.store, conf.container, reportTempPath) -// storageService.upload(storageConfig.container, reportTempPath, s"${reportPath}/", Some(true), Some(0), Some(3), None) -// storageService.closeContext() -// println(s"REPORT: Finished syncing reports from ${reportTempPath} to ${conf.store}://${conf.container}/${reportPath}") -// } def syncReports(reportTempPath: String, reportPath: String)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { println(s"REPORT: Syncing reports from ${reportTempPath} to ${conf.store}://${conf.container}/${reportPath} ...") val storageService = StorageUtil.getStorageService(conf) @@ -1901,68 +1886,6 @@ object DataUtil extends Serializable { show(df, "ratings") df } - def processOrgsL3(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { - val organisationDF = df.dropDuplicates() - val sumDF = organisationDF.withColumn("allIDs", lit(null).cast("string")).select(col("organisationID").alias("ministryID"), col("allIDs")) - sumDF - - } - - def processDepartmentL2(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { - val organisationDF = df - .join(orgHierarchyCompleteDF, df("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left") - .select(df("departmentID"), col("sborgid").alias("organisationID")).dropDuplicates() - val sumDF = organisationDF - .groupBy("departmentID") - .agg( - concat_ws(",", collect_set(when(col("organisationID").isNotNull, col("organisationID")))).alias("orgIDs") - ) - .withColumn("associatedIds", concat_ws(",", col("orgIDs"))) - .withColumn("allIDs", concat_ws(",", col("departmentID"), col("associatedIds"))) - .select(col("departmentID").alias("ministryID"), col("allIDs")) - sumDF - - } - - def processMinistryL1(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { - println("Processing Ministry L1 DataFrame:") - val departmentAndMapIDsDF = df - .join(orgHierarchyCompleteDF, df("ministryMapID") === orgHierarchyCompleteDF("l1mapid"), "left") - .select(df("ministryID"), col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) - - // Join with orgHierarchyCompleteDF to get the organisationDF - val organisationDF = departmentAndMapIDsDF - .join(orgHierarchyCompleteDF, departmentAndMapIDsDF("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left") - .select(departmentAndMapIDsDF("ministryID"), departmentAndMapIDsDF("departmentID"),col("sborgid").alias("organisationID")).dropDuplicates() - val sumDF = organisationDF - .groupBy("ministryID") - .agg( - concat_ws(",", collect_set(when(col("departmentID").isNotNull, col("departmentID")))).alias("departmentIDs"), - concat_ws(",", collect_set(when(col("organisationID").isNotNull, col("organisationID")))).alias("orgIDs") - ) - .withColumn("associatedIds", concat_ws(",", col("departmentIDs"), col("orgIDs"))) - .withColumn("allIDs", concat_ws(",", col("ministryID"), col("associatedIds"))) - .select(col("ministryID"), col("allIDs")) - sumDF - } - - def getDetailedHierarchy(userOrgDF: DataFrame)(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { - var orgHierarchyCompleteDF = orgCompleteHierarchyDataFrame() - var distinctMdoIDsDF = userOrgDF.select("userOrgID").distinct() - val joinedDF = orgHierarchyCompleteDF.join(distinctMdoIDsDF, orgHierarchyCompleteDF("sborgid") === distinctMdoIDsDF("userOrgID"), "inner") - println("The number of distinct orgs in orgHierarchy is: "+joinedDF.count()) - val ministryL1DF = joinedDF.filter(col("l1mapid").isNull && col("l2mapid").isNull && col("l3mapid").isNull).select(col("sborgid").alias("ministryID"), col("mapid").alias("ministryMapID")) - val ministryOrgDF = processMinistryL1(ministryL1DF, userOrgDF, orgHierarchyCompleteDF) - val departmentL2DF = joinedDF.filter(col("l2mapid").isNull && col("l1mapid").isNotNull || col("l3mapid").isNotNull).select(col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) - val deptOrgDF = processDepartmentL2(departmentL2DF, userOrgDF, orgHierarchyCompleteDF) - val orgsL3DF = joinedDF.filter((col("l3mapid").isNull) && col("l2mapid").isNotNull && col("l1mapid").isNotNull).select(col("sborgid").alias("organisationID")) - val orgsDF = processOrgsL3(orgsL3DF, userOrgDF, orgHierarchyCompleteDF) - val combinedMinistryMetricsDF = ministryOrgDF.union(deptOrgDF).union(orgsDF) - val updatedDF = combinedMinistryMetricsDF.withColumn("allIDs", when(col("allIDs").isNull || trim(col("allIDs")) === "", col("ministryID")).otherwise(col("allIDs"))) - show(updatedDF, "This will be the final hierarchy") - updatedDF - } - - } + diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsModel.scala index b451904e..530c5416 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsModel.scala @@ -70,7 +70,7 @@ object MinistryMetricsModel extends AbsDashboardModel { show(finalResultDF, "finalresult") val finaldf2 = finalResultDF - .withColumn("learningSumValue", col("learningSumValue").cast("int")) + .withColumn("learningSumValue", coalesce(col("learningSumValue").cast("int"), lit(0))) .withColumn("loginSumValue", coalesce(col("loginSumValue").cast("int"), lit(0))) .withColumn("certSumValue", coalesce(col("certSumValue").cast("int"), lit(0))) .withColumn("enrolmentSumValue", coalesce(col("enrolmentSumValue").cast("int"),lit(0))) @@ -121,7 +121,7 @@ object MinistryMetricsModel extends AbsDashboardModel { show(finalResultDF, "finalresult") val finaldf2 = finalResultDF - .withColumn("learningSumValue", col("learningSumValue").cast("int")) + .withColumn("learningSumValue", coalesce(col("learningSumValue").cast("int"), lit(0))) .withColumn("loginSumValue", coalesce(col("loginSumValue").cast("int"), lit(0))) .withColumn("certSumValue", coalesce(col("certSumValue").cast("int"), lit(0))) .withColumn("enrolmentSumValue", coalesce(col("enrolmentSumValue").cast("int"),lit(0))) @@ -182,7 +182,7 @@ object MinistryMetricsModel extends AbsDashboardModel { show(finalResultDF, "finalresult") val finaldf2 = finalResultDF - .withColumn("learningSumValue", col("learningSumValue").cast("int")) + .withColumn("learningSumValue", coalesce(col("learningSumValue").cast("int"), lit(0))) .withColumn("loginSumValue", coalesce(col("loginSumValue").cast("int"), lit(0))) .withColumn("certSumValue", coalesce(col("certSumValue").cast("int"), lit(0))) .withColumn("enrolmentSumValue", coalesce(col("enrolmentSumValue").cast("int"),lit(0))) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/nationallearningweek/NationLearningWeekModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/nationallearningweek/NationalLearningWeekModel.scala similarity index 100% rename from batch-models/src/main/scala/org/ekstep/analytics/dashboard/nationallearningweek/NationLearningWeekModel.scala rename to batch-models/src/main/scala/org/ekstep/analytics/dashboard/nationallearningweek/NationalLearningWeekModel.scala