Skip to content

Commit

Permalink
WIP CDM table read
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko committed Dec 10, 2024
1 parent 29023d6 commit c3fd952
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.sneaksanddata.arcane.framework
package services.cdm

import models.cdm.CSVParser.isComplete
import models.cdm.{SimpleCdmEntity, given}
import models.{ArcaneSchema, DataRow}
import services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader}

import java.time.{OffsetDateTime, ZoneOffset}
import scala.concurrent.Future

class CdmTable(name: String, storagePath: AdlsStoragePath, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader):
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
private val defaultFromYears: Int = 5
private val schema: ArcaneSchema = implicitly(entityModel)

private def getListPrefixes(fromYears: Option[Int]): IndexedSeq[String] =
val currentMoment = OffsetDateTime.now(ZoneOffset.UTC)
val fromMoment = currentMoment.minusYears(fromYears.getOrElse(defaultFromYears))
Range.inclusive(
fromMoment.getYear,
currentMoment.getYear
).flatMap(year => Range.inclusive(
1,
12
).map{ m =>
val mon = s"00$m".takeRight(2)
s"$year-$mon-"
})

/**
* Read a table snapshot, taking optional start time.
* @param fromYears Folders from Synapse export to include in the snapshot. If not provided, ALL folders will be included
* @return A stream of rows for this table
*/
def snapshot(fromYears: Option[Int]): Future[LazyList[DataRow]] =
// list all matching blobs
Future.sequence(getListPrefixes(fromYears)
.flatMap(prefix => reader.listBlobs(storagePath + prefix))
.map(blob => reader.getBlobContent(storagePath + blob.name, _.map(_.toChar).mkString)))
.map(_.flatMap(content => content.split('\n').foldLeft((Seq.empty[String], "")) { (agg, value) =>
if isComplete(agg._2) then
(agg._1 :+ agg._2, "")
else
(agg._1, agg._2 + value)
}._1.map(implicitly[DataRow](_, schema))))
.map(LazyList.from)

object CdmTable:
def apply(settings: CdmTableSettings, entityModel: SimpleCdmEntity, reader: AzureBlobStorageReader): CdmTable = new CdmTable(
name = settings.name,
storagePath = AdlsStoragePath(settings.rootPath).get,
entityModel = entityModel,
reader = reader
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.sneaksanddata.arcane.framework
package services.cdm

case class CdmTableSettings(name: String, rootPath: String)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import services.storage.models.azure.AzureModelConversions.given

import scala.jdk.CollectionConverters.*
import scala.language.implicitConversions
import com.azure.storage.blob.models.ListBlobsOptions
import com.azure.storage.blob.models.{BlobListDetails, ListBlobsOptions}
import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}

import java.time.Duration
Expand All @@ -22,6 +22,7 @@ final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]:
private val httpRetryTimeout = Duration.ofSeconds(60)
private val httpMinRetryDelay = Duration.ofMillis(500)
private val httpMaxRetryDelay = Duration.ofSeconds(3)
private val maxResultsPerPage = 1000

private lazy val defaultCredential = new DefaultAzureCredentialBuilder().build()
private lazy val serviceClient = new BlobServiceClientBuilder()
Expand All @@ -43,7 +44,9 @@ final class AzureBlobStorageReader extends BlobStorageReader[AdlsStoragePath]:

def listBlobs(blobPath: AdlsStoragePath): LazyList[StoredBlob] =
val client = getBlobContainerClient(blobPath)
val listOptions = new ListBlobsOptions().setPrefix(blobPath.blobPrefix)
val listOptions = new ListBlobsOptions()
.setPrefix(blobPath.blobPrefix)
.setMaxResultsPerPage(maxResultsPerPage)

@tailrec
def getPage(pageToken: Option[String], result: Iterable[StoredBlob]): Iterable[StoredBlob] =
Expand Down

0 comments on commit c3fd952

Please sign in to comment.