diff --git a/adhoc-scripts/pom.xml b/adhoc-scripts/pom.xml index 62372e31d..cecd81577 100644 --- a/adhoc-scripts/pom.xml +++ b/adhoc-scripts/pom.xml @@ -43,7 +43,7 @@ jets3t - net.java.dev.jets3t + org.jets3t org.slf4j @@ -81,9 +81,9 @@ provided - net.java.dev.jets3t + org.jets3t jets3t - 0.9.4 + 0.9.7 provided diff --git a/data-products/pom.xml b/data-products/pom.xml index 2f94cf092..442ed0d27 100644 --- a/data-products/pom.xml +++ b/data-products/pom.xml @@ -12,8 +12,8 @@ 1.4.11 2.12 2.12.10 - 3.0 - 3.1.0 + 3.2 + 3.2.1 @@ -27,7 +27,8 @@ jcenter-repo Jcenter Repo https://jcenter.bintray.com/ - + + @@ -90,7 +91,7 @@ jets3t - net.java.dev.jets3t + org.jets3t org.slf4j @@ -135,9 +136,9 @@ provided - net.java.dev.jets3t + org.jets3t jets3t - 0.9.4 + 0.9.7 provided @@ -172,8 +173,8 @@ --> com.datastax.spark - spark-cassandra-connector_${scala.maj.version} - 3.1.0 + spark-cassandra-connector-assembly_${scala.maj.version} + 3.2.0 @@ -281,7 +282,7 @@ org.sunbird cloud-store-sdk_2.12 - 1.3.0 + 1.4.6 com.microsoft.azure @@ -345,7 +346,13 @@ 0.7.1 test - + + + org.apache.commons + commons-pool2 + 2.0 + + src/main/scala src/test/scala diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala index 45434a051..055202ab9 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala @@ -50,8 +50,9 @@ trait BaseReportsJob { val store = modelParams.getOrElse("store", "local").asInstanceOf[String]; val storageKey = modelParams.getOrElse("storageKeyConfig", "reports_storage_key").asInstanceOf[String]; val storageSecret = modelParams.getOrElse("storageSecretConfig", "reports_storage_secret").asInstanceOf[String]; + val storageEndpoint = modelParams.getOrElse("storageEndpoint", "reports_storage_endpoint").asInstanceOf[String]; store.toLowerCase() match { - case "s3" => + case "s3" | "oci" => spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getConfig(storageKey)); spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getConfig(storageSecret)); case "azure" => diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/OnDemandExhaustJob.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/OnDemandExhaustJob.scala index da5a0b3cb..2bf47835a 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/exhaust/OnDemandExhaustJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/OnDemandExhaustJob.scala @@ -178,7 +178,7 @@ trait OnDemandExhaustJob { fc.getHadoopFileUtil().delete(conf, tempDir); val filePrefix = storageConfig.store.toLowerCase() match { // $COVERAGE-OFF$ Disabling scoverage - case "s3" => + case "s3" | "oci" => CommonUtil.getS3File(storageConfig.container, ""); case "azure" => CommonUtil.getAzureFile(storageConfig.container, "", storageConfig.accountKey.getOrElse("azure_storage_key")) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala index 6f0e595df..e7d9978c0 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/StateAdminGeoReportJob.scala @@ -45,7 +45,6 @@ object StateAdminGeoReportJob extends IJob with StateAdminReportHelper { val container = AppConf.getConfig("cloud.container.reports") val objectKey = AppConf.getConfig("admin.metrics.cloud.objectKey") val storageConfig = getStorageConfig(container, objectKey) - val organisationDF: DataFrame = loadOrganisationSlugDF() val subOrgDF: DataFrame = generateSubOrgData(organisationDF) val blockData:DataFrame = generateBlockLevelData(subOrgDF) diff --git a/data-products/src/main/scala/org/sunbird/analytics/model/report/ETBMetricsModel.scala b/data-products/src/main/scala/org/sunbird/analytics/model/report/ETBMetricsModel.scala index a1ca4a742..eba016e87 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/model/report/ETBMetricsModel.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/model/report/ETBMetricsModel.scala @@ -227,6 +227,10 @@ object ETBMetricsModel extends IBatchModelTemplate[Empty,Empty,FinalOutput,Final val key = AppConf.getConfig("azure_storage_key") val file = conf("file") s"wasb://$bucket@$key.blob.core.windows.net/$file" + case "oci" => + val bucket = conf("bucket") + val file = conf("file") + s"s3n://$bucket/$file" } val scansCount = sparkSession.read diff --git a/data-products/src/main/scala/org/sunbird/analytics/sourcing/FunnelReport.scala b/data-products/src/main/scala/org/sunbird/analytics/sourcing/FunnelReport.scala index d3aa56a91..e1b809fa4 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/sourcing/FunnelReport.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/sourcing/FunnelReport.scala @@ -122,7 +122,10 @@ object FunnelReport extends IJob with BaseReportsJob { .drop("channel","id","program_id") .persist(StorageLevel.MEMORY_ONLY) - val storageConfig = getStorageConfig("reports", "") + // support to other CSP and report container should be read from configuration + val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]()); + val container = modelParams.getOrElse("storageContainer", "reports").asInstanceOf[String] + val storageConfig = getStorageConfig(container, "") saveReportToBlob(funnelReport, configMap, storageConfig, "FunnelReport") funnelReport.unpersist(true) diff --git a/etl-jobs/pom.xml b/etl-jobs/pom.xml index 6633d8803..339702d03 100644 --- a/etl-jobs/pom.xml +++ b/etl-jobs/pom.xml @@ -24,7 +24,7 @@ jets3t - net.java.dev.jets3t + org.jets3t org.apache.xbean diff --git a/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/ESCloudUploader.scala b/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/ESCloudUploader.scala index 17e6a2bd8..37f27a228 100644 --- a/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/ESCloudUploader.scala +++ b/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/ESCloudUploader.scala @@ -46,7 +46,7 @@ object ESCloudUploader { .saveAsTextFile(outputFilePath) // backup the output file to cloud - val storageService = StorageServiceFactory.getStorageService(StorageConfig(config.getString("cloudStorage.provider"), config.getString("cloudStorage.accountName"), config.getString("cloudStorage.accountKey"))) + val storageService = StorageServiceFactory.getStorageService(StorageConfig(config.getString("cloudStorage.provider"), config.getString("cloudStorage.accountName"), config.getString("cloudStorage.accountKey"),Option(config.getString("cloudStorage.accountEndpoint")),Option(""))) storageService.upload(config.getString("cloudStorage.container"), outputFilePath + "/part-00000", config.getString("cloudStorage.objectKey"), isDirectory = Option(false)) println("successfully backed up file to cloud!") System.exit(0)