Skip to content

Commit

Permalink
Create S3 storage on project creation during nexus ship import (#4870)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Apr 17, 2024
1 parent 481c5a6 commit 26d0fc5
Show file tree
Hide file tree
Showing 17 changed files with 337 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
}

many[ScopeInitialization].addSet { (storages: Storages, serviceAccount: ServiceAccount, cfg: StoragePluginConfig) =>
Option.when(cfg.enableDefaultCreation)(new StorageScopeInitialization(storages, serviceAccount, cfg.defaults)).toSet
Option.when(cfg.enableDefaultCreation)(StorageScopeInitialization(storages, serviceAccount, cfg.defaults)).toSet
}

many[ProjectDeletionTask].add { (storages: Storages) => StorageDeletionTask(storages) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageFields.DiskStorageFields
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageFields
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageFields.{DiskStorageFields, S3StorageFields}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.ResourceAlreadyExists
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{defaultStorageId, Storages}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{defaultS3StorageId, defaultStorageId, Storages}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.ScopeInitializationFailed
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.{Caller, ServiceAccount}
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
Expand All @@ -20,40 +22,32 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Identity, Proje
* the storages module
* @param serviceAccount
* the subject that will be recorded when performing the initialization
* @param defaults
* default name and description for the storage
* @param defaultStorageId
* the id to use for the default storage to be created
* @param defaultStorageFields
* the default value for the storage fields
*/
class StorageScopeInitialization(
storages: Storages,
serviceAccount: ServiceAccount,
defaults: Defaults
defaultStorageId: Iri,
defaultStorageFields: StorageFields
) extends ScopeInitialization {

private val logger = Logger[StorageScopeInitialization]
implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent(Storages.entityType.value)

implicit private val caller: Caller = serviceAccount.caller

private lazy val defaultValue: DiskStorageFields = DiskStorageFields(
name = Some(defaults.name),
description = Some(defaults.description),
default = true,
volume = None,
readPermission = None,
writePermission = None,
capacity = None,
maxFileSize = None
)

override def onProjectCreation(project: ProjectRef, subject: Identity.Subject): IO[Unit] =
storages
.create(defaultStorageId, project, defaultValue)
.create(defaultStorageId, project, defaultStorageFields)
.void
.handleErrorWith {
case _: ResourceAlreadyExists => IO.unit // nothing to do, storage already exits
case rej =>
val str =
s"Failed to create the default DiskStorage for project '$project' due to '${rej.getMessage}'."
s"Failed to create the default ${defaultStorageFields.tpe} for project '$project' due to '${rej.getMessage}'."
logger.error(str) >> IO.raiseError(ScopeInitializationFailed(str))
}
.span("createDefaultStorage")
Expand All @@ -69,13 +63,31 @@ class StorageScopeInitialization(
object StorageScopeInitialization {

/**
* Conditionnally create a [[StorageScopeInitialization]]
* Creates a [[StorageScopeInitialization]] that creates a default DiskStorage with the provided default
* name/description
*/
def apply(storages: Storages, serviceAccount: ServiceAccount, defaults: Defaults): StorageScopeInitialization = {
val defaultFields: DiskStorageFields = DiskStorageFields(
name = Some(defaults.name),
description = Some(defaults.description),
default = true,
volume = None,
readPermission = None,
writePermission = None,
capacity = None,
maxFileSize = None
)
new StorageScopeInitialization(storages, serviceAccount, defaultStorageId, defaultFields)
}

/**
* Creates a [[StorageScopeInitialization]] that creates a default S3Storage with the provided default fields
*/
def when(enabled: Boolean)(
storages: Storages,
def s3(
storage: Storages,
serviceAccount: ServiceAccount,
defaults: Defaults
): Option[StorageScopeInitialization] =
Option.when(enabled)(new StorageScopeInitialization(storages, serviceAccount, defaults))
defaultFields: S3StorageFields
): StorageScopeInitialization =
new StorageScopeInitialization(storage, serviceAccount, defaultS3StorageId, defaultFields)

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ package object storages {
*/
final val defaultStorageId = nxv + "diskStorageDefault"

/**
* The id for the default S3 storage
*/
final val defaultS3StorageId = nxv + "defaultS3Storage"

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StorageScopeInitializationSpec
).accepted

val defaults = Defaults("defaultName", "defaultDescription")
lazy val init = new StorageScopeInitialization(storages, sa, defaults)
lazy val init = StorageScopeInitialization(storages, sa, defaults)

"create a default storage on newly created project" in {
storages.fetch(nxv + "diskStorageDefault", project.ref).rejectedWith[StorageNotFound]
Expand Down
82 changes: 81 additions & 1 deletion ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ship {
endpoint = "https://s3.us-east-1.amazonaws.com"

# the bucket which contains the import files
import-bucket = "nexus-ship-import"
import-bucket = "nexus-ship-production"
}

input {
Expand Down Expand Up @@ -73,5 +73,85 @@ ship {
subject: "delta"
realm: "internal"
}

# The bucket to which the files will be copied by the Nexus Ship
target-bucket = "nexus-delta-production"

storages {

# S3 compatible storage configuration
amazon {
# to enable s3 storage
enabled = true
# the default digest algorithm
digest-algorithm = "SHA-256"
# the default endpoint of the current storage
default-endpoint = "https://s3.us-east-1.amazonaws.com"
# the access key for the default endpoint
default-access-key = "my-key"
# the secret key for the default endpoint
default-secret-key = "my-secret-key"
# the default permission required in order to download a file from an S3 storage
default-read-permission = "resources/read"
# the default permission required in order to upload a file to a S3 storage
default-write-permission = "files/write"
# flag to decide whether or not to show the absolute location of the files in the metadata response
show-location = true
# the default maximum allowed file size (in bytes) for uploaded files. 10 GB
default-max-file-size = 10737418240
}

# Disk storage configuration
disk {
# the base path where the files are stored
default-volume = "/tmp"
# the allowed set of paths where the files are stored
allowed-volumes = ["/tmp"]
# algorithm for checksum calculation
digest-algorithm = "SHA-256"
# the default permission required in order to download a file from a disk storage
default-read-permission = "resources/read"
# the default permission required in order to upload a file to a disk storage
default-write-permission = "files/write"
# flag to decide whether or not to show the absolute location of the files in the metadata response
show-location = false
# the default capacity for storages (in bytes), by default no limit is defined
default-capacity = null
# the default maximum allowed file size (in bytes) for uploaded files. 10 GB
default-max-file-size = 10737418240
}

remote-disk {
# to enable remote storage
enabled = false
# the default endpoint
default-endpoint = "http://localhost:8084/v1"
# the default credentials for the endpoint
credentials {
type: "anonymous"
}
# the default digest algorithm
digest-algorithm = "SHA-256"
# the default permission required in order to download a file from a remote disk storage
default-read-permission = "resources/read"
# the default permission required in order to upload a file to a remote disk storage
default-write-permission = "files/write"
# flag to decide whether or not to show the absolute location of the files in the metadata response
show-location = true
# the default maximum allowed file size (in bytes) for uploaded files. 10 GB
default-max-file-size = 10737418240
# Retry delay for digest computation
digest-computation-retry-delay = 5s
}

# the storages event log configuration
event-log = ${ship.input.event-log}
pagination {
default-size = 30
size-limit = 1000
from-limit = 10000
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts => bgContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{contexts => compositeViewContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts => esContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts => storageContext}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{contexts => fileContext}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand Down Expand Up @@ -55,6 +57,10 @@ object ContextWiring {
// Composite views
compositeCtx <- ContextValue.fromFile("contexts/composite-views.json")
compositeMetaCtx <- ContextValue.fromFile("contexts/composite-views-metadata.json")
// Storages
storageCtx <- ContextValue.fromFile("contexts/storages.json")
storageMetaCtx <- ContextValue.fromFile("contexts/storages-metadata.json")
fileCtx <- ContextValue.fromFile("contexts/files.json")
} yield RemoteContextResolution.fixed(
// Delta
contexts.error -> errorCtx,
Expand Down Expand Up @@ -87,7 +93,11 @@ object ContextWiring {
bgContexts.blazegraphMetadata -> blazegraphMetaCtx,
// Composite views
compositeViewContexts.compositeViews -> compositeCtx,
compositeViewContexts.compositeViewsMetadata -> compositeMetaCtx
compositeViewContexts.compositeViewsMetadata -> compositeMetaCtx,
// Storages and files
storageContext.storages -> storageCtx,
storageContext.storagesMetadata -> storageMetaCtx,
fileContext.files -> fileCtx
)

def resolverContextResolution(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package ch.epfl.bluebrain.nexus.ship.config

import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.InputConfig.ProjectMapping
import eu.timepit.refined.collection.NonEmpty
import eu.timepit.refined.refineV
import fs2.aws.s3.models.Models.BucketName
import pureconfig.ConfigReader
import pureconfig.configurable.genericMapReader
import pureconfig.error.CannotConvert
import pureconfig.error.{CannotConvert, FailureReason}
import pureconfig.generic.semiauto.deriveReader

final case class InputConfig(
Expand All @@ -16,7 +20,9 @@ final case class InputConfig(
organizations: OrganizationCreationConfig,
projectMapping: ProjectMapping = Map.empty,
viewDefaults: ViewDefaults,
serviceAccount: ServiceAccountConfig
serviceAccount: ServiceAccountConfig,
storages: StoragesConfig,
targetBucket: BucketName
)

object InputConfig {
Expand All @@ -28,5 +34,13 @@ object InputConfig {
ProjectRef.parse(str).leftMap(e => CannotConvert(str, classOf[ProjectRef].getSimpleName, e))
)

private val emptyBucketName = new FailureReason {
override def description: String = "The s3 bucket name cannot be empty"
}

implicit val bucketNameReader: ConfigReader[BucketName] =
ConfigReader[String]
.emap(str => refineV[NonEmpty](str).leftMap(_ => emptyBucketName).map(BucketName.apply))

implicit final val runConfigReader: ConfigReader[InputConfig] = deriveReader[InputConfig]
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
package ch.epfl.bluebrain.nexus.ship.config

import cats.implicits.toBifunctorOps
import eu.timepit.refined.collection.NonEmpty
import eu.timepit.refined.refineV
import fs2.aws.s3.models.Models.BucketName
import pureconfig.ConfigReader
import pureconfig.error.FailureReason
import pureconfig.generic.semiauto.deriveReader

import java.net.URI
import scala.annotation.nowarn

final case class S3Config(endpoint: URI, importBucket: BucketName)

object S3Config {

@nowarn("cat=unused")
implicit final val s3ConfigReader: ConfigReader[S3Config] = {
val emptyBucketName = new FailureReason {
override def description: String = "The s3 bucket name cannot be empty"
}
implicit val bucketNameReader: ConfigReader[BucketName] =
ConfigReader[String]
.emap(str => refineV[NonEmpty](str).leftMap(_ => emptyBucketName).map(BucketName.apply))
implicit final val bucketNameReader: ConfigReader[BucketName] =
InputConfig.bucketNameReader

implicit final val s3ConfigReader: ConfigReader[S3Config] =
deriveReader[S3Config]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import ch.epfl.bluebrain.nexus.ship._
import ch.epfl.bluebrain.nexus.ship.config.InputConfig
import ch.epfl.bluebrain.nexus.ship.error.ShipError.ProjectDeletionIsNotAllowed
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor.logger
import ch.epfl.bluebrain.nexus.ship.views.ViewWiring
import io.circe.Decoder

final class ProjectProcessor private (
Expand Down Expand Up @@ -87,7 +86,7 @@ object ProjectProcessor {
): IO[ProjectProcessor] =
for {
uuidF <- EventUUIDF.init()
initializer <- ViewWiring.viewInitializer(fetchContext, rcr, config, clock, xas)
initializer <- ScopeInitializerWiring.initializer(fetchContext, rcr, config, clock, xas)
} yield {
val disableDeletion: ValidateProjectDeletion = (p: ProjectRef) => IO.raiseError(ProjectDeletionIsNotAllowed(p))
val projects = ProjectsImpl(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ch.epfl.bluebrain.nexus.ship.projects

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.sdk.ScopeInitializer
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, ScopeInitializationErrorStore}
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.ship.EventClock
import ch.epfl.bluebrain.nexus.ship.config.InputConfig
import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring
import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring.s3StorageInitializer
import ch.epfl.bluebrain.nexus.ship.views.ViewWiring.{blazegraphViews, elasticSearchViews, viewInitializers}

object ScopeInitializerWiring {

def initializer(
fetchContext: FetchContext,
rcr: ResolverContextResolution,
config: InputConfig,
clock: EventClock,
xas: Transactors
)(implicit jsonLdApi: JsonLdApi): IO[ScopeInitializer] =
for {
esViews <- elasticSearchViews(fetchContext, rcr, config.eventLog, clock, UUIDF.random, xas)
bgViews <- blazegraphViews(fetchContext, rcr, config.eventLog, clock, UUIDF.random, xas)
storages <- StorageWiring.storages(fetchContext, rcr, config, clock, xas)
storageInit <- s3StorageInitializer(storages, config)
allInits = viewInitializers(esViews, bgViews, config) + storageInit
errorStore = ScopeInitializationErrorStore(xas, clock)
} yield ScopeInitializer(allInits, errorStore)

}
Loading

0 comments on commit 26d0fc5

Please sign in to comment.