Skip to content

Commit

Permalink
4.8.18 case study fix (#53)
Browse files Browse the repository at this point in the history
* Ags cert (#45)

* Added debug logs

* added code to prevent cert. for ags casestudy

* Update CompositeSearchIndexerHelper.scala

---------

Co-authored-by: karthik-tarento <karthikeyan.rajendran@tarento.com>
Co-authored-by: anilkumar <anilkumar.kammlapalli@tarento.com>

* Fix: Add null check for contentCache before logging Redis fetch details (#50)

Co-authored-by: anilkumar <anilkumar.kammlapalli@tarento.com>

* Optimized code for reading course data

* Updated logs and variable names to avoid confusion

---------

Co-authored-by: anilkumarkammalapalli <121931293+anilkumarkammalapalli@users.noreply.github.com>
Co-authored-by: anilkumar <anilkumar.kammlapalli@tarento.com>
  • Loading branch information
3 people authored Nov 27, 2024
1 parent a79a9fb commit fbfd732
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.sunbird.job.aggregate.common

import org.slf4j.ILoggerFactory
import org.sunbird.job.cache.DataCache
import org.sunbird.job.aggregate.task.ActivityAggregateUpdaterConfig
import org.sunbird.job.util.{HttpUtil, ScalaJsonUtil}
import org.slf4j.LoggerFactory
import org.sunbird.job.Metrics

trait ContentHelper {
private[this] val logger = LoggerFactory.getLogger(classOf[ContentHelper])
def getCourseInfo(courseId: String)(
metrics: Metrics,
config: ActivityAggregateUpdaterConfig,
contentCache: DataCache,
httpUtil: HttpUtil
): java.util.Map[String, AnyRef] = {
logger.info(
s"Fetching course details from Redis for Id: ${courseId}, Configured Index: " + contentCache.getDBConfigIndex() + ", Current Index: " + contentCache.getDBIndex()
)
val courseMetadata = Option(contentCache).flatMap(c => Option(c.getWithRetry(courseId))).getOrElse(null)
if (null == courseMetadata || courseMetadata.isEmpty) {
logger.error(
s"Fetching course details from Content Service for Id: ${courseId}"
)
val url =
config.contentReadURL + "/" + courseId + "?fields=identifier,name,versionKey,parentCollections,primaryCategory,courseCategory"
val response = getAPICall(url, "content")(config, httpUtil, metrics)
val courseName = StringContext
.processEscapes(
response.getOrElse(config.name, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val primaryCategory = StringContext
.processEscapes(
response.getOrElse(config.primaryCategory, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val versionKey = StringContext
.processEscapes(
response.getOrElse(config.versionKey, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val parentCollections = response
.getOrElse("parentCollections", List.empty[String])
.asInstanceOf[List[String]]
val courseCateogry = StringContext
.processEscapes(response.getOrElse(config.courseCategory, "").asInstanceOf[String]).filter(_ >= ' ')
val courseInfoMap: java.util.Map[String, AnyRef] =
new java.util.HashMap[String, AnyRef]()
courseInfoMap.put("courseId", courseId)
courseInfoMap.put("courseName", courseName)
courseInfoMap.put("parentCollections", parentCollections)
courseInfoMap.put("primaryCategory", primaryCategory)
courseInfoMap.put("versionKey", versionKey)
courseInfoMap.put(config.courseCategory, courseCateogry)
courseInfoMap
} else {
val courseName = StringContext
.processEscapes(
courseMetadata.getOrElse(config.name, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val primaryCategory = StringContext
.processEscapes(
courseMetadata
.getOrElse("primarycategory", "")
.asInstanceOf[String]
)
.filter(_ >= ' ')
val versionKey = StringContext
.processEscapes(
courseMetadata.getOrElse("versionkey", "").asInstanceOf[String]
)
.filter(_ >= ' ')
val parentCollections = courseMetadata
.getOrElse("parentcollections", new java.util.ArrayList())
.asInstanceOf[java.util.ArrayList[String]]
val courseCateogry = StringContext
.processEscapes(courseMetadata.getOrElse(config.coursecategory, "").asInstanceOf[String]).filter(_ >= ' ')
val courseInfoMap: java.util.Map[String, AnyRef] =
new java.util.HashMap[String, AnyRef]()
courseInfoMap.put("courseId", courseId)
courseInfoMap.put("courseName", courseName)
courseInfoMap.put("parentCollections", parentCollections)
courseInfoMap.put("primaryCategory", primaryCategory)
courseInfoMap.put("versionKey", versionKey)
courseInfoMap.put(config.courseCategory, courseCateogry)
courseInfoMap
}

}

def getAPICall(url: String, responseParam: String)(
config: ActivityAggregateUpdaterConfig,
httpUtil: HttpUtil,
metrics: Metrics
): Map[String, AnyRef] = {
val response = httpUtil.get(url, config.defaultHeaders)
if (200 == response.status) {
ScalaJsonUtil
.deserialize[Map[String, AnyRef]](response.body)
.getOrElse("result", Map[String, AnyRef]())
.asInstanceOf[Map[String, AnyRef]]
.getOrElse(responseParam, Map[String, AnyRef]())
.asInstanceOf[Map[String, AnyRef]]
} else if (
400 == response.status && response.body.contains(
config.userAccBlockedErrCode
)
) {
metrics.incCounter(config.skippedEventCount)
logger.error(
s"Error while fetching user details for ${url}: " + response.status + " :: " + response.body
)
Map[String, AnyRef]()
} else {
throw new Exception(
s"Error from get API : ${url}, with response: ${response}"
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class UserActivityAgg(activity_type: String,
agg_last_updated: Map[String, Long]
)

case class CollectionProgress(userId: String, batchId: String, courseId: String, progress: Int, completedOn: Date, contentStatus: Map[String, Int], inputContents: List[String], completed: Boolean = false)
case class CollectionProgress(userId: String, batchId: String, courseId: String, progress: Int, completedOn: Date, contentStatus: Map[String, Int], inputContents: List[String], completed: Boolean = false, courseCategory: String = "")

case class UserEnrolmentAgg(activityAgg: UserActivityAgg, collectionProgress: Option[CollectionProgress] = None)

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.slf4j.LoggerFactory
import org.sunbird.job.cache.{DataCache, RedisConnect}
import org.sunbird.job.aggregate.common.ContentHelper
import org.sunbird.job.aggregate.domain.{UserContentConsumption, _}
import org.sunbird.job.aggregate.task.ActivityAggregateUpdaterConfig
import org.sunbird.job.util.{CassandraUtil, HttpUtil, JSONUtil, ScalaJsonUtil}
Expand All @@ -26,7 +27,8 @@ import scala.language.postfixOps

class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUtil: HttpUtil, @transient var cassandraUtil: CassandraUtil = null)
(implicit val stringTypeInfo: TypeInformation[String])
extends WindowBaseProcessFunction[Map[String, AnyRef], String, Int](config) {
extends WindowBaseProcessFunction[Map[String, AnyRef], String, Int](config)
with ContentHelper {

val mapType: Type = new TypeToken[Map[String, AnyRef]]() {}.getType
private[this] val logger = LoggerFactory.getLogger(classOf[ActivityAggregatesFunction])
Expand Down Expand Up @@ -62,8 +64,16 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti
metrics: Metrics): Unit = {

logger.info("Input Events : " + JSONUtil.serialize(events.toList))
var courseCategory = ""
val inputUserConsumptionList: List[UserContentConsumption] = events
.filter(event=> verifyPrimaryCategory(event.getOrElse(config.courseId, "").asInstanceOf[String])(metrics, config, httpUtil, contentCache))
.filter {
event =>
val (isValidCourseEvent, courseCategoryVal) = verifyPrimaryCategory(event.getOrElse(config.courseId, "").asInstanceOf[String])(metrics, config, httpUtil, contentCache)
courseCategory = courseCategoryVal
if (!isValidCourseEvent)
logger.warn(s"Skipping event due to primaryCategory is Program.")
isValidCourseEvent
}
.groupBy(key => (key.get(config.courseId), key.get(config.batchId), key.get(config.userId)))
.values.map(value => {
metrics.incCounter(config.processedEnrolmentCount)
Expand Down Expand Up @@ -93,7 +103,7 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti
val courseAggregations = finalUserConsumptionList.flatMap(userConsumption => {

// Course Level Agg using the merged data of ContentConsumption per user, course and batch.
val optCourseAgg = courseActivityAgg(userConsumption, context)(metrics)
val optCourseAgg = courseActivityAgg(userConsumption, courseCategory, context)(metrics)
val courseAggs = if (optCourseAgg.nonEmpty) List(optCourseAgg.get) else List()

// Identify the children of the course (only collections) for which aggregates computation required.
Expand Down Expand Up @@ -125,7 +135,7 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti
/**
* Course Level Agg using the merged data of ContentConsumption per user, course and batch.
*/
def courseActivityAgg(userConsumption: UserContentConsumption, context: ProcessWindowFunction[Map[String, AnyRef], String, Int, GlobalWindow]#Context)(implicit metrics: Metrics): Option[UserEnrolmentAgg] = {
def courseActivityAgg(userConsumption: UserContentConsumption, courseCategory: String, context: ProcessWindowFunction[Map[String, AnyRef], String, Int, GlobalWindow]#Context)(implicit metrics: Metrics): Option[UserEnrolmentAgg] = {
val courseId = userConsumption.courseId
val userId = userConsumption.userId
val contextId = "cb:" + userConsumption.batchId
Expand All @@ -151,9 +161,9 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti
val contentStatus = userConsumption.contents.map(cc => (cc._2.contentId, cc._2.status)).toMap
val inputContents = userConsumption.contents.filter(cc => cc._2.fromInput).keys.toList
val collectionProgress = if (completedCount >= leafNodes.size) {
Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, new java.util.Date(), contentStatus, inputContents, true))
Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, new java.util.Date(), contentStatus, inputContents, true, courseCategory))
} else {
Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, null, contentStatus, inputContents))
Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, null, contentStatus, inputContents, false, courseCategory))
}
Option(UserEnrolmentAgg(UserActivityAgg("Course", userId, courseId, contextId, Map("completedCount" -> completedCount.toDouble), Map("completedCount" -> System.currentTimeMillis())), collectionProgress))
}
Expand Down Expand Up @@ -455,133 +465,30 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti
config: ActivityAggregateUpdaterConfig,
httpUtil: HttpUtil,
contentCache: DataCache
): Boolean = {
): (Boolean, String) = {
logger.info(
"Verify Program post-publish required for content: " + identifier
)
// Get the primary Categories for the courses here
var isValidProgram = false
var isValidCourse = false
var courseCategory = ""
val contentObj: java.util.Map[String, AnyRef] =
getCourseInfo(identifier)(metrics, config, contentCache, httpUtil)
if (!contentObj.isEmpty) {
val primaryCategory = contentObj.get("primaryCategory")
courseCategory = contentObj.get("courseCategory").asInstanceOf[String]
if (primaryCategory != null &&
(primaryCategory != "Program"
|| primaryCategory != "Curated Program"
|| primaryCategory != "Blended Program")) {
isValidProgram = true
isValidCourse = true
}
logger.info("PrimaryCategory value is :" + primaryCategory + ", for Id: " + identifier)
logger.info("PrimaryCategory value is : " + primaryCategory + ", for Id: " + identifier)
} else {
logger.error("Failed to read content details for Id: " + identifier)
throw new Exception(s"Failed to read content for Id $identifier")
}
logger.info("is activity aggregator is skipping this event ? " + isValidProgram)
isValidProgram
}

def getCourseInfo(courseId: String)(
metrics: Metrics,
config: ActivityAggregateUpdaterConfig,
contentCache: DataCache,
httpUtil: HttpUtil
): java.util.Map[String, AnyRef] = {
logger.info(
s"Fetching course details from Redis for Id: ${courseId}, Configured Index: " + contentCache.getDBConfigIndex() + ", Current Index: " + contentCache.getDBIndex()
)
val courseMetadata = contentCache.getWithRetry(courseId)
if (null == courseMetadata || courseMetadata.isEmpty) {
logger.error(
s"Fetching course details from Content Service for Id: ${courseId}"
)
val url =
config.contentReadURL + "/" + courseId + "?fields=identifier,name,versionKey,parentCollections,primaryCategory"
val response = getAPICall(url, "content")(config, httpUtil, metrics)
val courseName = StringContext
.processEscapes(
response.getOrElse(config.name, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val primaryCategory = StringContext
.processEscapes(
response.getOrElse(config.primaryCategory, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val versionKey = StringContext
.processEscapes(
response.getOrElse(config.versionKey, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val parentCollections = response
.getOrElse("parentCollections", List.empty[String])
.asInstanceOf[List[String]]
val courseInfoMap: java.util.Map[String, AnyRef] =
new java.util.HashMap[String, AnyRef]()
courseInfoMap.put("courseId", courseId)
courseInfoMap.put("courseName", courseName)
courseInfoMap.put("parentCollections", parentCollections)
courseInfoMap.put("primaryCategory", primaryCategory)
courseInfoMap.put("versionKey", versionKey)
courseInfoMap
} else {
val courseName = StringContext
.processEscapes(
courseMetadata.getOrElse(config.name, "").asInstanceOf[String]
)
.filter(_ >= ' ')
val primaryCategory = StringContext
.processEscapes(
courseMetadata
.getOrElse("primarycategory", "")
.asInstanceOf[String]
)
.filter(_ >= ' ')
val versionKey = StringContext
.processEscapes(
courseMetadata.getOrElse("versionkey", "").asInstanceOf[String]
)
.filter(_ >= ' ')
val parentCollections = courseMetadata
.getOrElse("parentcollections", new java.util.ArrayList())
.asInstanceOf[java.util.ArrayList[String]]
val courseInfoMap: java.util.Map[String, AnyRef] =
new java.util.HashMap[String, AnyRef]()
courseInfoMap.put("courseId", courseId)
courseInfoMap.put("courseName", courseName)
courseInfoMap.put("parentCollections", parentCollections)
courseInfoMap.put("primaryCategory", primaryCategory)
courseInfoMap.put("versionKey", versionKey)
courseInfoMap
}

}

def getAPICall(url: String, responseParam: String)(
config: ActivityAggregateUpdaterConfig,
httpUtil: HttpUtil,
metrics: Metrics
): Map[String, AnyRef] = {
val response = httpUtil.get(url, config.defaultHeaders)
if (200 == response.status) {
ScalaJsonUtil
.deserialize[Map[String, AnyRef]](response.body)
.getOrElse("result", Map[String, AnyRef]())
.asInstanceOf[Map[String, AnyRef]]
.getOrElse(responseParam, Map[String, AnyRef]())
.asInstanceOf[Map[String, AnyRef]]
} else if (
400 == response.status && response.body.contains(
config.userAccBlockedErrCode
)
) {
metrics.incCounter(config.skippedEventCount)
logger.error(
s"Error while fetching user details for ${url}: " + response.status + " :: " + response.body
)
Map[String, AnyRef]()
} else {
throw new Exception(
s"Error from get API : ${url}, with response: ${response}"
)
}
logger.info("is activity aggregator is processing this event ? " + isValidCourse)
(isValidCourse, courseCategory)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.sunbird.job.aggregate.functions

import java.util.UUID

import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update}
import com.google.gson.Gson
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -14,11 +13,11 @@ import org.sunbird.job.dedup.DeDupEngine
import org.sunbird.job.{BaseProcessFunction, Metrics}
import org.sunbird.job.aggregate.domain.{ActorObject, CollectionProgress, EventContext, EventData, EventObject, TelemetryEvent}
import org.sunbird.job.aggregate.task.ActivityAggregateUpdaterConfig
import org.sunbird.job.util.CassandraUtil
import org.sunbird.job.util.{CassandraUtil, HttpUtil}

import scala.collection.JavaConverters._

class CollectionProgressCompleteFunction(config: ActivityAggregateUpdaterConfig)(implicit val enrolmentCompleteTypeInfo: TypeInformation[List[CollectionProgress]], val stringTypeInfo: TypeInformation[String], @transient var cassandraUtil: CassandraUtil = null)
class CollectionProgressCompleteFunction(config: ActivityAggregateUpdaterConfig, httpUtil: HttpUtil)(implicit val enrolmentCompleteTypeInfo: TypeInformation[List[CollectionProgress]], val stringTypeInfo: TypeInformation[String], @transient var cassandraUtil: CassandraUtil = null)
extends BaseProcessFunction[List[CollectionProgress], String](config) {

private[this] val logger = LoggerFactory.getLogger(classOf[CollectionProgressCompleteFunction])
Expand Down Expand Up @@ -51,7 +50,12 @@ class CollectionProgressCompleteFunction(config: ActivityAggregateUpdaterConfig)
logger.info("enrolmentQueries => "+enrolmentQueries)
updateDB(config.thresholdBatchWriteSize, enrolmentQueries)(metrics)
pendingEnrolments.foreach(e => {
createIssueCertEvent(e, context)(metrics)
val courseId = e.courseId
if (config.caseStudy.replaceAll("\\s+", " ").equalsIgnoreCase(e.courseCategory.replaceAll("\\s+", " "))) {
logger.info(s"Certificate event not generated for courseId: $courseId because the courseCategory is 'Case Study'.")
} else {
createIssueCertEvent(e, context)(metrics)
}
generateAuditEvent(e, context)(metrics)
})
logger.info("posting events completed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,8 @@ class ActivityAggregateUpdaterConfig(override val config: Config) extends BaseJo
val defaultHeaders = Map[String, String] ("Content-Type" -> "application/json")
val userAccBlockedErrCode = "UOS_USRRED0006"
val skippedEventCount = "skipped-events-count"
val courseCategory = "courseCategory"
val caseStudy = "Case Study"
val coursecategory = "coursecategory"

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ActivityAggregateUpdaterStreamTask(config: ActivityAggregateUpdaterConfig,
// TODO: set separate parallelism for below task.
progressStream.getSideOutput(config.collectionUpdateOutputTag).process(new CollectionProgressUpdateFunction(config))
.name(config.collectionProgressUpdateFn).uid(config.collectionProgressUpdateFn).setParallelism(config.enrolmentCompleteParallelism)
val enrolmentCompleteStream = progressStream.getSideOutput(config.collectionCompleteOutputTag).process(new CollectionProgressCompleteFunction(config))
val enrolmentCompleteStream = progressStream.getSideOutput(config.collectionCompleteOutputTag).process(new CollectionProgressCompleteFunction(config, httpUtil))
.name(config.collectionCompleteFn).uid(config.collectionCompleteFn).setParallelism(config.enrolmentCompleteParallelism)

enrolmentCompleteStream.getSideOutput(config.certIssueOutputTag).addSink(kafkaConnector.kafkaStringSink(config.kafkaCertIssueTopic))
Expand Down
Loading

0 comments on commit fbfd732

Please sign in to comment.