Skip to content

Commit

Permalink
CDM (Synapse Link for Dynamics) plugin (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Dec 16, 2024
1 parent 29023d6 commit 3a72b49
Show file tree
Hide file tree
Showing 13 changed files with 1,007 additions and 47 deletions.
26 changes: 25 additions & 1 deletion framework/arcane-framework/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,28 @@ services:
condition: service_healthy
volumes:
- ./create-polaris-catalog.sh:/create-polaris-catalog.sh
command: ["/bin/sh", "/create-polaris-catalog.sh"]
command: ["/bin/sh", "/create-polaris-catalog.sh"]
azurite:
image: mcr.microsoft.com/azure-storage/azurite
restart: always
networks:
mesh:
ipv4_address: 10.1.0.6
command:
- azurite-blob
- "--blobHost"
- "10.1.0.6"
- "--blobPort"
- "10001"
ports:
- "10001:10001"
create-cdm-container:
image: python:3.11-slim-bookworm
depends_on:
- azurite
networks:
mesh:
ipv4_address: 10.1.0.7
volumes:
- ./populate-cdm-container.py:/populate-cdm-container.py
command: [ "/bin/sh", "-c", "pip install azure-storage-blob requests && python /populate-cdm-container.py" ]
665 changes: 665 additions & 0 deletions framework/arcane-framework/populate-cdm-container.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object CSVParser:
}._1

if parsed.size != headerCount then
throw new IllegalStateException(s"CSV line $line with delimiter $delimiter cannot be parsed into desired $headerCount")
throw new IllegalStateException(s"CSV line $line with delimiter $delimiter cannot be parsed into desired $headerCount fields")

parsed
}
Expand All @@ -93,7 +93,7 @@ given Conversion[(String, ArcaneSchema), DataRow] with
case (csvLine, schema) =>
val parsed = CSVParser.parseCsvLine(
line = csvLine,
headerCount = schema.size)
headerCount = schema.size - SimpleCdmModel.systemFieldCount)

val mergeKeyValue = parsed(schema.zipWithIndex.find(v => v._1.name == "Id").get._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,30 @@ package com.sneaksanddata.arcane.framework
package models.cdm

import models.{ArcaneSchema, ArcaneSchemaField, ArcaneType, Field, MergeKeyField}
import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader}

import upickle.default.*

import scala.concurrent.Future
import scala.util.{Failure, Success}
import scala.language.implicitConversions

/**
* Attribute in Microsoft Common Data Model, simplified compared to native SDK
* @param name Attribute name
* @param dataType String literal for the attribute data type
* @param maxLength max length property - not used
*/
case class SimpleCdmAttribute(name: String, dataType: String, maxLength: Int)
derives ReadWriter

/**
* Entity (Table) in Microsoft Common Data Model, simplified compared to native SDK
* @param entityType CDM entity type
* @param name Entity name
* @param description Docstring for the entity
* @param attributes Entity fields
*/
case class SimpleCdmEntity(
@upickle.implicits.key("$type")
entityType: String,
Expand All @@ -18,6 +35,13 @@ case class SimpleCdmEntity(
derives ReadWriter


/**
* Synapse Link container model, containing all entities enabled for the export
* @param name Model name
* @param description Docstring for the model
* @param version Model version
* @param entities Included entities
*/
case class SimpleCdmModel(name: String, description: String, version: String, entities: Seq[SimpleCdmEntity])
derives ReadWriter

Expand All @@ -34,3 +58,15 @@ given Conversion[SimpleCdmAttribute, ArcaneSchemaField] with

given Conversion[SimpleCdmEntity, ArcaneSchema] with
override def apply(entity: SimpleCdmEntity): ArcaneSchema = entity.attributes.map(implicitly) :+ MergeKeyField

object SimpleCdmModel:
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
// number of fields in the schema of each entity which do not originate from CDM
// currently MergeKeyField only
val systemFieldCount: Int = 1

def apply(rootPath: String, reader: AzureBlobStorageReader): Future[SimpleCdmModel] =
AdlsStoragePath(rootPath).map(_ + "model.json") match {
case Success(modelPath) => reader.getBlobContent(modelPath).map(read[SimpleCdmModel](_))
case Failure(ex) => Future.failed(ex)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.sneaksanddata.arcane.framework
package services.cdm

import models.cdm.CSVParser.replaceQuotedNewlines
import models.cdm.{SimpleCdmEntity, given}
import models.{ArcaneSchema, DataRow}
import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader}

import java.time.{OffsetDateTime, ZoneOffset}
import scala.concurrent.Future

class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader):
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
private val defaultFromYears: Int = 5
private val schema: ArcaneSchema = implicitly(entityModel)

/**
* Read top-level virtual directories to allow pre-filtering blobs
* @param startDate Baseline date to start search from
* @return A list of yyyy-MM-ddTHH prefixes to apply as filters
*/
private def getListPrefixes(startDate: Option[OffsetDateTime], endDate: Option[OffsetDateTime] = None): IndexedSeq[String] =
val currentMoment = endDate.getOrElse(OffsetDateTime.now(ZoneOffset.UTC))
val startMoment = startDate.getOrElse(currentMoment.minusYears(defaultFromYears))
Iterator.iterate(startMoment)(_.plusHours(1))
.takeWhile(_.toEpochSecond < currentMoment.toEpochSecond)
.map { moment =>
val monthString = s"00${moment.getMonth.getValue}".takeRight(2)
val dayString = s"00${moment.getDayOfMonth}".takeRight(2)
val hourString = s"00${moment.getHour}".takeRight(2)
s"${moment.getYear}-$monthString-${dayString}T$hourString"
}.toIndexedSeq

/**
* Read a table snapshot, taking optional start time. Lowest precision available is 1 hour
* @param startDate Folders from Synapse export to include in the snapshot, based on the start date provided. If not provided, ALL folders from now - defaultFromYears will be included
* @param endDate Date to stop at when looking for prefixes. In production use None for this value to always look data up to current moment.
* @return A stream of rows for this table
*/
def snapshot(startDate: Option[OffsetDateTime] = None, endDate: Option[OffsetDateTime] = None): Future[LazyList[DataRow]] =
// list all matching blobs
Future.sequence(getListPrefixes(startDate, endDate)
.flatMap(prefix => reader.listPrefixes(storagePath + prefix))
.flatMap(prefix => reader.listBlobs(storagePath + prefix.name + name))
// exclude any files other than CSV
.collect {
case blob if blob.name.endsWith(".csv") => reader.getBlobContent(storagePath + blob.name)
})
.map(_.flatMap(content => replaceQuotedNewlines(content).split('\n').map(implicitly[DataRow](_, schema))))
.map(LazyList.from)

object CdmTable:
def apply(settings: CdmTableSettings, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader): CdmTable = new CdmTable(
name = settings.name,
storagePath = AdlsStoragePath(settings.rootPath).get,
entityModel = entityModel,
reader = reader
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.sneaksanddata.arcane.framework
package services.cdm

/**
* Settings for a CdmTable object
* @param name Name of the table
* @param rootPath HDFS-style path that includes table blob prefix, for example abfss://container@account.dfs.core.windows.net/path/to/table
*/
case class CdmTableSettings(name: String, rootPath: String)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class AdlsStoragePath(accountName: String, container: String, blobPre
* @return The new path.
*/
@targetName("plus")
def +(part: String): AdlsStoragePath = copy(blobPrefix = if (blobPrefix.isEmpty) part else s"$blobPrefix/$part")
def +(part: String): AdlsStoragePath = copy(blobPrefix = if (blobPrefix.isEmpty) part else s"${blobPrefix.stripSuffix("/")}/$part")

object AdlsStoragePath:
private val matchRegex: String = "^abfss:\\/\\/([^@]+)@([^\\.]+)\\.dfs\\.core\\.windows\\.net\\/(.*)$"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,133 @@ package com.sneaksanddata.arcane.framework
package services.storage.models.azure

import services.storage.base.BlobStorageReader

import com.azure.identity.{DefaultAzureCredential, DefaultAzureCredentialBuilder}
import com.azure.storage.blob.{BlobAsyncClient, BlobClient, BlobContainerAsyncClient, BlobContainerClient, BlobServiceClientBuilder}
import services.storage.models.base.StoredBlob
import services.storage.models.azure.AzureModelConversions.given
import services.storage.models.base.StoredBlob

import scala.jdk.CollectionConverters.*
import scala.language.implicitConversions
import com.azure.core.credential.TokenCredential
import com.azure.core.http.rest.PagedResponse
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.models.ListBlobsOptions
import com.azure.storage.blob.{BlobClient, BlobContainerClient, BlobServiceClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}

import java.time.Duration
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
import scala.language.implicitConversions

final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]:
private val httpMaxRetries = 3
private val httpRetryTimeout = Duration.ofSeconds(60)
private val httpMinRetryDelay = Duration.ofMillis(500)
private val httpMaxRetryDelay = Duration.ofSeconds(3)

/**
* Blob reader implementation for Azure. Relies on the default credential chain if no added credentials are provided.
* @param accountName Storage account name
* @param tokenCredential Optional token credential provider
* @param sharedKeyCredential Optional access key credential
*/
final class AzureBlobStorageReader(accountName: String, endpoint: Option[String], tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential], settings: Option[AzureBlobStorageReaderSettings] = None) extends BlobStorageReader[AdlsStoragePath]:
private val serviceClientSettings = settings.getOrElse(AzureBlobStorageReaderSettings())
private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build()
private lazy val serviceClient = new BlobServiceClientBuilder()
.credential(defaultCredential)
.retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, httpMaxRetries, httpRetryTimeout.toSeconds.toInt, httpMinRetryDelay.toMillis, httpMaxRetryDelay.toMillis, null))
.buildClient()
private lazy val serviceClient =
val builder = (tokenCredential, sharedKeyCredential) match
case (Some(credential), _) => new BlobServiceClientBuilder().credential(credential)
case (None, Some(credential)) => new BlobServiceClientBuilder().credential(credential)
case (None, None) => new BlobServiceClientBuilder().credential(defaultCredential)

builder
.endpoint(endpoint.getOrElse("https://$accountName.blob.core.windows.net/"))
.retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, serviceClientSettings.httpMaxRetries, serviceClientSettings.httpRetryTimeout.toSeconds.toInt, serviceClientSettings.httpMinRetryDelay.toMillis, serviceClientSettings.httpMaxRetryDelay.toMillis, null))
.buildClient()

private val defaultTimeout = Duration.ofSeconds(30)
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

private def getBlobClient(blobPath: AdlsStoragePath): BlobClient =
require(blobPath.accountName == accountName, s"Account name in the path `${blobPath.accountName}` does not match account name `$accountName` initialized for this reader")
getBlobContainerClient(blobPath).getBlobClient(blobPath.blobPrefix)

private def getBlobContainerClient(blobPath: AdlsStoragePath): BlobContainerClient =
serviceClient.getBlobContainerClient(blobPath.container)

def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result): Future[Result] =
private val stringContentSerializer: Array[Byte] => String = _.map(_.toChar).mkString

@tailrec
private def getPage[ElementType, ResultElementType](pageToken: Option[String], result: Iterable[ResultElementType], pager: Option[String] => PagedResponse[ElementType])(implicit converter: ElementType => ResultElementType): Iterable[ResultElementType] =
val page = pager(pageToken)
val pageData = page.getValue.asScala.map(implicitly)
val continuationToken = Option(page.getContinuationToken)

if continuationToken.isEmpty then
result ++ pageData
else
getPage(continuationToken, result ++ pageData, pager)

/**
*
* @param blobPath The path to the blob.
* @param deserializer function to deserialize the content of the blob. Deserializes all content as String if not implementation is provided
* @tparam Result The type of the result.
* @return The result of applying the function to the content of the blob.
*/
def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result = stringContentSerializer): Future[Result] =
val client = getBlobClient(blobPath)
Future(deserializer(client.downloadContent().toBytes))

def listPrefixes(rootPrefix: AdlsStoragePath): LazyList[StoredBlob] =
val client = getBlobContainerClient(rootPrefix)
val listOptions = new ListBlobsOptions()
.setPrefix(rootPrefix.blobPrefix)
.setMaxResultsPerPage(serviceClientSettings.maxResultsPerPage)

LazyList.from(getPage(
None,
List.empty[StoredBlob],
token => client.listBlobsByHierarchy("/", listOptions, defaultTimeout).iterableByPage(token.orNull).iterator().next()
))

def listBlobs(blobPath: AdlsStoragePath): LazyList[StoredBlob] =
val client = getBlobContainerClient(blobPath)
val listOptions = new ListBlobsOptions().setPrefix(blobPath.blobPrefix)

@tailrec
def getPage(pageToken: Option[String], result: Iterable[StoredBlob]): Iterable[StoredBlob] =
val page = client.listBlobs(listOptions, pageToken.orNull, defaultTimeout)
.iterableByPage()
.iterator()
.next()
val listOptions = new ListBlobsOptions()
.setPrefix(blobPath.blobPrefix)
.setMaxResultsPerPage(serviceClientSettings.maxResultsPerPage)

val pageData = page.getValue.asScala.map(implicitly)
LazyList.from(getPage(
None,
List.empty[StoredBlob],
token => client.listBlobs(listOptions, token.orNull, defaultTimeout).iterableByPage().iterator().next()
))

if page.getContinuationToken.isEmpty then
result ++ pageData
else
getPage(Some(page.getContinuationToken), result ++ pageData)

LazyList.from(getPage(None, List()))
object AzureBlobStorageReader:
/**
* Create AzureBlobStorageReader for the account using TokenCredential
* @param accountName Storage account name
* @param credential TokenCredential (accessToken provider)
* @return AzureBlobStorageReader instance
*/
def apply(accountName: String, credential: TokenCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, Some(credential), None)

/**
* Create AzureBlobStorageReader for the account using StorageSharedKeyCredential
*
* @param accountName Storage account name
* @param credential StorageSharedKeyCredential (account key)
* @return AzureBlobStorageReader instance
*/
def apply(accountName: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None, Some(credential))

/**
* Create AzureBlobStorageReader for the account using StorageSharedKeyCredential and custom endpoint
*
* @param accountName Storage account name
* @param endpoint Storage account endpoint
* @param credential StorageSharedKeyCredential (account key)
* @return AzureBlobStorageReader instance
*/
def apply(accountName: String, endpoint: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, Some(endpoint), None, Some(credential))

/**
* Create AzureBlobStorageReader for the account using default credential chain
*
* @param accountName Storage account name
* @return AzureBlobStorageReader instance
*/
def apply(accountName: String): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None, None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.sneaksanddata.arcane.framework
package services.storage.models.azure

import java.time.Duration

case class AzureBlobStorageReaderSettings(httpMaxRetries: Int, httpRetryTimeout: Duration, httpMinRetryDelay: Duration, httpMaxRetryDelay: Duration, maxResultsPerPage: Int)

object AzureBlobStorageReaderSettings:
def apply(
httpMaxRetries: Int = 3,
httpRetryTimeout: Duration = Duration.ofSeconds(60),
httpMinRetryDelay: Duration = Duration.ofMillis(500),
httpMaxRetryDelay: Duration = Duration.ofSeconds(3),
maxResultsPerPage: Int = 5000): AzureBlobStorageReaderSettings = new AzureBlobStorageReaderSettings(
httpMaxRetries = httpMaxRetries,
httpRetryTimeout = httpRetryTimeout,
httpMinRetryDelay = httpMinRetryDelay,
httpMaxRetryDelay = httpMaxRetryDelay,
maxResultsPerPage = maxResultsPerPage
)
Loading

0 comments on commit 3a72b49

Please sign in to comment.