diff --git a/.circleci/config.yml b/.circleci/config.yml index 0762d72e..132f16c1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,8 +1,13 @@ version: 2.1 +environment: jobs: analytics-core-build: machine: image: ubuntu-2004:202008-01 + environment: + CLOUD_STORE_VERSION: "1.4.0" + CLOUD_STORE_ARTIFACT_ID: "cloud-store-sdk_2.12" + CLOUD_STORE_GROUP_ID: "org.sunbird" steps: - checkout - restore_cache: @@ -11,7 +16,7 @@ jobs: name: sunbird-analytics-core-build command: | java -version - mvn scoverage:report + mvn scoverage:report -DCLOUD_STORE_GROUP_ID=$CLOUD_STORE_GROUP_ID -DCLOUD_STORE_ARTIFACT_ID=$CLOUD_STORE_ARTIFACT_ID -DCLOUD_STORE_VERSION=$CLOUD_STORE_VERSION - save_cache: key: dp-dependency-cache-{{ checksum "pom.xml" }} paths: ~/.m2 @@ -24,7 +29,7 @@ jobs: name: sonar command: | export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64 - mvn -X sonar:sonar -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml + mvn -X sonar:sonar -DCLOUD_STORE_GROUP_ID=$CLOUD_STORE_GROUP_ID -DCLOUD_STORE_ARTIFACT_ID=$CLOUD_STORE_ARTIFACT_ID -DCLOUD_STORE_VERSION=$CLOUD_STORE_VERSION -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml workflows: version: 2.1 diff --git a/Jenkinsfile b/Jenkinsfile index d79bf756..bf656fd5 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -30,7 +30,7 @@ node('build-slave') { export JAVA_HOME=/usr/lib/jvm/jdk-11.0.2 export PATH=$JAVA_HOME/bin:$PATH echo $(java -version) - mvn clean install -DskipTests + mvn clean install -DskipTests -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 ''' } stage('Archive artifacts'){ diff --git a/analytics-core/pom.xml b/analytics-core/pom.xml index 975f912e..bbb823f2 100644 --- a/analytics-core/pom.xml +++ b/analytics-core/pom.xml @@ -210,9 +210,12 @@ --> - org.sunbird - cloud-store-sdk_${scala.maj.version} - 1.4.0 + + + + ${CLOUD_STORE_GROUP_ID} + ${CLOUD_STORE_ARTIFACT_ID} + ${CLOUD_STORE_VERSION} com.microsoft.azure diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala new file mode 100644 index 00000000..9218e992 --- /dev/null +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -0,0 +1,69 @@ +package org.ekstep.analytics.framework.util +import org.apache.spark.SparkContext +import org.sunbird.cloud.storage.conf.AppConf + +trait ICloudStorageProvider { + def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit +} + + + +object CloudStorageProviders { + implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider" + private val providerMap: Map[String, ICloudStorageProvider] = Map("s3" -> S3Provider, "azure" -> AzureProvider, "gcp" -> GcpProvider, "oci" -> OCIProvider) + def setSparkCSPConfigurations(sc: SparkContext, csp: String, storageKey: Option[String], storageSecret: Option[String]): Unit = { + providerMap.get(csp.toLowerCase()).foreach { provider => + provider.setConf(sc, storageKey, storageSecret) + } + } +} +object S3Provider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { + JobLogger.log("Configuring S3 Access Key & Secret Key to SparkContext") + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsKey()) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsSecret()) + sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key) + sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret) + val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint") + if (storageEndpoint.nonEmpty) { + sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint) + } + } +} + +object AzureProvider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider" + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { + JobLogger.log("Configuring Azure Access Key & Secret Key to SparkContext") + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("azure")) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("azure")) + sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") + sc.hadoopConfiguration.set("fs.azure.account.key." + key + ".blob.core.windows.net", secret) + sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + key + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") + } +} +object GcpProvider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider" + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { + JobLogger.log("Configuring GCP Access Key & Secret Key to SparkContext") + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("gcloud")) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("gcloud")) + sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") + sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") + sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", key) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", secret) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id")) + } +} + +object OCIProvider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.OCIProvider" + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("oci")) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("oci")) + JobLogger.log("Configuring OCI Access Key & Secret Key to SparkContext") + sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key); + sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret); + } +} \ No newline at end of file diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 7f182664..569cd7da 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -8,13 +8,13 @@ import java.security.MessageDigest import java.sql.Timestamp import java.util.zip.GZIPOutputStream import java.util.{Date, Properties} - import com.ing.wbaa.druid.definitions.{Granularity, GranularityType} import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.ekstep.analytics.framework.Level._ import org.ekstep.analytics.framework.Period._ +import org.ekstep.analytics.framework.util.CloudStorageProviders.setSparkCSPConfigurations import org.ekstep.analytics.framework.{DtRange, Event, JobConfig, _} import scala.collection.mutable.ListBuffer @@ -91,9 +91,7 @@ object CommonUtil { } val sc = new SparkContext(conf) - setS3Conf(sc) - setAzureConf(sc) - setGcloudConf(sc) + setSparkCSPConfigurations(sc, AppConf.getConfig("cloud_storage_type"), None, None) JobLogger.log("Spark Context initialized") sc } @@ -144,40 +142,10 @@ object CommonUtil { } val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate() - setS3Conf(sparkSession.sparkContext) - setAzureConf(sparkSession.sparkContext) - setGcloudConf(sparkSession.sparkContext) + setSparkCSPConfigurations(sparkSession.sparkContext, AppConf.getConfig("cloud_storage_type"), None, None) JobLogger.log("SparkSession initialized") sparkSession } - - def setS3Conf(sc: SparkContext) = { - JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") - sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey()); - sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret()); - - val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint") - if (!"".equalsIgnoreCase(storageEndpoint)) { - sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint) - } - } - - def setAzureConf(sc: SparkContext) = { - val accName = AppConf.getStorageKey("azure") - val accKey = AppConf.getStorageSecret("azure") - sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") - sc.hadoopConfiguration.set("fs.azure.account.key." + accName + ".blob.core.windows.net", accKey) - sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") - } - - def setGcloudConf(sc: SparkContext) = { - sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") - sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") - sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud")) - sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud")) - sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id")) - } - def closeSparkContext()(implicit sc: SparkContext) { JobLogger.log("Closing Spark Context", None, INFO) sc.stop(); diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala index af78a5ce..abfa767e 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala @@ -93,14 +93,23 @@ class TestDatasetUtil extends BaseSpec with Matchers with MockFactory { val rdd = sparkSession.sparkContext.parallelize(Seq(EnvSummary("env1", 22.1, 3), EnvSummary("env2", 20.1, 3), EnvSummary("env1", 32.1, 4)), 1); import sparkSession.implicits._ val df = sparkSession.createDataFrame(rdd); - a[AzureException] should be thrownBy { - df.saveToBlobStore(StorageConfig("azure", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))); + val azureException = intercept[Throwable] { + df.saveToBlobStore(StorageConfig("azure", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))); } - - a[S3Exception] should be thrownBy { - df.saveToBlobStore(StorageConfig("s3", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))); + val s3Exception = intercept[Throwable] { + df.saveToBlobStore(StorageConfig("s3", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))) + } + handleException(azureException) + handleException(s3Exception) + def handleException(caughtException: Throwable): Unit = { + caughtException match { + case s3Exception: S3Exception => println("S3 Exception occurred") + case azureException: AzureException => println("Azure Exception occurred") + case illegalArgumentException: IllegalArgumentException => println("CSP Configurations are not found") + case _ => + fail("Unexpected exception type thrown") + } } - sparkSession.stop(); } diff --git a/auto_build_deploy b/auto_build_deploy index 71baf4e3..ea666f3e 100644 --- a/auto_build_deploy +++ b/auto_build_deploy @@ -27,7 +27,7 @@ node('build-slave') { export JAVA_HOME=/usr/lib/jvm/jdk-11.0.2 export PATH=$JAVA_HOME/bin:$PATH echo $(java -version) - mvn clean install -DskipTests + mvn clean install -DskipTests -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 ''' } stage('Archive artifacts'){