From fbfd732e54e567bde77918c1c232d8330dffcc64 Mon Sep 17 00:00:00 2001 From: Karthikeyan Rajendran <70887864+karthik-tarento@users.noreply.github.com> Date: Wed, 27 Nov 2024 10:06:54 +0530 Subject: [PATCH] 4.8.18 case study fix (#53) * Ags cert (#45) * Added debug logs * added code to prevent cert. for ags casestudy * Update CompositeSearchIndexerHelper.scala --------- Co-authored-by: karthik-tarento Co-authored-by: anilkumar * Fix: Add null check for contentCache before logging Redis fetch details (#50) Co-authored-by: anilkumar * Optimized code for reading course data * Updated logs and variable names to avoid confusion --------- Co-authored-by: anilkumarkammalapalli <121931293+anilkumarkammalapalli@users.noreply.github.com> Co-authored-by: anilkumar --- .../job/aggregate/common/ContentHelper.scala | 123 +++++++++++++++ .../sunbird/job/aggregate/domain/Models.scala | 2 +- .../ActivityAggregatesFunction.scala | 143 +++--------------- .../CollectionProgressCompleteFunction.scala | 12 +- .../task/ActivityAggregateUpdaterConfig.scala | 3 + .../ActivityAggregateUpdaterStreamTask.scala | 2 +- .../task/PostPublishProcessorConfig.scala | 2 +- 7 files changed, 162 insertions(+), 125 deletions(-) create mode 100644 activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/common/ContentHelper.scala diff --git a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/common/ContentHelper.scala b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/common/ContentHelper.scala new file mode 100644 index 000000000..0c64e9723 --- /dev/null +++ b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/common/ContentHelper.scala @@ -0,0 +1,123 @@ +package org.sunbird.job.aggregate.common + +import org.slf4j.ILoggerFactory +import org.sunbird.job.cache.DataCache +import org.sunbird.job.aggregate.task.ActivityAggregateUpdaterConfig +import org.sunbird.job.util.{HttpUtil, ScalaJsonUtil} +import org.slf4j.LoggerFactory +import org.sunbird.job.Metrics + +trait ContentHelper { + private[this] val logger = LoggerFactory.getLogger(classOf[ContentHelper]) + def getCourseInfo(courseId: String)( + metrics: Metrics, + config: ActivityAggregateUpdaterConfig, + contentCache: DataCache, + httpUtil: HttpUtil + ): java.util.Map[String, AnyRef] = { + logger.info( + s"Fetching course details from Redis for Id: ${courseId}, Configured Index: " + contentCache.getDBConfigIndex() + ", Current Index: " + contentCache.getDBIndex() + ) + val courseMetadata = Option(contentCache).flatMap(c => Option(c.getWithRetry(courseId))).getOrElse(null) + if (null == courseMetadata || courseMetadata.isEmpty) { + logger.error( + s"Fetching course details from Content Service for Id: ${courseId}" + ) + val url = + config.contentReadURL + "/" + courseId + "?fields=identifier,name,versionKey,parentCollections,primaryCategory,courseCategory" + val response = getAPICall(url, "content")(config, httpUtil, metrics) + val courseName = StringContext + .processEscapes( + response.getOrElse(config.name, "").asInstanceOf[String] + ) + .filter(_ >= ' ') + val primaryCategory = StringContext + .processEscapes( + response.getOrElse(config.primaryCategory, "").asInstanceOf[String] + ) + .filter(_ >= ' ') + val versionKey = StringContext + .processEscapes( + response.getOrElse(config.versionKey, "").asInstanceOf[String] + ) + .filter(_ >= ' ') + val parentCollections = response + .getOrElse("parentCollections", List.empty[String]) + .asInstanceOf[List[String]] + val courseCateogry = StringContext + .processEscapes(response.getOrElse(config.courseCategory, "").asInstanceOf[String]).filter(_ >= ' ') + val courseInfoMap: java.util.Map[String, AnyRef] = + new java.util.HashMap[String, AnyRef]() + courseInfoMap.put("courseId", courseId) + courseInfoMap.put("courseName", courseName) + courseInfoMap.put("parentCollections", parentCollections) + courseInfoMap.put("primaryCategory", primaryCategory) + courseInfoMap.put("versionKey", versionKey) + courseInfoMap.put(config.courseCategory, courseCateogry) + courseInfoMap + } else { + val courseName = StringContext + .processEscapes( + courseMetadata.getOrElse(config.name, "").asInstanceOf[String] + ) + .filter(_ >= ' ') + val primaryCategory = StringContext + .processEscapes( + courseMetadata + .getOrElse("primarycategory", "") + .asInstanceOf[String] + ) + .filter(_ >= ' ') + val versionKey = StringContext + .processEscapes( + courseMetadata.getOrElse("versionkey", "").asInstanceOf[String] + ) + .filter(_ >= ' ') + val parentCollections = courseMetadata + .getOrElse("parentcollections", new java.util.ArrayList()) + .asInstanceOf[java.util.ArrayList[String]] + val courseCateogry = StringContext + .processEscapes(courseMetadata.getOrElse(config.coursecategory, "").asInstanceOf[String]).filter(_ >= ' ') + val courseInfoMap: java.util.Map[String, AnyRef] = + new java.util.HashMap[String, AnyRef]() + courseInfoMap.put("courseId", courseId) + courseInfoMap.put("courseName", courseName) + courseInfoMap.put("parentCollections", parentCollections) + courseInfoMap.put("primaryCategory", primaryCategory) + courseInfoMap.put("versionKey", versionKey) + courseInfoMap.put(config.courseCategory, courseCateogry) + courseInfoMap + } + + } + + def getAPICall(url: String, responseParam: String)( + config: ActivityAggregateUpdaterConfig, + httpUtil: HttpUtil, + metrics: Metrics + ): Map[String, AnyRef] = { + val response = httpUtil.get(url, config.defaultHeaders) + if (200 == response.status) { + ScalaJsonUtil + .deserialize[Map[String, AnyRef]](response.body) + .getOrElse("result", Map[String, AnyRef]()) + .asInstanceOf[Map[String, AnyRef]] + .getOrElse(responseParam, Map[String, AnyRef]()) + .asInstanceOf[Map[String, AnyRef]] + } else if ( + 400 == response.status && response.body.contains( + config.userAccBlockedErrCode + ) + ) { + metrics.incCounter(config.skippedEventCount) + logger.error( + s"Error while fetching user details for ${url}: " + response.status + " :: " + response.body + ) + Map[String, AnyRef]() + } else { + throw new Exception( + s"Error from get API : ${url}, with response: ${response}" + ) + } + } +} \ No newline at end of file diff --git a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/domain/Models.scala b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/domain/Models.scala index fab648368..8c795d1f4 100644 --- a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/domain/Models.scala +++ b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/domain/Models.scala @@ -46,7 +46,7 @@ case class UserActivityAgg(activity_type: String, agg_last_updated: Map[String, Long] ) -case class CollectionProgress(userId: String, batchId: String, courseId: String, progress: Int, completedOn: Date, contentStatus: Map[String, Int], inputContents: List[String], completed: Boolean = false) +case class CollectionProgress(userId: String, batchId: String, courseId: String, progress: Int, completedOn: Date, contentStatus: Map[String, Int], inputContents: List[String], completed: Boolean = false, courseCategory: String = "") case class UserEnrolmentAgg(activityAgg: UserActivityAgg, collectionProgress: Option[CollectionProgress] = None) diff --git a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/ActivityAggregatesFunction.scala b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/ActivityAggregatesFunction.scala index 79806457b..ef09feb92 100644 --- a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/ActivityAggregatesFunction.scala +++ b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/ActivityAggregatesFunction.scala @@ -16,6 +16,7 @@ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.slf4j.LoggerFactory import org.sunbird.job.cache.{DataCache, RedisConnect} +import org.sunbird.job.aggregate.common.ContentHelper import org.sunbird.job.aggregate.domain.{UserContentConsumption, _} import org.sunbird.job.aggregate.task.ActivityAggregateUpdaterConfig import org.sunbird.job.util.{CassandraUtil, HttpUtil, JSONUtil, ScalaJsonUtil} @@ -26,7 +27,8 @@ import scala.language.postfixOps class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUtil: HttpUtil, @transient var cassandraUtil: CassandraUtil = null) (implicit val stringTypeInfo: TypeInformation[String]) - extends WindowBaseProcessFunction[Map[String, AnyRef], String, Int](config) { + extends WindowBaseProcessFunction[Map[String, AnyRef], String, Int](config) + with ContentHelper { val mapType: Type = new TypeToken[Map[String, AnyRef]]() {}.getType private[this] val logger = LoggerFactory.getLogger(classOf[ActivityAggregatesFunction]) @@ -62,8 +64,16 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti metrics: Metrics): Unit = { logger.info("Input Events : " + JSONUtil.serialize(events.toList)) + var courseCategory = "" val inputUserConsumptionList: List[UserContentConsumption] = events - .filter(event=> verifyPrimaryCategory(event.getOrElse(config.courseId, "").asInstanceOf[String])(metrics, config, httpUtil, contentCache)) + .filter { + event => + val (isValidCourseEvent, courseCategoryVal) = verifyPrimaryCategory(event.getOrElse(config.courseId, "").asInstanceOf[String])(metrics, config, httpUtil, contentCache) + courseCategory = courseCategoryVal + if (!isValidCourseEvent) + logger.warn(s"Skipping event due to primaryCategory is Program.") + isValidCourseEvent + } .groupBy(key => (key.get(config.courseId), key.get(config.batchId), key.get(config.userId))) .values.map(value => { metrics.incCounter(config.processedEnrolmentCount) @@ -93,7 +103,7 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti val courseAggregations = finalUserConsumptionList.flatMap(userConsumption => { // Course Level Agg using the merged data of ContentConsumption per user, course and batch. - val optCourseAgg = courseActivityAgg(userConsumption, context)(metrics) + val optCourseAgg = courseActivityAgg(userConsumption, courseCategory, context)(metrics) val courseAggs = if (optCourseAgg.nonEmpty) List(optCourseAgg.get) else List() // Identify the children of the course (only collections) for which aggregates computation required. @@ -125,7 +135,7 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti /** * Course Level Agg using the merged data of ContentConsumption per user, course and batch. */ - def courseActivityAgg(userConsumption: UserContentConsumption, context: ProcessWindowFunction[Map[String, AnyRef], String, Int, GlobalWindow]#Context)(implicit metrics: Metrics): Option[UserEnrolmentAgg] = { + def courseActivityAgg(userConsumption: UserContentConsumption, courseCategory: String, context: ProcessWindowFunction[Map[String, AnyRef], String, Int, GlobalWindow]#Context)(implicit metrics: Metrics): Option[UserEnrolmentAgg] = { val courseId = userConsumption.courseId val userId = userConsumption.userId val contextId = "cb:" + userConsumption.batchId @@ -151,9 +161,9 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti val contentStatus = userConsumption.contents.map(cc => (cc._2.contentId, cc._2.status)).toMap val inputContents = userConsumption.contents.filter(cc => cc._2.fromInput).keys.toList val collectionProgress = if (completedCount >= leafNodes.size) { - Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, new java.util.Date(), contentStatus, inputContents, true)) + Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, new java.util.Date(), contentStatus, inputContents, true, courseCategory)) } else { - Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, null, contentStatus, inputContents)) + Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, null, contentStatus, inputContents, false, courseCategory)) } Option(UserEnrolmentAgg(UserActivityAgg("Course", userId, courseId, contextId, Map("completedCount" -> completedCount.toDouble), Map("completedCount" -> System.currentTimeMillis())), collectionProgress)) } @@ -455,133 +465,30 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti config: ActivityAggregateUpdaterConfig, httpUtil: HttpUtil, contentCache: DataCache - ): Boolean = { + ): (Boolean, String) = { logger.info( "Verify Program post-publish required for content: " + identifier ) // Get the primary Categories for the courses here - var isValidProgram = false + var isValidCourse = false + var courseCategory = "" val contentObj: java.util.Map[String, AnyRef] = getCourseInfo(identifier)(metrics, config, contentCache, httpUtil) if (!contentObj.isEmpty) { val primaryCategory = contentObj.get("primaryCategory") + courseCategory = contentObj.get("courseCategory").asInstanceOf[String] if (primaryCategory != null && (primaryCategory != "Program" || primaryCategory != "Curated Program" || primaryCategory != "Blended Program")) { - isValidProgram = true + isValidCourse = true } - logger.info("PrimaryCategory value is :" + primaryCategory + ", for Id: " + identifier) + logger.info("PrimaryCategory value is : " + primaryCategory + ", for Id: " + identifier) } else { logger.error("Failed to read content details for Id: " + identifier) + throw new Exception(s"Failed to read content for Id $identifier") } - logger.info("is activity aggregator is skipping this event ? " + isValidProgram) - isValidProgram - } - - def getCourseInfo(courseId: String)( - metrics: Metrics, - config: ActivityAggregateUpdaterConfig, - contentCache: DataCache, - httpUtil: HttpUtil - ): java.util.Map[String, AnyRef] = { - logger.info( - s"Fetching course details from Redis for Id: ${courseId}, Configured Index: " + contentCache.getDBConfigIndex() + ", Current Index: " + contentCache.getDBIndex() - ) - val courseMetadata = contentCache.getWithRetry(courseId) - if (null == courseMetadata || courseMetadata.isEmpty) { - logger.error( - s"Fetching course details from Content Service for Id: ${courseId}" - ) - val url = - config.contentReadURL + "/" + courseId + "?fields=identifier,name,versionKey,parentCollections,primaryCategory" - val response = getAPICall(url, "content")(config, httpUtil, metrics) - val courseName = StringContext - .processEscapes( - response.getOrElse(config.name, "").asInstanceOf[String] - ) - .filter(_ >= ' ') - val primaryCategory = StringContext - .processEscapes( - response.getOrElse(config.primaryCategory, "").asInstanceOf[String] - ) - .filter(_ >= ' ') - val versionKey = StringContext - .processEscapes( - response.getOrElse(config.versionKey, "").asInstanceOf[String] - ) - .filter(_ >= ' ') - val parentCollections = response - .getOrElse("parentCollections", List.empty[String]) - .asInstanceOf[List[String]] - val courseInfoMap: java.util.Map[String, AnyRef] = - new java.util.HashMap[String, AnyRef]() - courseInfoMap.put("courseId", courseId) - courseInfoMap.put("courseName", courseName) - courseInfoMap.put("parentCollections", parentCollections) - courseInfoMap.put("primaryCategory", primaryCategory) - courseInfoMap.put("versionKey", versionKey) - courseInfoMap - } else { - val courseName = StringContext - .processEscapes( - courseMetadata.getOrElse(config.name, "").asInstanceOf[String] - ) - .filter(_ >= ' ') - val primaryCategory = StringContext - .processEscapes( - courseMetadata - .getOrElse("primarycategory", "") - .asInstanceOf[String] - ) - .filter(_ >= ' ') - val versionKey = StringContext - .processEscapes( - courseMetadata.getOrElse("versionkey", "").asInstanceOf[String] - ) - .filter(_ >= ' ') - val parentCollections = courseMetadata - .getOrElse("parentcollections", new java.util.ArrayList()) - .asInstanceOf[java.util.ArrayList[String]] - val courseInfoMap: java.util.Map[String, AnyRef] = - new java.util.HashMap[String, AnyRef]() - courseInfoMap.put("courseId", courseId) - courseInfoMap.put("courseName", courseName) - courseInfoMap.put("parentCollections", parentCollections) - courseInfoMap.put("primaryCategory", primaryCategory) - courseInfoMap.put("versionKey", versionKey) - courseInfoMap - } - - } - - def getAPICall(url: String, responseParam: String)( - config: ActivityAggregateUpdaterConfig, - httpUtil: HttpUtil, - metrics: Metrics - ): Map[String, AnyRef] = { - val response = httpUtil.get(url, config.defaultHeaders) - if (200 == response.status) { - ScalaJsonUtil - .deserialize[Map[String, AnyRef]](response.body) - .getOrElse("result", Map[String, AnyRef]()) - .asInstanceOf[Map[String, AnyRef]] - .getOrElse(responseParam, Map[String, AnyRef]()) - .asInstanceOf[Map[String, AnyRef]] - } else if ( - 400 == response.status && response.body.contains( - config.userAccBlockedErrCode - ) - ) { - metrics.incCounter(config.skippedEventCount) - logger.error( - s"Error while fetching user details for ${url}: " + response.status + " :: " + response.body - ) - Map[String, AnyRef]() - } else { - throw new Exception( - s"Error from get API : ${url}, with response: ${response}" - ) - } + logger.info("is activity aggregator is processing this event ? " + isValidCourse) + (isValidCourse, courseCategory) } } diff --git a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/CollectionProgressCompleteFunction.scala b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/CollectionProgressCompleteFunction.scala index a91bf76b4..807ccc250 100644 --- a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/CollectionProgressCompleteFunction.scala +++ b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/CollectionProgressCompleteFunction.scala @@ -1,7 +1,6 @@ package org.sunbird.job.aggregate.functions import java.util.UUID - import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update} import com.google.gson.Gson import org.apache.flink.api.common.typeinfo.TypeInformation @@ -14,11 +13,11 @@ import org.sunbird.job.dedup.DeDupEngine import org.sunbird.job.{BaseProcessFunction, Metrics} import org.sunbird.job.aggregate.domain.{ActorObject, CollectionProgress, EventContext, EventData, EventObject, TelemetryEvent} import org.sunbird.job.aggregate.task.ActivityAggregateUpdaterConfig -import org.sunbird.job.util.CassandraUtil +import org.sunbird.job.util.{CassandraUtil, HttpUtil} import scala.collection.JavaConverters._ -class CollectionProgressCompleteFunction(config: ActivityAggregateUpdaterConfig)(implicit val enrolmentCompleteTypeInfo: TypeInformation[List[CollectionProgress]], val stringTypeInfo: TypeInformation[String], @transient var cassandraUtil: CassandraUtil = null) +class CollectionProgressCompleteFunction(config: ActivityAggregateUpdaterConfig, httpUtil: HttpUtil)(implicit val enrolmentCompleteTypeInfo: TypeInformation[List[CollectionProgress]], val stringTypeInfo: TypeInformation[String], @transient var cassandraUtil: CassandraUtil = null) extends BaseProcessFunction[List[CollectionProgress], String](config) { private[this] val logger = LoggerFactory.getLogger(classOf[CollectionProgressCompleteFunction]) @@ -51,7 +50,12 @@ class CollectionProgressCompleteFunction(config: ActivityAggregateUpdaterConfig) logger.info("enrolmentQueries => "+enrolmentQueries) updateDB(config.thresholdBatchWriteSize, enrolmentQueries)(metrics) pendingEnrolments.foreach(e => { - createIssueCertEvent(e, context)(metrics) + val courseId = e.courseId + if (config.caseStudy.replaceAll("\\s+", " ").equalsIgnoreCase(e.courseCategory.replaceAll("\\s+", " "))) { + logger.info(s"Certificate event not generated for courseId: $courseId because the courseCategory is 'Case Study'.") + } else { + createIssueCertEvent(e, context)(metrics) + } generateAuditEvent(e, context)(metrics) }) logger.info("posting events completed") diff --git a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterConfig.scala b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterConfig.scala index 7e291d5f1..f92730370 100644 --- a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterConfig.scala +++ b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterConfig.scala @@ -141,5 +141,8 @@ class ActivityAggregateUpdaterConfig(override val config: Config) extends BaseJo val defaultHeaders = Map[String, String] ("Content-Type" -> "application/json") val userAccBlockedErrCode = "UOS_USRRED0006" val skippedEventCount = "skipped-events-count" + val courseCategory = "courseCategory" + val caseStudy = "Case Study" + val coursecategory = "coursecategory" } diff --git a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterStreamTask.scala b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterStreamTask.scala index b947b8523..7771aee9e 100644 --- a/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterStreamTask.scala +++ b/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/task/ActivityAggregateUpdaterStreamTask.scala @@ -45,7 +45,7 @@ class ActivityAggregateUpdaterStreamTask(config: ActivityAggregateUpdaterConfig, // TODO: set separate parallelism for below task. progressStream.getSideOutput(config.collectionUpdateOutputTag).process(new CollectionProgressUpdateFunction(config)) .name(config.collectionProgressUpdateFn).uid(config.collectionProgressUpdateFn).setParallelism(config.enrolmentCompleteParallelism) - val enrolmentCompleteStream = progressStream.getSideOutput(config.collectionCompleteOutputTag).process(new CollectionProgressCompleteFunction(config)) + val enrolmentCompleteStream = progressStream.getSideOutput(config.collectionCompleteOutputTag).process(new CollectionProgressCompleteFunction(config, httpUtil)) .name(config.collectionCompleteFn).uid(config.collectionCompleteFn).setParallelism(config.enrolmentCompleteParallelism) enrolmentCompleteStream.getSideOutput(config.certIssueOutputTag).addSink(kafkaConnector.kafkaStringSink(config.kafkaCertIssueTopic)) diff --git a/post-publish-processor/src/main/scala/org/sunbird/job/postpublish/task/PostPublishProcessorConfig.scala b/post-publish-processor/src/main/scala/org/sunbird/job/postpublish/task/PostPublishProcessorConfig.scala index 2faf14bd0..7c08e77fa 100644 --- a/post-publish-processor/src/main/scala/org/sunbird/job/postpublish/task/PostPublishProcessorConfig.scala +++ b/post-publish-processor/src/main/scala/org/sunbird/job/postpublish/task/PostPublishProcessorConfig.scala @@ -116,6 +116,6 @@ class PostPublishProcessorConfig(override val config: Config) extends BaseJobCon val contentCacheStore: Int = 0 - val allowedResourceTypesForEventBatch = List[String]("Karmayogi Saptah") + val allowedResourceTypesForEventBatch: util.List[String] = if (config.hasPath("service.batchCreation.event.categories")) config.getStringList("service.batchCreation.event.categories") else util.Arrays.asList("Karmayogi Saptah") val minPercetageToCompleteEventResource: Int = 50 }