diff --git a/data-products/build.Jenkinsfile b/data-products/build.Jenkinsfile index e2aa08870..7a8834a4c 100644 --- a/data-products/build.Jenkinsfile +++ b/data-products/build.Jenkinsfile @@ -12,14 +12,15 @@ node('build-slave') { checkout scm commit_hash = sh(script: 'git rev-parse --short HEAD', returnStdout: true).trim() branch_name = sh(script: 'git name-rev --name-only HEAD | rev | cut -d "/" -f1| rev', returnStdout: true).trim() - artifact_version = branch_name + "_" + commit_hash - println(ANSI_BOLD + ANSI_YELLOW + "github_release_tag not specified, using the latest commit hash: " + commit_hash + ANSI_NORMAL) + artifact_version = branch_name.split('/')[-1] + "_" + commit_hash + println(ANSI_BOLD + ANSI_YELLOW + "artifact version: " + artifact_version + ANSI_NORMAL) + println(ANSI_BOLD + ANSI_YELLOW + "github_release_tag not specified, using the latest commit hash: " + commit_hash + ANSI_NORMAL) } else { def scmVars = checkout scm - checkout scm: [$class: 'GitSCM', branches: [[name: "refs/tags/$params.github_release_tag"]], userRemoteConfigs: [[url: scmVars.GIT_URL]]] - artifact_version = params.github_release_tag - println(ANSI_BOLD + ANSI_YELLOW + "github_release_tag specified, building from github_release_tag: " + params.github_release_tag + ANSI_NORMAL) + checkout scm: [$class: 'GitSCM', branches: [[name: "$params.github_release_tag"]], userRemoteConfigs: [[url: scmVars.GIT_URL]]] + artifact_version = params.github_release_tag.split('/')[-1] + println(ANSI_BOLD + ANSI_YELLOW + "github_release_tag specified, building from github_release_tag: " + params.github_release_tag + ANSI_NORMAL) } echo "artifact_version: "+ artifact_version } diff --git a/data-products/src/main/resources/data.cql b/data-products/src/main/resources/data.cql index ec6cb3b94..713c2a338 100644 --- a/data-products/src/main/resources/data.cql +++ b/data-products/src/main/resources/data.cql @@ -181,78 +181,29 @@ CREATE KEYSPACE IF NOT EXISTS sunbird WITH replication = { 'replication_factor': '1' }; -CREATE TABLE IF NOT EXISTS sunbird.shadow_user ( - channel text, - userextid text, - addedby text, - claimedon timestamp, - claimstatus int, - createdon timestamp, - email text, - name text, - orgextid text, - phone text, - processid text, - updatedon timestamp, - userid text, - userids list, - userstatus int, - PRIMARY KEY (channel, userextid) -) WITH CLUSTERING ORDER BY (userextid ASC) - AND bloom_filter_fp_chance = 0.01 - AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} - AND comment = '' - AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} - AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} - AND crc_check_chance = 1.0 - AND dclocal_read_repair_chance = 0.1 - AND default_time_to_live = 0 - AND gc_grace_seconds = 864000 - AND max_index_interval = 2048 - AND memtable_flush_period_in_ms = 0 - AND min_index_interval = 128 - AND read_repair_chance = 0.0 - AND speculative_retry = '99PERCENTILE'; - CREATE TABLE IF NOT EXISTS sunbird.organisation ( id text PRIMARY KEY, - addressid text, - approvedby text, - approveddate text, channel text, - communityid text, contactdetail text, createdby text, createddate text, - datetime timestamp, description text, email text, externalid text, hashtagid text, - homeurl text, - imgurl text, - isapproved boolean, - isdefault boolean, isrootorg boolean, + istenant boolean, isssoenabled boolean, keys map>>, - locationid text, locationids list, - noofmembers int, - orgcode text, orgname text, - orgtype text, - orgtypeid text, - parentorgid text, - preferredlanguage text, provider text, rootorgid text, slug text, status int, - theme text, - thumbnail text, updatedby text, - updateddate text + updateddate text, + orglocation text ) WITH bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} AND comment = '' @@ -332,47 +283,32 @@ CREATE TABLE IF NOT EXISTS sunbird_courses.course_batch ( CREATE TABLE IF NOT EXISTS sunbird.user ( id text PRIMARY KEY, - accesscode text, - avatar text, channel text, countrycode text, createdby text, createddate text, - currentlogintime text, dob text, email text, emailverified boolean, firstname text, flagsvalue int, framework map>>, - gender text, - grade list, isdeleted boolean, - language list, - lastlogintime text, lastname text, - location text, locationids list, loginid text, managedby text, maskedemail text, maskedphone text, - password text, phone text, phoneverified boolean, prevusedemail text, prevusedphone text, - profilesummary text, - profilevisibility map, recoveryemail text, recoveryphone text, - registryid text, roles list, rootorgid text, status int, - subject list, - temppassword text, - thumbnail text, tncacceptedon timestamp, tncacceptedversion text, updatedby text, @@ -382,8 +318,7 @@ CREATE TABLE IF NOT EXISTS sunbird.user ( usertype text, usersubtype text, profileusertype text, - profilelocation text, - webpages list>> + profilelocation text ) WITH bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} AND comment = '' diff --git a/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala b/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala index 1b8785639..6ea7d245d 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/audit/CourseBatchStatusUpdaterJob.scala @@ -12,11 +12,9 @@ import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig} import org.sunbird.analytics.job.report.BaseReportsJob import org.apache.spark.sql.cassandra._ import org.sunbird.analytics.exhaust.collection.UDFUtils + import java.text.SimpleDateFormat import java.util.{Calendar, Date, TimeZone} - -import org.apache.spark - import scala.collection.immutable.List case class CourseBatchStatusMetrics(unStarted: Long, inProgress: Long, completed: Long) @@ -61,7 +59,7 @@ object CourseBatchStatusUpdaterJob extends optional.Application with IJob with B res } - def updateBatchStatus(updaterConfig: JobConfig, collectionBatchDF: DataFrame)(implicit sc: SparkContext, spark: SparkSession): CourseBatchStatusMetrics = { + def updateBatchStatus(updaterConfig: JobConfig, collectionBatchDF: DataFrame)(implicit sc: SparkContext): CourseBatchStatusMetrics = { val currentDate = getDateFormat().format(new Date) val computedDF = collectionBatchDF.withColumn("updated_status", when(unix_timestamp(lit(currentDate), "yyyy-MM-dd").gt(unix_timestamp(col("enddate"), "yyyy-MM-dd")), 2).otherwise( @@ -82,11 +80,10 @@ object CourseBatchStatusUpdaterJob extends optional.Application with IJob with B } def getCollectionBatchDF(fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame)(implicit spark: SparkSession): DataFrame = { - val convertDate = spark.udf.register("convertDate", convertDateFn) fetchData(spark, collectionBatchDBSettings, cassandraFormat, new StructType()) - .withColumn("startdate", UDFUtils.getLatestValue(convertDate(col("start_date")), col("startdate"))) - .withColumn("enddate", UDFUtils.getLatestValue(convertDate(col("end_date")), col("enddate"))) - .withColumn("enrollmentenddate", UDFUtils.getLatestValue(convertDate(col("enrollment_enddate")), col("enrollmentenddate"))) + .withColumn("startdate", UDFUtils.getLatestValue(col("start_date"), col("startdate"))) + .withColumn("enddate", UDFUtils.getLatestValue(col("end_date"), col("enddate"))) + .withColumn("enrollmentenddate", UDFUtils.getLatestValue(col("enrollment_enddate"), col("enrollmentenddate"))) .select("courseid", "batchid", "startdate", "name", "enddate", "enrollmentenddate", "enrollmenttype", "createdfor", "status") } @@ -162,18 +159,11 @@ object CourseBatchStatusUpdaterJob extends optional.Application with IJob with B dateFormatter.setTimeZone(TimeZone.getTimeZone("IST")) dateFormatter } - - def formatDate(date: String) = { - Option(date).map(x => { - getDateFormat().format(getDateFormat().parse(x)) - }).orNull - } - def convertDateFn : String => String = (date: String) => { + def formatDate(date: String): String = { Option(date).map(x => { - val utcDateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - utcDateFormatter.setTimeZone(TimeZone.getTimeZone("UTC")) - getDateFormat().format(utcDateFormatter.parse(x)) + val dateFormatter = getDateFormat() + dateFormatter.format(dateFormatter.parse(x)) }).orNull } diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala index d9c4a7f97..9e2d1b786 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala @@ -1,31 +1,29 @@ package org.sunbird.analytics.exhaust.collection -import java.security.MessageDigest -import java.util.concurrent.atomic.AtomicInteger import com.datastax.spark.connector.cql.CassandraConnectorConf import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, Encoders, SQLContext, SparkSession} import org.apache.spark.sql.cassandra._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType -import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig} -import org.ekstep.analytics.framework.Level.INFO +import org.apache.spark.sql._ +import org.ekstep.analytics.framework.Level.{ERROR, INFO} import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.dispatcher.KafkaDispatcher +import org.ekstep.analytics.framework.driver.BatchJobDriver.getMetricJson import org.ekstep.analytics.framework.util.DatasetUtil.extensions import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger, RestUtil} +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig, StorageConfig} import org.ekstep.analytics.util.Constants -import org.joda.time.{DateTime, DateTimeZone} import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} -import org.sunbird.analytics.exhaust.{BaseReportsJob, JobRequest, OnDemandExhaustJob, RequestStatus} +import org.joda.time.{DateTime, DateTimeZone} +import org.sunbird.analytics.exhaust.{BaseReportsJob, JobRequest, OnDemandExhaustJob} import org.sunbird.analytics.util.DecryptUtil -import scala.collection.immutable.List +import java.security.MessageDigest import java.util.concurrent.CompletableFuture -import org.ekstep.analytics.framework.StorageConfig -import org.ekstep.analytics.framework.dispatcher.KafkaDispatcher -import org.ekstep.analytics.framework.driver.BatchJobDriver.getMetricJson - -import scala.collection.mutable.{Buffer, ListBuffer} +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.immutable.List +import scala.collection.mutable.ListBuffer case class UserData(userid: String, state: Option[String] = Option(""), district: Option[String] = Option(""), orgname: Option[String] = Option(""), firstname: Option[String] = Option(""), lastname: Option[String] = Option(""), email: Option[String] = Option(""), @@ -67,10 +65,21 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh // generate metric event and push it to kafka topic val metrics = List(Map("id" -> "total-requests", "value" -> res._2.totalRequests), Map("id" -> "success-requests", "value" -> res._2.successRequests), Map("id" -> "failed-requests", "value" -> res._2.failedRequests), Map("id" -> "time-taken-secs", "value" -> Double.box(res._1 / 1000).asInstanceOf[AnyRef])) val metricEvent = getMetricJson(jobName, Option(new DateTime().toString(CommonUtil.dateFormat)), "SUCCESS", metrics) + // $COVERAGE-OFF$ if (AppConf.getConfig("push.metrics.kafka").toBoolean) KafkaDispatcher.dispatch(Array(metricEvent), Map("topic" -> AppConf.getConfig("metric.kafka.topic"), "brokerList" -> AppConf.getConfig("metric.kafka.broker"))) - + // $COVERAGE-ON$ JobLogger.end(s"$jobName completed execution", "SUCCESS", Option(Map("timeTaken" -> res._1, "totalRequests" -> res._2.totalRequests, "successRequests" -> res._2.successRequests, "failedRequests" -> res._2.failedRequests))); + } catch { + case ex: Exception => + JobLogger.log(ex.getMessage, None, ERROR); + JobLogger.end(jobName + " execution failed", "FAILED", Option(Map("model" -> jobName, "statusMsg" -> ex.getMessage))); + // generate metric event and push it to kafka topic in case of failure + val metricEvent = getMetricJson(jobName, Option(new DateTime().toString(CommonUtil.dateFormat)), "FAILED", List()) + // $COVERAGE-OFF$ + if (AppConf.getConfig("push.metrics.kafka").toBoolean) + KafkaDispatcher.dispatch(Array(metricEvent), Map("topic" -> AppConf.getConfig("metric.kafka.topic"), "brokerList" -> AppConf.getConfig("metric.kafka.broker"))) + // $COVERAGE-ON$ } finally { frameworkContext.closeContext(); spark.close() @@ -147,8 +156,8 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh JobLogger.log("The Request is processed. Pending zipping", Some(Map("requestId" -> request.request_id, "timeTaken" -> res.execution_time, "remainingRequest" -> totalRequests.getAndDecrement())), INFO) res } else { - JobLogger.log("Invalid Request", Some(Map("requestId" -> request.request_id, "remainingRequest" -> totalRequests.getAndDecrement())), INFO) - markRequestAsFailed(request, "Invalid request") + JobLogger.log("Request should have either of batchId, batchFilter, searchFilter or encrption key", Some(Map("requestId" -> request.request_id, "remainingRequest" -> totalRequests.getAndDecrement())), INFO) + markRequestAsFailed(request, "Request should have either of batchId, batchFilter, searchFilter or encrption key") } } else { @@ -189,40 +198,46 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh } def checkRequestProcessCriteria(processedCount: Long, processedSize: Long): Boolean = { - if (processedCount < AppConf.getConfig("exhaust.batches.limit").toLong && processedSize < AppConf.getConfig("exhaust.file.size.limit").toLong) + if (processedCount < AppConf.getConfig("exhaust.batches.limit.per.channel").toLong && processedSize < AppConf.getConfig("exhaust.file.size.limit.per.channel").toLong) true else false } def processRequest(request: JobRequest, custodianOrgId: String, userCachedDF: DataFrame, storageConfig: StorageConfig, processedRequests: ListBuffer[ProcessedRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): JobRequest = { - val completedBatches :ListBuffer[ProcessedRequest]= if(request.processed_batches.getOrElse("[]").equals("[]")) ListBuffer.empty[ProcessedRequest] else { - JSONUtils.deserialize[ListBuffer[ProcessedRequest]](request.processed_batches.get) - } - markRequestAsProcessing(request) - val completedBatchIds = completedBatches.map(f=> f.batchId) - + val batchLimit: Int = AppConf.getConfig("data_exhaust.batch.limit.per.request").toInt val collectionConfig = JSONUtils.deserialize[CollectionConfig](request.request_data); - val collectionBatches = getCollectionBatches(collectionConfig.batchId, collectionConfig.batchFilter, collectionConfig.searchFilter, custodianOrgId, request.requested_channel) - .filter(p=> !completedBatchIds.contains(p.batchId)) - val result = CommonUtil.time(processBatches(userCachedDF, collectionBatches, storageConfig, Some(request.request_id), Some(request.requested_channel), processedRequests.toList)) - - val response = result._2; - val failedBatches = response.filter(p => p.status.equals("FAILED")) - val processingBatches= response.filter(p => p.status.equals("PROCESSING")) - response.filter(p=> p.status.equals("SUCCESS")).foreach(f => completedBatches += ProcessedRequest(request.requested_channel, f.batchId,f.file, f.fileSize)) - if (response.size == 0) { - markRequestAsFailed(request, "No data found") - } else if (failedBatches.size > 0) { - markRequestAsFailed(request, failedBatches.map(f => f.statusMsg).mkString(","), Option(JSONUtils.serialize(completedBatches))) - } else if(processingBatches.size > 0 ){ - markRequestAsSubmitted(request, JSONUtils.serialize(completedBatches)) + val batches = if (collectionConfig.batchId.isDefined) List(collectionConfig.batchId.get) else collectionConfig.batchFilter.getOrElse(List[String]()) + if (batches.length <= batchLimit) { + val completedBatches :ListBuffer[ProcessedRequest]= if(request.processed_batches.getOrElse("[]").equals("[]")) ListBuffer.empty[ProcessedRequest] else { + JSONUtils.deserialize[ListBuffer[ProcessedRequest]](request.processed_batches.get) + } + markRequestAsProcessing(request) + val completedBatchIds = completedBatches.map(f=> f.batchId) + + val collectionBatches = getCollectionBatches(collectionConfig.batchId, collectionConfig.batchFilter, collectionConfig.searchFilter, custodianOrgId, request.requested_channel) + .filter(p=> !completedBatchIds.contains(p.batchId)) + val result = CommonUtil.time(processBatches(userCachedDF, collectionBatches, storageConfig, Some(request.request_id), Some(request.requested_channel), processedRequests.toList)) + + val response = result._2; + val failedBatches = response.filter(p => p.status.equals("FAILED")) + val processingBatches= response.filter(p => p.status.equals("PROCESSING")) + response.filter(p=> p.status.equals("SUCCESS")).foreach(f => completedBatches += ProcessedRequest(request.requested_channel, f.batchId,f.file, f.fileSize)) + if (response.size == 0) { + markRequestAsFailed(request, "No data found") + } else if (failedBatches.size > 0) { + markRequestAsFailed(request, failedBatches.map(f => f.statusMsg).mkString(","), Option(JSONUtils.serialize(completedBatches))) + } else if(processingBatches.size > 0 ){ + markRequestAsSubmitted(request, JSONUtils.serialize(completedBatches)) + } else { + request.status = "SUCCESS"; + request.download_urls = Option(completedBatches.map(f => f.filePath).toList); + request.execution_time = Option(result._1); + request.dt_job_completed = Option(System.currentTimeMillis) + request.processed_batches = Option(JSONUtils.serialize(completedBatches)) + request + } } else { - request.status = "SUCCESS"; - request.download_urls = Option(completedBatches.map(f => f.filePath).toList); - request.execution_time = Option(result._1); - request.dt_job_completed = Option(System.currentTimeMillis) - request.processed_batches = Option(JSONUtils.serialize(completedBatches)) - request + markRequestAsFailed(request, s"Number of batches in request exceeded. It should be within $batchLimit") } } @@ -242,7 +257,7 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh val encoder = Encoders.product[CollectionBatch]; val collectionBatches = getCollectionBatchDF(false); if (batchId.isDefined || batchFilter.isDefined) { - val batches = if (batchId.isDefined) collectionBatches.filter(col("batchid") === batchId.get) else collectionBatches.filter(col("batchid").isin(batchFilter.get: _*)) + val batches = validateBatches(collectionBatches, batchId, batchFilter) val collectionIds = batches.select("courseid").dropDuplicates().collect().map(f => f.get(0)); val collectionDF = searchContent(Map("request" -> Map("filters" -> Map("identifier" -> collectionIds, "status" -> Array("Live", "Unlisted", "Retired")), "fields" -> Array("channel", "identifier", "name", "userConsent")))); val joinedDF = batches.join(collectionDF, batches("courseid") === collectionDF("identifier"), "inner"); @@ -262,6 +277,31 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh } } + /** + * + * @param collectionBatches, batchId, batchFilter + * If batchFilter is defined + * Step 1: Filter the duplictae batches from batchFilter list + * Common Step + * Step 2: Validate if the batchid is correct by checking in coursebatch table and is not expired (status=2) batch + * + * @return Dataset[Row] of valid batchid + */ + def validateBatches(collectionBatches: DataFrame, batchId: Option[String], batchFilter: Option[List[String]]): Dataset[Row] = { + if (batchId.isDefined) { + collectionBatches.filter(col("batchid") === batchId.get && col("status").notEqual(2)) + } else { + /** + * Filter out the duplicate batches from batchFilter + * eg: Input: List["batch-001", "batch-002", "batch-001"] + * Output: List["batch-001", "batch-002"] + */ + val distinctBatch = batchFilter.get.distinct + if (batchFilter.size != distinctBatch.size) JobLogger.log("Duplicate Batches are filtered:: TotalDistinctBatches: " + distinctBatch.size) + collectionBatches.filter(col("batchid").isin(distinctBatch: _*) && col("status").notEqual(2)) + } + } + def processBatches(userCachedDF: DataFrame, collectionBatches: List[CollectionBatch], storageConfig: StorageConfig, requestId: Option[String], requestChannel: Option[String], processedRequests: List[ProcessedRequest] )(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[CollectionBatchResponse] = { var processedCount = if(processedRequests.isEmpty) 0 else processedRequests.filter(f => f.channel.equals(requestChannel.getOrElse(""))).size @@ -402,7 +442,7 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh val df = loadData(collectionBatchDBSettings, cassandraFormat, new StructType()) .withColumn("startdate", UDFUtils.getLatestValue(col("start_date"), col("startdate"))) .withColumn("enddate", UDFUtils.getLatestValue(col("end_date"), col("enddate"))) - .select("courseid", "batchid", "enddate", "startdate", "name") + .select("courseid", "batchid", "enddate", "startdate", "name", "status") if (persist) df.persist() else df } diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/UserInfoExhaustJob.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/UserInfoExhaustJob.scala index 36ff264ab..b2abcd505 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/UserInfoExhaustJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/UserInfoExhaustJob.scala @@ -56,12 +56,12 @@ object UserInfoExhaustJob extends optional.Application with BaseCollectionExhaus userDF.withColumn("consentflag", lit("false")); } else { val consentDF = getUserConsentDF(collectionBatch); - val resultDF = userDF.join(consentDF, Seq("userid"), "left_outer") + val resultDF = userDF.join(consentDF, Seq("userid"), "inner") // Org level consent - will be updated in 3.4 to read from user_consent table resultDF.withColumn("orgconsentflag", when(col("rootorgid") === collectionBatch.requestedOrgId, "true").otherwise("false")) } - val consentAppliedDF = consentFields.foldLeft(consentDF)((df, column) => df.withColumn(column, when(col("consentflag") === "true", col(column)).otherwise(""))); - orgDerivedFields.foldLeft(consentAppliedDF)((df, field) => df.withColumn(field, when(col("consentflag") === "true", col(field)).when(col("orgconsentflag") === "true", col(field)).otherwise(""))); + // Issue #SB-24966: Logic to exclude users whose consentflag is false + consentDF.filter(col("consentflag") === "true") } def decryptUserInfo(userDF: DataFrame)(implicit spark: SparkSession): DataFrame = { diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala index 7fde0daba..b5f006234 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala @@ -15,9 +15,6 @@ import org.ekstep.analytics.framework.Level.{ERROR, INFO} case class DistrictSummary(index:Int, districtName: String, blocks: Long, schools: Long) case class RootOrgData(rootorgjoinid: String, rootorgchannel: String, rootorgslug: String) -case class SubOrgRow(id: String, isrootorg: Boolean, rootorgid: String, channel: String, status: String, locationid: String, locationids: Seq[String], orgname: String, - explodedlocation: String, locid: String, loccode: String, locname: String, locparentid: String, loctype: String, rootorgjoinid: String, rootorgchannel: String, externalid: String) - object StateAdminGeoReportJob extends optional.Application with IJob with StateAdminReportHelper { implicit val className: String = "org.ekstep.analytics.job.StateAdminGeoReportJob" diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportHelper.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportHelper.scala index 811e15a4b..d9ba699d0 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportHelper.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportHelper.scala @@ -3,6 +3,7 @@ package org.sunbird.analytics.job.report import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Encoders, SparkSession} +import org.sunbird.analytics.job.report.StateAdminReportJob.locationIdList import org.sunbird.cloud.storage.conf.AppConf trait StateAdminReportHelper extends BaseReportsJob { @@ -24,13 +25,12 @@ trait StateAdminReportHelper extends BaseReportsJob { def generateSubOrgData(organisationDF: DataFrame)(implicit sparkSession: SparkSession) = { var locationDF = locationData() - - val rootOrgs = organisationDF.select(col("id").as("rootorgjoinid"), col("channel").as("rootorgchannel"), col("slug").as("rootorgslug")).where(col("isrootorg") && col("status").===(1)).collect(); + val rootOrgs = organisationDF.select(col("id").as("rootorgjoinid"), col("channel").as("rootorgchannel"), col("slug").as("rootorgslug")).where(col("istenant") && col("status").===(1)).collect(); val rootOrgRDD = sparkSession.sparkContext.parallelize(rootOrgs.toSeq); val rootOrgEncoder = Encoders.product[RootOrgData].schema val rootOrgDF = sparkSession.createDataFrame(rootOrgRDD, rootOrgEncoder); - - val subOrgDF = organisationDF + val orgWithLocationDF = organisationDF.withColumn("locationids", locationIdList(col("orglocation"))) + val subOrgDF = orgWithLocationDF .withColumn("explodedlocation", explode(when(size(col("locationids")).equalTo(0), array(lit(null).cast("string"))) .otherwise(when(col("locationids").isNotNull, col("locationids")) .otherwise(array(lit(null).cast("string")))))) @@ -68,13 +68,13 @@ trait StateAdminReportHelper extends BaseReportsJob { def loadOrganisationData()(implicit sparkSession: SparkSession) = { loadData(sparkSession, Map("table" -> "organisation", "keyspace" -> sunbirdKeyspace), None).select( col("id").as("id"), - col("isrootorg").as("isrootorg"), col("rootorgid").as("rootorgid"), col("channel").as("channel"), col("status").as("status"), col("orgname").as("orgname"), - col("locationids").as("locationids"), col("externalid").as("externalid"), + col("orglocation"), + col("istenant"), col("slug").as("slug")).cache(); } diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala index 85824ad6d..70ac3c693 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminReportJob.scala @@ -1,7 +1,6 @@ package org.sunbird.analytics.job.report import org.apache.spark.SparkContext -import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, lit, when, _} import org.apache.spark.sql.{DataFrame, _} import org.ekstep.analytics.framework.Level.{ERROR, INFO} @@ -12,35 +11,15 @@ import org.ekstep.analytics.framework.FrameworkContext import org.ekstep.analytics.framework.util.JobLogger import org.ekstep.analytics.framework.JobConfig import org.ekstep.analytics.framework.JobContext -import org.ekstep.analytics.framework.StorageConfig -import org.ekstep.analytics.framework.OutputDispatcher import org.ekstep.analytics.framework.dispatcher.ScriptDispatcher import org.sunbird.analytics.util.DecryptUtil import org.ekstep.analytics.framework.util.JSONUtils import scala.collection.mutable.ListBuffer -case class ValidatedUserDistrictSummary(index: Int, districtName: String, blocks: Long, schools: Long, registered: Long) -case class UserStatus(id: Long, status: String) -object UnclaimedStatus extends UserStatus(0, "UNCLAIMED") -object ClaimedStatus extends UserStatus(1, "CLAIMED") -object RejectedStatus extends UserStatus(2, "REJECTED") -object FailedStatus extends UserStatus(3, "FAILED") -object MultiMatchStatus extends UserStatus(4, "MULTIMATCH") -object OrgExtIdMismatch extends UserStatus(5, "ORGEXTIDMISMATCH") -object Eligible extends UserStatus(6, "ELIGIBLE") - -case class ShadowUserData(channel: String, userextid: String, addedby: String, claimedon: java.sql.Timestamp, claimstatus: Int, - createdon: java.sql.Timestamp, email: String, name: String, orgextid: String, processid: String, - phone: String, updatedon: java.sql.Timestamp, userid: String, userids: List[String], userstatus: Int) - case class UserSelfDeclared(userid: String, orgid: String, persona: String, errortype: String, status: String, userinfo: Map[String, String]) -// Shadow user summary in the json will have this POJO -case class UserSummary(accounts_validated: Long, accounts_rejected: Long, accounts_unclaimed: Long, accounts_failed: Long) -case class UserProfileLocationType(userid: String, usertype: String, usersubtype: String, locationids: List[String]) - object StateAdminReportJob extends optional.Application with IJob with StateAdminReportHelper { implicit val className: String = "org.ekstep.analytics.job.StateAdminReportJob" @@ -65,9 +44,6 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } private def execute(config: JobConfig)(implicit sparkSession: SparkSession, fc: FrameworkContext) = { - - generateReport(); - JobLogger.end("StateAdminReportJob completed successfully!", "SUCCESS", Option(Map("config" -> config, "model" -> name))) val resultDf = generateExternalIdReport(); JobLogger.end("ExternalIdReportJob completed successfully!", "SUCCESS", Option(Map("config" -> config, "model" -> name))) @@ -78,10 +54,16 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi // $COVERAGE-ON$ Enabling scoverage for other methods def generateExternalIdReport() (implicit sparkSession: SparkSession, fc: FrameworkContext) = { import sparkSession.implicits._ + val DECLARED_EMAIL: String = "declared-email" + val DECLARED_PHONE: String = "declared-phone" val userSelfDeclaredEncoder = Encoders.product[UserSelfDeclared].schema //loading user_declarations table details based on declared values and location details and appending org-external-id if present val userSelfDeclaredDataDF = loadData(sparkSession, Map("table" -> "user_declarations", "keyspace" -> sunbirdKeyspace), Some(userSelfDeclaredEncoder)) - val userSelfDeclaredUserInfoDataDF = userSelfDeclaredDataDF.select(col("*"), col("userinfo").getItem("declared-email").as("declared-email"), col("userinfo").getItem("declared-phone").as("declared-phone"), + val userConsentDataDF = loadData(sparkSession, Map("table" -> "user_consent", "keyspace" -> sunbirdKeyspace)) + val activeConsentDF = userConsentDataDF.where(col("status") === "ACTIVE" && lower(col("object_type")) === "organisation") + val activeSelfDeclaredDF = userSelfDeclaredDataDF.join(activeConsentDF, userSelfDeclaredDataDF.col("orgid") === activeConsentDF.col("consumer_id"), "left_semi"). + select(userSelfDeclaredDataDF.col("*")) + val userSelfDeclaredUserInfoDataDF = activeSelfDeclaredDF.select(col("*"), col("userinfo").getItem(DECLARED_EMAIL).as(DECLARED_EMAIL), col("userinfo").getItem(DECLARED_PHONE).as(DECLARED_PHONE), col("userinfo").getItem("declared-school-name").as("declared-school-name"), col("userinfo").getItem("declared-school-udise-code").as("declared-school-udise-code"),col("userinfo").getItem("declared-ext-id").as("declared-ext-id")).drop("userinfo"); val locationDF = locationData() //to-do later check if externalid is necessary not-null check is necessary @@ -90,7 +72,7 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi select(userSelfDeclaredUserInfoDataDF.col("*"), orgExternalIdDf.col("*")) //decrypting email and phone values - val userDecrpytedDataDF = decryptPhoneEmailInDF(userSelfDeclaredExtIdDF, "declared-email", "declared-phone") + val userDecrpytedDataDF = decryptPhoneEmailInDF(userSelfDeclaredExtIdDF, DECLARED_EMAIL, DECLARED_PHONE) //appending decrypted values to the user-external-identifier dataframe val userExternalDecryptData = userSelfDeclaredExtIdDF.join(userDecrpytedDataDF, userSelfDeclaredExtIdDF.col("userid") === userDecrpytedDataDF.col("userid"), "left_outer"). select(userSelfDeclaredExtIdDF.col("*"), userDecrpytedDataDF.col("decrypted-email"), userDecrpytedDataDF.col("decrypted-phone")) @@ -175,19 +157,29 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi } private def saveUserSelfDeclaredExternalInfo(userExternalDecryptData: DataFrame, userDenormLocationDF: DataFrame): DataFrame ={ - val resultDf = userExternalDecryptData.join(userDenormLocationDF, userExternalDecryptData.col("userid") === userDenormLocationDF.col("userid"), "left_outer"). + var userDenormLocationDFWithCluster : DataFrame = null; + if(!userDenormLocationDF.columns.contains("cluster")) { + if(!userDenormLocationDF.columns.contains("block")) { + userDenormLocationDFWithCluster = userDenormLocationDF.withColumn("cluster", lit("").cast("string")).withColumn("block", lit("").cast("string")) + } else { + userDenormLocationDFWithCluster = userDenormLocationDF.withColumn("cluster", lit("").cast("string")) + } + } else { + userDenormLocationDFWithCluster = userDenormLocationDF + } + val resultDf = userExternalDecryptData.join(userDenormLocationDFWithCluster, userExternalDecryptData.col("userid") === userDenormLocationDFWithCluster.col("userid"), "left_outer"). select(col("Name"), userExternalDecryptData.col("userid").as("Diksha UUID"), - when(userDenormLocationDF.col("state").isNotNull, userDenormLocationDF.col("state")).otherwise(lit("")).as("State"), - when(userDenormLocationDF.col("district").isNotNull, userDenormLocationDF.col("district")).otherwise(lit("")).as("District"), - when(userDenormLocationDF.col("block").isNotNull, userDenormLocationDF.col("block")).otherwise(lit("")).as("Block"), - when(userDenormLocationDF.col("cluster").isNotNull, userDenormLocationDF.col("cluster")).otherwise(lit("")).as("Cluster"), + when(userDenormLocationDFWithCluster.col("state").isNotNull, userDenormLocationDFWithCluster.col("state")).otherwise(lit("")).as("State"), + when(userDenormLocationDFWithCluster.col("district").isNotNull, userDenormLocationDFWithCluster.col("district")).otherwise(lit("")).as("District"), + when(userDenormLocationDFWithCluster.col("block").isNotNull, userDenormLocationDFWithCluster.col("block")).otherwise(lit("")).as("Block"), + when(userDenormLocationDFWithCluster.col("cluster").isNotNull, userDenormLocationDFWithCluster.col("cluster")).otherwise(lit("")).as("Cluster"), col("declared-school-name"). as("School Name"), col("declared-school-udise-code").as("School UDISE ID"), col("declared-ext-id").as("State provided ext. ID"), - userDenormLocationDF.col("decrypted-email").as("Profile Email"), - userDenormLocationDF.col("decrypted-phone").as("Profile Phone number"), + userDenormLocationDFWithCluster.col("decrypted-email").as("Profile Email"), + userDenormLocationDFWithCluster.col("decrypted-phone").as("Profile Phone number"), userExternalDecryptData.col("decrypted-phone").as("Org Phone"), userExternalDecryptData.col("decrypted-email").as("Org Email ID"), col("usertype").as("User Type"), @@ -198,207 +190,6 @@ object StateAdminReportJob extends optional.Application with IJob with StateAdmi resultDf } - def generateStateRootSubOrgDF(subOrgDF: DataFrame, claimedShadowDataSummaryDF: DataFrame, claimedShadowUserDF: DataFrame) = { - val rootSubOrg = subOrgDF.where(col("isrootorg") && col("status").equalTo(1)) - val stateUsersDf = rootSubOrg.join(claimedShadowUserDF, rootSubOrg.col("id") === (claimedShadowUserDF.col("rootorgid")),"inner") - .withColumnRenamed("orgname","School name") - .withColumn("District id", lit("")).withColumn("District name", lit( "")).withColumn("Block id", lit("")).withColumn("Block name", lit("")) - .select(col("School name"), col("District id"), col("District name"), col("Block id"), col("Block name"), col("slug"), - col("externalid"),col("userextid"), col("name"), col("userid"), col("usersuborganisationid")).where(col("usersuborganisationid").isNull) - stateUsersDf - } - - def generateReport()(implicit sparkSession: SparkSession, fc: FrameworkContext) = { - - import sparkSession.implicits._ - - val shadowDataEncoder = Encoders.product[ShadowUserData].schema - val shadowUserDF = loadData(sparkSession, Map("table" -> "shadow_user", "keyspace" -> sunbirdKeyspace), Some(shadowDataEncoder)).as[ShadowUserData] - val claimedShadowUserDF = shadowUserDF.where(col("claimstatus")=== ClaimedStatus.id) - val organisationDF = loadOrganisationSlugDF() - val channelSlugDF = getChannelSlugDF(organisationDF) - val shadowUserStatusDF = appendShadowUserStatus(shadowUserDF); - val shadowDataSummary = generateSummaryData(shadowUserStatusDF) - - saveUserSummaryReport(shadowDataSummary, channelSlugDF, storageConfig) - saveUserDetailsReport(shadowUserStatusDF, channelSlugDF, storageConfig) - - generateTenantUserReport(organisationDF.toDF()); - } - - //This is validated user summary/detail section in the admin-manage page - def generateTenantUserReport(organisationDF: DataFrame) (implicit sparkSession: SparkSession, fc: FrameworkContext) = { - - val custodianOrgId = getCustodianOrgId(); - //loading active teanant user-details from user and usr_external_identity and "user_organisation - var tenantUserDF = loadData(sparkSession, Map("table" -> "user", "keyspace" -> sunbirdKeyspace), None). - where(col("rootorgid").isNotNull and !col("rootorgid").contains(custodianOrgId) and col("isdeleted").contains(false)) - val userExternalIdentityDF = loadData(sparkSession, Map("table" -> "usr_external_identity", "keyspace" -> sunbirdKeyspace), None) - val tenantUserOrgDF = loadData(sparkSession, Map("table" -> "user_organisation", "keyspace" -> sunbirdKeyspace), None). - where(!col("organisationid").contains(custodianOrgId)) - //teantUserSubOrgDf will contain user records which belong to sub-org(duplicate tenant org-related details is removed) - val teantUserSubOrgDf = tenantUserOrgDF.join(tenantUserDF, tenantUserOrgDF.col("organisationid") === tenantUserDF.col("rootorgid"), "left_anti") - tenantUserDF = tenantUserDF.join(teantUserSubOrgDf, tenantUserDF.col("userid") === teantUserSubOrgDf.col("userid"), "left_outer").select(tenantUserDF.col("*"), tenantUserOrgDF.col("organisationid").as("usersuborganisationid")) - tenantUserDF = tenantUserDF.join(userExternalIdentityDF, tenantUserDF.col("id") === userExternalIdentityDF.col("userid"), "left_outer"). - select(tenantUserDF.col("*"), userExternalIdentityDF.col("originalexternalid").as("userextid"), concat_ws(" ", col("firstname"), col("lastname")).as("name")).drop("firstname", "lastname") - val stateUserDataSummaryDF = tenantUserDF.groupBy("channel").agg(count("rootorgid").as("totalregistered")).na.fill(0) - - // We can directly write to the slug folder - val subOrgDF: DataFrame = generateSubOrgData(organisationDF) - val blockDataWithSlug:DataFrame = generateBlockLevelData(subOrgDF) - val userDistrictSummaryDF = tenantUserDF.join(blockDataWithSlug, blockDataWithSlug.col("School id") === (tenantUserDF.col("usersuborganisationid")),"inner") - val validatedUsersWithDst = userDistrictSummaryDF.groupBy(col("slug"), col("Channels")).agg(countDistinct("District name").as("districts"), - countDistinct("Block id").as("blocks"), countDistinct(tenantUserDF.col("usersuborganisationid")).as("schools"), count("userid").as("subOrgRegistered")) - - val validatedUserDataSummaryDF = stateUserDataSummaryDF.join(validatedUsersWithDst, stateUserDataSummaryDF.col("channel") === validatedUsersWithDst.col("Channels")) - val validatedGeoSummaryDF = validatedUserDataSummaryDF.withColumn("registered", - when(col("totalregistered").isNull, 0).otherwise(col("totalregistered"))).withColumn("rootOrgRegistered", col("registered")-col("subOrgRegistered")).drop("totalregistered", "channel", "Channels") - - saveUserValidatedSummaryReport(validatedGeoSummaryDF, storageConfig) - val stateOrgDf = generateStateRootSubOrgDF(subOrgDF, stateUserDataSummaryDF, tenantUserDF.toDF()); - saveValidatedUserDetailsReport(userDistrictSummaryDF, storageConfig, "validated-user-detail") - saveValidatedUserDetailsReport(stateOrgDf, storageConfig, "validated-user-detail-state") - - val districtUserResult = userDistrictSummaryDF.groupBy(col("slug"), col("District name").as("districtName")). - agg(countDistinct("Block id").as("blocks"),countDistinct(col("School id")).as("schools"), count("id").as("registered")) - saveUserDistrictSummary(districtUserResult, storageConfig) - - districtUserResult - } - - def getCustodianOrgId() (implicit sparkSession: SparkSession): String = { - val systemSettingDF = loadData(sparkSession, Map("table" -> "system_settings", "keyspace" -> sunbirdKeyspace)).where(col("id") === "custodianOrgId" && col("field") === "custodianOrgId") - systemSettingDF.select(col("value")).persist().select("value").first().getString(0) - } - - def saveValidatedUserDetailsReport(userDistrictSummaryDF: DataFrame, storageConfig: StorageConfig, reportId: String) : Unit = { - val window = Window.partitionBy("slug").orderBy(asc("District name")) - val userDistrictDetailDF = userDistrictSummaryDF.withColumn("Sl", row_number().over(window)).select( col("Sl"), col("District name"), col("District id").as("District ext. ID"), - col("Block name"), col("Block id").as("Block ext. ID"), col("School name"), col("externalid").as("School ext. ID"), col("name").as("Teacher name"), - col("userextid").as("Teacher ext. ID"), col("userid").as("Teacher Diksha ID"), col("slug")) - userDistrictDetailDF.saveToBlobStore(storageConfig, "csv", reportId, Option(Map("header" -> "true")), Option(Seq("slug"))) - JobLogger.log(s"StateAdminReportJob: ${reportId} report records count = ${userDistrictDetailDF.count()}", None, INFO) - } - - def saveUserDistrictSummary(resultDF: DataFrame, storageConfig: StorageConfig)(implicit spark: SparkSession, fc: FrameworkContext) = { - val window = Window.partitionBy("slug").orderBy(asc("districtName")) - val districtSummaryDF = resultDF.withColumn("index", row_number().over(window)) - dataFrameToJsonFile(districtSummaryDF, "validated-user-summary-district", storageConfig) - JobLogger.log(s"StateAdminReportJob: validated-user-summary-district report records count = ${districtSummaryDF.count()}", None, INFO) - } - - private def getChannelSlugDF(organisationDF: DataFrame)(implicit sparkSession: SparkSession): DataFrame = { - organisationDF.select(col("channel"), col("slug")).where(col("isrootorg") && col("status").===(1)) - } - - def appendShadowUserStatus(shadowUserDF: Dataset[ShadowUserData])(implicit spark: SparkSession): DataFrame = { - import spark.implicits._ - shadowUserDF.withColumn( - "claim_status", - when($"claimstatus" === UnclaimedStatus.id, lit(UnclaimedStatus.status)) - .when($"claimstatus" === ClaimedStatus.id, lit(ClaimedStatus.status)) - .when($"claimstatus" === FailedStatus.id, lit(FailedStatus.status)) - .when($"claimstatus" === RejectedStatus.id, lit(RejectedStatus.status)) - .when($"claimstatus" === MultiMatchStatus.id, lit(MultiMatchStatus.status)) - .when($"claimstatus" === OrgExtIdMismatch.id, lit(OrgExtIdMismatch.status)) - .when($"claimstatus" === Eligible.id, lit(Eligible.status)) - .otherwise(lit(""))) - } - - def generateSummaryData(shadowUserDF: DataFrame)(implicit spark: SparkSession): DataFrame = { - shadowUserDF.groupBy("channel") - .pivot("claim_status").agg(count("claim_status")).na.fill(0) - } - - /** - * Saves the raw data as a .csv. - * Appends /detail to the URL to prevent overwrites. - * Check function definition for the exact column ordering. - * @param reportDF - * @param url - */ - def saveUserDetailsReport(reportDF: DataFrame, channelSlugDF: DataFrame, storageConfig: StorageConfig) (implicit spark: SparkSession): Unit = { - import spark.implicits._ - // List of fields available - //channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus - val userDetailReport = reportDF.withColumn("shadow_user_status", - when($"claimstatus" === UnclaimedStatus.id, lit(UnclaimedStatus.status)) - .when($"claimstatus" === ClaimedStatus.id, lit(ClaimedStatus.status)) - .when($"claimstatus" === FailedStatus.id, lit(FailedStatus.status)) - .when($"claimstatus" === RejectedStatus.id, lit(RejectedStatus.status)) - .when($"claimstatus" === MultiMatchStatus.id, lit(MultiMatchStatus.status)) - .when($"claimstatus" === OrgExtIdMismatch.id, lit(FailedStatus.status)) - .when($"claimstatus" === Eligible.id, lit(Eligible.status))) - userDetailReport.join(channelSlugDF, reportDF.col("channel") === channelSlugDF.col("channel"), "left_outer").select( - col("slug"), - col("shadow_user_status"), - col("userextid").as("User external id"), - col("userstatus").as("User account status"), - col("userid").as("User id"), - concat_ws(",", col("userids")).as("Matching User ids"), - col("claimedon").as("Claimed on"), - col("orgextid").as("School external id"), - col("claim_status").as("Claimed status"), - col("createdon").as("Created on"), - col("updatedon").as("Last updated on")).filter(col(colName ="shadow_user_status"). - isin(lit(UnclaimedStatus.status),lit(ClaimedStatus.status),lit(Eligible.status),lit(RejectedStatus.status),lit(FailedStatus.status),lit(MultiMatchStatus.status))).filter(col(colName = "slug").isNotNull) - .saveToBlobStore(storageConfig, "csv", "user-detail", Option(Map("header" -> "true")), Option(Seq("shadow_user_status","slug"))) - - JobLogger.log(s"StateAdminReportJob: user-details report records count = ${userDetailReport.count()}", None, INFO) - } - - def saveUserValidatedSummaryReport(reportDF: DataFrame, storageConfig: StorageConfig): Unit = { - reportDF.saveToBlobStore(storageConfig, "json", "validated-user-summary", None, Option(Seq("slug"))) - JobLogger.log(s"StateAdminReportJob: validated-user-summary report records count = ${reportDF.count()}", None, INFO) - } - - def saveUserSummaryReport(reportDF: DataFrame, channelSlugDF: DataFrame, storageConfig: StorageConfig): Unit = { - val dfColumns = reportDF.columns.toSet - - // Get claim status not in the current dataframe to add them. - val columns: Seq[String] = Seq( - UnclaimedStatus.status, - ClaimedStatus.status, - RejectedStatus.status, - FailedStatus.status, - MultiMatchStatus.status, - OrgExtIdMismatch.status).filterNot(dfColumns) - val correctedReportDF = columns.foldLeft(reportDF)((acc, col) => { - acc.withColumn(col, lit(0)) - }) - JobLogger.log(s"columns to add in this report $columns") - val totalSummaryDF = correctedReportDF.join(channelSlugDF, correctedReportDF.col("channel") === channelSlugDF.col("channel"), "left_outer").select( - col("slug"), - when(col(UnclaimedStatus.status).isNull, 0).otherwise(col(UnclaimedStatus.status)).as("accounts_unclaimed"), - when(col(ClaimedStatus.status).isNull, 0).otherwise(col(ClaimedStatus.status)).as("accounts_validated"), - when(col(RejectedStatus.status).isNull, 0).otherwise(col(RejectedStatus.status)).as("accounts_rejected"), - when(col(Eligible.status).isNull, 0).otherwise(col(Eligible.status)).as("accounts_eligible"), - when(col(MultiMatchStatus.status).isNull, 0).otherwise(col(MultiMatchStatus.status)).as("accounts_duplicate"), - when(col(FailedStatus.status).isNull, 0).otherwise(col(FailedStatus.status)).as(FailedStatus.status), - when(col(OrgExtIdMismatch.status).isNull, 0).otherwise(col(OrgExtIdMismatch.status)).as(OrgExtIdMismatch.status)) - totalSummaryDF.withColumn( - "accounts_failed", col(FailedStatus.status) + col(OrgExtIdMismatch.status)) - .withColumn("total", col("accounts_failed") + col("accounts_unclaimed") + col("accounts_validated") + col("accounts_rejected") - + col("accounts_eligible") + col("accounts_duplicate")) - .filter(col(colName = "slug").isNotNull) - .saveToBlobStore(storageConfig, "json", "user-summary", None, Option(Seq("slug"))) - JobLogger.log(s"StateAdminReportJob: user-summary report records count = ${correctedReportDF.count()}", None, INFO) - } - - def dataFrameToJsonFile(dataFrame: DataFrame, reportId: String, storageConfig: StorageConfig)(implicit spark: SparkSession, fc: FrameworkContext): Unit = { - - implicit val sc = spark.sparkContext; - - dataFrame.select("slug", "index", "districtName", "blocks", "schools", "registered") - .collect() - .groupBy(f => f.getString(0)).map(f => { - val summary = f._2.map(f => ValidatedUserDistrictSummary(f.getInt(1), f.getString(2), f.getLong(3), f.getLong(4), f.getLong(5))) - val arrDistrictSummary = sc.parallelize(Array(JSONUtils.serialize(summary)), 1) - OutputDispatcher.dispatch(StorageConfig(storageConfig.store, storageConfig.container, storageConfig.fileName + reportId + "/" + f._1 + ".json", storageConfig.accountKey, storageConfig.secretKey), arrDistrictSummary); - }) - - } - def locationIdListFunction(location: String): List[String] = { var locations = new ListBuffer[String]() try { diff --git a/data-products/src/test/resources/application.conf b/data-products/src/test/resources/application.conf index b4d92b1ce..9ff1086f6 100644 --- a/data-products/src/test/resources/application.conf +++ b/data-products/src/test/resources/application.conf @@ -168,9 +168,11 @@ postgres.table.job_request="job_request" druid.report.default.storage="local" druid.report.date.format="yyyy-MM-dd" -exhaust.batches.limit=3 +exhaust.batches.limit.per.channel=3 // file size in bytes exhaust.file.size.limit=100 exhaust.parallel.batch.load.limit = 10 -exhaust.user.parallelism=200 \ No newline at end of file +exhaust.user.parallelism=200 +exhaust.file.size.limit.per.channel=100 +data_exhaust.batch.limit.per.request=4 diff --git a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv index f8cfb174d..c2ad42129 100644 --- a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv +++ b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp.csv @@ -1,5 +1,5 @@ courseid,batchid,cert_templates,createdby,createddate,createdfor,description,enddate,end_date,enrollmentenddate,enrollment_enddate,enrollmenttype,mentors,name,startdate,start_date,status,updateddate -do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,,2030-06-30 18:29:59.000000+0000,2020-06-07,2020-06-07 18:29:59.000000+0000,open,,TEST 3,2021-04-28,2021-04-28 00:00:00.000000+0000,0, -do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30 18:29:59.000000+0000,,,open,,Test,,2020-05-26 00:00:00.000000+0000,1, -do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2019-06-30 18:29:59.000000+0000,,,open,,Test course order prad - 1,,2020-05-29 00:00:00.000000+0000,1, -do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2019-05-30,2019-05-30 18:29:59.000000+0000,,,open,,course-2,2020-05-23,2020-05-25 00:00:00.000000+0000,1, \ No newline at end of file +do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,2030-06-30 00:00:00.000000+0000 ,,2020-06-07,2020-06-07 00:00:00.000000+0000,open,,TEST 3,2021-04-28 00:00:00,2021-04-28 00:00:00.000000+0000,0, +do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30,,,open,,Test,,2020-05-26,1, +do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2019-06-30,,,open,,Test course order prad - 1,,2020-05-29,1, +do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2019-05-30,2019-05-30,,,open,,course-2,2020-05-23,2020-05-25,1, \ No newline at end of file diff --git a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv index eb44184cd..d673836d4 100644 --- a/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv +++ b/data-products/src/test/resources/course-batch-status-updater/course_batch_status_updater_temp2.csv @@ -1,5 +1,5 @@ courseid,batchid,cert_templates,createdby,createddate,createdfor,description,enddate,end_date,enrollmentenddate,enrollment_enddate,enrollmenttype,mentors,name,startdate,start_date,status,updateddate -do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,2030-06-30,,2020-06-07,,open,,TEST 3,2030-05-26,,0, -do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30 18:29:59.000000+0000,,,open,,Test,,2020-05-26 00:00:00.000000+0000,1, -do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2030-06-30 18:29:59.000000+0000,,,open,,Test course order prad - 1,,2020-05-26 00:00:00.000000+0000,1, -do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2030-06-30,2030-06-30 18:29:59.000000+0000,,,open,,course-2,2020-05-26,2020-05-26 00:00:00.000000+0000,1, \ No newline at end of file +do_112636984058314752121,0130320389509939204,,af73c71d-c4e8-4e3b-9010-864cfed2a9a9,2020-06-30 07:50:04:062+0000,,,2030-06-30,,2020-06-07,2020-06-07,open,,TEST 3,2030-05-26,,0, +do_1130293726460805121168,0130293763489873929,,9ad90eb4-b8d2-4e99-805f-58cdd1c0163f,2020-06-26 13:28:16:221+0000,,,,2030-06-30,,,open,,Test,,2020-05-26,1, +do_1130314965721088001129,01303150537737011211,,95e4942d-cbe8-477d-aebd-ad8e6de4bfc8,2020-06-29 13:34:54:235+0000,,,,2030-06-30,,,open,,Test course order prad - 1,,2020-05-26,1, +do_1130264512015646721166,0130271096968396800,,,2020-05-23 08:37:49:915+0000,,,2030-06-30,2030-06-30,,,open,,course-2,2020-05-26,2020-05-26,1, \ No newline at end of file diff --git a/data-products/src/test/resources/exhaust/report_data.cql b/data-products/src/test/resources/exhaust/report_data.cql index 386e46cd2..a6a909f09 100644 --- a/data-products/src/test/resources/exhaust/report_data.cql +++ b/data-products/src/test/resources/exhaust/report_data.cql @@ -1,7 +1,9 @@ INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1130928636168192001667', 'batch-001', 'manju', '2020-10-10', '2020-01-10', '2020-01-10', '2020-01-01', 'Basic Java', '2020-01-20', '2020-01-20', 1 ); INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1130293726460805121168', 'batch-002', 'manju', '2020-11-11', '2020-02-10', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-02-20', 1 ); -INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_11306040245271756813015', 'batch-003', 'manju', '2020-10-02', '2020-10-01', '2020-10-01', '2020-10-01', 'Basic C++', '2020-10-20', '2020-10-20', 1 ); +INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1131975645014835201326', 'batch-003', 'manju', '2020-10-02', '2020-10-01', '2020-10-01', '2020-10-01', 'Basic C++', '2020-10-20', '2020-10-20', 1 ); INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_112835334818643968148', 'batch-004', 'manju', '2020-11-11', '2020-02-10', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-10-20', 1 ); +INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_112835334818643968148', 'batch-005', 'manju', '2020-11-15', '2020-02-12', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-10-20', 2 ); +INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1130928636168192001667', 'batch-006', 'manju', '2020-11-15', '2020-02-12', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-10-20', 1 ); INSERT INTO sunbird_courses.user_activity_agg (activity_type, activity_id, user_id, context_id, agg, agg_last_updated) VALUES('Course', 'do_1130928636168192001667', 'user-001', 'cb:batch-001', {'completedCount': 1}, {'completedCount': '2020-08-17'}); INSERT INTO sunbird_courses.user_activity_agg (activity_type, activity_id, user_id, context_id, agg, agg_last_updated) VALUES('Course', 'do_1130928636168192001667', 'user-002', 'cb:batch-001', {'completedCount': 0}, {'completedCount': '2020-09-17'}); @@ -24,12 +26,16 @@ INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-005', 'do_1130292569979781121111', 'batch-002', True, 10, 0, 1, '2019-11-15 05:41:50:382+0000'); INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-006', 'do_1130292569979781121111', 'batch-002', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000'); INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-007', 'do_1130292569979781121111', 'batch-002', True, 10, 0, 1, '2019-11-15 05:41:50:382+0000'); -INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-008', 'do_11306040245271756813015', 'batch-001', True, 20, 1, 1, '2019-11-15 05:41:50:382+0000'); -INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-009', 'do_11306040245271756813015', 'batch-001', True, 20, 1, 1, '2019-11-15 05:41:50:382+0000'); -INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-010', 'do_11306040245271756813015', 'batch-001', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000'); +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-008', 'do_1131975645014835201326', 'batch-003', True, 20, 1, 1, '2019-11-15 05:41:50:382+0000'); +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-009', 'do_1131975645014835201326', 'batch-003', True, 20, 1, 1, '2019-11-15 05:41:50:382+0000'); +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-010', 'do_1131975645014835201326', 'batch-003', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000'); INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-011', 'do_1130293726460805121168', 'batch-002', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000'); INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-011', 'do_1130293726460805121168', 'batch-002', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000'); INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-014', 'do_112835334818643968148', 'batch-004', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000'); +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate, enrolled_date) VALUES ('user-015', 'do_1130928636168192001667', 'batch-006', True, 10, 0, 1, '2019-11-13 05:41:50:382+0000', '2019-11-16 05:41:50'); +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate, enrolled_date) VALUES ('user-016', 'do_1130928636168192001667', 'batch-006', True, 10, 0, 1, '2019-11-13 05:41:50:382+0000', '2019-11-16 05:41:50'); +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate, enrolled_date) VALUES ('user-017', 'do_1130928636168192001667', 'batch-006', True, 10, 0, 1, '2019-11-13 05:41:50:382+0000', '2019-11-16 05:41:50'); + INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-001', 'do_1128870328040161281204', 'attempat-001', '20', 20, 20, [{id: 'do_213019475454476288155', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], toTimeStamp(toDate(now()))); INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-002', 'do_1128870328040161281204', 'attempat-001', '10', 10, 10, toTimeStamp(toDate(now()))); @@ -43,16 +49,18 @@ INSERT INTO dev_hierarchy_store.content_hierarchy (identifier,hierarchy) VALUES INSERT INTO dev_hierarchy_store.content_hierarchy (identifier,hierarchy) VALUES ('do_1128870328040161281204', '{"identifier":"do_1128870328040161281204","children":[{"parent":"do_112876961957437440179","identifier":"do_1128870328040161281204","lastStatusChangedOn":"2019-09-19T18:15:56.490+0000","code":"2cb4d698-dc19-4f0c-9990-96f49daff753","visibility":"Parent","description":"Test_TextBookUnit_desc_8305852636","index":1,"mimeType":"application/vnd.ekstep.content-collection","createdOn":"2019-09-19T18:15:56.489+0000","versionKey":"1568916956489","depth":1,"name":"content_3","lastUpdatedOn":"2019-09-19T18:15:56.490+0000","contentType":"TextBookUnit","status":"Draft"}]}'); INSERT INTO dev_hierarchy_store.content_hierarchy (identifier,hierarchy) VALUES ('do_112835334818643968148', '{"identifier":"do_112835334818643968148","children":[{"parent":"do_112876961957437440179","identifier":"do_112835334818643968148","lastStatusChangedOn":"2019-09-19T18:15:56.490+0000","code":"2cb4d698-dc19-4f0c-9990-96f49daff753","visibility":"Parent","description":"Test_TextBookUnit_desc_8305852636","index":1,"mimeType":"application/vnd.ekstep.content-collection","createdOn":"2019-09-19T18:15:56.489+0000","versionKey":"1568916956489","depth":1,"name":"content_1","lastUpdatedOn":"2019-09-19T18:15:56.490+0000","contentType":"TextBookUnit","status":"Draft"}]}'); -INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id, last_updated_on) VALUES ('user-001', 'user-001', 'consumer_001', 'ORGANISATION', 'active', 'do_1130928636168192001667', '2019-09-19T18:15:56.490+0000'); -INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-002', 'consumer_001', 'ORGANISATION', 'active', 'do_1130928636168192001667'); -INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id, last_updated_on) VALUES ('user-001', 'user-003', 'consumer_001', 'ORGANISATION', 'active', 'do_1130928636168192001667', '2019-09-19T18:15:56.490+0000'); -INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-004', 'consumer_001', 'ORGANISATION', 'active', 'do_1130928636168192001667'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id, last_updated_on) VALUES ('user-001', 'user-001', 'b00bc992ef25f1a9a8d63291e20efc8d', 'ORGANISATION', 'active', 'do_1130928636168192001667', '2019-09-19T18:15:56.490+0000'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', 'ORGANISATION', 'active', 'do_1130928636168192001667'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id, last_updated_on) VALUES ('user-001', 'user-003', 'b00bc992ef25f1a9a8d63291e20efc8d', 'ORGANISATION', 'active', 'do_1130928636168192001667', '2019-09-19T18:15:56.490+0000'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-004', 'b00bc992ef25f1a9a8d63291e20efc8d', 'ORGANISATION', 'active', 'do_1130928636168192001667'); INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-005', 'consumer_001', 'ORGANISATION', 'active', 'object_01'); INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-006', 'consumer_001', 'ORGANISATION', 'active', 'object_01'); INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-007', 'consumer_001', 'ORGANISATION', 'active', 'object_01'); -INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-008', 'consumer_001', 'ORGANISATION', 'active', 'object_01'); -INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-009', 'consumer_001', 'ORGANISATION', 'active', 'object_01'); -INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-010', 'consumer_001', 'ORGANISATION', 'active', 'object_01'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id, last_updated_on) VALUES ('user-001', 'user-008', '01309282781705830427', 'ORGANISATION', 'active', 'do_1131975645014835201326', '2021-06-14T18:15:56.490+0000'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-009', '01309282781705830427', 'ORGANISATION', 'active', 'do_1131975645014835201326'); INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-014', 'consumer_001', 'ORGANISATION', 'active', 'object_01'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id, last_updated_on) VALUES ('user-001', 'user-015', 'b00bc992ef25f1a9a8d63291e20efc8d', 'ORGANISATION', 'active', 'do_1130928636168192001667', '2019-09-19T18:15:56.490+0000'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id) VALUES ('user-001', 'user-016', 'b00bc992ef25f1a9a8d63291e20efc8d', 'ORGANISATION', 'active', 'do_1130928636168192001667'); +INSERT INTO sunbird.user_consent (id,user_id,consumer_id,consumer_type, status, object_id, last_updated_on) VALUES ('user-001', 'user-017', 'b00bc992ef25f1a9a8d63291e20efc8d', 'ORGANISATION', 'REVOKED', 'do_1130928636168192001667', '2019-09-19T18:15:56.490+0000'); INSERT INTO sunbird.system_settings (id, field, value) VALUES ('custodianOrgId', 'custodianOrgId', '0130107621805015045'); \ No newline at end of file diff --git a/data-products/src/test/resources/reports/reports_test_data.cql b/data-products/src/test/resources/reports/reports_test_data.cql index b2b7cce7b..511667df3 100644 --- a/data-products/src/test/resources/reports/reports_test_data.cql +++ b/data-products/src/test/resources/reports/reports_test_data.cql @@ -1,18 +1,18 @@ -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org1','ap','Ext1',true,['1'],'AP','channel-1','0126391644091351040','ApSlug',1,'orgext1'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org2','ap','Ext2',false,['1','2'],'MPPS SIMHACHALNAGAR','skap','org1','ApSlug',1,'orgext2'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org3','ap','Ext2',false,['1','2','3'],'Another school','skap','org1','ApSlug',1,'orgext3'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org9','ap','Ext2',false,['1','14'],'PRS HASTINANAGAR','skap','org1','ApSlug',1,'orgext9'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org30','tn','Ext3',true,['5'],'TN','tamilnadutesttenant','01254587086334361650','TnSlug',1,'orgext30'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org4','tn','Ext4',false,['5','6'],'SACRED HEART(B)PS,TIRUVARANGAM','tamilnadutesttenant','org30','TnSlug',1,'orgext4'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org5','tn','Ext5',false,['5','6','7'],'PUPS, Chithamur','tamilnadutesttenant','org30','TnSlug',1,'orgext5'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org6','otherTenant','Ext6',true,['9'],'Other tenant name','sktn','01250894314817126443','OtherSlug',1,'orgext6'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org7','otherTenant','Ext7',false,['9','10','11'],'BALA VEDAMMAL N & P','tamilnadutesttenant','org6','OtherSlug',1,'orgext7'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org8','otherTenant','Ext8',false,['9','12','13'],'BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug',1,'orgext8'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('0124858228063600641','otherTenant','Ext8',false,['9','12','13'],'BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug1',1,'orgext8'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('01237176018957107221','otherTenant','Ext8',false,['9','12','13'],'BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug2',1,'orgext8'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('012306545106337792301','otherTenant','Ext8',false,['9','12','13'],'BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug1',1,'orgext8'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('b00bc992ef25f1a9a8d63291e20efc8d1','otherTenant','Ext8',false,['9','12','13'],'BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','DelhiSlug',1,'orgext8'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('ORG_001','otherTenant','Ext8',false,['9','12','13'],'BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','sunbird',1,'orgext8'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org1','ap','Ext1',true,'[{"id":"1", "type":"state"}]','AP','channel-1','0126391644091351040','ApSlug',1,'orgext1'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org2','ap','Ext2',false,'[{"id":"1", "type":"state"},{"id":"2", "type":"district"}]','MPPS SIMHACHALNAGAR','skap','org1','ApSlug',1,'orgext2'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org3','ap','Ext2',false,'[{"id":"1", "type":"state"},{"id":"2", "type":"district"},{"id":"3", "type":"block"}]','Another school','skap','org1','ApSlug',1,'orgext3'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org9','ap','Ext2',false,'[{"id":"1", "type":"state"},{"id":"14", "type":"district"}]','PRS HASTINANAGAR','skap','org1','ApSlug',1,'orgext9'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org30','tn','Ext3',true,'[{"id":"5", "type":"state"}]','TN','tamilnadutesttenant','01254587086334361650','TnSlug',1,'orgext30'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org4','tn','Ext4',false,'[{"id":"5", "type":"state"},{"id":"6", "type":"district"}]','SACRED HEART(B)PS,TIRUVARANGAM','tamilnadutesttenant','org30','TnSlug',1,'orgext4'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org5','tn','Ext5',false,'[{"id":"5", "type":"state"},{"id":"6", "type":"district"},{"id":"7", "type":"block"}]','PUPS, Chithamur','tamilnadutesttenant','org30','TnSlug',1,'orgext5'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org6','otherTenant','Ext6',true,'[{"id":"9", "type":"state"}]','Other tenant name','sktn','01250894314817126443','OtherSlug',1,'orgext6'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org7','otherTenant','Ext7',false,'[{"id":"9", "type":"state"},{"id":"10", "type":"district"},{"id":"11", "type":"block"}]','BALA VEDAMMAL N & P','tamilnadutesttenant','org6','OtherSlug',1,'orgext7'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('org8','otherTenant','Ext8',false,'[{"id":"9", "type":"state"},{"id":"12", "type":"district"},{"id":"13", "type":"block"}]','BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug',1,'orgext8'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('0124858228063600641','otherTenant','Ext8',false,'[{"id":"9", "type":"state"},{"id":"12", "type":"district"},{"id":"13", "type":"block"}]','BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug1',1,'orgext8'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('01237176018957107221','otherTenant','Ext8',false,'[{"id":"9", "type":"state"},{"id":"12", "type":"district"},{"id":"13", "type":"block"}]','BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug2',1,'orgext8'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('012306545106337792301','otherTenant','Ext8',false,'[{"id":"9", "type":"state"},{"id":"12", "type":"district"},{"id":"13", "type":"block"}]','BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','OtherSlug1',1,'orgext8'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('b00bc992ef25f1a9a8d63291e20efc8d1','otherTenant','Ext8',false,'[{"id":"9", "type":"state"},{"id":"12", "type":"district"},{"id":"13", "type":"block"}]','BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','DelhiSlug',1,'orgext8'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('ORG_001','otherTenant','Ext8',false,'[{"id":"9", "type":"state"},{"id":"12", "type":"district"},{"id":"13", "type":"block"}]','BALA VEDAMMALAI N & P','tamilnadutesttenant','org6','sunbird',1,'orgext8'); INSERT INTO sunbird.user (id,userid,rootorgid, firstname, lastname, channel,isdeleted) VALUES ('56c2d9a3-fae9-4341-9862-4eeeead2e9a1', '56c2d9a3-fae9-4341-9862-4eeeead2e9a1','org30','user1f','user1l','tn',False); INSERT INTO sunbird.user (id,userid,rootorgid, firstname, lastname, channel,isdeleted) VALUES ('66c2d9a4-gae0-5342-9862-4eeeead2e9a2', '66c2d9a4-gae0-5342-9862-4eeeead2e9a2','org30','user2f','user2l','tn',False); @@ -28,20 +28,6 @@ INSERT INTO sunbird.user (id,userid,rootorgid, firstname, lastname, channel,isde INSERT INTO sunbird.user (id,userid,rootorgid, firstname, lastname, channel,isdeleted) VALUES ('7d8269df-c6ca-5216-9b63-305382d46175', '7d8269df-c6ca-5216-9b63-305382d46175','org6','user12f','user12l','otherTenant',False); INSERT INTO sunbird.user (id,userid,rootorgid, firstname, lastname, channel,isdeleted) VALUES ('6d8296df-c6ca-5216-9b63-405382d46176', '6d8296df-c6ca-5216-9b63-405382d46176','org6','user13f','user13l','otherTenant',False); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('tn','tnext1',null,null,0,null,'abc1@123.com','emp1Name','orgext4','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('tn','tnext2',null,null,4,null,'abc2@123.com','emp2Name','orgext5','1234567809','anyProcessId2',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('tn','tnext3',null,null,5,null,'abc2@123.com','emp3Name','orgext5','1234567809','anyProcessId2',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('ap','apext1',null,null,1,null,'abc@123.com','emp4Name','orgext1','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('ap','apext2',null,null,1,null,'abc@123.com','emp5Name','orgext2','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('ap','apext3',null,null,1,null,'abc@123.com','emp8Name','orgext3','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('ap','apext4',null,null,1,null,'abc@123.com','emp12Name','orgext3','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('ap','apext5',null,null,1,null,'abc@123.com','emp13Name','orgext9','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('otherTenant','othext1',null,null,1,null,'abc@123.com','emp6Name','orgext6','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('otherTenant','othext2',null,null,0,null,'abc@123.com','emp7Name','orgext8','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('otherTenant','othext3',null,null,1,null,'abc@123.com','emp9Name','orgext8','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('otherTenant','othext4',null,null,2,null,'abc@123.com','emp10Name','orgext8','1234567890','anyProcessId',null,null,null,0); -INSERT INTO sunbird.shadow_user (channel,userextid,addedby,claimedon,claimstatus,createdon,email,name,orgextid,phone,processid,updatedon,userid,userids,userstatus) VALUES ('otherTenant','othext5',null,null,6,null,'abc@123.com','emp11Name','orgext6','1234567890','anyProcessId',null,null,null,0); - INSERT INTO sunbird.location (id,code,name,parentid,type) VALUES ('1','1916080007','DAKSHIN MANASRI ORIENT.P',null,'state'); INSERT INTO sunbird.location (id,code,name,parentid,type) VALUES ('2','1922090001','GULBARGA','1','district'); INSERT INTO sunbird.location (id,code,name,parentid,type) VALUES ('3','206040024','GSSS SERA','2','block'); diff --git a/data-products/src/test/resources/reports/user_self_test_data.cql b/data-products/src/test/resources/reports/user_self_test_data.cql index 14e0f22fd..9c22fa0eb 100644 --- a/data-products/src/test/resources/reports/user_self_test_data.cql +++ b/data-products/src/test/resources/reports/user_self_test_data.cql @@ -21,9 +21,14 @@ VALUES ('3150aa06-333e-4d6d-8d81-3d8e14e7b245', '3150aa06-333e-4d6d-8d81-3d8e14e INSERT INTO sunbird.user (id, userid, locationids, firstname, lastname, usertype, usersubtype, email, phone, rootorgid, profileusertype, profilelocation) VALUES ('6d8269de-c6ca-4106-9b63-305382d46175', '6d8269de-c6ca-4106-9b63-305382d46175', null, 'localuser114f', 'localuser114l', 'teacher', 'principal','PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==','1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==','012500530695766016224', '{"type":"teacher", "subType":"principal"}',''); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('012500530695766016224','ap','Ext1',true,['0dd2b51d-baa2-4403-bf1e-012a970b323f'],'AP','channel-1','012500530695766016224','ap',1,'orgext1'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('0127426598044057605836','ka','Ext2',false,['439d9cf4-da87-47c6-8211-d24bbc5e3ca8','9c5fd76e-bf1b-4cee-9735-4d9235e819f3'],'MPPS SIMHACHALNAGAR','skap','0127426598044057605836','ka',1,'orgext2'); -INSERT INTO sunbird.organisation (id,channel,hashtagid,isrootorg,locationids,orgname,provider,rootorgid,slug,status,externalid) VALUES ('01268935407534080010344','ap','Ext3',false,['0dd2b51d-baa2-4403-bf1e-012a970b323f','f8228633-7742-454b-8f61-519c6a875809','3'],'Another school','skap','012500530695766016224','ApSlug',1,'orgext3'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('012500530695766016224','ap','Ext1',true,'[{"id":"0dd2b51d-baa2-4403-bf1e-012a970b323f","type":"state"}]','AP','channel-1','012500530695766016224','ap',1,'orgext1'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('0127426598044057605836','ka','Ext2',false,'[{"id":"439d9cf4-da87-47c6-8211-d24bbc5e3ca8","type":"state"},{"id":"9c5fd76e-bf1b-4cee-9735-4d9235e819f3","type":"district"}]','MPPS SIMHACHALNAGAR','skap','0127426598044057605836','ka',1,'orgext2'); +INSERT INTO sunbird.organisation (id,channel,hashtagid,istenant,orglocation,orgname,provider,rootorgid,slug,status,externalid) VALUES ('01268935407534080010344','ap','Ext3',false,'[{"id":"0dd2b51d-baa2-4403-bf1e-012a970b323f","type":"state"},{"id":"f8228633-7742-454b-8f61-519c6a875809","type":"district"]','Another school','skap','012500530695766016224','ApSlug',1,'orgext3'); INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, userinfo) VALUES ('56c2d9a3-fae9-4341-9862-4eeeead2e9a1', '012500530695766016224', 'teacher', null, 'PENDING', {'declared-email': 'PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==', 'declared-phone': '1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==', 'declared-school-name': 'mgm21', 'declared-school-udise-code': '190923'}); INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, userinfo) VALUES ('8eaa1621-ac15-42a4-9e26-9c846963f331', '0127426598044057605836', 'teacher', null, 'PENDING', {'declared-email': 'PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==', 'declared-phone': '1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==', 'declared-school-name': 'mgm21', 'declared-school-udise-code': 'orgext2'}); +INSERT INTO sunbird.user_declarations(userid, orgid, persona, errortype, status, userinfo) VALUES ('7faa1621-bc14-32c4-9e26-4b846963g339', '01268935407534080010344', 'teacher', null, 'PENDING', {'declared-email': 'PEhQxQlaMdJEXOzShY0NAiKg4LqC2xUDE4InNodhG/fJMhq69iAPzseEdYAlMPWegxJaAnH+tJwc\nZuqPxJCtJkiGfwlCUEj5B41z4/RjH/7XowwzRVZXH0jth3IW4Ik8TQtMGOn7lhkDdxs1iV8l8A==', 'declared-phone': '1wsQrmy8Q1T4gFa+MOJsirdQC2yhyJsm2Rgj229s2b5Hk/JLNNnHMz6ywhgzYpgcQ6QILjcTLl7z\n7s4aRbsrWw==', 'declared-school-name': 'mgm21', 'declared-school-udise-code': 'orgext3'}); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('56c2d9a3-fae9-4341-9862-4eeeead2e9a1','012500530695766016224','012500530695766016224','','','','organisation','ACTIVE'); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('8eaa1621-ac15-42a4-9e26-9c846963f331','0127426598044057605836','0127426598044057605836','','','','organisation','ACTIVE'); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('7faa1621-bc14-32c4-9e26-4b846963g339','01268935407534080010344','01268935407534080010344','','','','organisation','REVOKED'); +INSERT INTO sunbird.user_consent (user_id, consumer_id, object_id, consent_data ,consumer_type, id ,object_type, status) VALUES ('6d8269de-c6ca-4106-9b63-305382d46175','0127426598044057605831','0127426598044057605836','','','','organisation','ACTIVE'); \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala b/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala index f4b61e5a3..6190de96f 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/audit/TestCourseBatchStatusUpdater.scala @@ -126,7 +126,7 @@ class TestCourseBatchStatusUpdater extends BaseReportSpec with MockFactory { .returning(courseBatchDF.withColumn("cert_templates", lit(null).cast(MapType(StringType, MapType(StringType, StringType))))) val resultDf = CourseBatchStatusUpdaterJob.getCollectionBatchDF(reporterMock.fetchData) val resultDateValue = resultDf.select("startdate").collect().map(_ (0)).toList - resultDateValue(0) should be("2021-04-28") + resultDateValue(0) should be("2021-04-28 00:00:00.000000+0000") resultDateValue(1) should be("2020-05-26") resultDateValue(2) should be("2020-05-29") resultDateValue(3) should be("2020-05-25") diff --git a/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestProgressExhaustJob.scala b/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestProgressExhaustJob.scala index 47d6f8249..97c39182d 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestProgressExhaustJob.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestProgressExhaustJob.scala @@ -402,4 +402,104 @@ class TestProgressExhaustJob extends BaseReportSpec with MockFactory with BaseRe batch1Results.filter(col("User UUID") === "user-002").collect().map(_ (1)).toList(0) should be("15/11/2019") batch1Results.filter(col("User UUID") === "user-003").collect().map(_ (1)).toList(0) should be("15/11/2019") } + + it should "generate report validating and filtering duplicate batches" in { + EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") + EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1130928636168192001667_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'progress-exhaust', 'SUBMITTED', '{\"batchFilter\": [\"batch-01\", \"batch-001\", \"batch-001\"]}', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');") + + implicit val fc = new FrameworkContext() + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.ProgressExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"localhost","sparkUserDbRedisPort":6341,"sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"Progress Exhaust"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + implicit val config = jobConfig + + ProgressExhaustJob.execute() + + val outputLocation = AppConf.getConfig("collection.exhaust.store.prefix") + val outputDir = "progress-exhaust" + val batch1 = "batch-001" + val requestId = "37564CF8F134EE7532F125651B51D17F" + val filePath = ProgressExhaustJob.getFilePath(batch1, requestId) + val jobName = ProgressExhaustJob.jobName() + + implicit val responseExhaustEncoder = Encoders.product[ProgressExhaustReport] + val batch1Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/$filePath.csv").as[ProgressExhaustReport].collectAsList().asScala + + batch1Results.size should be (4) + batch1Results.map(f => f.`Collection Id`).toList should contain atLeastOneElementOf List("do_1130928636168192001667") + batch1Results.map(f => f.`Collection Name`).toList should contain atLeastOneElementOf List("24 aug course") + batch1Results.map(f => f.`Batch Id`).toList should contain atLeastOneElementOf List("BatchId_batch-001") + batch1Results.map(f => f.`Batch Name`).toList should contain atLeastOneElementOf List("Basic Java") + batch1Results.map {res => res.`User UUID`}.toList should contain theSameElementsAs List("user-001", "user-002", "user-003", "user-004") + batch1Results.map {res => res.`State`}.toList should contain theSameElementsAs List("Karnataka", "Andhra Pradesh", "Karnataka", "Delhi") + batch1Results.map {res => res.`District`}.toList should contain theSameElementsAs List("bengaluru", "bengaluru", "bengaluru", "babarpur") + batch1Results.map(f => f.`Enrolment Date`).toList should contain allElementsOf List("15/11/2019") + batch1Results.map(f => f.`Completion Date`).toList should contain allElementsOf List(null) + batch1Results.map(f => f.`Progress`).toList should contain allElementsOf List("100") + batch1Results.map(f => f.`Cluster Name`).toList should contain atLeastOneElementOf List("CLUSTER1") + batch1Results.map(f => f.`User Type`).toList should contain atLeastOneElementOf List("administrator") + batch1Results.map(f => f.`User Sub Type`).toList should contain atLeastOneElementOf List("deo") + + val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='progress-exhaust'") + val reportDate = getDate("yyyyMMdd").format(Calendar.getInstance().getTime()) + + while(pResponse.next()) { + pResponse.getString("status") should be ("SUCCESS") + pResponse.getString("err_message") should be ("") + pResponse.getString("dt_job_submitted") should be ("2020-10-19 05:58:18.666") + pResponse.getString("download_urls") should be (s"""{reports/progress-exhaust/$requestId/batch-001_progress_${reportDate}.zip}""") + pResponse.getString("dt_file_created") should be (null) + pResponse.getString("iteration") should be ("0") + } + + new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, outputLocation) + } + + it should "mark request as failed if all batches are invalid in request_data" in { + EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") + EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1130928636168192001667_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'progress-exhaust', 'SUBMITTED', '{\"batchFilter\": [\"batch-01\", \"batch-02\"]}', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');") + + implicit val fc = new FrameworkContext() + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.ProgressExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"localhost","sparkUserDbRedisPort":6341,"sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"Progress Exhaust"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + implicit val config = jobConfig + + ProgressExhaustJob.execute() + + val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='progress-exhaust'") + val reportDate = getDate("yyyyMMdd").format(Calendar.getInstance().getTime()) + + while(pResponse.next()) { + pResponse.getString("status") should be ("FAILED") + pResponse.getString("request_data") should be ("""{"batchFilter": ["batch-01", "batch-02"]}""") + pResponse.getString("err_message") should be ("No data found") + pResponse.getString("dt_job_submitted") should be ("2020-10-19 05:58:18.666") + pResponse.getString("download_urls") should be (s"""{}""") + pResponse.getString("dt_file_created") should be (null) + pResponse.getString("iteration") should be ("1") + } + + } + + it should "insert status as FAILED since batchid is expired" in { + + EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") + EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1130928636168192001667_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'progress-exhaust', 'SUBMITTED', '{\"batchId\": \"batch-005\"}', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');") + + implicit val fc = new FrameworkContext() + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.ProgressExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"localhost","sparkUserDbRedisPort":6341,"sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"Progress Exhaust"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + implicit val config = jobConfig + + ProgressExhaustJob.execute() + + val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='progress-exhaust'") + val reportDate = getDate("yyyyMMdd").format(Calendar.getInstance().getTime()) + + while(pResponse.next()) { + pResponse.getString("status") should be ("FAILED") + pResponse.getString("err_message") should be ("No data found") + pResponse.getString("download_urls") should be (s"""{}""") + } + } } diff --git a/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestUserInfoExhaustJob.scala b/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestUserInfoExhaustJob.scala index bd62958dd..d401c1ca3 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestUserInfoExhaustJob.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/exhaust/TestUserInfoExhaustJob.scala @@ -27,6 +27,7 @@ class TestUserInfoExhaustJob extends BaseReportSpec with MockFactory with BaseRe var redisServer: RedisServer = _ var jedis: Jedis = _ val outputLocation = AppConf.getConfig("collection.exhaust.store.prefix") + val batchLimit: Int = AppConf.getConfig("data_exhaust.batch.limit.per.request").toInt override def beforeAll(): Unit = { spark = getSparkSession(); @@ -59,9 +60,12 @@ class TestUserInfoExhaustJob extends BaseReportSpec with MockFactory with BaseRe jedis.hmset("user:user-005", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Isha", "userid": "user-005", "state": "MP", "district": "Jhansi", "userchannel": "sunbird-dev", "rootorgid": "01250894314817126443", "email": "isha@ilimi.in", "usersignintype": "Validated"};""")) jedis.hmset("user:user-006", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Revathi", "userid": "user-006", "state": "Andhra Pradesh", "district": "babarpur", "userchannel": "sunbird-dev", "rootorgid": "01250894314817126443", "email": "revathi@ilimi.in", "usersignintype": "Validated"};""")) jedis.hmset("user:user-007", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Sunil", "userid": "user-007", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "0126391644091351040", "email": "sunil@ilimi.in", "usersignintype": "Validated"};""")) - jedis.hmset("user:user-008", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Anoop", "userid": "user-008", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "anoop@ilimi.in", "usersignintype": "Validated"};""")) + jedis.hmset("user:user-008", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Anoop", "userid": "user-008", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "anoop@ilimi.in", "usersignintype": "Validated", "cluster": "Cluster3", "block": "Block3", "usertype": "admin", "usersubtype": "deo", "schooludisecode": "2193754", "schoolname": "Vanasthali PS", "orgname":"Root Org2"};""")) jedis.hmset("user:user-009", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Kartheek", "userid": "user-011", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "01285019302823526477", "email": "kartheekp@ilimi.in", "usersignintype": "Validated"};""")) - jedis.hmset("user:user-010", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Anand", "userid": "user-012", "state": "Tamil Nadu", "district": "Chennai", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "anandp@ilimi.in", "usersignintype": "Validated"};""")) + jedis.hmset("user:user-010", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Anand", "userid": "user-012", "state": "Tamil Nadu", "district": "Chennai", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "anandp@ilimi.in", "usersignintype": "Validated", "cluster": "Cluster2", "block": "Block2", "usertype": "admin", "usersubtype": "deo", "schooludisecode": "21937", "schoolnamme": "Vanasthali"};""")) + jedis.hmset("user:user-015", JSONUtils.deserialize[java.util.Map[String, String]]("""{"cluster":"CLUSTER1","firstname":"Manju","subject":"[\"IRCS\"]","schooludisecode":"3183211","usertype":"administrator","usersignintype":"Validated","language":"[\"English\"]","medium":"[\"English\"]","userid":"user-015","schoolname":"DPS, MATHURA","rootorgid":"01250894314817126443","lastname":"D","framework":"[\"igot_health\"]","orgname":"Root Org2","phone":"","usersubtype":"deo","district":"bengaluru","grade":"[\"Volunteers\"]","block":"BLOCK1","state":"Karnataka","board":"[\"IGOT-Health\"]","email":""};""")) + jedis.hmset("user:user-016", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Mahesh", "userid": "user-016","orgname": "Pre-prod Custodian Organization", "state": "Andhra Pradesh", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "mahesh@ilimi.in", "usersignintype": "Validated"};""")) + jedis.hmset("user:user-017", JSONUtils.deserialize[java.util.Map[String, String]]("""{"firstname": "Sowmya", "userid": "user-017","orgname": "Pre-prod Custodian Organization", "state": "Karnataka", "district": "bengaluru", "userchannel": "sunbird-dev", "rootorgid": "0130107621805015045", "email": "sowmya@ilimi.in", "usersignintype": "Validated"};""")) jedis.close() } @@ -156,7 +160,7 @@ class TestUserInfoExhaustJob extends BaseReportSpec with MockFactory with BaseRe val postgresQuery = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='userinfo-exhaust'") while (postgresQuery.next()) { postgresQuery.getString("status") should be ("FAILED") - postgresQuery.getString("err_message") should be ("Invalid request") + postgresQuery.getString("err_message") should be ("Request should have either of batchId, batchFilter, searchFilter or encrption key") postgresQuery.getString("download_urls") should be ("{}") } } @@ -181,7 +185,46 @@ class TestUserInfoExhaustJob extends BaseReportSpec with MockFactory with BaseRe } - it should "fail as batchId is present in onDemand mode" in { + it should "insert status as FAILED as batchLimit exceeded" in { + EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") + EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1131350140968632321230_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'userinfo-exhaust', 'SUBMITTED', '{\"batchFilter\": [\"batch-001\", \"batch-002\", \"batch-003\", \"batch-002\", \"batch-006\"]}', 'user-002', 'channel-01', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test123');") + + implicit val fc = new FrameworkContext() + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.UserInfoExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"localhost","sparkRedisConnectionHost":"localhost","sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","sparkUserDbRedisPort":6381,"fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"UserInfo Exhaust"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + implicit val config = jobConfig + + UserInfoExhaustJob.execute() + + val postgresQuery = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='userinfo-exhaust'") + while (postgresQuery.next()) { + postgresQuery.getString("status") should be ("FAILED") + postgresQuery.getString("err_message") should be (s"Number of batches in request exceeded. It should be within $batchLimit") + postgresQuery.getString("download_urls") should be ("{}") + } + + } + + it should "insert status as FAILED as request_data is empty" in { + EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") + EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1131350140968632321230_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'userinfo-exhaust', 'SUBMITTED', '{}', 'user-002', 'channel-01', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test123');") + + implicit val fc = new FrameworkContext() + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.UserInfoExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"localhost","sparkRedisConnectionHost":"localhost","sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","sparkUserDbRedisPort":6381,"fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"UserInfo Exhaust"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + implicit val config = jobConfig + + UserInfoExhaustJob.execute() + + val postgresQuery = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='userinfo-exhaust'") + while (postgresQuery.next()) { + postgresQuery.getString("status") should be ("FAILED") + postgresQuery.getString("err_message") should be ("Request should have either of batchId, batchFilter, searchFilter or encrption key") + postgresQuery.getString("download_urls") should be ("{}") + } + } + + it should "fail as batchId is not present in onDemand mode" in { EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1131350140968632321230_batch-002:channel-01', '37564CF8F134EE7532F125651B51D17F', 'userinfo-exhaust', 'SUBMITTED', '{}', 'user-002', 'channel-01', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');") @@ -194,7 +237,7 @@ class TestUserInfoExhaustJob extends BaseReportSpec with MockFactory with BaseRe val postgresQuery = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='userinfo-exhaust'") while (postgresQuery.next()) { postgresQuery.getString("status") should be ("FAILED") - postgresQuery.getString("err_message") should be ("Invalid request") + postgresQuery.getString("err_message") should be ("Request should have either of batchId, batchFilter, searchFilter or encrption key") postgresQuery.getString("download_urls") should be ("{}") postgresQuery.getString("iteration") should be ("1") } @@ -342,6 +385,133 @@ class TestUserInfoExhaustJob extends BaseReportSpec with MockFactory with BaseRe } } + it should "generate the user info report excluding the user whose consent details are not provided" in { + EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") + EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1131350140968632321230_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'userinfo-exhaust', 'SUBMITTED', '{\"batchId\": \"batch-003\"}', 'user-002', '01309282781705830427', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');") + + implicit val fc = new FrameworkContext() + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.UserInfoExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"localhost","sparkRedisConnectionHost":"localhost","sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","sparkUserDbRedisPort":6381,"fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"UserInfo Exhaust"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + implicit val config = jobConfig + + UserInfoExhaustJob.execute() + + val outputDir = "response-exhaust" + val batch1 = "batch-003" + val requestId = "37564CF8F134EE7532F125651B51D17F" + val filePath = UserInfoExhaustJob.getFilePath(batch1, requestId) + val jobName = UserInfoExhaustJob.jobName() + implicit val responseExhaustEncoder = Encoders.product[UserInfoExhaustReport] + val batch1Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/$filePath.csv") + .as[UserInfoExhaustReport] + .collectAsList() + .asScala + + //user-010 is not present in consent table hence will be excluded in the report so the size is 2 + batch1Results.size should be (2) + //assertion for user-008 + val user001 = batch1Results.filter(f => f.`User UUID`.equals("user-008")) + user001.foreach(f => println("userdetails: " + JSONUtils.serialize(f))) + user001.map {f => f.`User UUID`}.head should be ("user-008") + user001.map {f => f.`Collection Id`}.head should be ("do_1131975645014835201326") + user001.map {f => f.`Batch Id`}.head should be ("BatchId_batch-003") + user001.map {f => f.`Collection Name`}.head should be ("ADOPT_BOOK_NCERT2") + user001.map {f => f.`Batch Name`}.head should be ("Basic C++") + user001.map {f => f.`User Name`}.head should be ("Anoop") + user001.map {f => f.`State`}.head should be ("Karnataka") + user001.map {f => f.`District`}.head should be ("bengaluru") + user001.map {f => f.`Block`}.head should be ("Block3") + user001.map {f => f.`Cluster`}.head should be ("Cluster3") + user001.map {f => f.`Email ID`}.head should be ("anoop@ilimi.in") + user001.map {f => f.`User Type`}.head should be ("admin") + user001.map {f => f.`User Sub Type`}.head should be ("deo") + user001.map {f => f.`School Id`}.head should be ("2193754") + user001.map {f => f.`School Name`}.head should be ("Vanasthali PS") + user001.map {f => f.`Org Name`}.head should be ("Root Org2") + user001.map {f => f.`Consent Provided`}.head should be ("true") + user001.map {f => f.`Consent Provided Date`}.head should be ("14/06/2021") + + //assertion for user-009 + val user009 = batch1Results.filter(f => f.`User UUID`.equals("user-009")) + user009.foreach(f => println("userdetails: " + JSONUtils.serialize(f))) + user009.map {f => f.`User UUID`}.head should be ("user-009") + user009.map {f => f.`Collection Id`}.head should be ("do_1131975645014835201326") + user009.map {f => f.`Batch Id`}.head should be ("BatchId_batch-003") + user009.map {f => f.`Collection Name`}.head should be ("ADOPT_BOOK_NCERT2") + user009.map {f => f.`Batch Name`}.head should be ("Basic C++") + user009.map {f => f.`User Name`}.head should be ("Kartheek") + user009.map {f => f.`State`}.head should be ("Karnataka") + user009.map {f => f.`District`}.head should be ("bengaluru") + user009.map {f => f.`Email ID`}.head should be ("kartheekp@ilimi.in") + user009.map {f => f.`Consent Provided`}.head should be ("true") + + val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='userinfo-exhaust'") + + while(pResponse.next()) { + pResponse.getString("status") should be ("SUCCESS") + pResponse.getString("err_message") should be ("") + pResponse.getString("dt_job_submitted") should be ("2020-10-19 05:58:18.666") + pResponse.getString("download_urls") should be (s"{reports/userinfo-exhaust/$requestId/batch-003_userinfo_${getDate()}.zip}") + pResponse.getString("dt_file_created") should be (null) + pResponse.getString("iteration") should be ("0") + } + + UserInfoExhaustJob.canZipExceptionBeIgnored() should be (false) + } + + /** + * user-017 will have consentflag=false and hence will be not be included in the report + */ + it should "generate the user info report excluding the user who have not provided consent" in { + EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable") + EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1131350140968632321230_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'userinfo-exhaust', 'SUBMITTED', '{\"batchId\": \"batch-006\"}', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');") + + implicit val fc = new FrameworkContext() + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.UserInfoExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"localhost","sparkRedisConnectionHost":"localhost","sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","sparkUserDbRedisPort":6381,"fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"UserInfo Exhaust"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + implicit val config = jobConfig + + UserInfoExhaustJob.execute() + + val batch1 = "batch-006" + val requestId = "37564CF8F134EE7532F125651B51D17F" + val filePath = UserInfoExhaustJob.getFilePath(batch1, requestId) + val jobName = UserInfoExhaustJob.jobName() + implicit val responseExhaustEncoder = Encoders.product[UserInfoExhaustReport] + val batch1Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/$filePath.csv") + .as[UserInfoExhaustReport] + .collectAsList() + .asScala + + batch1Results.size should be (2) + batch1Results.map {res => res.`User UUID`}.toList should contain theSameElementsAs List("user-015", "user-016") + + val user001 = batch1Results.filter(f => f.`User UUID`.equals("user-015")) + user001.map {f => f.`User UUID`}.head should be ("user-015") + user001.map {f => f.`State`}.head should be ("Karnataka") + user001.map {f => f.`District`}.head should be ("bengaluru") + user001.map {f => f.`Org Name`}.head should be ("Root Org2") + user001.map {f => f.`Block`}.head should be ("BLOCK1") + user001.map {f => f.`Cluster`}.head should be ("CLUSTER1") + user001.map {f => f.`User Type`}.head should be ("administrator") + user001.map {f => f.`User Sub Type`}.head should be ("deo") + user001.map {f => f.`School Id`}.head should be ("3183211") + user001.map {f => f.`School Name`}.head should be ("DPS, MATHURA") + + val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='userinfo-exhaust'") + + while(pResponse.next()) { + pResponse.getString("status") should be ("SUCCESS") + pResponse.getString("err_message") should be ("") + pResponse.getString("dt_job_submitted") should be ("2020-10-19 05:58:18.666") + pResponse.getString("download_urls") should be (s"{reports/userinfo-exhaust/$requestId/batch-006_userinfo_${getDate()}.zip}") + pResponse.getString("dt_file_created") should be (null) + pResponse.getString("iteration") should be ("0") + } + } + def getDate(): String = { val dateFormat: DateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.forOffsetHoursMinutes(5, 30)); dateFormat.print(System.currentTimeMillis()); diff --git a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateAdminGeoReportJob.scala b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateAdminGeoReportJob.scala index f3d61f465..66d8c8b04 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateAdminGeoReportJob.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateAdminGeoReportJob.scala @@ -15,7 +15,6 @@ class TestStateAdminGeoReportJob extends SparkSpec(null) with MockFactory { implicit var spark: SparkSession = _ var map: Map[String, String] = _ - var shadowUserDF: DataFrame = _ var orgDF: DataFrame = _ var reporterMock: BaseReportsJob = mock[BaseReportsJob] val sunbirdKeyspace = "sunbird" diff --git a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateAdminReportJob.scala b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateAdminReportJob.scala deleted file mode 100644 index e16e4e00d..000000000 --- a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateAdminReportJob.scala +++ /dev/null @@ -1,64 +0,0 @@ -package org.sunbird.analytics.job.report - -import java.io.File - -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Encoders -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.col -import org.ekstep.analytics.framework.FrameworkContext -import org.ekstep.analytics.framework.util.HadoopFileUtil -import org.sunbird.analytics.util.EmbeddedCassandra -import org.scalamock.scalatest.MockFactory -import org.sunbird.cloud.storage.conf.AppConf - -class TestStateAdminReportJob extends BaseReportSpec with MockFactory { - - implicit var spark: SparkSession = _ - var map: Map[String, String] = _ - var shadowUserDF: DataFrame = _ - var orgDF: DataFrame = _ - var reporterMock: BaseReportsJob = mock[BaseReportsJob] - val sunbirdKeyspace = "sunbird" - val shadowUserEncoder = Encoders.product[ShadowUserData].schema - - override def beforeAll(): Unit = { - super.beforeAll() - spark = getSparkSession(); - EmbeddedCassandra.loadData("src/test/resources/reports/reports_test_data.cql") // Load test data in embedded cassandra server - } - - override def afterAll() : Unit = { - super.afterAll(); - (new HadoopFileUtil()).delete(spark.sparkContext.hadoopConfiguration, "src/test/resources/admin-user-reports") - } - - //Created data : channels ApSlug and OtherSlug contains validated users created against blocks,districts and state - //Only TnSlug doesn't contain any validated users - "StateAdminReportJob" should "generate reports" in { - implicit val fc = new FrameworkContext() - val tempDir = AppConf.getConfig("admin.metrics.temp.dir") - val reportDF = StateAdminReportJob.generateReport()(spark, fc) - assert(reportDF.columns.contains("registered") === true) - assert(reportDF.columns.contains("blocks") === true) - assert(reportDF.columns.contains("schools") === true) - assert(reportDF.columns.contains("districtName") === true) - assert(reportDF.columns.contains("slug") === true) - val apslug = reportDF.where(col("slug") === "ApSlug") - val districtName = apslug.select("districtName").collect().map(_ (0)).toList - assert(districtName(0) === "GULBARGA") - //checking reports were created under slug folder - val userClaimedDetail = new File("src/test/resources/admin-user-reports/user-detail/CLAIMED/ApSlug.csv") - val stateUserDetail = new File("src/test/resources/admin-user-reports/validated-user-detail-state/ApSlug.csv"); - val userSummary = new File("src/test/resources/admin-user-reports/user-summary/ApSlug.json") - val validateUserDetail = new File("src/test/resources/admin-user-reports/validated-user-detail/ApSlug.csv") - val validateUserSummary = new File("src/test/resources/admin-user-reports/validated-user-summary/ApSlug.json") - val validateUserDstSummary = new File("src/test/resources/admin-user-reports/validated-user-summary-district/ApSlug.json"); - assert(userClaimedDetail.exists() === true) - assert(userSummary.exists() === true) - assert(validateUserDetail.exists() === true) - assert(validateUserSummary.exists() === true) - assert(validateUserDstSummary.exists()) - assert(stateUserDetail.exists()) - } -} \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala index 7d0dc4e1d..95d6e83f6 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestStateSelfUserExternalIDJob.scala @@ -11,11 +11,9 @@ class TestStateSelfUserExternalIDJob extends BaseReportSpec with MockFactory { implicit var spark: SparkSession = _ var map: Map[String, String] = _ - var shadowUserDF: DataFrame = _ var orgDF: DataFrame = _ var reporterMock: BaseReportsJob = mock[BaseReportsJob] val sunbirdKeyspace = "sunbird" - val shadowUserEncoder = Encoders.product[ShadowUserData].schema override def beforeAll(): Unit = { super.beforeAll() @@ -34,6 +32,7 @@ class TestStateSelfUserExternalIDJob extends BaseReportSpec with MockFactory { implicit val fc = new FrameworkContext() val tempDir = AppConf.getConfig("admin.metrics.temp.dir") val reportDF = StateAdminReportJob.generateExternalIdReport()(spark, fc) + assert(reportDF.count() === (2)); assert(reportDF.columns.contains("Diksha UUID") === true) assert(reportDF.columns.contains("Name") === true) assert(reportDF.columns.contains("State") === true) diff --git a/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala b/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala index 9d02e4da6..bd4837e03 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/model/report/TestCourseConsumptionModel.scala @@ -1,6 +1,7 @@ package org.sunbird.analytics.model.report import java.time.{ZoneOffset, ZonedDateTime} + import cats.syntax.either._ import ing.wbaa.druid._ import ing.wbaa.druid.client.DruidClient @@ -10,7 +11,7 @@ import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} import org.ekstep.analytics.framework._ import org.ekstep.analytics.framework.util.{HTTPClient, JSONUtils} -import org.ekstep.analytics.model.ReportConfig +import org.ekstep.analytics.model.{ReportConfig} import org.scalamock.scalatest.MockFactory import org.sunbird.cloud.storage.BaseStorageService diff --git a/etl-jobs/build.Jenkinsfile b/etl-jobs/build.Jenkinsfile index 3411e3287..f36c51d16 100644 --- a/etl-jobs/build.Jenkinsfile +++ b/etl-jobs/build.Jenkinsfile @@ -19,7 +19,7 @@ node() { } else { def scmVars = checkout scm - checkout scm: [$class: 'GitSCM', branches: [[name: "refs/tags/$params.github_release_tag"]], userRemoteConfigs: [[url: scmVars.GIT_URL]]] + checkout scm: [$class: 'GitSCM', branches: [[name: "$params.github_release_tag"]], userRemoteConfigs: [[url: scmVars.GIT_URL]]] artifact_version = params.github_release_tag println(ANSI_BOLD + ANSI_YELLOW + "github_release_tag specified, building from github_release_tag: " + params.github_release_tag + ANSI_NORMAL) } diff --git a/python-scripts/src/main/python/dataproducts/services/misc/druid_job_submitter.py b/python-scripts/src/main/python/dataproducts/services/misc/druid_job_submitter.py index 0a8a972a5..d2741a72c 100644 --- a/python-scripts/src/main/python/dataproducts/services/misc/druid_job_submitter.py +++ b/python-scripts/src/main/python/dataproducts/services/misc/druid_job_submitter.py @@ -83,7 +83,10 @@ def init(self): print('Starting the job submitter...') reports = self.get_active_jobs() for report in reports: - if(self.check_schedule(report['reportSchedule'].upper(), report['reportId'], report['config']['reportConfig']['dateRange'].get('intervalSlider'))): - report_config = self.interpolate_config(report['config']) - self.submit_job(report_config) + try: + if(self.check_schedule(report['reportSchedule'].upper(), report['reportId'], report['config']['reportConfig']['dateRange'].get('intervalSlider'))): + report_config = self.interpolate_config(report['config']) + self.submit_job(report_config) + except Exception as e: + print('ERROR::While submitting druid report', report['reportId']) print('Job submission completed...') \ No newline at end of file