Skip to content

Commit

Permalink
Merge pull request #27 from varshamahuli97/cbrelease-4.8.17
Browse files Browse the repository at this point in the history
org wise zip reports upload as a fix of KAR - 792
  • Loading branch information
Haritest authored Oct 11, 2024
2 parents 34cde59 + 4bae734 commit b4a42e7
Showing 1 changed file with 13 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package org.ekstep.analytics.dashboard.report.zipreports

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession}
import org.ekstep.analytics.framework._
import org.ekstep.analytics.dashboard.DashboardUtil._
import org.ekstep.analytics.dashboard.DataUtil._
import net.lingala.zip4j.ZipFile
import org.apache.commons.io.FileUtils
import net.lingala.zip4j.model.ZipParameters
import net.lingala.zip4j.model.enums.EncryptionMethod
import net.lingala.zip4j.model.enums.CompressionMethod
import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig}
import java.io.{File}
import java.nio.file.{Files, Paths, StandardCopyOption}
Expand All @@ -33,10 +29,6 @@ object ZipReportsWithSecurityModel extends AbsDashboardModel {
*/
def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = {


//fetch the org hierarchy
var (orgNamesDF, userDF, userOrgDF) = getOrgUserDataFrames()
val orgHierarchyDF = getDetailedHierarchy(userOrgDF)
// Start of merging folders

// Define variables for source, destination directories and date.
Expand All @@ -46,17 +38,6 @@ object ZipReportsWithSecurityModel extends AbsDashboardModel {
val specificDate = getDate()
val kcmFolderPath = s"${conf.localReportDir}/${conf.kcmReportPath}/${specificDate}/ContentCompetencyMapping"

def getOrgName(orgID: String, orgDF: DataFrame): String = {
val resultDF = orgDF.filter(col("orgID") === orgID).select("orgName")
if (resultDF.isEmpty) {
// Return a default value if no rows are found
"NotFound"
} else {
resultDF.first().getString(0)
}
}


// Method to traverse all the report folders within the source folder and check for specific date folder
def traverseDirectory(directory: File): Unit = {
// Get the list of files and directories in the current directory
Expand Down Expand Up @@ -122,23 +103,23 @@ object ZipReportsWithSecurityModel extends AbsDashboardModel {

// Start traversing the source directory
traverseDirectory(new File(prefixDirectoryPath))
// End of merging folders

// End of merging folders

// Start of zipping the reports and syncing to blob store
// Define variables for source, blobStorage directories and password.
val password = conf.password
// Traverse through source directory to create individual zip files (mdo-wise)
val mdoidFolders = new File(destinationPath).listFiles().filter(file => file.getName.startsWith("mdoid=0"))
val mdoidFolders = new File(destinationPath).listFiles()
if (mdoidFolders != null) {
mdoidFolders.foreach { mdoidFolder =>
if (mdoidFolder.isDirectory) { // Check if it's a directory
val folderName = mdoidFolder.getName
val orgID = folderName.split("=")(1)
val orgFileName = getOrgName(orgID, orgNamesDF)
val sanitizedOrgFileName = orgFileName.replaceAll("[/\\\\]", "_")
val zipFilePath = s"${mdoidFolder}"
// Create a password-protected zip file for the mdoid folder
val zipFile = new ZipFile(zipFilePath+"/"+sanitizedOrgFileName+".zip")
val zipFile = new ZipFile(zipFilePath+"/reports.zip", password.toCharArray)
val parameters = new ZipParameters()
parameters.setCompressionMethod(CompressionMethod.DEFLATE)
parameters.setEncryptFiles(true)
parameters.setEncryptionMethod(EncryptionMethod.ZIP_STANDARD)
// Add all files within the mdoid folder to the zip file
mdoidFolder.listFiles().foreach { file =>
zipFile.addFile(file, parameters)
Expand All @@ -152,97 +133,17 @@ object ZipReportsWithSecurityModel extends AbsDashboardModel {
}
// Upload the zip file to blob storage
val mdoid = mdoidFolder.getName
println(s"Individual ZIP file created for $mdoid: $zipFilePath")
}
}
} else {
println("No mdoid folders found in the given directory.")
}

// merge the zip files based on hierarchy and then zip the final report
// Method to copy a zip file to the destination directory
def copyZipFile(sourceZipPath: String, destinationZipPath: String): Unit = {
try {
Files.copy(Paths.get(sourceZipPath), Paths.get(destinationZipPath), StandardCopyOption.REPLACE_EXISTING)
} catch {
case e: Exception => println(s"Failed to copy $sourceZipPath: ${e.getMessage}")
}
}

// Method to process each ministryID
def processMinistryFolder(ministryID: String, ids: Array[String], baseDir: String): Unit = {
val ministryDir = new File(s"$baseDir/mdoid=$ministryID")
if (!ministryDir.exists()) {
ministryDir.mkdirs()
}

ids.drop(1) // Drop the first element and process the rest
.filter(id => id != null && id.trim.nonEmpty)
.foreach { id =>
val orgFileName = getOrgName(id, orgNamesDF)
val sanitizedOrgFileName = orgFileName.replaceAll("[/\\\\]", "_")
val sourceZipFilePath = s"$baseDir/mdoid=$id/$sanitizedOrgFileName.zip"
val destinationZipFilePath = s"$ministryDir/$sanitizedOrgFileName.zip"

// Copy the zip file
val sourceFile = new File(sourceZipFilePath)
if (sourceFile.exists()) {
copyZipFile(sourceZipFilePath, destinationZipFilePath)
} else {
println(s"Source zip file $sourceZipFilePath does not exist.")
}
}
}

// Main function to execute the merging process
def mergeMdoidFolders(orgHierarchy: DataFrame, baseDir: String): Unit = {
orgHierarchy.collect().foreach { row =>
val ministryID = row.getAs[String]("ministryID")
val allIDs = row.getAs[String]("allIDs")
val ids = allIDs.split(",").map(_.trim)

if (ids.length > 1) {
// Process the ministry folder based on allIDs
processMinistryFolder(ministryID, ids, baseDir)
}
}
}

// Main execution
mergeMdoidFolders(orgHierarchyDF, destinationPath)
// Start of zipping the reports and syncing to blob store
// Define variables for source, blobStorage directories and password.
val password = conf.password
if (mdoidFolders != null) {
mdoidFolders.foreach { mdoidFolder =>
if (mdoidFolder.isDirectory) { // Check if it's a directory
val zipFilePath = s"${mdoidFolder}"
// Create a password-protected zip file for the mdoid folder
val combinedZipFile = new ZipFile(zipFilePath+"/reports.zip", password.toCharArray)
val parameters = new ZipParameters()
parameters.setEncryptFiles(true)
parameters.setEncryptionMethod(EncryptionMethod.ZIP_STANDARD)
// Add all files within the mdoid folder to the zip file
mdoidFolder.listFiles().foreach { file =>
combinedZipFile.addFile(file, parameters)
}
val mdoid = mdoidFolder.getName
println(s"Hierarchical password protected zip created for $mdoid: $zipFilePath")
mdoidFolder.listFiles().foreach { file =>
if (file.isFile && file.getName != "reports.zip") {
file.delete()
}
}
// Upload the zip file to blob storage
val zipReporthPath = s"${conf.destinationDirectoryPath}/$mdoid"
//sync reports
val zipReporthPath = s"${conf.destinationDirectoryPath}/$mdoid"
syncReports(zipFilePath, zipReporthPath)

println(s"Password-protected ZIP file created and uploaded for $mdoid: $zipFilePath")
}
}
} else {
println("No mdoid folders found in the given directory.")
}

// End of zipping the reports and syncing to blob store
//deleting the tmp merged folder
try {
Expand All @@ -251,11 +152,4 @@ object ZipReportsWithSecurityModel extends AbsDashboardModel {
case e: Exception => println(s"Error deleting directory: ${e.getMessage}")
}
}
}







}

0 comments on commit b4a42e7

Please sign in to comment.