Skip to content

Commit

Permalink
Merge pull request #720 from Sunbird-Knowlg/release-5.4.1
Browse files Browse the repository at this point in the history
Merge Release 5.4.1 into 5.5.0
  • Loading branch information
maheshkumargangula authored Jul 13, 2023
2 parents bcc03e5 + 9c1dbdf commit d034624
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: 2.0
jobs:
unit-tests:
docker:
- image: circleci/openjdk:stretch
- image: circleci/openjdk:14-jdk-buster-node-browsers-legacy
resource_class: medium
working_directory: ~/kp
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class CollectionPublishFunction(config: ContentPublishConfig, httpUtil: HttpUtil
logger.info("Collection publishing started for : " + data.identifier)
metrics.incCounter(config.collectionPublishEventCount)
val obj: ObjectData = getObject(data.identifier, data.pkgVersion, data.mimeType, data.publishType, readerConfig)(neo4JUtil, cassandraUtil, config)
logger.info(s"KN-856: Step:1 - From DB Collection: ${obj.identifier} | Hierarchy: ${obj.hierarchy}");
try {
if (obj.pkgVersion > data.pkgVersion) {
metrics.incCounter(config.skippedEventCount)
Expand All @@ -82,9 +83,10 @@ class CollectionPublishFunction(config: ContentPublishConfig, httpUtil: HttpUtil
if (messages.isEmpty) {
// Pre-publish update
updateProcessingNode(updObj)(neo4JUtil, cassandraUtil, readerConfig, definitionCache, definitionConfig)

logger.info(s"KN-856: Step:2 - After updating processing status Collection: ${updObj.identifier} | Hierarchy: ${updObj.hierarchy}");
val isCollectionShallowCopy = isContentShallowCopy(updObj)
val updatedObj = if (isCollectionShallowCopy) updateOriginPkgVersion(updObj)(neo4JUtil) else updObj
logger.info(s"KN-856: Step:3 - After shallow copy status check and update Collection: ${updatedObj.identifier} | isCollectionShallowCopy: $isCollectionShallowCopy | Hierarchy: ${updatedObj.hierarchy}");

// Clear redis cache
cache.del(data.identifier)
Expand All @@ -100,9 +102,11 @@ class CollectionPublishFunction(config: ContentPublishConfig, httpUtil: HttpUtil

logger.info("CollectionPublishFunction:: Live unitNodes: " + unitNodes)
val enrichedObj = enrichObject(updatedObj)(neo4JUtil, cassandraUtil, readerConfig, cloudStorageUtil, config, definitionCache, definitionConfig)
logger.info(s"KN-856: Step:4 - After enriching the object Collection: ${enrichedObj.identifier} | Hierarchy: ${enrichedObj.hierarchy}");
logger.info("CollectionPublishFunction:: Collection Object Enriched: " + enrichedObj.identifier)
val objWithEcar = getObjectWithEcar(enrichedObj, pkgTypes)(ec, neo4JUtil, cassandraUtil, readerConfig, cloudStorageUtil, config, definitionCache, definitionConfig, httpUtil)
logger.info("CollectionPublishFunction:: ECAR generation completed for Collection Object: " + objWithEcar.identifier)
logger.info(s"KN-856: Step:5 - After WithEcar Collection: ${objWithEcar.identifier} | Hierarchy: ${objWithEcar.hierarchy}");

val collRelationalMetadata = getRelationalMetadata(obj.identifier, obj.pkgVersion-1, readerConfig)(cassandraUtil, config).getOrElse(Map.empty[String, AnyRef])

Expand All @@ -116,19 +120,22 @@ class CollectionPublishFunction(config: ContentPublishConfig, httpUtil: HttpUtil
val publishType = objWithEcar.getString("publish_type", "Public")
val successObj = new ObjectData(objWithEcar.identifier, objWithEcar.metadata + ("status" -> (if (publishType.equalsIgnoreCase("Unlisted")) "Unlisted" else "Live"), "variants" -> variantsJsonString, "identifier" -> objWithEcar.identifier), objWithEcar.extData, objWithEcar.hierarchy)
val children = successObj.hierarchy.getOrElse(Map()).getOrElse("children", List()).asInstanceOf[List[Map[String, AnyRef]]]

logger.info(s"KN-856: Step:6 - After saveOnSuccess(Neo4J Save) Collection: ${successObj.identifier} | Hierarchy: $children");
// Collection - update and publish children - line 418 in PublishFinalizer
val updatedChildren = updateHierarchyMetadata(children, successObj.metadata, collRelationalMetadata)(config)
logger.info(s"KN-856: Step:7 - After updateHierarchyMetadata Collection: ${successObj.identifier} | Hierarchy: $updatedChildren");
logger.info("CollectionPublishFunction:: Hierarchy Metadata updated for Collection Object: " + successObj.identifier + " || updatedChildren:: " + updatedChildren)
publishHierarchy(updatedChildren, successObj, readerConfig, config)(cassandraUtil)

logger.info(s"KN-856: Step:8 - After publishHierarchy Collection: ${successObj.identifier} | Hierarchy: $updatedChildren");
//TODO: Save IMAGE Object with enrichedObj children and collRelationalMetadata when pkgVersion is 1 - verify with MaheshG
if(data.pkgVersion == 1) {
saveImageHierarchy(enrichedObj, readerConfig, collRelationalMetadata)(cassandraUtil)
logger.info(s"KN-856: Step:8.1 - After saveImageHierarchy Collection: ${enrichedObj.identifier} | Hierarchy: ${enrichedObj.hierarchy}");
}

if (!isCollectionShallowCopy) syncNodes(successObj, updatedChildren, unitNodes)(esUtil, neo4JUtil, cassandraUtil, readerConfig, definition, config)
pushPostProcessEvent(successObj, dialContextMap, context)(metrics)
logger.info(s"KN-856: Step:9 - After pushPostProcessEvent Collection: ${successObj.identifier} | Hierarchy: ${successObj.hierarchy}");
metrics.incCounter(config.collectionPublishSuccessEventCount)
logger.info("CollectionPublishFunction:: Collection publishing completed successfully for : " + data.identifier)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class ContentPublishStreamTask(config: ContentPublishConfig, kafkaConnector: Fli
val collectionPublish = processStreamTask.getSideOutput(config.collectionPublishOutTag).process(new CollectionPublishFunction(config, httpUtil))
.name("collection-publish-process").uid("collection-publish-process").setParallelism(1)
collectionPublish.getSideOutput(config.generatePostPublishProcessTag).addSink(kafkaConnector.kafkaStringSink(config.postPublishTopic))
collectionPublish.getSideOutput(config.failedEventOutTag).addSink(kafkaConnector.kafkaStringSink(config.kafkaErrorTopic))

env.execute(config.jobName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.sunbird.job.exception.InvalidEventException
import org.sunbird.job.qrimagegenerator.domain.{Event, ImageConfig, QRCodeImageGeneratorRequest}
import org.sunbird.job.qrimagegenerator.task.QRCodeImageGeneratorConfig
import org.sunbird.job.qrimagegenerator.util.QRCodeImageGeneratorUtil
import org.sunbird.job.util.{CassandraUtil, CloudStorageUtil, FileUtils}
import org.sunbird.job.util.{CassandraUtil, CloudStorageUtil, ElasticSearchUtil, FileUtils}
import org.sunbird.job.{BaseProcessFunction, Metrics}

import java.io.File
Expand All @@ -17,6 +17,7 @@ import scala.collection.mutable.ListBuffer
class QRCodeImageGeneratorFunction(config: QRCodeImageGeneratorConfig,
@transient var cassandraUtil: CassandraUtil = null,
@transient var cloudStorageUtil: CloudStorageUtil = null,
@transient var esUtil: ElasticSearchUtil = null,
@transient var qRCodeImageGeneratorUtil: QRCodeImageGeneratorUtil = null)
(implicit val stringTypeInfo: TypeInformation[String])
extends BaseProcessFunction[Event, String](config) {
Expand All @@ -26,7 +27,8 @@ class QRCodeImageGeneratorFunction(config: QRCodeImageGeneratorConfig,
override def open(parameters: Configuration): Unit = {
cassandraUtil = new CassandraUtil(config.cassandraHost, config.cassandraPort, config)
cloudStorageUtil = new CloudStorageUtil(config)
qRCodeImageGeneratorUtil = new QRCodeImageGeneratorUtil(config, cassandraUtil, cloudStorageUtil)
esUtil = new ElasticSearchUtil(config.esConnectionInfo, config.dialcodeExternalIndex, config.dialcodeExternalIndexType)
qRCodeImageGeneratorUtil = new QRCodeImageGeneratorUtil(config, cassandraUtil, cloudStorageUtil, esUtil)
super.open(parameters)
}

Expand Down Expand Up @@ -101,6 +103,9 @@ class QRCodeImageGeneratorFunction(config: QRCodeImageGeneratorConfig,
else {
logger.info("QRCodeImageGeneratorService:processMessage: Skipping zip creation due to missing processId.")
}

if(config.indexImageURL)
context.output(config.indexImageUrlOutTag, event)
logger.info("QRCodeImageGeneratorService:processMessage: Message processed successfully at " + System.currentTimeMillis)
} else {
logger.info("QRCodeImageGeneratorService: Eid other than BE_QR_IMAGE_GENERATOR or Dialcodes not present")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.sunbird.job.qrimagegenerator.functions

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.slf4j.LoggerFactory
import org.sunbird.job.exception.InvalidEventException
import org.sunbird.job.qrimagegenerator.domain.Event
import org.sunbird.job.qrimagegenerator.task.QRCodeImageGeneratorConfig
import org.sunbird.job.qrimagegenerator.util.QRCodeImageGeneratorUtil
import org.sunbird.job.util.{CassandraUtil, CloudStorageUtil, ElasticSearchUtil, ScalaJsonUtil}
import org.sunbird.job.{BaseProcessFunction, Metrics}

import scala.collection.mutable

class QRCodeIndexImageUrlFunction(config: QRCodeImageGeneratorConfig,
@transient var cassandraUtil: CassandraUtil = null,
@transient var cloudStorageUtil: CloudStorageUtil = null,
@transient var esUtil: ElasticSearchUtil = null,
@transient var qRCodeImageGeneratorUtil: QRCodeImageGeneratorUtil = null)
(implicit val stringTypeInfo: TypeInformation[String]) extends BaseProcessFunction[Event, String](config) {

private val logger = LoggerFactory.getLogger(classOf[QRCodeIndexImageUrlFunction])

override def open(parameters: Configuration): Unit = {
cassandraUtil = new CassandraUtil(config.cassandraHost, config.cassandraPort, config)
esUtil = new ElasticSearchUtil(config.esConnectionInfo, config.dialcodeExternalIndex, config.dialcodeExternalIndexType)
qRCodeImageGeneratorUtil = new QRCodeImageGeneratorUtil(config, cassandraUtil, cloudStorageUtil, esUtil)
super.open(parameters)
}

override def close(): Unit = {
cassandraUtil.close()
super.close()
}

override def metricsList(): List[String] = {
List()
}

@throws(classOf[InvalidEventException])
override def processElement(event: Event,
context: ProcessFunction[Event, String]#Context,
metrics: Metrics): Unit = {
event.dialCodes.foreach { dialcode =>
try {
val text = dialcode("text").asInstanceOf[String]
qRCodeImageGeneratorUtil.indexImageInDocument(text)(esUtil, cassandraUtil)
} catch {
case e: Exception => e.printStackTrace()
throw new InvalidEventException(e.getMessage)
}
}

}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package org.sunbird.job.qrimagegenerator.task

import com.typesafe.config.Config
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.scala.OutputTag
import org.sunbird.job.BaseJobConfig
import org.sunbird.job.qrimagegenerator.domain.Event

class QRCodeImageGeneratorConfig(override val config: Config) extends BaseJobConfig(config, "qrcode-image-generator") {

Expand All @@ -12,6 +16,7 @@ class QRCodeImageGeneratorConfig(override val config: Config) extends BaseJobCon
val kafkaInputTopic: String = config.getString("kafka.input.topic")
override val kafkaConsumerParallelism: Int = config.getInt("task.consumer.parallelism")
override val parallelism: Int = config.getInt("task.parallelism")
implicit val qrImageTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event])

// Metric List
val totalEventsCount = "total-events-count"
Expand All @@ -23,6 +28,15 @@ class QRCodeImageGeneratorConfig(override val config: Config) extends BaseJobCon
val cloudDbHitCount = "cloud-db-hit-events-count"
val cloudDbFailCount = "cloud-db-hit-failure-count"

//Tags
val indexImageUrlOutTag: OutputTag[Event] = OutputTag[Event]("index-imageUrl")

// ES Configs
val esConnectionInfo = config.getString("es.basePath")

val dialcodeExternalIndex: String = if (config.hasPath("dialcode.index.name")) config.getString("dialcode.index.name") else "dialcode"
val dialcodeExternalIndexType: String = "dc"

// Consumers
val eventConsumer = "qrcode-image-generator-consumer"
val qrCodeImageGeneratorFunction = "qrcode-image-generator-function"
Expand All @@ -43,4 +57,6 @@ class QRCodeImageGeneratorConfig(override val config: Config) extends BaseJobCon
val cassandraKeyspace: String = config.getString("lms-cassandra.keyspace")
val cassandraDialCodeImageTable: String = config.getString("lms-cassandra.table.image")
val cassandraDialCodeBatchTable: String = config.getString("lms-cassandra.table.batch")

val indexImageURL: Boolean = if (config.hasPath("qr.image.indexImageUrl")) config.getBoolean("qr.image.indexImageUrl") else true
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.sunbird.job.connector.FlinkKafkaConnector
import org.sunbird.job.qrimagegenerator.domain.Event
import org.sunbird.job.qrimagegenerator.functions.QRCodeImageGeneratorFunction
import org.sunbird.job.qrimagegenerator.functions.{QRCodeImageGeneratorFunction, QRCodeIndexImageUrlFunction}
import org.sunbird.job.util.FlinkUtil

import java.io.File
Expand All @@ -20,7 +20,7 @@ class QRCodeImageGeneratorTask(config: QRCodeImageGeneratorConfig, kafkaConnecto
implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])

val source = kafkaConnector.kafkaJobRequestSource[Event](config.kafkaInputTopic)
env.addSource(source)
val streamTask = env.addSource(source)
.name(config.eventConsumer)
.uid(config.eventConsumer)
.rebalance
Expand All @@ -30,6 +30,10 @@ class QRCodeImageGeneratorTask(config: QRCodeImageGeneratorConfig, kafkaConnecto
.uid(config.qrCodeImageGeneratorFunction)
.setParallelism(config.parallelism)

if(config.indexImageURL)
streamTask.getSideOutput(config.indexImageUrlOutTag).process(new QRCodeIndexImageUrlFunction(config))
.name("index-imageUrl-process").uid("index-imageUrl-process").setParallelism(config.parallelism)

env.execute(config.jobName)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.sunbird.job.qrimagegenerator.util

import com.datastax.driver.core.Row
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.google.zxing.client.j2se.BufferedImageLuminanceSource
import com.google.zxing.common.{BitMatrix, HybridBinarizer}
Expand All @@ -8,6 +9,7 @@ import com.google.zxing.qrcode.decoder.ErrorCorrectionLevel
import com.google.zxing.{BarcodeFormat, EncodeHintType, NotFoundException, WriterException}
import org.slf4j.LoggerFactory
import org.sunbird.job.Metrics
import org.sunbird.job.exception.InvalidInputException
import org.sunbird.job.qrimagegenerator.domain.{ImageConfig, QRCodeImageGeneratorRequest}
import org.sunbird.job.qrimagegenerator.task.QRCodeImageGeneratorConfig
import org.sunbird.job.util._
Expand All @@ -18,12 +20,14 @@ import java.awt.{Color, Font, FontFormatException, Graphics2D, RenderingHints}
import java.io.{File, IOException, InputStream}
import java.util.UUID
import javax.imageio.ImageIO
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

class QRCodeImageGeneratorUtil(config: QRCodeImageGeneratorConfig, cassandraUtil: CassandraUtil, cloudStorageUtil: CloudStorageUtil) {
class QRCodeImageGeneratorUtil(config: QRCodeImageGeneratorConfig, cassandraUtil: CassandraUtil, cloudStorageUtil: CloudStorageUtil, esUtil: ElasticSearchUtil) {
private val qrCodeWriter = new QRCodeWriter()
private val fontStore: java.util.Map[String, Font] = new java.util.HashMap[String, Font]()
private val logger = LoggerFactory.getLogger(classOf[QRCodeImageGeneratorUtil])
val isrRelativePathEnabled = config.getBoolean("cloudstorage.metadata.replace_absolute_path", false)

@throws[WriterException]
@throws[IOException]
Expand Down Expand Up @@ -152,8 +156,8 @@ class QRCodeImageGeneratorUtil(config: QRCodeImageGeneratorConfig, cassandraUtil
}

private def getImage(bitMatrix: BitMatrix, colorModel: String) = {
val imageWidth = bitMatrix.getWidth()
val imageHeight = bitMatrix.getHeight()
val imageWidth = bitMatrix.getWidth
val imageHeight = bitMatrix.getHeight
val image = new BufferedImage(imageWidth, imageHeight, getImageType(colorModel))
image.createGraphics()
val graphics = image.getGraphics.asInstanceOf[Graphics2D]
Expand Down Expand Up @@ -259,5 +263,33 @@ class QRCodeImageGeneratorUtil(config: QRCodeImageGeneratorConfig, cassandraUtil
private def getFontFromStore(fontName: String): Font = {
fontStore.getOrDefault(fontName, loadFontStore(fontName))
}


def indexImageInDocument(id: String)(esUtil: ElasticSearchUtil, cassandraUtil: CassandraUtil): Unit = {
val documentJson: String = esUtil.getDocumentAsString(id)
val indexDocument = if (documentJson != null && documentJson.nonEmpty) ScalaJsonUtil.deserialize[mutable.Map[String, AnyRef]](documentJson) else mutable.Map[String, AnyRef]()
logger.info("QRCodeImageGeneratorUtil::indexImageInDocument:: indexDocument:: " + indexDocument)
if(indexDocument!=null && indexDocument.nonEmpty && !indexDocument.contains("url")) {
val query = QueryBuilder.select("url").from(config.cassandraKeyspace, config.cassandraDialCodeImageTable)
.allowFiltering()
.where(QueryBuilder.eq("dialcode", id))
logger.info("QRCodeImageGeneratorUtil::indexImageInDocument:: query:: " + query)
val row: Row = cassandraUtil.findOne(query.toString)
if(null != row && !row.isNull("url")) {
val imageUrl = row.getString("url")
logger.info("QRCodeImageGeneratorUtil::indexImageInDocument:: imageUrl:: " + imageUrl)
val absoluteImageUrl = if (isrRelativePathEnabled) CSPMetaUtil.updateAbsolutePath(imageUrl)(config) else imageUrl
logger.info("QRCodeImageGeneratorUtil::indexImageInDocument:: absoluteImageUrl:: " + absoluteImageUrl)
val updatedDocString = ScalaJsonUtil.serialize(indexDocument + ("imageUrl" -> absoluteImageUrl))
logger.info("QRCodeImageGeneratorUtil:indexImageInDocument: updatedDocString:: " + updatedDocString)
esUtil.updateDocument(id, updatedDocString)
}
} else {
throw new InvalidInputException("ElasticSearch Document not found for " + id)
}

}


}

1 change: 1 addition & 0 deletions qrcode-image-generator/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ qr.image {
imageFormat="png"
bottomMargin=0
margin=1
indexImageUrl = false
}
10 changes: 6 additions & 4 deletions qrcode-image-generator/src/test/resources/test.cql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE KEYSPACE dialcodes WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '2'} AND durable_writes = true;
CREATE KEYSPACE IF NOT EXISTS dialcodes WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '2'} AND durable_writes = true;

CREATE TABLE dialcodes.dialcode_images (
CREATE TABLE IF NOT EXISTS dialcodes.dialcode_images (
filename text PRIMARY KEY,
channel text,
config map<text, text>,
Expand All @@ -11,7 +11,7 @@ CREATE TABLE dialcodes.dialcode_images (
url text
);

CREATE TABLE dialcodes.dialcode_batch (
CREATE TABLE IF NOT EXISTS dialcodes.dialcode_batch (
processid uuid PRIMARY KEY,
channel text,
config map<text, text>,
Expand All @@ -20,4 +20,6 @@ CREATE TABLE dialcodes.dialcode_batch (
publisher text,
status int,
url text
);
);

INSERT INTO dialcodes.dialcode_images(filename, channel, dialcode, url) VALUES ('0_Q1I5I3', 'b00bc992ef25f1a9a8d63291e20efc8d', 'Q1I5I3', 'https://sunbirddev.blob.core.windows.net/sunbird-content-dev/in.ekstep/0_Q1I5I3.png') ;
Loading

0 comments on commit d034624

Please sign in to comment.