Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OB-525 - CSP Changes #160

Closed
wants to merge 11 commits into from
9 changes: 7 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
version: 2.1
environment:
jobs:
analytics-core-build:
machine:
image: ubuntu-2004:202008-01
environment:
CLOUD_STORE_VERSION: "1.4.0"
CLOUD_STORE_ARTIFACT_ID: "cloud-store-sdk_2.12"
CLOUD_STORE_GROUP_ID: "org.sunbird"
steps:
- checkout
- restore_cache:
Expand All @@ -11,7 +16,7 @@ jobs:
name: sunbird-analytics-core-build
command: |
java -version
mvn scoverage:report
mvn scoverage:report -DCLOUD_STORE_GROUP_ID=$CLOUD_STORE_GROUP_ID -DCLOUD_STORE_ARTIFACT_ID=$CLOUD_STORE_ARTIFACT_ID -DCLOUD_STORE_VERSION=$CLOUD_STORE_VERSION
- save_cache:
key: dp-dependency-cache-{{ checksum "pom.xml" }}
paths: ~/.m2
Expand All @@ -24,7 +29,7 @@ jobs:
name: sonar
command: |
export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64
mvn -X sonar:sonar -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml
mvn -X sonar:sonar -DCLOUD_STORE_GROUP_ID=$CLOUD_STORE_GROUP_ID -DCLOUD_STORE_ARTIFACT_ID=$CLOUD_STORE_ARTIFACT_ID -DCLOUD_STORE_VERSION=$CLOUD_STORE_VERSION -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml

workflows:
version: 2.1
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ node('build-slave') {
export JAVA_HOME=/usr/lib/jvm/jdk-11.0.2
export PATH=$JAVA_HOME/bin:$PATH
echo $(java -version)
mvn clean install -DskipTests
mvn clean install -DskipTests -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0
'''
}
stage('Archive artifacts'){
Expand Down
9 changes: 6 additions & 3 deletions analytics-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,12 @@
</dependency> -->
<!-- https://mvnrepository.com/artifact/org.sunbird/cloud-store-sdk -->
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>cloud-store-sdk_${scala.maj.version}</artifactId>
<version>1.4.0</version>
<!-- <groupId>org.sunbird</groupId>-->
<!-- <artifactId>cloud-store-sdk_${scala.maj.version}</artifactId>-->
<!-- <version>1.4.0</version>-->
<groupId>${CLOUD_STORE_GROUP_ID}</groupId>
<artifactId>${CLOUD_STORE_ARTIFACT_ID}</artifactId>
<version>${CLOUD_STORE_VERSION}</version>
<exclusions>
<exclusion>
<groupId>com.microsoft.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.ekstep.analytics.framework.util
import org.apache.spark.SparkContext
import org.sunbird.cloud.storage.conf.AppConf

trait ICloudStorageProvider {
def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit
}



object CloudStorageProviders {
implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider"
private val providerMap: Map[String, ICloudStorageProvider] = Map("s3" -> S3Provider, "azure" -> AzureProvider, "gcp" -> GcpProvider, "oci" -> OCIProvider)
def setSparkCSPConfigurations(sc: SparkContext, csp: String, storageKey: Option[String], storageSecret: Option[String]): Unit = {
providerMap.get(csp.toLowerCase()).foreach { provider =>
provider.setConf(sc, storageKey, storageSecret)
}
}
}
object S3Provider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
JobLogger.log("Configuring S3 Access Key & Secret Key to SparkContext")
val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsKey())
val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsSecret())
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret)
val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint")
if (storageEndpoint.nonEmpty) {
sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint)
}
}
}

object AzureProvider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
JobLogger.log("Configuring Azure Access Key & Secret Key to SparkContext")
val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("azure"))
val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("azure"))
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.azure.account.key." + key + ".blob.core.windows.net", secret)
sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + key + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider")
}
}
object GcpProvider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
JobLogger.log("Configuring GCP Access Key & Secret Key to SparkContext")
val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("gcloud"))
val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("gcloud"))
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", key)
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", secret)
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id"))
}
}

object OCIProvider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.OCIProvider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("oci"))
val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("oci"))
JobLogger.log("Configuring OCI Access Key & Secret Key to SparkContext")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key);
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import java.security.MessageDigest
import java.sql.Timestamp
import java.util.zip.GZIPOutputStream
import java.util.{Date, Properties}

import com.ing.wbaa.druid.definitions.{Granularity, GranularityType}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.ekstep.analytics.framework.Level._
import org.ekstep.analytics.framework.Period._
import org.ekstep.analytics.framework.util.CloudStorageProviders.setSparkCSPConfigurations
import org.ekstep.analytics.framework.{DtRange, Event, JobConfig, _}

import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -91,9 +91,7 @@ object CommonUtil {
}

val sc = new SparkContext(conf)
setS3Conf(sc)
setAzureConf(sc)
setGcloudConf(sc)
setSparkCSPConfigurations(sc, AppConf.getConfig("cloud_storage_type"), None, None)
JobLogger.log("Spark Context initialized")
sc
}
Expand Down Expand Up @@ -144,40 +142,10 @@ object CommonUtil {
}

val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate()
setS3Conf(sparkSession.sparkContext)
setAzureConf(sparkSession.sparkContext)
setGcloudConf(sparkSession.sparkContext)
setSparkCSPConfigurations(sparkSession.sparkContext, AppConf.getConfig("cloud_storage_type"), None, None)
JobLogger.log("SparkSession initialized")
sparkSession
}

def setS3Conf(sc: SparkContext) = {
JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey());
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret());

val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint")
if (!"".equalsIgnoreCase(storageEndpoint)) {
sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint)
}
}

def setAzureConf(sc: SparkContext) = {
val accName = AppConf.getStorageKey("azure")
val accKey = AppConf.getStorageSecret("azure")
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.azure.account.key." + accName + ".blob.core.windows.net", accKey)
sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider")
}

def setGcloudConf(sc: SparkContext) = {
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id"))
}

def closeSparkContext()(implicit sc: SparkContext) {
JobLogger.log("Closing Spark Context", None, INFO)
sc.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,23 @@ class TestDatasetUtil extends BaseSpec with Matchers with MockFactory {
val rdd = sparkSession.sparkContext.parallelize(Seq(EnvSummary("env1", 22.1, 3), EnvSummary("env2", 20.1, 3), EnvSummary("env1", 32.1, 4)), 1);
import sparkSession.implicits._
val df = sparkSession.createDataFrame(rdd);
a[AzureException] should be thrownBy {
df.saveToBlobStore(StorageConfig("azure", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env")));
val azureException = intercept[Throwable] {
df.saveToBlobStore(StorageConfig("azure", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env")));
}

a[S3Exception] should be thrownBy {
df.saveToBlobStore(StorageConfig("s3", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env")));
val s3Exception = intercept[Throwable] {
df.saveToBlobStore(StorageConfig("s3", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env")))
}
handleException(azureException)
handleException(s3Exception)
def handleException(caughtException: Throwable): Unit = {
caughtException match {
case s3Exception: S3Exception => println("S3 Exception occurred")
case azureException: AzureException => println("Azure Exception occurred")
case illegalArgumentException: IllegalArgumentException => println("CSP Configurations are not found")
case _ =>
fail("Unexpected exception type thrown")
}
}

sparkSession.stop();
}

Expand Down
2 changes: 1 addition & 1 deletion auto_build_deploy
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ node('build-slave') {
export JAVA_HOME=/usr/lib/jvm/jdk-11.0.2
export PATH=$JAVA_HOME/bin:$PATH
echo $(java -version)
mvn clean install -DskipTests
mvn clean install -DskipTests -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0
'''
}
stage('Archive artifacts'){
Expand Down