Skip to content

Commit

Permalink
Integrate Iceberg Writer with StreamGraphBuilder (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy authored Dec 4, 2024
1 parent 7d40ab2 commit bd18a90
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ trait S3CatalogFileIO extends CatalogFileIO:
* Singleton for S3CatalogFileIO
*/
object S3CatalogFileIO extends S3CatalogFileIO:
override val accessKeyId: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID", "")
override val secretAccessKey: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_SECRET_ACCESS_KEY", "")
override val accessKeyId: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID", "")
override val endpoint: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_ENDPOINT", "")
override val region: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_REGION", "us-east-1")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import org.apache.iceberg.parquet.Parquet
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList
import org.apache.iceberg.rest.RESTCatalog
import org.apache.iceberg.{CatalogProperties, PartitionSpec, Schema, Table}
import zio.{ZIO, ZLayer}

import java.util.UUID
import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}

/**
* Converts an Arcane schema to an Iceberg schema.
*/
given Conversion[ArcaneSchema, Schema] with
def apply(schema: ArcaneSchema): Schema = SchemaConversions.toIcebergSchema(schema)

// https://www.tabular.io/blog/java-api-part-3/
class IcebergS3CatalogWriter(
namespace: String,
Expand Down Expand Up @@ -106,14 +110,53 @@ class IcebergS3CatalogWriter(
Future(catalog.loadTable(tableId)).flatMap(appendData(data, schema, false))

object IcebergS3CatalogWriter:
/**
* The ZLayer that creates the LazyOutputDataProcessor.
*/
val layer: ZLayer[IcebergCatalogSettings, Nothing, CatalogWriter[RESTCatalog, Table, Schema]] =
ZLayer {
for
settings <- ZIO.service[IcebergCatalogSettings]
yield IcebergS3CatalogWriter(settings)
}

/**
* Factory method to create IcebergS3CatalogWriter
* @param namespace The namespace for the catalog
* @param warehouse The warehouse location
* @param catalogUri The catalog URI
* @param additionalProperties Additional properties for the catalog
* @param s3CatalogFileIO The S3 catalog file IO settings
* @param locationOverride The location override for the catalog
* @return The initialized IcebergS3CatalogWriter instance
*/
def apply(namespace: String,
warehouse: String,
catalogUri: String,
additionalProperties: Map[String, String],
s3CatalogFileIO: S3CatalogFileIO,
locationOverride: Option[String]): IcebergS3CatalogWriter =
val writer = new IcebergS3CatalogWriter(
namespace = namespace,
warehouse = warehouse,
catalogUri = catalogUri,
additionalProperties = additionalProperties,
s3CatalogFileIO = s3CatalogFileIO,
locationOverride = locationOverride,
)
writer.initialize()

/**
* Factory method to create IcebergS3CatalogWriter
* @param icebergSettings Iceberg settings
* @return The initialized IcebergS3CatalogWriter instance
*/
def apply(icebergSettings: IcebergCatalogSettings): IcebergS3CatalogWriter =
val writer =
new IcebergS3CatalogWriter(
IcebergS3CatalogWriter(
namespace = icebergSettings.namespace,
warehouse = icebergSettings.warehouse,
catalogUri = icebergSettings.catalogUri,
additionalProperties = icebergSettings.additionalProperties,
s3CatalogFileIO = icebergSettings.s3CatalogFileIO,
locationOverride = icebergSettings.locationOverride,
locationOverride = icebergSettings.stagingLocation,
)
writer.initialize()
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ trait IcebergCatalogSettings:
/**
* The lakehouse location of the catalog
*/
val locationOverride: Option[String]
val stagingLocation: Option[String]

Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ object MsSqlConnection:
/**
* The ZLayer that creates the MsSqlDataProvider.
*/
val layer: ZLayer[ConnectionOptions, Nothing, MsSqlConnection] =
val layer: ZLayer[ConnectionOptions, Nothing, MsSqlConnection & SchemaProvider[ArcaneSchema]] =
ZLayer.scoped {
ZIO.fromAutoCloseable{
for connectionOptions <- ZIO.service[ConnectionOptions] yield MsSqlConnection(connectionOptions)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.sneaksanddata.arcane.framework
package services.streaming

import models.app.StreamContext
import models.{ArcaneSchema, DataRow}
import services.base.SchemaProvider
import services.lakehouse.{CatalogWriter, given_Conversion_ArcaneSchema_Schema}
import services.streaming.IcebergConsumer.getTableName
import services.streaming.base.BatchConsumer

import org.apache.iceberg.rest.RESTCatalog
import org.apache.iceberg.{Schema, Table}
import org.slf4j.{Logger, LoggerFactory}
import zio.stream.{ZPipeline, ZSink}
import zio.{Chunk, Task, ZIO, ZLayer}

import java.time.format.DateTimeFormatter
import java.time.{ZoneOffset, ZonedDateTime}

/**
* A consumer that writes the data to the staging table.
*
* @param streamContext The stream context.
* @param catalogWriter The catalog writer.
* @param schemaProvider The schema provider.
*/
class IcebergConsumer(streamContext: StreamContext,
catalogWriter: CatalogWriter[RESTCatalog, Table, Schema],
schemaProvider: SchemaProvider[ArcaneSchema]) extends BatchConsumer[Chunk[DataRow]]:

private val logger: Logger = LoggerFactory.getLogger(classOf[IcebergConsumer])

/**
* Returns the sink that consumes the batch.
*
* @return ZSink (stream sink for the stream graph).
*/
def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] =
writeStagingTable >>> logResults


private def logResults: ZSink[Any, Throwable, Table, Nothing, Unit] = ZSink.foreach { e =>
logger.info(s"Received the table ${e.name()} from the streaming source")
ZIO.unit
}

private def writeStagingTable: ZPipeline[Any, Throwable, Chunk[DataRow], Table] = ZPipeline[Chunk[DataRow]]()
.mapAccum(0L) { (acc, chunk) => (acc + 1, (chunk, acc.getTableName(streamContext.streamId))) }
.mapZIO({
case (rows, tableName) => writeWithWriter(rows, tableName)
})


private def writeWithWriter(rows: Chunk[DataRow], name: String): Task[Table] =
for
schema <- ZIO.fromFuture(implicit ec => schemaProvider.getSchema)
table <- ZIO.fromFuture(implicit ec => catalogWriter.write(rows, name, schema))
yield table

object IcebergConsumer:
val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")

extension (batchNumber: Long) def getTableName(streamId: String): String =
s"$streamId-${ZonedDateTime.now(ZoneOffset.UTC).format(formatter)}-$batchNumber"


/**
* Factory method to create IcebergConsumer
*
* @param streamContext The stream context.
* @param catalogWriter The catalog writer.
* @param schemaProvider The schema provider.
* @return The initialized IcebergConsumer instance
*/
def apply(streamContext: StreamContext,
catalogWriter: CatalogWriter[RESTCatalog, Table, Schema],
schemaProvider: SchemaProvider[ArcaneSchema]): IcebergConsumer =
new IcebergConsumer(streamContext, catalogWriter, schemaProvider)

/**
* The required environment for the IcebergConsumer.
*/
type Environment = SchemaProvider[ArcaneSchema] & CatalogWriter[RESTCatalog, Table, Schema] & StreamContext

/**
* The ZLayer that creates the IcebergConsumer.
*/
val layer: ZLayer[Environment, Nothing, IcebergConsumer] =
ZLayer {
for
streamContext <- ZIO.service[StreamContext]
catalogWriter <- ZIO.service[CatalogWriter[RESTCatalog, Table, Schema]]
schemaProvider <- ZIO.service[SchemaProvider[ArcaneSchema]]
yield IcebergConsumer(streamContext, catalogWriter, schemaProvider)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import models.settings.VersionedDataGraphBuilderSettings
import services.app.base.StreamLifetimeService
import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch}
import services.mssql.given_HasVersion_VersionedBatch
import services.streaming.base.{BatchProcessor, StreamGraphBuilder, VersionedDataProvider}
import services.streaming.base.{BatchConsumer, BatchProcessor, StreamGraphBuilder, VersionedDataProvider}

import org.slf4j.{Logger, LoggerFactory}
import zio.stream.{ZSink, ZStream}
import zio.{Chunk, Schedule, ZIO, ZLayer}
import zio.{Chunk, Schedule, ZIO}

/**
* The stream graph builder that reads the changes from the database.
Expand All @@ -22,7 +22,8 @@ import zio.{Chunk, Schedule, ZIO, ZLayer}
class VersionedDataGraphBuilder(versionedDataGraphBuilderSettings: VersionedDataGraphBuilderSettings,
versionedDataProvider: VersionedDataProvider[Long, VersionedBatch],
streamLifetimeService: StreamLifetimeService,
batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]])
batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]],
batchConsumer: BatchConsumer[Chunk[DataRow]])
extends StreamGraphBuilder:

private val logger: Logger = LoggerFactory.getLogger(classOf[VersionedDataGraphBuilder])
Expand All @@ -40,11 +41,7 @@ class VersionedDataGraphBuilder(versionedDataGraphBuilderSettings: VersionedData
*
* @return ZStream (stream source for the stream graph).
*/
override def consume: ZSink[Any, Throwable, StreamElementType, Any, Unit] =
ZSink.foreach { e =>
logger.info(s"Received ${e.size} rows from the streaming source")
ZIO.unit
}
override def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = batchConsumer.consume

private def createStream = ZStream
.unfoldZIO(versionedDataProvider.firstVersion) { previousVersion =>
Expand All @@ -71,6 +68,7 @@ object VersionedDataGraphBuilder:
type Environment = VersionedDataProvider[Long, VersionedBatch]
& StreamLifetimeService
& BatchProcessor[DataBatch, Chunk[DataRow]]
& BatchConsumer[Chunk[DataRow]]
& VersionedDataGraphBuilderSettings

/**
Expand All @@ -84,8 +82,13 @@ object VersionedDataGraphBuilder:
def apply(versionedDataGraphBuilderSettings: VersionedDataGraphBuilderSettings,
versionedDataProvider: VersionedDataProvider[Long, VersionedBatch],
streamLifetimeService: StreamLifetimeService,
batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]]): VersionedDataGraphBuilder =
new VersionedDataGraphBuilder(versionedDataGraphBuilderSettings, versionedDataProvider, streamLifetimeService, batchProcessor)
batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]],
batchConsumer: BatchConsumer[Chunk[DataRow]]): VersionedDataGraphBuilder =
new VersionedDataGraphBuilder(versionedDataGraphBuilderSettings,
versionedDataProvider,
streamLifetimeService,
batchProcessor,
batchConsumer)

/**
* Creates a new instance of the BackfillDataGraphBuilder using services provided by ZIO Environment.
Expand All @@ -99,6 +102,7 @@ object VersionedDataGraphBuilder:
dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]]
ls <- ZIO.service[StreamLifetimeService]
bp <- ZIO.service[BatchProcessor[DataBatch, Chunk[DataRow]]]
yield VersionedDataGraphBuilder(sss, dp, ls, bp)
bc <- ZIO.service[BatchConsumer[Chunk[DataRow]]]
yield VersionedDataGraphBuilder(sss, dp, ls, bp, bc)


Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.sneaksanddata.arcane.framework
package services.streaming.base

import zio.stream.ZSink

/**
* A trait that represents a grouped data batch consumer.
* @tparam ConsumableBatch The type of the consumable batch.
*/
trait BatchConsumer[ConsumableBatch]:

/**
* Returns the sink that consumes the batch.
*
* @return ZSink (stream sink for the stream graph).
*/
def consume: ZSink[Any, Throwable, ConsumableBatch, Any, Unit]
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class IcebergS3CatalogWriterTests extends flatspec.AsyncFlatSpec with Matchers:
override val catalogUri = "http://localhost:8181/api/catalog"
override val additionalProperties: Map[String, String] = IcebergCatalogCredential.oAuth2Properties
override val s3CatalogFileIO: S3CatalogFileIO = S3CatalogFileIO
override val locationOverride: Option[String] = Some("s3://tmp/polaris/test")
override val stagingLocation: Option[String] = Some("s3://tmp/polaris/test")

private val schema = Seq(MergeKeyField, Field(name = "colA", fieldType = IntType), Field(name = "colB", fieldType = StringType))
private val icebergWriter = IcebergS3CatalogWriter(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import services.app.base.StreamLifetimeService
import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch}
import services.mssql.query.{LazyQueryResult, QueryRunner, ScalarQueryResult}
import services.mssql.{ConnectionOptions, MsSqlConnection, MsSqlDataProvider}
import services.streaming.base.{BatchProcessor, VersionedDataProvider}
import services.streaming.base.{BatchConsumer, BatchProcessor, VersionedDataProvider}
import utils.{TestConnectionInfo, TestGroupingSettings, TestStreamLifetimeService}

import com.microsoft.sqlserver.jdbc.SQLServerDriver
Expand All @@ -17,7 +17,8 @@ import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers.*
import org.scalatest.prop.TableDrivenPropertyChecks.forAll
import org.scalatest.prop.Tables.Table
import zio.{Chunk, FiberFailure, Runtime, ULayer, Unsafe, ZIO, ZLayer}
import zio.stream.ZSink
import zio.{Chunk, FiberFailure, Runtime, Task, ULayer, Unsafe, ZIO, ZLayer}

import java.sql.Connection
import java.time.Duration
Expand Down Expand Up @@ -120,7 +121,7 @@ class VersionedStreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Match
dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]]
sls <- ZIO.service[StreamLifetimeService]
bp <- ZIO.service[BatchProcessor[DataBatch, Chunk[DataRow]]]
} yield new VersionedDataGraphBuilder(sss, dp, sls, bp)
} yield new VersionedDataGraphBuilder(sss, dp, sls, bp, new EmptyConsumer)

/// Helper methods

Expand Down Expand Up @@ -184,7 +185,8 @@ class VersionedStreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Match
test(conn)



class EmptyConsumer extends BatchConsumer[Chunk[DataRow]]:
def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = ZSink.drain

class TestVersionedDataGraphBuilderSettings(override val lookBackInterval: Duration,
override val changeCaptureInterval: Duration)
Expand Down
14 changes: 12 additions & 2 deletions plugin/arcane-stream-sqlserver-change-tracking/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ networks:
services:
mssql:
container_name: sql-server
hostname: sql-server
image: mcr.microsoft.com/mssql/server:2022-latest
restart: always
networks:
mesh:
ipv4_address: 10.1.0.8
environment:
ACCEPT_EULA: "Y"
SA_PASSWORD: "tMIxN11yGZgMC"
Expand All @@ -20,6 +24,9 @@ services:
setup_mssql:
container_name: integration-setup
image: mcr.microsoft.com/mssql-tools
networks:
mesh:
ipv4_address: 10.1.0.6
depends_on:
- mssql
restart: "no"
Expand All @@ -36,6 +43,9 @@ services:
insert_changes:
container_name: insert-data
image: mcr.microsoft.com/mssql-tools
networks:
mesh:
ipv4_address: 10.1.0.7
depends_on:
setup_mssql:
condition: service_completed_successfully
Expand Down Expand Up @@ -109,7 +119,7 @@ services:
JAVA_OPTS: -Ddw.awsAccessKey=minioadmin -Ddw.awsSecretKey=minioadmin

healthcheck:
test: [ "CMD", "curl", "http://localhost:8182/healthcheck" ]
test: ["CMD", "curl", "http://localhost:8182/healthcheck"]
interval: 10s
timeout: 10s
retries: 5
Expand All @@ -123,4 +133,4 @@ services:
condition: service_healthy
volumes:
- ./create-polaris-catalog.sh:/create-polaris-catalog.sh
command: [ "/bin/sh", "/create-polaris-catalog.sh" ]
command: ["/bin/sh", "/create-polaris-catalog.sh"]
Loading

0 comments on commit bd18a90

Please sign in to comment.