Skip to content

Commit

Permalink
Azure blob functionality (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Dec 10, 2024
1 parent e570981 commit 29023d6
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 11 deletions.
6 changes: 6 additions & 0 deletions framework/arcane-framework/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ lazy val root = (project in file("."))
// https://mvnrepository.com/artifact/io.trino/trino-jdbc
libraryDependencies += "io.trino" % "trino-jdbc" % "465",

// Azure dependencies
// https://mvnrepository.com/artifact/com.azure/azure-storage-blob
libraryDependencies += "com.azure" % "azure-storage-blob" % "12.29.0",
// https://mvnrepository.com/artifact/com.azure/azure-identity
libraryDependencies += "com.azure" % "azure-identity" % "1.14.2",


// Test dependencies
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.19" % Test,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sneaksanddata.arcane.framework
package services.storage.base

import services.storage.models.base.BlobPath
import services.storage.models.base.{BlobPath, StoredBlob}

import scala.concurrent.Future

Expand All @@ -20,3 +20,5 @@ trait BlobStorageReader[PathType <: BlobPath]:
* @return The result of applying the function to the content of the blob.
*/
def getBlobContent[Result](blobPath: PathType, deserializer: Array[Byte] => Result): Future[Result]

def listBlobs(blobPath: PathType): LazyList[StoredBlob]
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ final case class AmazonS3StoragePath(bucket: String, objectKey: String) extends
override def toHdfsPath = s"s3a://$bucket/$objectKey"

/**
* Joins the given key name to the current path.
*
* @param keyName The key name to join.
* @return The new path.
*/
* Joins the given key name to the current path.
*
* @param keyName The key name to join.
* @return The new path.
*/
@targetName("plus")
def +(keyName: String) = new AmazonS3StoragePath(bucket, if (objectKey.isEmpty) keyName else s"$objectKey/$keyName")
def +(keyName: String): AmazonS3StoragePath = copy(objectKey = if (objectKey.isEmpty) keyName else s"$objectKey/$keyName")

/**
* Companion object for [[AmazonS3StoragePath]].
Expand All @@ -43,10 +43,7 @@ object AmazonS3StoragePath {
* @param hdfsPath The HDFS path.
* @return The [[AmazonS3StoragePath]].
*/
def apply(hdfsPath: String): Try[AmazonS3StoragePath] =
val r: Regex = AmazonS3StoragePath.matchRegex.r
val m = r.findFirstMatchIn(hdfsPath)
m match {
def apply(hdfsPath: String): Try[AmazonS3StoragePath] = matchRegex.r.findFirstMatchIn(hdfsPath) match {
case Some(matched) => Success(new AmazonS3StoragePath(matched.group(1), matched.group(2).stripSuffix("/")))
case None => Failure(IllegalArgumentException(s"An AmazonS3StoragePath must be in the format s3a://bucket/path, but was: $hdfsPath"))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.sneaksanddata.arcane.framework
package services.storage.models.azure

import services.storage.models.base.BlobPath

import scala.annotation.targetName
import scala.util.{Failure, Success, Try}
import scala.util.matching.Regex

final case class AdlsStoragePath(accountName: String, container: String, blobPrefix: String) extends BlobPath:
def toHdfsPath: String = s"abfss://$container@$accountName.dfs.core.windows.net/$blobPrefix"

/**
* Joins the given key name to the current path.
*
* @param part Blob prefix part to join
* @return The new path.
*/
@targetName("plus")
def +(part: String): AdlsStoragePath = copy(blobPrefix = if (blobPrefix.isEmpty) part else s"$blobPrefix/$part")

object AdlsStoragePath:
private val matchRegex: String = "^abfss:\\/\\/([^@]+)@([^\\.]+)\\.dfs\\.core\\.windows\\.net\\/(.*)$"

def apply(hdfsPath: String): Try[AdlsStoragePath] = matchRegex.r.findFirstMatchIn(hdfsPath) match {
case Some(matched) => Success(new AdlsStoragePath(matched.group(2), matched.group(1), matched.group(3).stripSuffix("/")))
case None => Failure(IllegalArgumentException(s"An AdlsStoragePath must be in the format abfss://container@account.dfs.core.windows.net/path/to/file, but was: $hdfsPath"))
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.sneaksanddata.arcane.framework
package services.storage.models.azure

import services.storage.base.BlobStorageReader

import com.azure.identity.{DefaultAzureCredential, DefaultAzureCredentialBuilder}
import com.azure.storage.blob.{BlobAsyncClient, BlobClient, BlobContainerAsyncClient, BlobContainerClient, BlobServiceClientBuilder}
import services.storage.models.base.StoredBlob
import services.storage.models.azure.AzureModelConversions.given

import scala.jdk.CollectionConverters.*
import scala.language.implicitConversions
import com.azure.storage.blob.models.ListBlobsOptions
import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}

import java.time.Duration
import scala.annotation.tailrec
import scala.concurrent.Future

final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]:
private val httpMaxRetries = 3
private val httpRetryTimeout = Duration.ofSeconds(60)
private val httpMinRetryDelay = Duration.ofMillis(500)
private val httpMaxRetryDelay = Duration.ofSeconds(3)

private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build()
private lazy val serviceClient = new BlobServiceClientBuilder()
.credential(defaultCredential)
.retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, httpMaxRetries, httpRetryTimeout.toSeconds.toInt, httpMinRetryDelay.toMillis, httpMaxRetryDelay.toMillis, null))
.buildClient()
private val defaultTimeout = Duration.ofSeconds(30)
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

private def getBlobClient(blobPath: AdlsStoragePath): BlobClient =
getBlobContainerClient(blobPath).getBlobClient(blobPath.blobPrefix)

private def getBlobContainerClient(blobPath: AdlsStoragePath): BlobContainerClient =
serviceClient.getBlobContainerClient(blobPath.container)

def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result): Future[Result] =
val client = getBlobClient(blobPath)
Future(deserializer(client.downloadContent().toBytes))

def listBlobs(blobPath: AdlsStoragePath): LazyList[StoredBlob] =
val client = getBlobContainerClient(blobPath)
val listOptions = new ListBlobsOptions().setPrefix(blobPath.blobPrefix)

@tailrec
def getPage(pageToken: Option[String], result: Iterable[StoredBlob]): Iterable[StoredBlob] =
val page = client.listBlobs(listOptions, pageToken.orNull, defaultTimeout)
.iterableByPage()
.iterator()
.next()

val pageData = page.getValue.asScala.map(implicitly)

if page.getContinuationToken.isEmpty then
result ++ pageData
else
getPage(Some(page.getContinuationToken), result ++ pageData)

LazyList.from(getPage(None, List()))


Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.sneaksanddata.arcane.framework
package services.storage.models.azure

import com.azure.storage.blob.models.BlobItem
import services.storage.models.base.StoredBlob

import scala.jdk.CollectionConverters.*

object AzureModelConversions:
given Conversion[BlobItem, StoredBlob] with
override def apply(blobItem: BlobItem): StoredBlob = StoredBlob(
name = blobItem.getName,
createdOn = blobItem.getProperties.getCreationTime.toEpochSecond,
metadata = blobItem.getMetadata.asScala.toMap,
contentHash = Option(blobItem.getProperties.getContentMd5.map(_.toChar).mkString),
contentEncoding = Option(blobItem.getProperties.getContentEncoding),
contentType = Option(blobItem.getProperties.getContentType),
contentLength = Option(blobItem.getProperties.getContentLength),
lastModified = Option(blobItem.getProperties.getLastModified.toEpochSecond)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.sneaksanddata.arcane.framework
package services.storage.models.base

/**
* Blob object metadata.
*/
case class StoredBlob(
/**
* Additional metadata attached to this object.
*/
metadata: Map[String, String] = Map(),

/**
* Content hashsum.
*/
contentHash: Option[String] = None,

/**
* Content encoding, for example utf-8.
*/
contentEncoding: Option[String] = None,

/**
* Content type, for example text/plain.
*/
contentType: Option[String] = None,

/**
* Content length in bytes.
*/
contentLength: Option[Long] = None,

/**
* Blob filename. May contain full path, depending on the actual storage.
*/
name: String,

/**
* Last modified timestamp.
*/
lastModified: Option[Long] = None,

/**
* Created on timestamp.
*/
createdOn: Long)

0 comments on commit 29023d6

Please sign in to comment.