From c3fd952584de54fb0a459c2412cddefc1898c284 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Tue, 10 Dec 2024 15:39:56 +0100 Subject: [PATCH 01/14] WIP CDM table read --- .../main/scala/services/cdm/CdmTable.scala | 55 +++++++++++++++++++ .../scala/services/cdm/CdmTableSettings.scala | 4 ++ .../models/azure/AzureBlobStorageReader.scala | 7 ++- 3 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala create mode 100644 framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala new file mode 100644 index 0000000..b872a60 --- /dev/null +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -0,0 +1,55 @@ +package com.sneaksanddata.arcane.framework +package services.cdm + +import models.cdm.CSVParser.isComplete +import models.cdm.{SimpleCdmEntity, given} +import models.{ArcaneSchema, DataRow} +import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader} + +import java.time.{OffsetDateTime, ZoneOffset} +import scala.concurrent.Future + +class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader): + implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + private val defaultFromYears: Int = 5 + private val schema: ArcaneSchema = implicitly(entityModel) + + private def getListPrefixes(fromYears: Option[Int]): IndexedSeq[String] = + val currentMoment = OffsetDateTime.now(ZoneOffset.UTC) + val fromMoment = currentMoment.minusYears(fromYears.getOrElse(defaultFromYears)) + Range.inclusive( + fromMoment.getYear, + currentMoment.getYear + ).flatMap(year => Range.inclusive( + 1, + 12 + ).map{ m => + val mon = s"00$m".takeRight(2) + s"$year-$mon-" + }) + + /** + * Read a table snapshot, taking optional start time. + * @param fromYears Folders from Synapse export to include in the snapshot. If not provided, ALL folders will be included + * @return A stream of rows for this table + */ + def snapshot(fromYears: Option[Int]): Future[LazyList[DataRow]] = + // list all matching blobs + Future.sequence(getListPrefixes(fromYears) + .flatMap(prefix => reader.listBlobs(storagePath + prefix)) + .map(blob => reader.getBlobContent(storagePath + blob.name, _.map(_.toChar).mkString))) + .map(_.flatMap(content => content.split('\n').foldLeft((Seq.empty[String], "")) { (agg, value) => + if isComplete(agg._2) then + (agg._1 :+ agg._2, "") + else + (agg._1, agg._2 + value) + }._1.map(implicitly[DataRow](_, schema)))) + .map(LazyList.from) + +object CdmTable: + def apply(settings: CdmTableSettings, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader): CdmTable = new CdmTable( + name = settings.name, + storagePath = AdlsStoragePath(settings.rootPath).get, + entityModel = entityModel, + reader = reader + ) diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala new file mode 100644 index 0000000..27f636f --- /dev/null +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala @@ -0,0 +1,4 @@ +package com.sneaksanddata.arcane.framework +package services.cdm + +case class CdmTableSettings(name: String, rootPath: String) diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala index 3b0f78c..a8d4695 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala @@ -10,7 +10,7 @@ import services.storage.models.azure.AzureModelConversions.given import scala.jdk.CollectionConverters.* import scala.language.implicitConversions -import com.azure.storage.blob.models.ListBlobsOptions +import com.azure.storage.blob.models.{BlobListDetails, ListBlobsOptions} import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType} import java.time.Duration @@ -22,6 +22,7 @@ final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]: private val httpRetryTimeout = Duration.ofSeconds(60) private val httpMinRetryDelay = Duration.ofMillis(500) private val httpMaxRetryDelay = Duration.ofSeconds(3) + private val maxResultsPerPage = 1000 private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build() private lazy val serviceClient = new BlobServiceClientBuilder() @@ -43,7 +44,9 @@ final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]: def listBlobs(blobPath: AdlsStoragePath): LazyList[StoredBlob] = val client = getBlobContainerClient(blobPath) - val listOptions = new ListBlobsOptions().setPrefix(blobPath.blobPrefix) + val listOptions = new ListBlobsOptions() + .setPrefix(blobPath.blobPrefix) + .setMaxResultsPerPage(maxResultsPerPage) @tailrec def getPage(pageToken: Option[String], result: Iterable[StoredBlob]): Iterable[StoredBlob] = From e02345452919a7dac4063f67dbb80dfdb0f1b21c Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Wed, 11 Dec 2024 11:29:21 +0100 Subject: [PATCH 02/14] Improve prefix generation --- .../main/scala/services/cdm/CdmTable.scala | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala index b872a60..8d0f825 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -6,7 +6,7 @@ import models.cdm.{SimpleCdmEntity, given} import models.{ArcaneSchema, DataRow} import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader} -import java.time.{OffsetDateTime, ZoneOffset} +import java.time.{Instant, OffsetDateTime, ZoneOffset} import scala.concurrent.Future class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader): @@ -14,28 +14,37 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd private val defaultFromYears: Int = 5 private val schema: ArcaneSchema = implicitly(entityModel) - private def getListPrefixes(fromYears: Option[Int]): IndexedSeq[String] = + private def getListPrefixes(startDate: Option[OffsetDateTime]): IndexedSeq[String] = val currentMoment = OffsetDateTime.now(ZoneOffset.UTC) - val fromMoment = currentMoment.minusYears(fromYears.getOrElse(defaultFromYears)) + val startMoment = startDate.getOrElse(currentMoment.minusYears(defaultFromYears)) Range.inclusive( - fromMoment.getYear, + startMoment.getYear, currentMoment.getYear ).flatMap(year => Range.inclusive( 1, 12 - ).map{ m => + ).map { m => val mon = s"00$m".takeRight(2) - s"$year-$mon-" - }) + (s"$year-$mon-", year, m) + }).collect { + // include all prefixes from previous years + // in case year for both dates is the same, we will never hit this case + case (prefix, year, _) if year < currentMoment.getYear => prefix + // only include prefixes for current year that are less than current month + // this only applies to the case when startMoment year is less than current moment - then we take months from 1 to current month + case (prefix, year, mon) if (year == currentMoment.getYear) && (currentMoment.getYear > startMoment.getYear) && (mon <= currentMoment.getMonth.getValue) => prefix + // in case both dates are in the same year, we limit month selection to start from startMoment month + case (prefix, year, mon) if (year == currentMoment.getYear) && (currentMoment.getYear == startMoment.getYear) && (mon >= startMoment.getMonth.getValue) && (mon <= currentMoment.getMonth.getValue) => prefix + } /** * Read a table snapshot, taking optional start time. - * @param fromYears Folders from Synapse export to include in the snapshot. If not provided, ALL folders will be included + * @param startDate Folders from Synapse export to include in the snapshot, based on the start date provided. If not provided, ALL folders from now - defaultFromYears will be included * @return A stream of rows for this table */ - def snapshot(fromYears: Option[Int]): Future[LazyList[DataRow]] = + def snapshot(startDate: Option[OffsetDateTime] = None): Future[LazyList[DataRow]] = // list all matching blobs - Future.sequence(getListPrefixes(fromYears) + Future.sequence(getListPrefixes(startDate) .flatMap(prefix => reader.listBlobs(storagePath + prefix)) .map(blob => reader.getBlobContent(storagePath + blob.name, _.map(_.toChar).mkString))) .map(_.flatMap(content => content.split('\n').foldLeft((Seq.empty[String], "")) { (agg, value) => From 586b0b71796916fd46f5cb828d33dd22a814bd8f Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Wed, 11 Dec 2024 11:42:44 +0100 Subject: [PATCH 03/14] Add some filters --- .../src/main/scala/services/cdm/CdmTable.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala index 8d0f825..e7cf1cf 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -1,12 +1,12 @@ package com.sneaksanddata.arcane.framework package services.cdm -import models.cdm.CSVParser.isComplete +import models.cdm.CSVParser.replaceQuotedNewlines import models.cdm.{SimpleCdmEntity, given} import models.{ArcaneSchema, DataRow} import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader} -import java.time.{Instant, OffsetDateTime, ZoneOffset} +import java.time.{OffsetDateTime, ZoneOffset} import scala.concurrent.Future class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader): @@ -45,14 +45,12 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd def snapshot(startDate: Option[OffsetDateTime] = None): Future[LazyList[DataRow]] = // list all matching blobs Future.sequence(getListPrefixes(startDate) - .flatMap(prefix => reader.listBlobs(storagePath + prefix)) - .map(blob => reader.getBlobContent(storagePath + blob.name, _.map(_.toChar).mkString))) - .map(_.flatMap(content => content.split('\n').foldLeft((Seq.empty[String], "")) { (agg, value) => - if isComplete(agg._2) then - (agg._1 :+ agg._2, "") - else - (agg._1, agg._2 + value) - }._1.map(implicitly[DataRow](_, schema)))) + .flatMap(prefix => reader.listBlobs(storagePath + prefix + name)) + // exclude any files other than CSV + .collect { + case blob if blob.name.endsWith(".csv") => reader.getBlobContent(storagePath + blob.name, _.map(_.toChar).mkString) + }) + .map(_.flatMap(content => replaceQuotedNewlines(content).split('\n').map(implicitly[DataRow](_, schema)))) .map(LazyList.from) object CdmTable: From 66162055f9dc10f9ad36ff05852ddd54c00ea5e3 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Wed, 11 Dec 2024 11:58:06 +0100 Subject: [PATCH 04/14] Add model.json reader method --- .../src/main/scala/models/cdm/SimpleCdmModel.scala | 13 +++++++++++++ .../src/main/scala/services/cdm/CdmTable.scala | 5 +++-- .../models/azure/AzureBlobStorageReader.scala | 11 ++++++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala b/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala index 06b67f3..e35bee4 100644 --- a/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala +++ b/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala @@ -2,8 +2,12 @@ package com.sneaksanddata.arcane.framework package models.cdm import models.{ArcaneSchema, ArcaneSchemaField, ArcaneType, Field, MergeKeyField} +import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader} + import upickle.default.* +import scala.concurrent.Future +import scala.util.{Failure, Success} import scala.language.implicitConversions case class SimpleCdmAttribute(name: String, dataType: String, maxLength: Int) @@ -34,3 +38,12 @@ given Conversion[SimpleCdmAttribute, ArcaneSchemaField] with given Conversion[SimpleCdmEntity, ArcaneSchema] with override def apply(entity: SimpleCdmEntity): ArcaneSchema = entity.attributes.map(implicitly) :+ MergeKeyField + +object SimpleCdmModel: + implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + + def apply(rootPath: String, reader: AzureBlobStorageReader): Future[SimpleCdmModel] = + AdlsStoragePath(rootPath).map(_ + "model.json") match { + case Success(modelPath) => reader.getBlobContent(modelPath).map(read[SimpleCdmModel](_)) + case Failure(ex) => Future.failed(ex) + } diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala index e7cf1cf..608a760 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -2,7 +2,7 @@ package com.sneaksanddata.arcane.framework package services.cdm import models.cdm.CSVParser.replaceQuotedNewlines -import models.cdm.{SimpleCdmEntity, given} +import models.cdm.{SimpleCdmEntity, SimpleCdmModel, given} import models.{ArcaneSchema, DataRow} import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader} @@ -48,7 +48,7 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd .flatMap(prefix => reader.listBlobs(storagePath + prefix + name)) // exclude any files other than CSV .collect { - case blob if blob.name.endsWith(".csv") => reader.getBlobContent(storagePath + blob.name, _.map(_.toChar).mkString) + case blob if blob.name.endsWith(".csv") => reader.getBlobContent(storagePath + blob.name) }) .map(_.flatMap(content => replaceQuotedNewlines(content).split('\n').map(implicitly[DataRow](_, schema)))) .map(LazyList.from) @@ -60,3 +60,4 @@ object CdmTable: entityModel = entityModel, reader = reader ) + diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala index a8d4695..fb2be0c 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala @@ -37,8 +37,17 @@ final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]: private def getBlobContainerClient(blobPath: AdlsStoragePath): BlobContainerClient = serviceClient.getBlobContainerClient(blobPath.container) + + private val stringContentSerializer: Array[Byte] => String = _.map(_.toChar).mkString - def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result): Future[Result] = + /** + * + * @param blobPath The path to the blob. + * @param deserializer function to deserialize the content of the blob. Deserializes all content as String if not implementation is provided + * @tparam Result The type of the result. + * @return The result of applying the function to the content of the blob. + */ + def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result = stringContentSerializer): Future[Result] = val client = getBlobClient(blobPath) Future(deserializer(client.downloadContent().toBytes)) From fa8b7f7020b6570ffa56bae823744c26f4687b58 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Wed, 11 Dec 2024 16:17:23 +0100 Subject: [PATCH 05/14] Working code --- .../src/main/scala/models/cdm/CdmParser.scala | 4 +- .../scala/models/cdm/SimpleCdmModel.scala | 3 + .../main/scala/services/cdm/CdmTable.scala | 31 ++----- .../models/azure/AdlsStoragePath.scala | 2 +- .../models/azure/AzureBlobStorageReader.scala | 91 ++++++++++++------- .../models/azure/AzureModelConversions.scala | 33 +++++-- .../storage/models/base/StoredBlob.scala | 2 +- .../src/test/scala/models/CdmTableTests.scala | 42 +++++++++ 8 files changed, 141 insertions(+), 67 deletions(-) create mode 100644 framework/arcane-framework/src/test/scala/models/CdmTableTests.scala diff --git a/framework/arcane-framework/src/main/scala/models/cdm/CdmParser.scala b/framework/arcane-framework/src/main/scala/models/cdm/CdmParser.scala index 5d8c9b9..5708c59 100644 --- a/framework/arcane-framework/src/main/scala/models/cdm/CdmParser.scala +++ b/framework/arcane-framework/src/main/scala/models/cdm/CdmParser.scala @@ -73,7 +73,7 @@ object CSVParser: }._1 if parsed.size != headerCount then - throw new IllegalStateException(s"CSV line $line with delimiter $delimiter cannot be parsed into desired $headerCount") + throw new IllegalStateException(s"CSV line $line with delimiter $delimiter cannot be parsed into desired $headerCount fields") parsed } @@ -93,7 +93,7 @@ given Conversion[(String, ArcaneSchema), DataRow] with case (csvLine, schema) => val parsed = CSVParser.parseCsvLine( line = csvLine, - headerCount = schema.size) + headerCount = schema.size - SimpleCdmModel.systemFieldCount) val mergeKeyValue = parsed(schema.zipWithIndex.find(v => v._1.name == "Id").get._2) diff --git a/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala b/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala index e35bee4..c14a96d 100644 --- a/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala +++ b/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala @@ -41,6 +41,9 @@ given Conversion[SimpleCdmEntity, ArcaneSchema] with object SimpleCdmModel: implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + // number of fields in the schema of each entity which do not originate from CDM + // currently MergeKeyField only + val systemFieldCount: Int = 1 def apply(rootPath: String, reader: AzureBlobStorageReader): Future[SimpleCdmModel] = AdlsStoragePath(rootPath).map(_ + "model.json") match { diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala index 608a760..95bd9de 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -17,25 +17,13 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd private def getListPrefixes(startDate: Option[OffsetDateTime]): IndexedSeq[String] = val currentMoment = OffsetDateTime.now(ZoneOffset.UTC) val startMoment = startDate.getOrElse(currentMoment.minusYears(defaultFromYears)) - Range.inclusive( - startMoment.getYear, - currentMoment.getYear - ).flatMap(year => Range.inclusive( - 1, - 12 - ).map { m => - val mon = s"00$m".takeRight(2) - (s"$year-$mon-", year, m) - }).collect { - // include all prefixes from previous years - // in case year for both dates is the same, we will never hit this case - case (prefix, year, _) if year < currentMoment.getYear => prefix - // only include prefixes for current year that are less than current month - // this only applies to the case when startMoment year is less than current moment - then we take months from 1 to current month - case (prefix, year, mon) if (year == currentMoment.getYear) && (currentMoment.getYear > startMoment.getYear) && (mon <= currentMoment.getMonth.getValue) => prefix - // in case both dates are in the same year, we limit month selection to start from startMoment month - case (prefix, year, mon) if (year == currentMoment.getYear) && (currentMoment.getYear == startMoment.getYear) && (mon >= startMoment.getMonth.getValue) && (mon <= currentMoment.getMonth.getValue) => prefix - } + Iterator.iterate(startMoment)(_.plusDays(1)) + .takeWhile(_.toEpochSecond < currentMoment.toEpochSecond) + .map { moment => + val monthString = s"00${moment.getMonth.getValue}".takeRight(2) + val dayString = s"00${moment.getDayOfMonth}".takeRight(2) + s"${moment.getYear}-$monthString-$dayString" + }.toIndexedSeq /** * Read a table snapshot, taking optional start time. @@ -45,7 +33,8 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd def snapshot(startDate: Option[OffsetDateTime] = None): Future[LazyList[DataRow]] = // list all matching blobs Future.sequence(getListPrefixes(startDate) - .flatMap(prefix => reader.listBlobs(storagePath + prefix + name)) + .flatMap(prefix => reader.listPrefixes(storagePath + prefix)) + .flatMap(prefix => reader.listBlobs(storagePath + prefix.name + name)) // exclude any files other than CSV .collect { case blob if blob.name.endsWith(".csv") => reader.getBlobContent(storagePath + blob.name) @@ -60,4 +49,4 @@ object CdmTable: entityModel = entityModel, reader = reader ) - + diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AdlsStoragePath.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AdlsStoragePath.scala index 12a9a99..0b86828 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AdlsStoragePath.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AdlsStoragePath.scala @@ -17,7 +17,7 @@ final case class AdlsStoragePath(accountName: String, container: String, blobPre * @return The new path. */ @targetName("plus") - def +(part: String): AdlsStoragePath = copy(blobPrefix = if (blobPrefix.isEmpty) part else s"$blobPrefix/$part") + def +(part: String): AdlsStoragePath = copy(blobPrefix = if (blobPrefix.isEmpty) part else s"${blobPrefix.stripSuffix("/")}/$part") object AdlsStoragePath: private val matchRegex: String = "^abfss:\\/\\/([^@]+)@([^\\.]+)\\.dfs\\.core\\.windows\\.net\\/(.*)$" diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala index fb2be0c..b5e6794 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala @@ -2,46 +2,67 @@ package com.sneaksanddata.arcane.framework package services.storage.models.azure import services.storage.base.BlobStorageReader - -import com.azure.identity.{DefaultAzureCredential, DefaultAzureCredentialBuilder} -import com.azure.storage.blob.{BlobAsyncClient, BlobClient, BlobContainerAsyncClient, BlobContainerClient, BlobServiceClientBuilder} -import services.storage.models.base.StoredBlob import services.storage.models.azure.AzureModelConversions.given +import services.storage.models.base.StoredBlob -import scala.jdk.CollectionConverters.* -import scala.language.implicitConversions -import com.azure.storage.blob.models.{BlobListDetails, ListBlobsOptions} +import com.azure.core.credential.TokenCredential +import com.azure.core.http.rest.PagedResponse +import com.azure.identity.DefaultAzureCredentialBuilder +import com.azure.storage.blob.models.ListBlobsOptions +import com.azure.storage.blob.{BlobClient, BlobContainerClient, BlobServiceClientBuilder} +import com.azure.storage.common.StorageSharedKeyCredential import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType} import java.time.Duration import scala.annotation.tailrec import scala.concurrent.Future +import scala.jdk.CollectionConverters.* +import scala.language.implicitConversions -final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]: +final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential]) extends BlobStorageReader[AdlsStoragePath]: private val httpMaxRetries = 3 private val httpRetryTimeout = Duration.ofSeconds(60) private val httpMinRetryDelay = Duration.ofMillis(500) private val httpMaxRetryDelay = Duration.ofSeconds(3) - private val maxResultsPerPage = 1000 - + private val maxResultsPerPage = 5000 + private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build() - private lazy val serviceClient = new BlobServiceClientBuilder() - .credential(defaultCredential) - .retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, httpMaxRetries, httpRetryTimeout.toSeconds.toInt, httpMinRetryDelay.toMillis, httpMaxRetryDelay.toMillis, null)) - .buildClient() + private lazy val serviceClient = + val builder = (tokenCredential, sharedKeyCredential) match + case (Some(credential), _) => new BlobServiceClientBuilder().credential(credential) + case (None, Some(credential)) => new BlobServiceClientBuilder().credential(credential) + case (None, None) => new BlobServiceClientBuilder().credential(defaultCredential) + + builder + .endpoint(s"https://$accountName.blob.core.windows.net/") + .retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, httpMaxRetries, httpRetryTimeout.toSeconds.toInt, httpMinRetryDelay.toMillis, httpMaxRetryDelay.toMillis, null)) + .buildClient() + private val defaultTimeout = Duration.ofSeconds(30) implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global private def getBlobClient(blobPath: AdlsStoragePath): BlobClient = + require(blobPath.accountName == accountName, s"Account name in the path `${blobPath.accountName}` does not match account name `$accountName` initialized for this reader") getBlobContainerClient(blobPath).getBlobClient(blobPath.blobPrefix) private def getBlobContainerClient(blobPath: AdlsStoragePath): BlobContainerClient = serviceClient.getBlobContainerClient(blobPath.container) - + private val stringContentSerializer: Array[Byte] => String = _.map(_.toChar).mkString + + @tailrec + private def getPage[ElementType, ResultElementType](pageToken: Option[String], result: Iterable[ResultElementType], pager: Option[String] => PagedResponse[ElementType])(implicit converter: ElementType => ResultElementType): Iterable[ResultElementType] = + val page = pager(pageToken) + val pageData = page.getValue.asScala.map(implicitly) + val continuationToken = Option(page.getContinuationToken) + + if continuationToken.isEmpty then + result ++ pageData + else + getPage(continuationToken, result ++ pageData, pager) /** - * + * * @param blobPath The path to the blob. * @param deserializer function to deserialize the content of the blob. Deserializes all content as String if not implementation is provided * @tparam Result The type of the result. @@ -50,6 +71,18 @@ final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]: def getBlobContent[Result](blobPath: AdlsStoragePath, deserializer: Array[Byte] => Result = stringContentSerializer): Future[Result] = val client = getBlobClient(blobPath) Future(deserializer(client.downloadContent().toBytes)) + + def listPrefixes(rootPrefix: AdlsStoragePath): LazyList[StoredBlob] = + val client = getBlobContainerClient(rootPrefix) + val listOptions = new ListBlobsOptions() + .setPrefix(rootPrefix.blobPrefix) + .setMaxResultsPerPage(maxResultsPerPage) + + LazyList.from(getPage( + None, + List.empty[StoredBlob], + token => client.listBlobsByHierarchy("/", listOptions, defaultTimeout).iterableByPage(token.orNull).iterator().next() + )) def listBlobs(blobPath: AdlsStoragePath): LazyList[StoredBlob] = val client = getBlobContainerClient(blobPath) @@ -57,20 +90,14 @@ final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]: .setPrefix(blobPath.blobPrefix) .setMaxResultsPerPage(maxResultsPerPage) - @tailrec - def getPage(pageToken: Option[String], result: Iterable[StoredBlob]): Iterable[StoredBlob] = - val page = client.listBlobs(listOptions, pageToken.orNull, defaultTimeout) - .iterableByPage() - .iterator() - .next() - - val pageData = page.getValue.asScala.map(implicitly) - - if page.getContinuationToken.isEmpty then - result ++ pageData - else - getPage(Some(page.getContinuationToken), result ++ pageData) - - LazyList.from(getPage(None, List())) - + LazyList.from(getPage( + None, + List.empty[StoredBlob], + token => client.listBlobs(listOptions, token.orNull, defaultTimeout).iterableByPage().iterator().next() + )) +object AzureBlobStorageReader: + // TODO: move http settings etc to apply + def apply(accountName: String, credential: TokenCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, Some(credential), None) + def apply(accountName: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, Some(credential)) + def apply(accountName: String): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None) diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureModelConversions.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureModelConversions.scala index 06c308e..8ada9f1 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureModelConversions.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureModelConversions.scala @@ -5,16 +5,29 @@ import com.azure.storage.blob.models.BlobItem import services.storage.models.base.StoredBlob import scala.jdk.CollectionConverters.* +import scala.util.Try object AzureModelConversions: given Conversion[BlobItem, StoredBlob] with - override def apply(blobItem: BlobItem): StoredBlob = StoredBlob( - name = blobItem.getName, - createdOn = blobItem.getProperties.getCreationTime.toEpochSecond, - metadata = blobItem.getMetadata.asScala.toMap, - contentHash = Option(blobItem.getProperties.getContentMd5.map(_.toChar).mkString), - contentEncoding = Option(blobItem.getProperties.getContentEncoding), - contentType = Option(blobItem.getProperties.getContentType), - contentLength = Option(blobItem.getProperties.getContentLength), - lastModified = Option(blobItem.getProperties.getLastModified.toEpochSecond) - ) + override def apply(blobItem: BlobItem): StoredBlob = + if Option(blobItem.getProperties).isDefined && Try(blobItem.getProperties.getCreationTime).isSuccess then + StoredBlob( + name = blobItem.getName, + createdOn = Option(blobItem.getProperties.getCreationTime).map(_.toEpochSecond), + metadata = Option(blobItem.getMetadata).map(_.asScala.toMap).getOrElse(Map()), + contentHash = Option(blobItem.getProperties.getContentMd5).map(_.map(_.toChar).mkString), + contentEncoding = Option(blobItem.getProperties.getContentEncoding), + contentType = Option(blobItem.getProperties.getContentType), + contentLength = Option(blobItem.getProperties.getContentLength), + lastModified = Option(blobItem.getProperties.getLastModified.toEpochSecond) + ) + else StoredBlob( + name = blobItem.getName, + createdOn = None, + metadata = Map(), + contentHash = None, + contentEncoding = None, + contentType = None, + contentLength = None, + lastModified = None + ) diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/base/StoredBlob.scala b/framework/arcane-framework/src/main/scala/services/storage/models/base/StoredBlob.scala index 6655ae1..901603b 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/base/StoredBlob.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/base/StoredBlob.scala @@ -43,4 +43,4 @@ case class StoredBlob( /** * Created on timestamp. */ - createdOn: Long) + createdOn: Option[Long]) diff --git a/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala new file mode 100644 index 0000000..311ebe3 --- /dev/null +++ b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala @@ -0,0 +1,42 @@ +package com.sneaksanddata.arcane.framework +package models + +import models.cdm.SimpleCdmModel +import services.storage.models.azure.AzureBlobStorageReader + +import com.azure.storage.common.StorageSharedKeyCredential +import services.cdm.{CdmTable, CdmTableSettings} +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.must.Matchers +import org.scalatest.matchers.should.Matchers.should + +import java.time.{OffsetDateTime, ZoneOffset} + + +class CdmTableTests extends AsyncFlatSpec with Matchers { + private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + private val integrationTestContainer = sys.env.get("ARCANE_FRAMEWORK__STORAGE_CONTAINER") + private val integrationTestAccount = sys.env.get("ARCANE_FRAMEWORK__STORAGE_ACCOUNT") + private val integrationTestAccessKey = sys.env.get("ARCANE_FRAMEWORK__STORAGE_ACCESS_KEY") + + it should "read model schemas from Synapse Link blob storage" in { + (integrationTestContainer, integrationTestAccount, integrationTestAccessKey) match + case (Some(container), Some(account), Some(key)) => SimpleCdmModel(s"abfss://$container@$account.dfs.core.windows.net/", AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))).map { model => + model.entities.size should be > 0 + } + case _ => cancel("Skipping test since it is not configured to run") + } + + it should "read a CDM table from Synapse Link blob storage" in { + (integrationTestContainer, integrationTestAccount, integrationTestAccessKey) match + case (Some(container), Some(account), Some(key)) => SimpleCdmModel(s"abfss://$container@$account.dfs.core.windows.net/", AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))).flatMap { model => + val entityToRead = model.entities.find(v => v.name == "salesline").get + CdmTable(CdmTableSettings(name = entityToRead.name, rootPath = s"abfss://$container@$account.dfs.core.windows.net/"), entityToRead, AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))) + .snapshot(Some(OffsetDateTime.now(ZoneOffset.UTC).minusDays(1))) + .map { rows => + rows.foldLeft(0L){ (agg, _) => agg + 1 } should be > 0L + } + } + case _ => cancel("Skipping test since it is not configured to run") + } +} From 1b4991de20acc98de817e5f4beacd44271e384fb Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Wed, 11 Dec 2024 16:23:28 +0100 Subject: [PATCH 06/14] Enable hour-level lookback --- .../src/main/scala/services/cdm/CdmTable.scala | 5 +++-- .../src/test/scala/models/CdmTableTests.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala index 95bd9de..a373b59 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -17,12 +17,13 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd private def getListPrefixes(startDate: Option[OffsetDateTime]): IndexedSeq[String] = val currentMoment = OffsetDateTime.now(ZoneOffset.UTC) val startMoment = startDate.getOrElse(currentMoment.minusYears(defaultFromYears)) - Iterator.iterate(startMoment)(_.plusDays(1)) + Iterator.iterate(startMoment)(_.plusHours(1)) .takeWhile(_.toEpochSecond < currentMoment.toEpochSecond) .map { moment => val monthString = s"00${moment.getMonth.getValue}".takeRight(2) val dayString = s"00${moment.getDayOfMonth}".takeRight(2) - s"${moment.getYear}-$monthString-$dayString" + val hourString = s"00${moment.getHour}".takeRight(2) + s"${moment.getYear}-$monthString-${dayString}T$hourString" }.toIndexedSeq /** diff --git a/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala index 311ebe3..9083d17 100644 --- a/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala +++ b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala @@ -32,7 +32,7 @@ class CdmTableTests extends AsyncFlatSpec with Matchers { case (Some(container), Some(account), Some(key)) => SimpleCdmModel(s"abfss://$container@$account.dfs.core.windows.net/", AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))).flatMap { model => val entityToRead = model.entities.find(v => v.name == "salesline").get CdmTable(CdmTableSettings(name = entityToRead.name, rootPath = s"abfss://$container@$account.dfs.core.windows.net/"), entityToRead, AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))) - .snapshot(Some(OffsetDateTime.now(ZoneOffset.UTC).minusDays(1))) + .snapshot(Some(OffsetDateTime.now(ZoneOffset.UTC).minusHours(6))) .map { rows => rows.foldLeft(0L){ (agg, _) => agg + 1 } should be > 0L } From 75a4326e1b9411d74e6bf8e3c89b4db507540073 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Wed, 11 Dec 2024 17:08:03 +0100 Subject: [PATCH 07/14] Add some docstrings --- .../scala/models/cdm/SimpleCdmModel.scala | 20 +++++++++++++ .../main/scala/services/cdm/CdmTable.scala | 9 ++++-- .../scala/services/cdm/CdmTableSettings.scala | 5 ++++ .../models/azure/AzureBlobStorageReader.scala | 28 +++++++++++++++++++ 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala b/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala index c14a96d..2a11aa9 100644 --- a/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala +++ b/framework/arcane-framework/src/main/scala/models/cdm/SimpleCdmModel.scala @@ -10,9 +10,22 @@ import scala.concurrent.Future import scala.util.{Failure, Success} import scala.language.implicitConversions +/** + * Attribute in Microsoft Common Data Model, simplified compared to native SDK + * @param name Attribute name + * @param dataType String literal for the attribute data type + * @param maxLength max length property - not used + */ case class SimpleCdmAttribute(name: String, dataType: String, maxLength: Int) derives ReadWriter +/** + * Entity (Table) in Microsoft Common Data Model, simplified compared to native SDK + * @param entityType CDM entity type + * @param name Entity name + * @param description Docstring for the entity + * @param attributes Entity fields + */ case class SimpleCdmEntity( @upickle.implicits.key("$type") entityType: String, @@ -22,6 +35,13 @@ case class SimpleCdmEntity( derives ReadWriter +/** + * Synapse Link container model, containing all entities enabled for the export + * @param name Model name + * @param description Docstring for the model + * @param version Model version + * @param entities Included entities + */ case class SimpleCdmModel(name: String, description: String, version: String, entities: Seq[SimpleCdmEntity]) derives ReadWriter diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala index a373b59..8bfc501 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -2,7 +2,7 @@ package com.sneaksanddata.arcane.framework package services.cdm import models.cdm.CSVParser.replaceQuotedNewlines -import models.cdm.{SimpleCdmEntity, SimpleCdmModel, given} +import models.cdm.{SimpleCdmEntity, given} import models.{ArcaneSchema, DataRow} import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader} @@ -14,6 +14,11 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd private val defaultFromYears: Int = 5 private val schema: ArcaneSchema = implicitly(entityModel) + /** + * Read top-level virtual directories to allow pre-filtering blobs + * @param startDate Baseline date to start search from + * @return A list of yyyy-MM-ddTHH prefixes to apply as filters + */ private def getListPrefixes(startDate: Option[OffsetDateTime]): IndexedSeq[String] = val currentMoment = OffsetDateTime.now(ZoneOffset.UTC) val startMoment = startDate.getOrElse(currentMoment.minusYears(defaultFromYears)) @@ -27,7 +32,7 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd }.toIndexedSeq /** - * Read a table snapshot, taking optional start time. + * Read a table snapshot, taking optional start time. Lowest precision available is 1 hour * @param startDate Folders from Synapse export to include in the snapshot, based on the start date provided. If not provided, ALL folders from now - defaultFromYears will be included * @return A stream of rows for this table */ diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala index 27f636f..f3d143a 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTableSettings.scala @@ -1,4 +1,9 @@ package com.sneaksanddata.arcane.framework package services.cdm +/** + * Settings for a CdmTable object + * @param name Name of the table + * @param rootPath HDFS-style path that includes table blob prefix, for example abfss://container@account.dfs.core.windows.net/path/to/table + */ case class CdmTableSettings(name: String, rootPath: String) diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala index b5e6794..4fd820a 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala @@ -19,6 +19,12 @@ import scala.concurrent.Future import scala.jdk.CollectionConverters.* import scala.language.implicitConversions +/** + * Blob reader implementation for Azure. Relies on the default credential chain if no added credentials are provided. + * @param accountName Storage account name + * @param tokenCredential Optional token credential provider + * @param sharedKeyCredential Optional access key credential + */ final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential]) extends BlobStorageReader[AdlsStoragePath]: private val httpMaxRetries = 3 private val httpRetryTimeout = Duration.ofSeconds(60) @@ -98,6 +104,28 @@ final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[ object AzureBlobStorageReader: // TODO: move http settings etc to apply + + /** + * Create AzureBlobStorageReader for the account using TokenCredential + * @param accountName Storage account name + * @param credential TokenCredential (accessToken provider) + * @return AzureBlobStorageReader instance + */ def apply(accountName: String, credential: TokenCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, Some(credential), None) + + /** + * Create AzureBlobStorageReader for the account using StorageSharedKeyCredential + * + * @param accountName Storage account name + * @param credential StorageSharedKeyCredential (account key) + * @return AzureBlobStorageReader instance + */ def apply(accountName: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, Some(credential)) + + /** + * Create AzureBlobStorageReader for the account using default credential chain + * + * @param accountName Storage account name + * @return AzureBlobStorageReader instance + */ def apply(accountName: String): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None) From c2b0884542845261fb5bfc45ff772180cc0d32eb Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Thu, 12 Dec 2024 14:06:39 +0100 Subject: [PATCH 08/14] Extract client settings for Azure --- .../models/azure/AzureBlobStorageReader.scala | 13 +++--------- .../AzureBlobStorageReaderSettings.scala | 21 +++++++++++++++++++ 2 files changed, 24 insertions(+), 10 deletions(-) create mode 100644 framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala index 4fd820a..c217a9c 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala @@ -25,13 +25,8 @@ import scala.language.implicitConversions * @param tokenCredential Optional token credential provider * @param sharedKeyCredential Optional access key credential */ -final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential]) extends BlobStorageReader[AdlsStoragePath]: - private val httpMaxRetries = 3 - private val httpRetryTimeout = Duration.ofSeconds(60) - private val httpMinRetryDelay = Duration.ofMillis(500) - private val httpMaxRetryDelay = Duration.ofSeconds(3) - private val maxResultsPerPage = 5000 - +final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential], settings: Option[AzureBlobStorageReaderSettings] = None) extends BlobStorageReader[AdlsStoragePath]: + private val serviceClientSettings = settings.getOrElse(AzureBlobStorageReaderSettings()) private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build() private lazy val serviceClient = val builder = (tokenCredential, sharedKeyCredential) match @@ -41,7 +36,7 @@ final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[ builder .endpoint(s"https://$accountName.blob.core.windows.net/") - .retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, httpMaxRetries, httpRetryTimeout.toSeconds.toInt, httpMinRetryDelay.toMillis, httpMaxRetryDelay.toMillis, null)) + .retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, serviceClientSettings.httpMaxRetries, serviceClientSettings.httpRetryTimeout.toSeconds.toInt, serviceClientSettings.httpMinRetryDelay.toMillis, serviceClientSettings.httpMaxRetryDelay.toMillis, null)) .buildClient() private val defaultTimeout = Duration.ofSeconds(30) @@ -103,8 +98,6 @@ final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[ )) object AzureBlobStorageReader: - // TODO: move http settings etc to apply - /** * Create AzureBlobStorageReader for the account using TokenCredential * @param accountName Storage account name diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala new file mode 100644 index 0000000..25b69e7 --- /dev/null +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala @@ -0,0 +1,21 @@ +package com.sneaksanddata.arcane.framework +package services.storage.models.azure + +import java.time.Duration +import scala.concurrent.duration.Duration + +case class AzureBlobStorageReaderSettings(httpMaxRetries: Int, httpRetryTimeout: Duration, httpMinRetryDelay: Duration, httpMaxRetryDelay: Duration, maxResultsPerPage: Int) + +object AzureBlobStorageReaderSettings: + def apply( + httpMaxRetries: Int = 3, + httpRetryTimeout: Duration = Duration.ofSeconds(60), + httpMinRetryDelay: Duration = Duration.ofMillis(500), + httpMaxRetryDelay: Duration = Duration.ofSeconds(3), + maxResultsPerPage: Int = 5000): AzureBlobStorageReaderSettings = new AzureBlobStorageReaderSettings( + httpMaxRetries = httpMaxRetries, + httpRetryTimeout = httpRetryTimeout, + httpMinRetryDelay = httpMinRetryDelay, + httpMaxRetryDelay = httpMaxRetryDelay, + maxResultsPerPage = maxResultsPerPage + ) From 8347633e98f86550a55a4e72f6201a3c2d0b3468 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Thu, 12 Dec 2024 16:41:43 +0100 Subject: [PATCH 09/14] WIP full e2e with azurite --- .../arcane-framework/docker-compose.yaml | 24 +- .../populate-cdm-container.py | 665 ++++++++++++++++++ .../main/scala/services/cdm/CdmTable.scala | 9 +- .../models/azure/AzureBlobStorageReader.scala | 24 +- .../AzureBlobStorageReaderSettings.scala | 1 - .../src/test/scala/models/CdmTableTests.scala | 37 +- 6 files changed, 735 insertions(+), 25 deletions(-) create mode 100644 framework/arcane-framework/populate-cdm-container.py diff --git a/framework/arcane-framework/docker-compose.yaml b/framework/arcane-framework/docker-compose.yaml index 8ba7cc9..478aae1 100644 --- a/framework/arcane-framework/docker-compose.yaml +++ b/framework/arcane-framework/docker-compose.yaml @@ -92,4 +92,26 @@ services: condition: service_healthy volumes: - ./create-polaris-catalog.sh:/create-polaris-catalog.sh - command: ["/bin/sh", "/create-polaris-catalog.sh"] \ No newline at end of file + command: ["/bin/sh", "/create-polaris-catalog.sh"] + azurite: + image: mcr.microsoft.com/azure-storage/azurite + restart: always + networks: + mesh: + ipv4_address: 10.1.0.6 + command: + - azurite-blob + - "--blobHost" + - "10.1.0.6" + ports: + - "10000:10000" + create-cdm-container: + image: python:3.11-slim-bookworm + depends_on: + - azurite + networks: + mesh: + ipv4_address: 10.1.0.7 + volumes: + - ./populate-cdm-container.py:/populate-cdm-container.py + command: [ "/bin/sh", "-c", "pip install azure-storage-blob requests && python /populate-cdm-container.py" ] \ No newline at end of file diff --git a/framework/arcane-framework/populate-cdm-container.py b/framework/arcane-framework/populate-cdm-container.py new file mode 100644 index 0000000..276a531 --- /dev/null +++ b/framework/arcane-framework/populate-cdm-container.py @@ -0,0 +1,665 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from azure.storage.blob import BlobServiceClient +import os + +CONTENT = """50bff458-d47a-4924-804b-31c0a83108e6,"1/1/2020 0:00:00 PM","1/1/2020 0:00:00 PM",0,1111000000,1111000010,"F1234567",1,,"2020-01-01T00:15:00.0000000Z","acc1",111111110,"2020-01-01T00:15:00.0000000Z","acc1",0,"dat",1,1111000001,2111000001,1111000001,21111,2111000001,"2020-01-01T00:15:00.0000000+00:00","2020-01-01T00:15:00.0000000Z", +5b4bc74e-2132-4d8e-8572-48ce4260f182,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000001,1111000011,"F1234568",1,,"2020-01-01T00:16:00.0000000Z","acc2",111111111,"2020-01-01T00:16:00.0000000Z","acc2",0,"dat",1,1111000002,2111000002,1111000001,21111,2111000001,"2020-01-01T00:16:00.0000000+00:00","2020-01-01T00:16:00.0000000Z", +aae2094d-cd17-42b4-891e-3b268e2b6713,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000002,1111000012,"F1234569",1,,"2020-01-01T00:17:00.0000000Z","acc2",111111112,"2020-01-01T00:17:00.0000000Z","acc2",0,"dat",1,1111000003,2111000003,1111000001,21111,2111000001,"2020-01-01T00:17:00.0000000+00:00","2020-01-01T00:17:00.0000000Z", +9633be9a-c485-4afa-8bb7-4ba380eaa206,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000003,1111000013,"F1234578",1,,"2020-01-01T00:18:00.0000000Z","acc1",111111113,"2020-01-01T00:18:00.0000000Z","acc1",0,"dat",1,1111000004,2111000004,1111000001,21111,2111000001,"2020-01-01T00:18:00.0000000+00:00","2020-01-01T00:18:00.0000000Z", +b62c7b67-b8f8-4635-8cef-1c23591d5c4c,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000004,1111000014,"F1234511",1,,"2020-01-01T00:19:00.0000000Z","acc2",111111114,"2020-01-01T00:19:00.0000000Z","acc2",0,"dat",1,1111000005,2111000005,1111000001,21111,2111000001,"2020-01-01T00:19:00.0000000+00:00","2020-01-01T00:19:00.0000000Z", +""" + +MODEL_JSON = """{ + "name": "cdm", + "description": "cdm", + "version": "1.0", + "entities": [ + { + "$type": "LocalEntity", + "name": "currency", + "description": "currency", + "annotations": [ + { + "name": "Athena:PartitionGranularity", + "value": "Year" + }, + { + "name": "Athena:InitialSyncState", + "value": "Completed" + }, + { + "name": "Athena:InitialSyncDataCompletedTime", + "value": "1/1/2020 0:00:00 PM" + } + ], + "attributes": [ + { + "name": "Id", + "dataType": "guid", + "maxLength": -1 + }, + { + "name": "SinkCreatedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "SinkModifiedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "iseuro", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypeassetdep_jp", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypeprice", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypepurch", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypesales", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "ltmroundofftypelineamount", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "sysdatastatecode", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "currencycode", + "dataType": "string", + "maxLength": 3, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 3 + } + ] + } + ] + }, + { + "name": "currencycodeiso", + "dataType": "string", + "maxLength": 3, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 3 + } + ] + } + ] + }, + { + "name": "roundingprecision", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffassetdep_jp", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffprice", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffpurch", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffsales", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "symbol", + "dataType": "string", + "maxLength": 5, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 5 + } + ] + } + ] + }, + { + "name": "txt", + "dataType": "string", + "maxLength": 120, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 120 + } + ] + } + ] + }, + { + "name": "exchratemaxvariationpercent_mx", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "decimalscount_mx", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "ltmroundofflineamount", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "modifieddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "modifiedby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "modifiedtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "createdby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "createdtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dataareaid", + "dataType": "string", + "maxLength": 4, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 4 + } + ] + } + ] + }, + { + "name": "recversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "partition", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "sysrowversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "recid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "tableid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "versionnumber", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createdon", + "dataType": "dateTimeOffset", + "maxLength": -1 + }, + { + "name": "modifiedon", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "IsDelete", + "dataType": "boolean", + "maxLength": -1 + } + ], + "partitions": [] + }, + { + "$type": "LocalEntity", + "name": "dimensionattributelevelvalue", + "description": "dimensionattributelevelvalue", + "annotations": [ + { + "name": "Athena:PartitionGranularity", + "value": "Year" + }, + { + "name": "Athena:InitialSyncState", + "value": "Completed" + }, + { + "name": "Athena:InitialSyncDataCompletedTime", + "value": "1/1/2020 0:00:00 PM" + } + ], + "attributes": [ + { + "name": "Id", + "dataType": "guid", + "maxLength": -1 + }, + { + "name": "SinkCreatedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "SinkModifiedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "sysdatastatecode", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dimensionattributevalue", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dimensionattributevaluegroup", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "displayvalue", + "dataType": "string", + "maxLength": 30, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 30 + } + ] + } + ] + }, + { + "name": "ordinal", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "backingrecorddataareaid", + "dataType": "string", + "maxLength": 4, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 4 + } + ] + } + ] + }, + { + "name": "modifieddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "modifiedby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "modifiedtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "createdby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "createdtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dataareaid", + "dataType": "string", + "maxLength": 4, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 4 + } + ] + } + ] + }, + { + "name": "recversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "partition", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "sysrowversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "recid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "tableid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "versionnumber", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createdon", + "dataType": "dateTimeOffset", + "maxLength": -1 + }, + { + "name": "modifiedon", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "IsDelete", + "dataType": "boolean", + "maxLength": -1 + } + ], + "partitions": [] + } + ] + }""" + +AZURITE_CONNECTION_STRING='DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://10.1.0.6:10000/devstoreaccount1' +CONTAINER = "cdm-e2e" +FOLDERS = [ + "2020-01-01T00.15.12Z", + "2020-01-01T00.26.42Z", + "2020-01-01T00.34.31Z", + "2020-01-01T01.12.48Z", + "2020-01-01T02.05.38Z", + "2020-01-02T01.05.38Z", + "2020-02-01T01.05.38Z" +] + +def upload_blob_file(blob_service_client: BlobServiceClient, container_name: str, blob_name: str, content: str): + blob_service_client.get_container_client(container=container_name).upload_blob(name=blob_name, data=content.encode('utf-8'), overwrite=True) + +def create_container(): + # Create a container for Azurite for the first run + blob_service_client = BlobServiceClient.from_connection_string(AZURITE_CONNECTION_STRING) + try: + blob_service_client.create_container(CONTAINER) + except Exception as e: + print(e) + +def create_blobs(): + blob_service_client = BlobServiceClient.from_connection_string(AZURITE_CONNECTION_STRING) + for folder in FOLDERS: + upload_blob_file(blob_service_client, CONTAINER, f"{folder}/dimensionattributelevelvalue/2020.csv", CONTENT) + + upload_blob_file(blob_service_client, CONTAINER, "model.json", MODEL_JSON) + +create_container() +create_blobs() diff --git a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala index 8bfc501..c8c8a99 100644 --- a/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala +++ b/framework/arcane-framework/src/main/scala/services/cdm/CdmTable.scala @@ -19,8 +19,8 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd * @param startDate Baseline date to start search from * @return A list of yyyy-MM-ddTHH prefixes to apply as filters */ - private def getListPrefixes(startDate: Option[OffsetDateTime]): IndexedSeq[String] = - val currentMoment = OffsetDateTime.now(ZoneOffset.UTC) + private def getListPrefixes(startDate: Option[OffsetDateTime], endDate: Option[OffsetDateTime] = None): IndexedSeq[String] = + val currentMoment = endDate.getOrElse(OffsetDateTime.now(ZoneOffset.UTC)) val startMoment = startDate.getOrElse(currentMoment.minusYears(defaultFromYears)) Iterator.iterate(startMoment)(_.plusHours(1)) .takeWhile(_.toEpochSecond < currentMoment.toEpochSecond) @@ -34,11 +34,12 @@ class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCd /** * Read a table snapshot, taking optional start time. Lowest precision available is 1 hour * @param startDate Folders from Synapse export to include in the snapshot, based on the start date provided. If not provided, ALL folders from now - defaultFromYears will be included + * @param endDate Date to stop at when looking for prefixes. In production use None for this value to always look data up to current moment. * @return A stream of rows for this table */ - def snapshot(startDate: Option[OffsetDateTime] = None): Future[LazyList[DataRow]] = + def snapshot(startDate: Option[OffsetDateTime] = None, endDate: Option[OffsetDateTime] = None): Future[LazyList[DataRow]] = // list all matching blobs - Future.sequence(getListPrefixes(startDate) + Future.sequence(getListPrefixes(startDate, endDate) .flatMap(prefix => reader.listPrefixes(storagePath + prefix)) .flatMap(prefix => reader.listBlobs(storagePath + prefix.name + name)) // exclude any files other than CSV diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala index c217a9c..e84b705 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReader.scala @@ -25,7 +25,7 @@ import scala.language.implicitConversions * @param tokenCredential Optional token credential provider * @param sharedKeyCredential Optional access key credential */ -final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential], settings: Option[AzureBlobStorageReaderSettings] = None) extends BlobStorageReader[AdlsStoragePath]: +final class AzureBlobStorageReader(accountName: String, endpoint: Option[String], tokenCredential: Option[TokenCredential], sharedKeyCredential: Option[StorageSharedKeyCredential], settings: Option[AzureBlobStorageReaderSettings] = None) extends BlobStorageReader[AdlsStoragePath]: private val serviceClientSettings = settings.getOrElse(AzureBlobStorageReaderSettings()) private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build() private lazy val serviceClient = @@ -35,7 +35,7 @@ final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[ case (None, None) => new BlobServiceClientBuilder().credential(defaultCredential) builder - .endpoint(s"https://$accountName.blob.core.windows.net/") + .endpoint(endpoint.getOrElse("https://$accountName.blob.core.windows.net/")) .retryOptions(RequestRetryOptions(RetryPolicyType.EXPONENTIAL, serviceClientSettings.httpMaxRetries, serviceClientSettings.httpRetryTimeout.toSeconds.toInt, serviceClientSettings.httpMinRetryDelay.toMillis, serviceClientSettings.httpMaxRetryDelay.toMillis, null)) .buildClient() @@ -77,7 +77,7 @@ final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[ val client = getBlobContainerClient(rootPrefix) val listOptions = new ListBlobsOptions() .setPrefix(rootPrefix.blobPrefix) - .setMaxResultsPerPage(maxResultsPerPage) + .setMaxResultsPerPage(serviceClientSettings.maxResultsPerPage) LazyList.from(getPage( None, @@ -89,7 +89,7 @@ final class AzureBlobStorageReader(accountName: String, tokenCredential: Option[ val client = getBlobContainerClient(blobPath) val listOptions = new ListBlobsOptions() .setPrefix(blobPath.blobPrefix) - .setMaxResultsPerPage(maxResultsPerPage) + .setMaxResultsPerPage(serviceClientSettings.maxResultsPerPage) LazyList.from(getPage( None, @@ -104,7 +104,7 @@ object AzureBlobStorageReader: * @param credential TokenCredential (accessToken provider) * @return AzureBlobStorageReader instance */ - def apply(accountName: String, credential: TokenCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, Some(credential), None) + def apply(accountName: String, credential: TokenCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, Some(credential), None) /** * Create AzureBlobStorageReader for the account using StorageSharedKeyCredential @@ -113,12 +113,22 @@ object AzureBlobStorageReader: * @param credential StorageSharedKeyCredential (account key) * @return AzureBlobStorageReader instance */ - def apply(accountName: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, Some(credential)) + def apply(accountName: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None, Some(credential)) + /** + * Create AzureBlobStorageReader for the account using StorageSharedKeyCredential and custom endpoint + * + * @param accountName Storage account name + * @param endpoint Storage account endpoint + * @param credential StorageSharedKeyCredential (account key) + * @return AzureBlobStorageReader instance + */ + def apply(accountName: String, endpoint: String, credential: StorageSharedKeyCredential): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, Some(endpoint), None, Some(credential)) + /** * Create AzureBlobStorageReader for the account using default credential chain * * @param accountName Storage account name * @return AzureBlobStorageReader instance */ - def apply(accountName: String): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None) + def apply(accountName: String): AzureBlobStorageReader = new AzureBlobStorageReader(accountName, None, None, None) diff --git a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala index 25b69e7..bdd318a 100644 --- a/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala +++ b/framework/arcane-framework/src/main/scala/services/storage/models/azure/AzureBlobStorageReaderSettings.scala @@ -2,7 +2,6 @@ package com.sneaksanddata.arcane.framework package services.storage.models.azure import java.time.Duration -import scala.concurrent.duration.Duration case class AzureBlobStorageReaderSettings(httpMaxRetries: Int, httpRetryTimeout: Duration, httpMinRetryDelay: Duration, httpMaxRetryDelay: Duration, maxResultsPerPage: Int) diff --git a/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala index 9083d17..d2d78c5 100644 --- a/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala +++ b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala @@ -6,37 +6,50 @@ import services.storage.models.azure.AzureBlobStorageReader import com.azure.storage.common.StorageSharedKeyCredential import services.cdm.{CdmTable, CdmTableSettings} + import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers.should +import org.scalatest.prop.Tables.Table +import org.scalatest.prop.TableDrivenPropertyChecks.* import java.time.{OffsetDateTime, ZoneOffset} class CdmTableTests extends AsyncFlatSpec with Matchers { private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + private val integrationTestEndpoint = sys.env.get("ARCANE_FRAMEWORK__STORAGE_ENDPOINT") private val integrationTestContainer = sys.env.get("ARCANE_FRAMEWORK__STORAGE_CONTAINER") private val integrationTestAccount = sys.env.get("ARCANE_FRAMEWORK__STORAGE_ACCOUNT") private val integrationTestAccessKey = sys.env.get("ARCANE_FRAMEWORK__STORAGE_ACCESS_KEY") + private val integrationTestTableName = sys.env.get("ARCANE_FRAMEWORK__CDM_TEST_TABLE") + + private val scanPeriods = Table( + ("start", "end", "expected_rows"), + (OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).minusHours(6), OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), 0), + (OffsetDateTime.of(2020, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC).minusHours(1), OffsetDateTime.of(2020, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC), 16) + ) it should "read model schemas from Synapse Link blob storage" in { - (integrationTestContainer, integrationTestAccount, integrationTestAccessKey) match - case (Some(container), Some(account), Some(key)) => SimpleCdmModel(s"abfss://$container@$account.dfs.core.windows.net/", AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))).map { model => + (integrationTestContainer, integrationTestAccount, integrationTestAccessKey, integrationTestEndpoint) match + case (Some(container), Some(account), Some(key), Some(endpoint)) => SimpleCdmModel(s"abfss://$container@$account.dfs.core.windows.net/", AzureBlobStorageReader(account, endpoint, StorageSharedKeyCredential(account, key))).map { model => model.entities.size should be > 0 } case _ => cancel("Skipping test since it is not configured to run") } it should "read a CDM table from Synapse Link blob storage" in { - (integrationTestContainer, integrationTestAccount, integrationTestAccessKey) match - case (Some(container), Some(account), Some(key)) => SimpleCdmModel(s"abfss://$container@$account.dfs.core.windows.net/", AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))).flatMap { model => - val entityToRead = model.entities.find(v => v.name == "salesline").get - CdmTable(CdmTableSettings(name = entityToRead.name, rootPath = s"abfss://$container@$account.dfs.core.windows.net/"), entityToRead, AzureBlobStorageReader(account, StorageSharedKeyCredential(account, key))) - .snapshot(Some(OffsetDateTime.now(ZoneOffset.UTC).minusHours(6))) - .map { rows => - rows.foldLeft(0L){ (agg, _) => agg + 1 } should be > 0L - } - } - case _ => cancel("Skipping test since it is not configured to run") + forAll (scanPeriods) { (startDate, endDate, expectedRows) => + (integrationTestContainer, integrationTestAccount, integrationTestAccessKey, integrationTestTableName, integrationTestEndpoint) match + case (Some(container), Some(account), Some(key), Some(tableName), Some(endpoint)) => SimpleCdmModel(s"abfss://$container@$account.dfs.core.windows.net/", AzureBlobStorageReader(account, endpoint, StorageSharedKeyCredential(account, key))).flatMap { model => + val entityToRead = model.entities.find(v => v.name == tableName).get + CdmTable(CdmTableSettings(name = entityToRead.name, rootPath = s"abfss://$container@$account.dfs.core.windows.net/"), entityToRead, AzureBlobStorageReader(account, endpoint, StorageSharedKeyCredential(account, key))) + .snapshot(Some(startDate), Some(endDate)) + .map { rows => + rows.foldLeft(0L) { (agg, _) => agg + 1 } should equal(expectedRows) + } + } + case _ => cancel("Skipping test since it is not configured to run") + } } } From 34094be4afaeb097c37446ae2b0b7ad5b7f12fa9 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Thu, 12 Dec 2024 16:46:02 +0100 Subject: [PATCH 10/14] All tests pass --- .../src/test/scala/models/CdmTableTests.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala index d2d78c5..f8c1ac0 100644 --- a/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala +++ b/framework/arcane-framework/src/test/scala/models/CdmTableTests.scala @@ -27,7 +27,11 @@ class CdmTableTests extends AsyncFlatSpec with Matchers { private val scanPeriods = Table( ("start", "end", "expected_rows"), (OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).minusHours(6), OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), 0), - (OffsetDateTime.of(2020, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC).minusHours(1), OffsetDateTime.of(2020, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC), 16) + (OffsetDateTime.of(2020, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC).minusHours(1), OffsetDateTime.of(2020, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC), 15), + (OffsetDateTime.of(2020, 1, 1, 2, 0, 0, 0, ZoneOffset.UTC).minusHours(2), OffsetDateTime.of(2020, 1, 1, 2, 0, 0, 0, ZoneOffset.UTC), 20), + (OffsetDateTime.of(2020, 1, 1, 3, 0, 0, 0, ZoneOffset.UTC).minusHours(3), OffsetDateTime.of(2020, 1, 1, 3, 0, 0, 0, ZoneOffset.UTC), 25), + (OffsetDateTime.of(2020, 1, 2, 2, 0, 0, 0, ZoneOffset.UTC).minusDays(3), OffsetDateTime.of(2020, 1, 2, 2, 0, 0, 0, ZoneOffset.UTC), 30), + (OffsetDateTime.of(2020, 2, 1, 1, 0, 0, 0, ZoneOffset.UTC).minusMonths(2), OffsetDateTime.of(2020, 2, 1, 2, 0, 0, 0, ZoneOffset.UTC), 35) ) it should "read model schemas from Synapse Link blob storage" in { From 8db93c9a37256cc4b06a8e4ce97d01f1c65617a8 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Thu, 12 Dec 2024 16:48:33 +0100 Subject: [PATCH 11/14] Add env vars for azurite --- framework/arcane-framework/unit-tests.env | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/framework/arcane-framework/unit-tests.env b/framework/arcane-framework/unit-tests.env index 40f3486..a8b330a 100644 --- a/framework/arcane-framework/unit-tests.env +++ b/framework/arcane-framework/unit-tests.env @@ -5,3 +5,8 @@ ARCANE_FRAMEWORK__S3_CATALOG_AUTH_CLIENT_URI=http://localhost:8181/api/catalog/v ARCANE_FRAMEWORK__S3_CATALOG_AUTH_SCOPE=PRINCIPAL_ROLE:ALL ARCANE_FRAMEWORK__S3_CATALOG_ENDPOINT=http://localhost:9000 AWS_REGION=us-east-1 +ARCANE_FRAMEWORK__CDM_TEST_TABLE=dimensionattributelevelvalue +ARCANE_FRAMEWORK__STORAGE_ACCESS_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== +ARCANE_FRAMEWORK__STORAGE_ACCOUNT=devstoreaccount1 +ARCANE_FRAMEWORK__STORAGE_CONTAINER=cdm-e2e +ARCANE_FRAMEWORK__STORAGE_ENDPOINT=http://localhost:10000/devstoreaccount1 From 2e2f7bdcf442448ec17fbdbe2b3ea7d8d8402d19 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Thu, 12 Dec 2024 17:08:45 +0100 Subject: [PATCH 12/14] Prevent port overlap --- framework/arcane-framework/docker-compose.yaml | 2 +- framework/arcane-framework/populate-cdm-container.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/arcane-framework/docker-compose.yaml b/framework/arcane-framework/docker-compose.yaml index 478aae1..0d2b1cc 100644 --- a/framework/arcane-framework/docker-compose.yaml +++ b/framework/arcane-framework/docker-compose.yaml @@ -104,7 +104,7 @@ services: - "--blobHost" - "10.1.0.6" ports: - - "10000:10000" + - "10001:10001" create-cdm-container: image: python:3.11-slim-bookworm depends_on: diff --git a/framework/arcane-framework/populate-cdm-container.py b/framework/arcane-framework/populate-cdm-container.py index 276a531..482a087 100644 --- a/framework/arcane-framework/populate-cdm-container.py +++ b/framework/arcane-framework/populate-cdm-container.py @@ -631,7 +631,7 @@ ] }""" -AZURITE_CONNECTION_STRING='DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://10.1.0.6:10000/devstoreaccount1' +AZURITE_CONNECTION_STRING='DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://10.1.0.6:10001/devstoreaccount1' CONTAINER = "cdm-e2e" FOLDERS = [ "2020-01-01T00.15.12Z", From 57ad425ef0cc42e94b27aa647bcedc37e52b4952 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Thu, 12 Dec 2024 17:11:57 +0100 Subject: [PATCH 13/14] Prevent port overlap --- framework/arcane-framework/unit-tests.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/arcane-framework/unit-tests.env b/framework/arcane-framework/unit-tests.env index a8b330a..5cdba52 100644 --- a/framework/arcane-framework/unit-tests.env +++ b/framework/arcane-framework/unit-tests.env @@ -9,4 +9,4 @@ ARCANE_FRAMEWORK__CDM_TEST_TABLE=dimensionattributelevelvalue ARCANE_FRAMEWORK__STORAGE_ACCESS_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== ARCANE_FRAMEWORK__STORAGE_ACCOUNT=devstoreaccount1 ARCANE_FRAMEWORK__STORAGE_CONTAINER=cdm-e2e -ARCANE_FRAMEWORK__STORAGE_ENDPOINT=http://localhost:10000/devstoreaccount1 +ARCANE_FRAMEWORK__STORAGE_ENDPOINT=http://localhost:10001/devstoreaccount1 From 8915e2b03bb676814a7697cd5b48241e95618077 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Mon, 16 Dec 2024 17:13:04 +0100 Subject: [PATCH 14/14] Fix compose service port for Azurite --- framework/arcane-framework/docker-compose.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/framework/arcane-framework/docker-compose.yaml b/framework/arcane-framework/docker-compose.yaml index 0d2b1cc..d8e3e79 100644 --- a/framework/arcane-framework/docker-compose.yaml +++ b/framework/arcane-framework/docker-compose.yaml @@ -103,6 +103,8 @@ services: - azurite-blob - "--blobHost" - "10.1.0.6" + - "--blobPort" + - "10001" ports: - "10001:10001" create-cdm-container: