From 9faed1ebbe5c790ea0c141e6306e025d4ac52b99 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 27 Jul 2021 10:46:25 +0530 Subject: [PATCH 1/8] Revert "#SB-25287 fix : Fixed CourseBatch status update issue" --- .../audit/CourseBatchStatusUpdaterJob.scala | 26 ++++++------------- .../course_batch_status_updater_temp.csv | 8 +++--- .../course_batch_status_updater_temp2.csv | 8 +++--- .../audit/TestCourseBatchStatusUpdater.scala | 2 +- .../report/TestCourseConsumptionModel.scala | 3 ++- 5 files changed, 19 insertions(+), 28 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala b/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala index 1b8785639..6ea7d245d 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala @@ -12,11 +12,9 @@ import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig} import org.sunbird.analytics.job.report.BaseReportsJob import org.apache.spark.sql.cassandra._ import org.sunbird.analytics.exhaust.collection.UDFUtils + import java.text.SimpleDateFormat import java.util.{Calendar, Date, TimeZone} - -import org.apache.spark - import scala.collection.immutable.List case class CourseBatchStatusMetrics(unStarted: Long, inProgress: Long, completed: Long) @@ -61,7 +59,7 @@ object CourseBatchStatusUpdaterJob extends optional.Application with IJob with B res } - def updateBatchStatus(updaterConfig: JobConfig, collectionBatchDF: DataFrame)(implicit sc: SparkContext, spark: SparkSession): CourseBatchStatusMetrics = { + def updateBatchStatus(updaterConfig: JobConfig, collectionBatchDF: DataFrame)(implicit sc: SparkContext): CourseBatchStatusMetrics = { val currentDate = getDateFormat().format(new Date) val computedDF = collectionBatchDF.withColumn("updated_status", when(unix_timestamp(lit(currentDate), "yyyy-MM-dd").gt(unix_timestamp(col("enddate"), "yyyy-MM-dd")), 2).otherwise( @@ -82,11 +80,10 @@ object CourseBatchStatusUpdaterJob extends optional.Application with IJob with B } def getCollectionBatchDF(fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame)(implicit spark: SparkSession): DataFrame = { - val convertDate = spark.udf.register("convertDate", convertDateFn) fetchData(spark, collectionBatchDBSettings, cassandraFormat, new StructType()) - .withColumn("startdate", UDFUtils.getLatestValue(convertDate(col("start_date")), col("startdate"))) - .withColumn("enddate", UDFUtils.getLatestValue(convertDate(col("end_date")), col("enddate"))) - .withColumn("enrollmentenddate", UDFUtils.getLatestValue(convertDate(col("enrollment_enddate")), col("enrollmentenddate"))) + .withColumn("startdate", UDFUtils.getLatestValue(col("start_date"), col("startdate"))) + .withColumn("enddate", UDFUtils.getLatestValue(col("end_date"), col("enddate"))) + .withColumn("enrollmentenddate", UDFUtils.getLatestValue(col("enrollment_enddate"), col("enrollmentenddate"))) .select("courseid", "batchid", "startdate", "name", "enddate", "enrollmentenddate", "enrollmenttype", "createdfor", "status") } @@ -162,18 +159,11 @@ object CourseBatchStatusUpdaterJob extends optional.Application with IJob with B dateFormatter.setTimeZone(TimeZone.getTimeZone("IST")) dateFormatter } - - def formatDate(date: String) = { - Option(date).map(x => { - getDateFormat().format(getDateFormat().parse(x)) - }).orNull - } - def convertDateFn : String => String = (date: String) => { + def formatDate(date: String): String = { Option(date).map(x => { - val utcDateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - utcDateFormatter.setTimeZone(TimeZone.getTimeZone("UTC")) - getDateFormat().format(utcDateFormatter.parse(x)) + val dateFormatter = getDateFormat() + dateFormatter.format(dateFormatter.parse(x)) }).orNull } diff --git a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv index f8cfb174d..c2ad42129 100644 --- a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv +++ b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv @@ -1,5 +1,5 @@ courseid,batchid,cert_templates,createdby,createddate,createdfor,description,enddate,end_date,enrollmentenddate,enrollment_enddate,enrollmenttype,mentors,name,startdate,start_date,status,updateddate -do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,,2030-06-30 18:29:59.000000+0000,2020-06-07,2020-06-07 18:29:59.000000+0000,open,,TEST 3,2021-04-28,2021-04-28 00:00:00.000000+0000,0, -do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30 18:29:59.000000+0000,,,open,,Test,,2020-05-26 00:00:00.000000+0000,1, -do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2019-06-30 18:29:59.000000+0000,,,open,,Test course order prad - 1,,2020-05-29 00:00:00.000000+0000,1, -do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2019-05-30,2019-05-30 18:29:59.000000+0000,,,open,,course-2,2020-05-23,2020-05-25 00:00:00.000000+0000,1, \ No newline at end of file +do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,2030-06-30 00:00:00.000000+0000 ,,2020-06-07,2020-06-07 00:00:00.000000+0000,open,,TEST 3,2021-04-28 00:00:00,2021-04-28 00:00:00.000000+0000,0, +do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30,,,open,,Test,,2020-05-26,1, +do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2019-06-30,,,open,,Test course order prad - 1,,2020-05-29,1, +do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2019-05-30,2019-05-30,,,open,,course-2,2020-05-23,2020-05-25,1, \ No newline at end of file diff --git a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv index eb44184cd..d673836d4 100644 --- a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv +++ b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv @@ -1,5 +1,5 @@ courseid,batchid,cert_templates,createdby,createddate,createdfor,description,enddate,end_date,enrollmentenddate,enrollment_enddate,enrollmenttype,mentors,name,startdate,start_date,status,updateddate -do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,2030-06-30,,2020-06-07,,open,,TEST 3,2030-05-26,,0, -do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30 18:29:59.000000+0000,,,open,,Test,,2020-05-26 00:00:00.000000+0000,1, -do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2030-06-30 18:29:59.000000+0000,,,open,,Test course order prad - 1,,2020-05-26 00:00:00.000000+0000,1, -do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2030-06-30,2030-06-30 18:29:59.000000+0000,,,open,,course-2,2020-05-26,2020-05-26 00:00:00.000000+0000,1, \ No newline at end of file +do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,2030-06-30,,2020-06-07,2020-06-07,open,,TEST 3,2030-05-26,,0, +do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30,,,open,,Test,,2020-05-26,1, +do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2030-06-30,,,open,,Test course order prad - 1,,2020-05-26,1, +do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2030-06-30,2030-06-30,,,open,,course-2,2020-05-26,2020-05-26,1, \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala b/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala index f4b61e5a3..6190de96f 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala @@ -126,7 +126,7 @@ class TestCourseBatchStatusUpdater extends BaseReportSpec with MockFactory { .returning(courseBatchDF.withColumn("cert_templates", lit(null).cast(MapType(StringType, MapType(StringType, StringType))))) val resultDf = CourseBatchStatusUpdaterJob.getCollectionBatchDF(reporterMock.fetchData) val resultDateValue = resultDf.select("startdate").collect().map(_ (0)).toList - resultDateValue(0) should be("2021-04-28") + resultDateValue(0) should be("2021-04-28 00:00:00.000000+0000") resultDateValue(1) should be("2020-05-26") resultDateValue(2) should be("2020-05-29") resultDateValue(3) should be("2020-05-25") diff --git a/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala b/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala index 9d02e4da6..bd4837e03 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala @@ -1,6 +1,7 @@ package org.sunbird.analytics.model.report import java.time.{ZoneOffset, ZonedDateTime} + import cats.syntax.either._ import ing.wbaa.druid._ import ing.wbaa.druid.client.DruidClient @@ -10,7 +11,7 @@ import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} import org.ekstep.analytics.framework._ import org.ekstep.analytics.framework.util.{HTTPClient, JSONUtils} -import org.ekstep.analytics.model.ReportConfig +import org.ekstep.analytics.model.{ReportConfig} import org.scalamock.scalatest.MockFactory import org.sunbird.cloud.storage.BaseStorageService From c52b91dcc6fda17a66d788f5b4cf210232c5cd7c Mon Sep 17 00:00:00 2001 From: Harikumar Palemkota Date: Fri, 22 Oct 2021 17:13:42 +0530 Subject: [PATCH 2/8] SB-26472 filtering active user consent into the reports --- .../job/report/StateAdminReportJob.scala | 24 ++++++++++++++++--- .../resources/reports/user_self_test_data.cql | 4 ++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index 38369fb68..a68054834 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -1,5 +1,6 @@ package org.sunbird.analytics.job.report +import com.datastax.spark.connector.SomeColumns import org.apache.spark.SparkContext import org.apache.spark.sql.functions.{col, lit, when, _} import org.apache.spark.sql.{DataFrame, _} @@ -56,10 +57,16 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi import sparkSession.implicits._ val userSelfDeclaredEncoder = Encoders.product[UserSelfDeclared].schema //loading user_declarations table details based on declared values and location details and appending org-external-id if present - val userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) - val userSelfDeclaredUserInfoDataDF = userSelfDeclaredDataDF.select(col("*"), col("userinfo").getItem("declared-email").as("declared-email"), col("userinfo").getItem("declared-phone").as("declared-phone"), + var userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) + val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) + val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && col("object_type") === "organisation") + val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("organisationid") === activeConsentDF.col("consumer_id")). + select(userSelfDeclaredDataDF.col("*")) + //userSelfDeclaredDataDF = userSelfDeclaredDataDF.where(col("userid")==="fd4051d5-cb62-4b3b-8672-baa4a2c90559") + val userSelfDeclaredUserInfoDataDF = activeSelfDeclaredDF.select(col("*"), col("userinfo").getItem("declared-email").as("declared-email"), col("userinfo").getItem("declared-phone").as("declared-phone"), col("userinfo").getItem("declared-school-name").as("declared-school-name"), col("userinfo").getItem("declared-school-udise-code").as("declared-school-udise-code"),col("userinfo").getItem("declared-ext-id").as("declared-ext-id")).drop("userinfo"); - val locationDF = locationData() + userSelfDeclaredUserInfoDataDF.show(10,false) + val locationDF = locationData() //to-do later check if externalid is necessary not-null check is necessary val orgExternalIdDf = loadOrganisationData().select("externalid","channel", "id","orgName").filter(col("channel").isNotNull) val userSelfDeclaredExtIdDF = userSelfDeclaredUserInfoDataDF.join(orgExternalIdDf, userSelfDeclaredUserInfoDataDF.col("orgid") === orgExternalIdDf.col("id"), "leftouter"). @@ -151,6 +158,8 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } private def saveUserSelfDeclaredExternalInfo(userExternalDecryptData: DataFrame, userDenormLocationDF: DataFrame): DataFrame ={ + import com.datastax.spark.connector.cql.CassandraConnectorConf + import com.datastax.spark.connector.{SomeColumns, toRDDFunctions} var userDenormLocationDFWithCluster : DataFrame = null; if(!userDenormLocationDF.columns.contains("cluster")) { if(!userDenormLocationDF.columns.contains("block")) { @@ -181,6 +190,9 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi col("userroororg").as("Root Org of user"), col("channel").as("provider")) resultDf.saveToBlobStore(storageConfig, "csv", "declared_user_detail", Option(Map("header" -> "true")), Option(Seq("provider"))) + resultDf.select("course_id", "batch_id", "user_id", "content_id", "attempt_id").rdd.deleteFromCassandra(AppConf.getConfig("sunbird.courses.keyspace"), "assessment_aggregator", keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id")) + + resultDf } @@ -229,3 +241,9 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } + +object StateAdminReportJobTest { + def main(args: Array[String]): Unit = { + StateAdminReportJob.main("""{"model":"Test"}""") + } +} \ No newline at end of file diff --git a/data-products/src/test/resources/reports/user_self_test_data.cql b/data-products/src/test/resources/reports/user_self_test_data.cql index 7357b2092..3e4302fc2 100644 --- a/data-products/src/test/resources/reports/user_self_test_data.cql +++ b/data-products/src/test/resources/reports/user_self_test_data.cql @@ -27,3 +27,7 @@ INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgn INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, userinfo) VALUES ('56c2d9a3-fae9-4341-9862-4eeeead2e9a1', '012500530695766016224', 'teacher', null, 'PENDING', {'declared-email': 'PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==', 'declared-phone': '1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==', 'declared-school-name': 'mgm21', 'declared-school-udise-code': '190923'}); INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, userinfo) VALUES ('8eaa1621-ac15-42a4-9e26-9c846963f331', '0127426598044057605836', 'teacher', null, 'PENDING', {'declared-email': 'PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==', 'declared-phone': '1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==', 'declared-school-name': 'mgm21', 'declared-school-udise-code': 'orgext2'}); +INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, userinfo) VALUES ('7faa1621-bc14-32c4-9e26-4b846963g339', '01268935407534080010344', 'teacher', null, 'PENDING', {'declared-email': 'PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==', 'declared-phone': '1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==', 'declared-school-name': 'mgm21', 'declared-school-udise-code': 'orgext3'}); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('56c2d9a3-fae9-4341-9862-4eeeead2e9a1','012500530695766016224','012500530695766016224','','','','organisation','ACTIVE'); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('8eaa1621-ac15-42a4-9e26-9c846963f331','0127426598044057605836','0127426598044057605836','','','','organisation','ACTIVE'); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('7faa1621-bc14-32c4-9e26-4b846963g339','01268935407534080010344','01268935407534080010344','','','','organisation','REVOKED'); \ No newline at end of file From 5ec9578249e6e978bddae0598e7ad5c2cfb6a1ce Mon Sep 17 00:00:00 2001 From: Harikumar Palemkota Date: Mon, 25 Oct 2021 11:16:12 +0530 Subject: [PATCH 3/8] SB-26472 removed unnecessary code --- .../analytics/job/report/StateAdminReportJob.scala | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index a68054834..8ef974561 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -57,15 +57,14 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi import sparkSession.implicits._ val userSelfDeclaredEncoder = Encoders.product[UserSelfDeclared].schema //loading user_declarations table details based on declared values and location details and appending org-external-id if present - var userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) - val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) + val userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) + val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_dconsent", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && col("object_type") === "organisation") val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("organisationid") === activeConsentDF.col("consumer_id")). select(userSelfDeclaredDataDF.col("*")) //userSelfDeclaredDataDF = userSelfDeclaredDataDF.where(col("userid")==="fd4051d5-cb62-4b3b-8672-baa4a2c90559") val userSelfDeclaredUserInfoDataDF = activeSelfDeclaredDF.select(col("*"), col("userinfo").getItem("declared-email").as("declared-email"), col("userinfo").getItem("declared-phone").as("declared-phone"), col("userinfo").getItem("declared-school-name").as("declared-school-name"), col("userinfo").getItem("declared-school-udise-code").as("declared-school-udise-code"),col("userinfo").getItem("declared-ext-id").as("declared-ext-id")).drop("userinfo"); - userSelfDeclaredUserInfoDataDF.show(10,false) val locationDF = locationData() //to-do later check if externalid is necessary not-null check is necessary val orgExternalIdDf = loadOrganisationData().select("externalid","channel", "id","orgName").filter(col("channel").isNotNull) @@ -158,8 +157,6 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } private def saveUserSelfDeclaredExternalInfo(userExternalDecryptData: DataFrame, userDenormLocationDF: DataFrame): DataFrame ={ - import com.datastax.spark.connector.cql.CassandraConnectorConf - import com.datastax.spark.connector.{SomeColumns, toRDDFunctions} var userDenormLocationDFWithCluster : DataFrame = null; if(!userDenormLocationDF.columns.contains("cluster")) { if(!userDenormLocationDF.columns.contains("block")) { @@ -190,9 +187,6 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi col("userroororg").as("Root Org of user"), col("channel").as("provider")) resultDf.saveToBlobStore(storageConfig, "csv", "declared_user_detail", Option(Map("header" -> "true")), Option(Seq("provider"))) - resultDf.select("course_id", "batch_id", "user_id", "content_id", "attempt_id").rdd.deleteFromCassandra(AppConf.getConfig("sunbird.courses.keyspace"), "assessment_aggregator", keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id")) - - resultDf } @@ -238,8 +232,6 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } val addUserSubType = udf[String, String](profileSubTypeFunction) - - } object StateAdminReportJobTest { From 861e65b4c159e4dc55eafd4e201813324687149b Mon Sep 17 00:00:00 2001 From: Harikumar Palemkota Date: Mon, 25 Oct 2021 11:16:27 +0530 Subject: [PATCH 4/8] SB-26472 removed unnecessary code --- .../sunbird/analytics/job/report/StateAdminReportJob.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index 8ef974561..7182da12c 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -232,10 +232,4 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } val addUserSubType = udf[String, String](profileSubTypeFunction) -} - -object StateAdminReportJobTest { - def main(args: Array[String]): Unit = { - StateAdminReportJob.main("""{"model":"Test"}""") - } } \ No newline at end of file From e14513e14cad491752681cdd1603e169fc4279b5 Mon Sep 17 00:00:00 2001 From: Harikumar Palemkota Date: Mon, 25 Oct 2021 11:20:51 +0530 Subject: [PATCH 5/8] SB-26472 convert to lower case and filtering --- .../sunbird/analytics/job/report/StateAdminReportJob.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index 7182da12c..22fcf2524 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -1,6 +1,5 @@ package org.sunbird.analytics.job.report -import com.datastax.spark.connector.SomeColumns import org.apache.spark.SparkContext import org.apache.spark.sql.functions.{col, lit, when, _} import org.apache.spark.sql.{DataFrame, _} @@ -59,10 +58,9 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi //loading user_declarations table details based on declared values and location details and appending org-external-id if present val userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_dconsent", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) - val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && col("object_type") === "organisation") + val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && lower(col("object_type")) === "organisation") val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("organisationid") === activeConsentDF.col("consumer_id")). select(userSelfDeclaredDataDF.col("*")) - //userSelfDeclaredDataDF = userSelfDeclaredDataDF.where(col("userid")==="fd4051d5-cb62-4b3b-8672-baa4a2c90559") val userSelfDeclaredUserInfoDataDF = activeSelfDeclaredDF.select(col("*"), col("userinfo").getItem("declared-email").as("declared-email"), col("userinfo").getItem("declared-phone").as("declared-phone"), col("userinfo").getItem("declared-school-name").as("declared-school-name"), col("userinfo").getItem("declared-school-udise-code").as("declared-school-udise-code"),col("userinfo").getItem("declared-ext-id").as("declared-ext-id")).drop("userinfo"); val locationDF = locationData() From 1345413415afa7c726bd5f5a8a3cce70e5ddf052 Mon Sep 17 00:00:00 2001 From: Harikumar Palemkota Date: Mon, 25 Oct 2021 13:05:40 +0530 Subject: [PATCH 6/8] SB-26472 test-case failed rectified-table name --- .../org/sunbird/analytics/job/report/StateAdminReportJob.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index 22fcf2524..e08b883ce 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -57,7 +57,7 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi val userSelfDeclaredEncoder = Encoders.product[UserSelfDeclared].schema //loading user_declarations table details based on declared values and location details and appending org-external-id if present val userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) - val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_dconsent", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) + val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_consent", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && lower(col("object_type")) === "organisation") val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("organisationid") === activeConsentDF.col("consumer_id")). select(userSelfDeclaredDataDF.col("*")) From 462e78c2fb54b971a04aaa1dc548e6ae0db0c9df Mon Sep 17 00:00:00 2001 From: Harikumar Palemkota Date: Mon, 25 Oct 2021 13:55:46 +0530 Subject: [PATCH 7/8] SB-26472 test-case failed code modified --- .../sunbird/analytics/job/report/StateAdminReportJob.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index e08b883ce..9c6f4cf07 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -57,9 +57,9 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi val userSelfDeclaredEncoder = Encoders.product[UserSelfDeclared].schema //loading user_declarations table details based on declared values and location details and appending org-external-id if present val userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) - val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_consent", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) + val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_consent", "keyspace" -> sunbirdKeyspace)) val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && lower(col("object_type")) === "organisation") - val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("organisationid") === activeConsentDF.col("consumer_id")). + val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("orgid") === activeConsentDF.col("consumer_id")). select(userSelfDeclaredDataDF.col("*")) val userSelfDeclaredUserInfoDataDF = activeSelfDeclaredDF.select(col("*"), col("userinfo").getItem("declared-email").as("declared-email"), col("userinfo").getItem("declared-phone").as("declared-phone"), col("userinfo").getItem("declared-school-name").as("declared-school-name"), col("userinfo").getItem("declared-school-udise-code").as("declared-school-udise-code"),col("userinfo").getItem("declared-ext-id").as("declared-ext-id")).drop("userinfo"); From db5b9d2249450cb263b5ce113457f2b16be134d5 Mon Sep 17 00:00:00 2001 From: Harikumar Palemkota Date: Wed, 27 Oct 2021 15:46:20 +0530 Subject: [PATCH 8/8] SB-27386 filtering common rows in consent and user_declarations --- .../analytics/job/report/StateAdminReportJob.scala | 14 +++++++++----- .../test/resources/reports/user_self_test_data.cql | 3 ++- .../report/TestStateSelfUserExternalIDJob.scala | 1 + 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index 9c6f4cf07..70ac3c693 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -54,23 +54,25 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi // $COVERAGE-ON$ Enabling scoverage for other methods def generateExternalIdReport() (implicit sparkSession: SparkSession, fc: FrameworkContext) = { import sparkSession.implicits._ + val DECLARED_EMAIL: String = "declared-email" + val DECLARED_PHONE: String = "declared-phone" val userSelfDeclaredEncoder = Encoders.product[UserSelfDeclared].schema //loading user_declarations table details based on declared values and location details and appending org-external-id if present val userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_consent", "keyspace" -> sunbirdKeyspace)) val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && lower(col("object_type")) === "organisation") - val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("orgid") === activeConsentDF.col("consumer_id")). + val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("orgid") === activeConsentDF.col("consumer_id"), "left_semi"). select(userSelfDeclaredDataDF.col("*")) - val userSelfDeclaredUserInfoDataDF = activeSelfDeclaredDF.select(col("*"), col("userinfo").getItem("declared-email").as("declared-email"), col("userinfo").getItem("declared-phone").as("declared-phone"), + val userSelfDeclaredUserInfoDataDF = activeSelfDeclaredDF.select(col("*"), col("userinfo").getItem(DECLARED_EMAIL).as(DECLARED_EMAIL), col("userinfo").getItem(DECLARED_PHONE).as(DECLARED_PHONE), col("userinfo").getItem("declared-school-name").as("declared-school-name"), col("userinfo").getItem("declared-school-udise-code").as("declared-school-udise-code"),col("userinfo").getItem("declared-ext-id").as("declared-ext-id")).drop("userinfo"); - val locationDF = locationData() + val locationDF = locationData() //to-do later check if externalid is necessary not-null check is necessary val orgExternalIdDf = loadOrganisationData().select("externalid","channel", "id","orgName").filter(col("channel").isNotNull) val userSelfDeclaredExtIdDF = userSelfDeclaredUserInfoDataDF.join(orgExternalIdDf, userSelfDeclaredUserInfoDataDF.col("orgid") === orgExternalIdDf.col("id"), "leftouter"). select(userSelfDeclaredUserInfoDataDF.col("*"), orgExternalIdDf.col("*")) //decrypting email and phone values - val userDecrpytedDataDF = decryptPhoneEmailInDF(userSelfDeclaredExtIdDF, "declared-email", "declared-phone") + val userDecrpytedDataDF = decryptPhoneEmailInDF(userSelfDeclaredExtIdDF, DECLARED_EMAIL, DECLARED_PHONE) //appending decrypted values to the user-external-identifier dataframe val userExternalDecryptData = userSelfDeclaredExtIdDF.join(userDecrpytedDataDF, userSelfDeclaredExtIdDF.col("userid") === userDecrpytedDataDF.col("userid"), "left_outer"). select(userSelfDeclaredExtIdDF.col("*"), userDecrpytedDataDF.col("decrypted-email"), userDecrpytedDataDF.col("decrypted-phone")) @@ -230,4 +232,6 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } val addUserSubType = udf[String, String](profileSubTypeFunction) -} \ No newline at end of file + + +} diff --git a/data-products/src/test/resources/reports/user_self_test_data.cql b/data-products/src/test/resources/reports/user_self_test_data.cql index 3e4302fc2..9c22fa0eb 100644 --- a/data-products/src/test/resources/reports/user_self_test_data.cql +++ b/data-products/src/test/resources/reports/user_self_test_data.cql @@ -30,4 +30,5 @@ INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, userinfo) VALUES ('7faa1621-bc14-32c4-9e26-4b846963g339', '01268935407534080010344', 'teacher', null, 'PENDING', {'declared-email': 'PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==', 'declared-phone': '1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==', 'declared-school-name': 'mgm21', 'declared-school-udise-code': 'orgext3'}); INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('56c2d9a3-fae9-4341-9862-4eeeead2e9a1','012500530695766016224','012500530695766016224','','','','organisation','ACTIVE'); INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('8eaa1621-ac15-42a4-9e26-9c846963f331','0127426598044057605836','0127426598044057605836','','','','organisation','ACTIVE'); -INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('7faa1621-bc14-32c4-9e26-4b846963g339','01268935407534080010344','01268935407534080010344','','','','organisation','REVOKED'); \ No newline at end of file +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('7faa1621-bc14-32c4-9e26-4b846963g339','01268935407534080010344','01268935407534080010344','','','','organisation','REVOKED'); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('6d8269de-c6ca-4106-9b63-305382d46175','0127426598044057605831','0127426598044057605836','','','','organisation','ACTIVE'); \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala index 325b81efd..95d6e83f6 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala @@ -32,6 +32,7 @@ class TestStateSelfUserExternalIDJob extends BaseReportSpec with MockFactory { implicit val fc = new FrameworkContext() val tempDir = AppConf.getConfig("admin.metrics.temp.dir") val reportDF = StateAdminReportJob.generateExternalIdReport()(spark, fc) + assert(reportDF.count() === (2)); assert(reportDF.columns.contains("Diksha UUID") === true) assert(reportDF.columns.contains("Name") === true) assert(reportDF.columns.contains("State") === true)