Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDM (Synapse Link for Dynamics) plugin #118

Merged
merged 14 commits into from
Dec 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object CSVParser:
}._1

if parsed.size != headerCount then
throw new IllegalStateException(s"CSV line $line with delimiter $delimiter cannot be parsed into desired $headerCount")
throw new IllegalStateException(s"CSV line $line with delimiter $delimiter cannot be parsed into desired $headerCount fields")

parsed
}
Expand All @@ -93,7 +93,7 @@ given Conversion[(String, ArcaneSchema), DataRow] with
case (csvLine, schema) =>
val parsed = CSVParser.parseCsvLine(
line = csvLine,
headerCount = schema.size)
headerCount = schema.size - SimpleCdmModel.systemFieldCount)

val mergeKeyValue = parsed(schema.zipWithIndex.find(v => v._1.name == "Id").get._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,30 @@ package com.sneaksanddata.arcane.framework
package models.cdm

import models.{ArcaneSchema, ArcaneSchemaField, ArcaneType, Field, MergeKeyField}
import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader}

import upickle.default.*

import scala.concurrent.Future
import scala.util.{Failure, Success}
import scala.language.implicitConversions

/**
* Attribute in Microsoft Common Data Model, simplified compared to native SDK
* @param name Attribute name
* @param dataType String literal for the attribute data type
* @param maxLength max length property - not used
*/
case class SimpleCdmAttribute(name: String, dataType: String, maxLength: Int)
derives ReadWriter

/**
* Entity (Table) in Microsoft Common Data Model, simplified compared to native SDK
* @param entityType CDM entity type
* @param name Entity name
* @param description Docstring for the entity
* @param attributes Entity fields
*/
case class SimpleCdmEntity(
@upickle.implicits.key("$type")
entityType: String,
Expand All @@ -18,6 +35,13 @@ case class SimpleCdmEntity(
derives ReadWriter


/**
* Synapse Link container model, containing all entities enabled for the export
* @param name Model name
* @param description Docstring for the model
* @param version Model version
* @param entities Included entities
*/
case class SimpleCdmModel(name: String, description: String, version: String, entities: Seq[SimpleCdmEntity])
derives ReadWriter

Expand All @@ -34,3 +58,15 @@ given Conversion[SimpleCdmAttribute, ArcaneSchemaField] with

given Conversion[SimpleCdmEntity, ArcaneSchema] with
override def apply(entity: SimpleCdmEntity): ArcaneSchema = entity.attributes.map(implicitly) :+ MergeKeyField

object SimpleCdmModel:
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
// number of fields in the schema of each entity which do not originate from CDM
// currently MergeKeyField only
val systemFieldCount: Int = 1

def apply(rootPath: String, reader: AzureBlobStorageReader): Future[SimpleCdmModel] =
AdlsStoragePath(rootPath).map(_ + "model.json") match {
case Success(modelPath) => reader.getBlobContent(modelPath).map(read[SimpleCdmModel](_))
case Failure(ex) => Future.failed(ex)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.sneaksanddata.arcane.framework
package services.cdm

import models.cdm.CSVParser.replaceQuotedNewlines
import models.cdm.{SimpleCdmEntity, given}
import models.{ArcaneSchema, DataRow}
import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader}

import java.time.{OffsetDateTime, ZoneOffset}
import scala.concurrent.Future

class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader):
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
private val defaultFromYears: Int = 5
private val schema: ArcaneSchema = implicitly(entityModel)

/**
* Read top-level virtual directories to allow pre-filtering blobs
* @param startDate Baseline date to start search from
* @return A list of yyyy-MM-ddTHH prefixes to apply as filters
*/
private def getListPrefixes(startDate: Option[OffsetDateTime]): IndexedSeq[String] =
val currentMoment = OffsetDateTime.now(ZoneOffset.UTC)
val startMoment = startDate.getOrElse(currentMoment.minusYears(defaultFromYears))
Iterator.iterate(startMoment)(_.plusHours(1))
.takeWhile(_.toEpochSecond < currentMoment.toEpochSecond)
.map { moment =>
val monthString = s"00${moment.getMonth.getValue}".takeRight(2)
val dayString = s"00${moment.getDayOfMonth}".takeRight(2)
val hourString = s"00${moment.getHour}".takeRight(2)
s"${moment.getYear}-$monthString-${dayString}T$hourString"
}.toIndexedSeq

/**
* Read a table snapshot, taking optional start time. Lowest precision available is 1 hour
* @param startDate Folders from Synapse export to include in the snapshot, based on the start date provided. If not provided, ALL folders from now - defaultFromYears will be included
* @return A stream of rows for this table
*/
def snapshot(startDate: Option[OffsetDateTime] = None): Future[LazyList[DataRow]] =
// list all matching blobs
Future.sequence(getListPrefixes(startDate)
.flatMap(prefix => reader.listPrefixes(storagePath + prefix))
.flatMap(prefix => reader.listBlobs(storagePath + prefix.name + name))
// exclude any files other than CSV
.collect {
case blob if blob.name.endsWith(".csv") => reader.getBlobContent(storagePath + blob.name)
})
.map(_.flatMap(content => replaceQuotedNewlines(content).split('\n').map(implicitly[DataRow](_, schema))))
.map(LazyList.from)

object CdmTable:
def apply(settings: CdmTableSettings, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader): CdmTable = new CdmTable(
name = settings.name,
storagePath = AdlsStoragePath(settings.rootPath).get,
entityModel = entityModel,
reader = reader
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.sneaksanddata.arcane.framework
package services.cdm

/**
* Settings for a CdmTable object
* @param name Name of the table
* @param rootPath HDFS-style path that includes table blob prefix, for example abfss://container@account.dfs.core.windows.net/path/to/table
*/
case class CdmTableSettings(name: String, rootPath: String)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class AdlsStoragePath(accountName: String, container: String, blobPre
* @return The new path.
*/
@targetName("plus")
def +(part: String): AdlsStoragePath = copy(blobPrefix = if (blobPrefix.isEmpty) part else s"$blobPrefix/$part")
def +(part: String): AdlsStoragePath = copy(blobPrefix = if (blobPrefix.isEmpty) part else s"${blobPrefix.stripSuffix("/")}/$part")

object AdlsStoragePath:
private val matchRegex: String = "^abfss:\\/\\/([^@]+)@([^\\.]+)\\.dfs\\.core\\.windows\\.net\\/(.*)$"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,130 @@ package com.sneaksanddata.arcane.framework
package services.storage.models.azure

import services.storage.base.BlobStorageReader

import com.azure.identity.{DefaultAzureCredential, DefaultAzureCredentialBuilder}
import com.azure.storage.blob.{BlobAsyncClient, BlobClient, BlobContainerAsyncClient, BlobContainerClient, BlobServiceClientBuilder}
import services.storage.models.base.StoredBlob
import services.storage.models.azure.AzureModelConversions.given
import services.storage.models.base.StoredBlob

import scala.jdk.CollectionConverters.*
import scala.language.implicitConversions
import com.azure.core.credential.TokenCredential
import com.azure.core.http.rest.PagedResponse
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.models.ListBlobsOptions
import com.azure.storage.blob.{BlobClient, BlobContainerClient, BlobServiceClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}

import java.time.Duration
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
import scala.language.implicitConversions

final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]:
/**
* Blob reader implementation for Azure. Relies on the default credential chain if no added credentials are provided.
* @param accountName Storage account name
* @param tokenCredential Optional token credential provider
* @param sharedKeyCredential Optional access key credential
*/
final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential]) extends BlobStorageReader[AdlsStoragePath]:
private val httpMaxRetries = 3
private val httpRetryTimeout = Duration.ofSeconds(60)
private val httpMinRetryDelay = Duration.ofMillis(500)
private val httpMaxRetryDelay = Duration.ofSeconds(3)

private val maxResultsPerPage = 5000

private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build()
private lazy val serviceClient = new BlobServiceClientBuilder()
.credential(defaultCredential)
.retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, httpMaxRetries, httpRetryTimeout.toSeconds.toInt, httpMinRetryDelay.toMillis, httpMaxRetryDelay.toMillis, null))
.buildClient()
private lazy val serviceClient =
val builder = (tokenCredential, sharedKeyCredential) match
case (Some(credential), _) => new BlobServiceClientBuilder().credential(credential)
case (None, Some(credential)) => new BlobServiceClientBuilder().credential(credential)
case (None, None) => new BlobServiceClientBuilder().credential(defaultCredential)

builder
.endpoint(s"https://$accountName.blob.core.windows.net/")
.retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, httpMaxRetries, httpRetryTimeout.toSeconds.toInt, httpMinRetryDelay.toMillis, httpMaxRetryDelay.toMillis, null))
.buildClient()

private val defaultTimeout = Duration.ofSeconds(30)
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

private def getBlobClient(blobPath: AdlsStoragePath): BlobClient =
require(blobPath.accountName == accountName, s"Account name in the path `${blobPath.accountName}` does not match account name `$accountName` initialized for this reader")
getBlobContainerClient(blobPath).getBlobClient(blobPath.blobPrefix)

private def getBlobContainerClient(blobPath: AdlsStoragePath): BlobContainerClient =
serviceClient.getBlobContainerClient(blobPath.container)

def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result): Future[Result] =
private val stringContentSerializer: Array[Byte] => String = _.map(_.toChar).mkString

@tailrec
private def getPage[ElementType, ResultElementType](pageToken: Option[String], result: Iterable[ResultElementType], pager: Option[String] => PagedResponse[ElementType])(implicit converter: ElementType => ResultElementType): Iterable[ResultElementType] =
val page = pager(pageToken)
val pageData = page.getValue.asScala.map(implicitly)
val continuationToken = Option(page.getContinuationToken)

if continuationToken.isEmpty then
result ++ pageData
else
getPage(continuationToken, result ++ pageData, pager)

/**
*
* @param blobPath The path to the blob.
* @param deserializer function to deserialize the content of the blob. Deserializes all content as String if not implementation is provided
* @tparam Result The type of the result.
* @return The result of applying the function to the content of the blob.
*/
def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result = stringContentSerializer): Future[Result] =
val client = getBlobClient(blobPath)
Future(deserializer(client.downloadContent().toBytes))

def listPrefixes(rootPrefix: AdlsStoragePath): LazyList[StoredBlob] =
val client = getBlobContainerClient(rootPrefix)
val listOptions = new ListBlobsOptions()
.setPrefix(rootPrefix.blobPrefix)
.setMaxResultsPerPage(maxResultsPerPage)

LazyList.from(getPage(
None,
List.empty[StoredBlob],
token => client.listBlobsByHierarchy("/", listOptions, defaultTimeout).iterableByPage(token.orNull).iterator().next()
))

def listBlobs(blobPath: AdlsStoragePath): LazyList[StoredBlob] =
val client = getBlobContainerClient(blobPath)
val listOptions = new ListBlobsOptions().setPrefix(blobPath.blobPrefix)

@tailrec
def getPage(pageToken: Option[String], result: Iterable[StoredBlob]): Iterable[StoredBlob] =
val page = client.listBlobs(listOptions, pageToken.orNull, defaultTimeout)
.iterableByPage()
.iterator()
.next()
val listOptions = new ListBlobsOptions()
.setPrefix(blobPath.blobPrefix)
.setMaxResultsPerPage(maxResultsPerPage)

val pageData = page.getValue.asScala.map(implicitly)
LazyList.from(getPage(
None,
List.empty[StoredBlob],
token => client.listBlobs(listOptions, token.orNull, defaultTimeout).iterableByPage().iterator().next()
))

if page.getContinuationToken.isEmpty then
result ++ pageData
else
getPage(Some(page.getContinuationToken), result ++ pageData)
object AzureBlobStorageReader:
// TODO: move http settings etc to apply

LazyList.from(getPage(None, List()))
/**
* Create AzureBlobStorageReader for the account using TokenCredential
* @param accountName Storage account name
* @param credential TokenCredential (accessToken provider)
* @return AzureBlobStorageReader instance
*/
def apply(accountName: String, credential: TokenCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, Some(credential), None)

/**
* Create AzureBlobStorageReader for the account using StorageSharedKeyCredential
*
* @param accountName Storage account name
* @param credential StorageSharedKeyCredential (account key)
* @return AzureBlobStorageReader instance
*/
def apply(accountName: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, Some(credential))

/**
* Create AzureBlobStorageReader for the account using default credential chain
*
* @param accountName Storage account name
* @return AzureBlobStorageReader instance
*/
def apply(accountName: String): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None)
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,29 @@ import com.azure.storage.blob.models.BlobItem
import services.storage.models.base.StoredBlob

import scala.jdk.CollectionConverters.*
import scala.util.Try

object AzureModelConversions:
given Conversion[BlobItem, StoredBlob] with
override def apply(blobItem: BlobItem): StoredBlob = StoredBlob(
name = blobItem.getName,
createdOn = blobItem.getProperties.getCreationTime.toEpochSecond,
metadata = blobItem.getMetadata.asScala.toMap,
contentHash = Option(blobItem.getProperties.getContentMd5.map(_.toChar).mkString),
contentEncoding = Option(blobItem.getProperties.getContentEncoding),
contentType = Option(blobItem.getProperties.getContentType),
contentLength = Option(blobItem.getProperties.getContentLength),
lastModified = Option(blobItem.getProperties.getLastModified.toEpochSecond)
)
override def apply(blobItem: BlobItem): StoredBlob =
if Option(blobItem.getProperties).isDefined && Try(blobItem.getProperties.getCreationTime).isSuccess then
StoredBlob(
name = blobItem.getName,
createdOn = Option(blobItem.getProperties.getCreationTime).map(_.toEpochSecond),
metadata = Option(blobItem.getMetadata).map(_.asScala.toMap).getOrElse(Map()),
contentHash = Option(blobItem.getProperties.getContentMd5).map(_.map(_.toChar).mkString),
contentEncoding = Option(blobItem.getProperties.getContentEncoding),
contentType = Option(blobItem.getProperties.getContentType),
contentLength = Option(blobItem.getProperties.getContentLength),
lastModified = Option(blobItem.getProperties.getLastModified.toEpochSecond)
)
else StoredBlob(
name = blobItem.getName,
createdOn = None,
metadata = Map(),
contentHash = None,
contentEncoding = None,
contentType = None,
contentLength = None,
lastModified = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ case class StoredBlob(
/**
* Created on timestamp.
*/
createdOn: Long)
createdOn: Option[Long])
Loading
Loading