diff --git a/framework/arcane-framework/src/main/scala/services/lakehouse/CatalogWriter.scala b/framework/arcane-framework/src/main/scala/services/lakehouse/CatalogWriter.scala index 674936f..fea8ac7 100644 --- a/framework/arcane-framework/src/main/scala/services/lakehouse/CatalogWriter.scala +++ b/framework/arcane-framework/src/main/scala/services/lakehouse/CatalogWriter.scala @@ -40,8 +40,8 @@ trait S3CatalogFileIO extends CatalogFileIO: * Singleton for S3CatalogFileIO */ object S3CatalogFileIO extends S3CatalogFileIO: - override val accessKeyId: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID", "") override val secretAccessKey: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_SECRET_ACCESS_KEY", "") + override val accessKeyId: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID", "") override val endpoint: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_ENDPOINT", "") override val region: String = scala.util.Properties.envOrElse("ARCANE_FRAMEWORK__S3_CATALOG_REGION", "us-east-1") diff --git a/framework/arcane-framework/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala b/framework/arcane-framework/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala index 00e0406..9e95198 100644 --- a/framework/arcane-framework/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala +++ b/framework/arcane-framework/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala @@ -12,6 +12,7 @@ import org.apache.iceberg.parquet.Parquet import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList import org.apache.iceberg.rest.RESTCatalog import org.apache.iceberg.{CatalogProperties, PartitionSpec, Schema, Table} +import zio.{ZIO, ZLayer} import java.util.UUID import scala.concurrent.Future @@ -19,9 +20,12 @@ import scala.jdk.CollectionConverters.* import scala.language.implicitConversions import scala.util.{Failure, Success, Try} +/** + * Converts an Arcane schema to an Iceberg schema. + */ given Conversion[ArcaneSchema, Schema] with def apply(schema: ArcaneSchema): Schema = SchemaConversions.toIcebergSchema(schema) - + // https://www.tabular.io/blog/java-api-part-3/ class IcebergS3CatalogWriter( namespace: String, @@ -106,14 +110,53 @@ class IcebergS3CatalogWriter( Future(catalog.loadTable(tableId)).flatMap(appendData(data, schema, false)) object IcebergS3CatalogWriter: + /** + * The ZLayer that creates the LazyOutputDataProcessor. + */ + val layer: ZLayer[IcebergCatalogSettings, Nothing, CatalogWriter[RESTCatalog, Table, Schema]] = + ZLayer { + for + settings <- ZIO.service[IcebergCatalogSettings] + yield IcebergS3CatalogWriter(settings) + } + + /** + * Factory method to create IcebergS3CatalogWriter + * @param namespace The namespace for the catalog + * @param warehouse The warehouse location + * @param catalogUri The catalog URI + * @param additionalProperties Additional properties for the catalog + * @param s3CatalogFileIO The S3 catalog file IO settings + * @param locationOverride The location override for the catalog + * @return The initialized IcebergS3CatalogWriter instance + */ + def apply(namespace: String, + warehouse: String, + catalogUri: String, + additionalProperties: Map[String, String], + s3CatalogFileIO: S3CatalogFileIO, + locationOverride: Option[String]): IcebergS3CatalogWriter = + val writer = new IcebergS3CatalogWriter( + namespace = namespace, + warehouse = warehouse, + catalogUri = catalogUri, + additionalProperties = additionalProperties, + s3CatalogFileIO = s3CatalogFileIO, + locationOverride = locationOverride, + ) + writer.initialize() + + /** + * Factory method to create IcebergS3CatalogWriter + * @param icebergSettings Iceberg settings + * @return The initialized IcebergS3CatalogWriter instance + */ def apply(icebergSettings: IcebergCatalogSettings): IcebergS3CatalogWriter = - val writer = - new IcebergS3CatalogWriter( + IcebergS3CatalogWriter( namespace = icebergSettings.namespace, warehouse = icebergSettings.warehouse, catalogUri = icebergSettings.catalogUri, additionalProperties = icebergSettings.additionalProperties, s3CatalogFileIO = icebergSettings.s3CatalogFileIO, - locationOverride = icebergSettings.locationOverride, + locationOverride = icebergSettings.stagingLocation, ) - writer.initialize() diff --git a/framework/arcane-framework/src/main/scala/services/lakehouse/base/IcebergCatalogSettings.scala b/framework/arcane-framework/src/main/scala/services/lakehouse/base/IcebergCatalogSettings.scala index ca5c543..11ce8c1 100644 --- a/framework/arcane-framework/src/main/scala/services/lakehouse/base/IcebergCatalogSettings.scala +++ b/framework/arcane-framework/src/main/scala/services/lakehouse/base/IcebergCatalogSettings.scala @@ -35,5 +35,5 @@ trait IcebergCatalogSettings: /** * The lakehouse location of the catalog */ - val locationOverride: Option[String] + val stagingLocation: Option[String] diff --git a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala index dd649e1..d61e4f8 100644 --- a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala +++ b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala @@ -195,7 +195,7 @@ object MsSqlConnection: /** * The ZLayer that creates the MsSqlDataProvider. */ - val layer: ZLayer[ConnectionOptions, Nothing, MsSqlConnection] = + val layer: ZLayer[ConnectionOptions, Nothing, MsSqlConnection & SchemaProvider[ArcaneSchema]] = ZLayer.scoped { ZIO.fromAutoCloseable{ for connectionOptions <- ZIO.service[ConnectionOptions] yield MsSqlConnection(connectionOptions) diff --git a/framework/arcane-framework/src/main/scala/services/streaming/IcebergConsumer.scala b/framework/arcane-framework/src/main/scala/services/streaming/IcebergConsumer.scala new file mode 100644 index 0000000..a1b8ff6 --- /dev/null +++ b/framework/arcane-framework/src/main/scala/services/streaming/IcebergConsumer.scala @@ -0,0 +1,95 @@ +package com.sneaksanddata.arcane.framework +package services.streaming + +import models.app.StreamContext +import models.{ArcaneSchema, DataRow} +import services.base.SchemaProvider +import services.lakehouse.{CatalogWriter, given_Conversion_ArcaneSchema_Schema} +import services.streaming.IcebergConsumer.getTableName +import services.streaming.base.BatchConsumer + +import org.apache.iceberg.rest.RESTCatalog +import org.apache.iceberg.{Schema, Table} +import org.slf4j.{Logger, LoggerFactory} +import zio.stream.{ZPipeline, ZSink} +import zio.{Chunk, Task, ZIO, ZLayer} + +import java.time.format.DateTimeFormatter +import java.time.{ZoneOffset, ZonedDateTime} + +/** + * A consumer that writes the data to the staging table. + * + * @param streamContext The stream context. + * @param catalogWriter The catalog writer. + * @param schemaProvider The schema provider. + */ +class IcebergConsumer(streamContext: StreamContext, + catalogWriter: CatalogWriter[RESTCatalog, Table, Schema], + schemaProvider: SchemaProvider[ArcaneSchema]) extends BatchConsumer[Chunk[DataRow]]: + + private val logger: Logger = LoggerFactory.getLogger(classOf[IcebergConsumer]) + + /** + * Returns the sink that consumes the batch. + * + * @return ZSink (stream sink for the stream graph). + */ + def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = + writeStagingTable >>> logResults + + + private def logResults: ZSink[Any, Throwable, Table, Nothing, Unit] = ZSink.foreach { e => + logger.info(s"Received the table ${e.name()} from the streaming source") + ZIO.unit + } + + private def writeStagingTable: ZPipeline[Any, Throwable, Chunk[DataRow], Table] = ZPipeline[Chunk[DataRow]]() + .mapAccum(0L) { (acc, chunk) => (acc + 1, (chunk, acc.getTableName(streamContext.streamId))) } + .mapZIO({ + case (rows, tableName) => writeWithWriter(rows, tableName) + }) + + + private def writeWithWriter(rows: Chunk[DataRow], name: String): Task[Table] = + for + schema <- ZIO.fromFuture(implicit ec => schemaProvider.getSchema) + table <- ZIO.fromFuture(implicit ec => catalogWriter.write(rows, name, schema)) + yield table + +object IcebergConsumer: + val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss") + + extension (batchNumber: Long) def getTableName(streamId: String): String = + s"$streamId-${ZonedDateTime.now(ZoneOffset.UTC).format(formatter)}-$batchNumber" + + + /** + * Factory method to create IcebergConsumer + * + * @param streamContext The stream context. + * @param catalogWriter The catalog writer. + * @param schemaProvider The schema provider. + * @return The initialized IcebergConsumer instance + */ + def apply(streamContext: StreamContext, + catalogWriter: CatalogWriter[RESTCatalog, Table, Schema], + schemaProvider: SchemaProvider[ArcaneSchema]): IcebergConsumer = + new IcebergConsumer(streamContext, catalogWriter, schemaProvider) + + /** + * The required environment for the IcebergConsumer. + */ + type Environment = SchemaProvider[ArcaneSchema] & CatalogWriter[RESTCatalog, Table, Schema] & StreamContext + + /** + * The ZLayer that creates the IcebergConsumer. + */ + val layer: ZLayer[Environment, Nothing, IcebergConsumer] = + ZLayer { + for + streamContext <- ZIO.service[StreamContext] + catalogWriter <- ZIO.service[CatalogWriter[RESTCatalog, Table, Schema]] + schemaProvider <- ZIO.service[SchemaProvider[ArcaneSchema]] + yield IcebergConsumer(streamContext, catalogWriter, schemaProvider) + } diff --git a/framework/arcane-framework/src/main/scala/services/streaming/VersionedDataGraphBuilder.scala b/framework/arcane-framework/src/main/scala/services/streaming/VersionedDataGraphBuilder.scala index 32b8dd9..e9bc710 100644 --- a/framework/arcane-framework/src/main/scala/services/streaming/VersionedDataGraphBuilder.scala +++ b/framework/arcane-framework/src/main/scala/services/streaming/VersionedDataGraphBuilder.scala @@ -6,11 +6,11 @@ import models.settings.VersionedDataGraphBuilderSettings import services.app.base.StreamLifetimeService import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch} import services.mssql.given_HasVersion_VersionedBatch -import services.streaming.base.{BatchProcessor, StreamGraphBuilder, VersionedDataProvider} +import services.streaming.base.{BatchConsumer, BatchProcessor, StreamGraphBuilder, VersionedDataProvider} import org.slf4j.{Logger, LoggerFactory} import zio.stream.{ZSink, ZStream} -import zio.{Chunk, Schedule, ZIO, ZLayer} +import zio.{Chunk, Schedule, ZIO} /** * The stream graph builder that reads the changes from the database. @@ -22,7 +22,8 @@ import zio.{Chunk, Schedule, ZIO, ZLayer} class VersionedDataGraphBuilder(versionedDataGraphBuilderSettings: VersionedDataGraphBuilderSettings, versionedDataProvider: VersionedDataProvider[Long, VersionedBatch], streamLifetimeService: StreamLifetimeService, - batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]]) + batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]], + batchConsumer: BatchConsumer[Chunk[DataRow]]) extends StreamGraphBuilder: private val logger: Logger = LoggerFactory.getLogger(classOf[VersionedDataGraphBuilder]) @@ -40,11 +41,7 @@ class VersionedDataGraphBuilder(versionedDataGraphBuilderSettings: VersionedData * * @return ZStream (stream source for the stream graph). */ - override def consume: ZSink[Any, Throwable, StreamElementType, Any, Unit] = - ZSink.foreach { e => - logger.info(s"Received ${e.size} rows from the streaming source") - ZIO.unit - } + override def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = batchConsumer.consume private def createStream = ZStream .unfoldZIO(versionedDataProvider.firstVersion) { previousVersion => @@ -71,6 +68,7 @@ object VersionedDataGraphBuilder: type Environment = VersionedDataProvider[Long, VersionedBatch] & StreamLifetimeService & BatchProcessor[DataBatch, Chunk[DataRow]] + & BatchConsumer[Chunk[DataRow]] & VersionedDataGraphBuilderSettings /** @@ -84,8 +82,13 @@ object VersionedDataGraphBuilder: def apply(versionedDataGraphBuilderSettings: VersionedDataGraphBuilderSettings, versionedDataProvider: VersionedDataProvider[Long, VersionedBatch], streamLifetimeService: StreamLifetimeService, - batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]]): VersionedDataGraphBuilder = - new VersionedDataGraphBuilder(versionedDataGraphBuilderSettings, versionedDataProvider, streamLifetimeService, batchProcessor) + batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]], + batchConsumer: BatchConsumer[Chunk[DataRow]]): VersionedDataGraphBuilder = + new VersionedDataGraphBuilder(versionedDataGraphBuilderSettings, + versionedDataProvider, + streamLifetimeService, + batchProcessor, + batchConsumer) /** * Creates a new instance of the BackfillDataGraphBuilder using services provided by ZIO Environment. @@ -99,6 +102,7 @@ object VersionedDataGraphBuilder: dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]] ls <- ZIO.service[StreamLifetimeService] bp <- ZIO.service[BatchProcessor[DataBatch, Chunk[DataRow]]] - yield VersionedDataGraphBuilder(sss, dp, ls, bp) + bc <- ZIO.service[BatchConsumer[Chunk[DataRow]]] + yield VersionedDataGraphBuilder(sss, dp, ls, bp, bc) diff --git a/framework/arcane-framework/src/main/scala/services/streaming/base/BatchConsumer.scala b/framework/arcane-framework/src/main/scala/services/streaming/base/BatchConsumer.scala new file mode 100644 index 0000000..1d14c4d --- /dev/null +++ b/framework/arcane-framework/src/main/scala/services/streaming/base/BatchConsumer.scala @@ -0,0 +1,17 @@ +package com.sneaksanddata.arcane.framework +package services.streaming.base + +import zio.stream.ZSink + +/** + * A trait that represents a grouped data batch consumer. + * @tparam ConsumableBatch The type of the consumable batch. + */ +trait BatchConsumer[ConsumableBatch]: + + /** + * Returns the sink that consumes the batch. + * + * @return ZSink (stream sink for the stream graph). + */ + def consume: ZSink[Any, Throwable, ConsumableBatch, Any, Unit] diff --git a/framework/arcane-framework/src/test/scala/services/lakehouse/IcebergS3CatalogWriterTests.scala b/framework/arcane-framework/src/test/scala/services/lakehouse/IcebergS3CatalogWriterTests.scala index 5598c2f..9d8435d 100644 --- a/framework/arcane-framework/src/test/scala/services/lakehouse/IcebergS3CatalogWriterTests.scala +++ b/framework/arcane-framework/src/test/scala/services/lakehouse/IcebergS3CatalogWriterTests.scala @@ -23,7 +23,7 @@ class IcebergS3CatalogWriterTests extends flatspec.AsyncFlatSpec with Matchers: override val catalogUri = "http://localhost:8181/api/catalog" override val additionalProperties: Map[String, String] = IcebergCatalogCredential.oAuth2Properties override val s3CatalogFileIO: S3CatalogFileIO = S3CatalogFileIO - override val locationOverride: Option[String] = Some("s3://tmp/polaris/test") + override val stagingLocation: Option[String] = Some("s3://tmp/polaris/test") private val schema = Seq(MergeKeyField, Field(name = "colA", fieldType = IntType), Field(name = "colB", fieldType = StringType)) private val icebergWriter = IcebergS3CatalogWriter(settings) diff --git a/framework/arcane-framework/src/test/scala/services/streaming/VersionedStreamGraphBuilderTests.scala b/framework/arcane-framework/src/test/scala/services/streaming/VersionedStreamGraphBuilderTests.scala index 73d35d3..258efe1 100644 --- a/framework/arcane-framework/src/test/scala/services/streaming/VersionedStreamGraphBuilderTests.scala +++ b/framework/arcane-framework/src/test/scala/services/streaming/VersionedStreamGraphBuilderTests.scala @@ -8,7 +8,7 @@ import services.app.base.StreamLifetimeService import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch} import services.mssql.query.{LazyQueryResult, QueryRunner, ScalarQueryResult} import services.mssql.{ConnectionOptions, MsSqlConnection, MsSqlDataProvider} -import services.streaming.base.{BatchProcessor, VersionedDataProvider} +import services.streaming.base.{BatchConsumer, BatchProcessor, VersionedDataProvider} import utils.{TestConnectionInfo, TestGroupingSettings, TestStreamLifetimeService} import com.microsoft.sqlserver.jdbc.SQLServerDriver @@ -17,7 +17,8 @@ import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers.* import org.scalatest.prop.TableDrivenPropertyChecks.forAll import org.scalatest.prop.Tables.Table -import zio.{Chunk, FiberFailure, Runtime, ULayer, Unsafe, ZIO, ZLayer} +import zio.stream.ZSink +import zio.{Chunk, FiberFailure, Runtime, Task, ULayer, Unsafe, ZIO, ZLayer} import java.sql.Connection import java.time.Duration @@ -120,7 +121,7 @@ class VersionedStreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Match dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]] sls <- ZIO.service[StreamLifetimeService] bp <- ZIO.service[BatchProcessor[DataBatch, Chunk[DataRow]]] - } yield new VersionedDataGraphBuilder(sss, dp, sls, bp) + } yield new VersionedDataGraphBuilder(sss, dp, sls, bp, new EmptyConsumer) /// Helper methods @@ -184,7 +185,8 @@ class VersionedStreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Match test(conn) - +class EmptyConsumer extends BatchConsumer[Chunk[DataRow]]: + def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = ZSink.drain class TestVersionedDataGraphBuilderSettings(override val lookBackInterval: Duration, override val changeCaptureInterval: Duration) diff --git a/plugin/arcane-stream-sqlserver-change-tracking/docker-compose.yaml b/plugin/arcane-stream-sqlserver-change-tracking/docker-compose.yaml index 5fcdaf9..c3c8cde 100644 --- a/plugin/arcane-stream-sqlserver-change-tracking/docker-compose.yaml +++ b/plugin/arcane-stream-sqlserver-change-tracking/docker-compose.yaml @@ -10,8 +10,12 @@ networks: services: mssql: container_name: sql-server + hostname: sql-server image: mcr.microsoft.com/mssql/server:2022-latest restart: always + networks: + mesh: + ipv4_address: 10.1.0.8 environment: ACCEPT_EULA: "Y" SA_PASSWORD: "tMIxN11yGZgMC" @@ -20,6 +24,9 @@ services: setup_mssql: container_name: integration-setup image: mcr.microsoft.com/mssql-tools + networks: + mesh: + ipv4_address: 10.1.0.6 depends_on: - mssql restart: "no" @@ -36,6 +43,9 @@ services: insert_changes: container_name: insert-data image: mcr.microsoft.com/mssql-tools + networks: + mesh: + ipv4_address: 10.1.0.7 depends_on: setup_mssql: condition: service_completed_successfully @@ -109,7 +119,7 @@ services: JAVA_OPTS: -Ddw.awsAccessKey=minioadmin -Ddw.awsSecretKey=minioadmin healthcheck: - test: [ "CMD", "curl", "http://localhost:8182/healthcheck" ] + test: ["CMD", "curl", "http://localhost:8182/healthcheck"] interval: 10s timeout: 10s retries: 5 @@ -123,4 +133,4 @@ services: condition: service_healthy volumes: - ./create-polaris-catalog.sh:/create-polaris-catalog.sh - command: [ "/bin/sh", "/create-polaris-catalog.sh" ] + command: ["/bin/sh", "/create-polaris-catalog.sh"] diff --git a/plugin/arcane-stream-sqlserver-change-tracking/integration-tests.env b/plugin/arcane-stream-sqlserver-change-tracking/integration-tests.env index 1508069..e8a9497 100644 --- a/plugin/arcane-stream-sqlserver-change-tracking/integration-tests.env +++ b/plugin/arcane-stream-sqlserver-change-tracking/integration-tests.env @@ -1,7 +1,15 @@ -STREAMCONTEXT__BACKFILL=true -STREAMCONTEXT__SPEC='{ "database": "IntegrationTests", "schema": "dbo", "table": "TestTable", "changeCaptureIntervalSeconds": 15, "commandTimeout": 3600, "groupingIntervalSeconds": 15, "groupsPerFile": 1, "lookBackInterval": 3600, "rowsPerGroup": 10000, "sinkLocation": "s3a://integration-tests" }' +STREAMCONTEXT__BACKFILL=false +STREAMCONTEXT__SPEC='{ "database": "IntegrationTests", "schema": "dbo", "table": "TestTable", "changeCaptureIntervalSeconds": 15, "commandTimeout": 3600, "groupingIntervalSeconds": 15, "groupsPerFile": 1, "lookBackInterval": 3600, "rowsPerGroup": 10000, "sinkLocation": "s3a://integration-tests", "catalogSettings": { "namespace": "test", "warehouse": "polaris", "catalogUri": "http://localhost:8181/api/catalog" } }' STREAMCONTEXT__STREAM_ID=test STREAMCONTEXT__STREAM_KIND=SqlServerChangeTracking -ARCANE.STREAM.SQL_SERVER_CHANGE_TRACKING__ARCANE_CONNECTION_STRING='jdbc:sqlserver://localhost:1433;databaseName=IntegrationTests;user=sa;password=tMIxN11yGZgMC;encrypt=false;trustServerCertificate=true' +ARCANE_CONNECTIONSTRING='jdbc:sqlserver://localhost:1433;databaseName=IntegrationTests;user=sa;password=tMIxN11yGZgMC;encrypt=false;trustServerCertificate=true' APPLICATION_VERSION=0.0.1 ARCANE_DATADOG_ENDPOINT=tcp-intake.logs.datadoghq.eu:443 + +ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID=minioadmin +ARCANE_FRAMEWORK__S3_CATALOG_SECRET_ACCESS_KEY=minioadmin +ARCANE_FRAMEWORK__S3_CATALOG_AUTH_INIT_TOKEN="principal:root;realm:default-realm" +ARCANE_FRAMEWORK__S3_CATALOG_AUTH_CLIENT_URI=http://localhost:8181/api/catalog/v1/oauth/tokens +ARCANE_FRAMEWORK__S3_CATALOG_AUTH_SCOPE=PRINCIPAL_ROLE:ALL +ARCANE_FRAMEWORK__S3_CATALOG_ENDPOINT=http://localhost:9000 +AWS_REGION=us-east-1 diff --git a/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/main.scala b/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/main.scala index 2d6ee7f..c5bb9c4 100644 --- a/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/main.scala +++ b/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/main.scala @@ -1,6 +1,6 @@ package com.sneaksanddata.arcane.sql_server_change_tracking -import models.app.StreamSpec +import models.app.SqlServerChangeTrackingStreamContext import services.StreamGraphBuilderFactory import com.sneaksanddata.arcane.framework.models.DataRow @@ -9,10 +9,11 @@ import com.sneaksanddata.arcane.framework.models.settings.{GroupingSettings, Ver import com.sneaksanddata.arcane.framework.services.app.base.{StreamLifetimeService, StreamRunnerService} import com.sneaksanddata.arcane.framework.services.app.logging.base.Enricher import com.sneaksanddata.arcane.framework.services.app.{PosixStreamLifetimeService, StreamRunnerServiceImpl} +import com.sneaksanddata.arcane.framework.services.lakehouse.IcebergS3CatalogWriter import com.sneaksanddata.arcane.framework.services.mssql.MsSqlConnection.BackfillBatch import com.sneaksanddata.arcane.framework.services.mssql.{ConnectionOptions, MsSqlConnection, MsSqlDataProvider} import com.sneaksanddata.arcane.framework.services.streaming.base.{BatchProcessor, StreamGraphBuilder} -import com.sneaksanddata.arcane.framework.services.streaming.{BackfillGroupingProcessor, LazyListGroupingProcessor} +import com.sneaksanddata.arcane.framework.services.streaming.{BackfillGroupingProcessor, IcebergConsumer, LazyListGroupingProcessor} import org.slf4j.MDC import zio.logging.LogFormat import zio.logging.backend.SLF4J @@ -42,14 +43,16 @@ object main extends ZIOAppDefault { @main def run: ZIO[Any, Throwable, Unit] = appLayer.provide( - StreamSpec.layer, + SqlServerChangeTrackingStreamContext.layer, PosixStreamLifetimeService.layer, MsSqlConnection.layer, MsSqlDataProvider.layer, LazyListGroupingProcessor.layer, StreamRunnerServiceImpl.layer, StreamGraphBuilderFactory.layer, - BackfillGroupingProcessor.layer + BackfillGroupingProcessor.layer, + IcebergS3CatalogWriter.layer, + IcebergConsumer.layer ) .orDie } diff --git a/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/models/app/SqlServerChangeTrackingStreamContext.scala b/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/models/app/SqlServerChangeTrackingStreamContext.scala new file mode 100644 index 0000000..72605fd --- /dev/null +++ b/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/models/app/SqlServerChangeTrackingStreamContext.scala @@ -0,0 +1,82 @@ +package com.sneaksanddata.arcane.sql_server_change_tracking +package models.app + +import com.sneaksanddata.arcane.framework.models.app.StreamContext +import com.sneaksanddata.arcane.framework.models.settings.{GroupingSettings, VersionedDataGraphBuilderSettings} +import com.sneaksanddata.arcane.framework.services.lakehouse.{IcebergCatalogCredential, S3CatalogFileIO} +import com.sneaksanddata.arcane.framework.services.lakehouse.base.IcebergCatalogSettings +import com.sneaksanddata.arcane.framework.services.mssql.ConnectionOptions +import zio.ZLayer +import zio.json.* + +import java.time.Duration + + +/** + * The context for the SQL Server Change Tracking stream. + * @param spec The stream specification + */ +case class SqlServerChangeTrackingStreamContext(spec: StreamSpec) extends StreamContext + with GroupingSettings + with IcebergCatalogSettings + with VersionedDataGraphBuilderSettings: + + override val rowsPerGroup: Int = spec.rowsPerGroup + override val lookBackInterval: Duration = Duration.ofSeconds(spec.lookBackInterval) + override val changeCaptureInterval: Duration = Duration.ofSeconds(spec.changeCaptureIntervalSeconds) + override val groupingInterval: Duration = Duration.ofSeconds(spec.groupingIntervalSeconds) + + override val namespace: String = spec.catalogSettings.namespace + override val warehouse: String = spec.catalogSettings.warehouse + override val catalogUri: String = spec.catalogSettings.catalogUri + + override val additionalProperties: Map[String, String] = IcebergCatalogCredential.oAuth2Properties + override val s3CatalogFileIO: S3CatalogFileIO = S3CatalogFileIO + + override val stagingLocation: Option[String] = spec.stagingLocation + + @jsonExclude + val connectionString: String = sys.env("ARCANE_CONNECTIONSTRING") + + val database: String = spec.database + + override def toString: String = this.toJsonPretty + + +given Conversion[SqlServerChangeTrackingStreamContext, ConnectionOptions] with + def apply(context: SqlServerChangeTrackingStreamContext): ConnectionOptions = + ConnectionOptions(context.connectionString, + context.database, + context.spec.schema, + context.spec.table, + context.spec.partitionExpression) + +object SqlServerChangeTrackingStreamContext { + implicit val icebergSettingsDecoder: JsonDecoder[CatalogSettings] = DeriveJsonDecoder.gen[CatalogSettings] + implicit val streamSpecDecoder: JsonDecoder[StreamSpec] = DeriveJsonDecoder.gen[StreamSpec] + + implicit val icebergSettingsEncoder: JsonEncoder[CatalogSettings] = DeriveJsonEncoder.gen[CatalogSettings] + implicit val specEncoder: JsonEncoder[StreamSpec] = DeriveJsonEncoder.gen[StreamSpec] + implicit val contextEncoder: JsonEncoder[SqlServerChangeTrackingStreamContext] = DeriveJsonEncoder.gen[SqlServerChangeTrackingStreamContext] + + type Environment = StreamContext + & ConnectionOptions + & GroupingSettings + & VersionedDataGraphBuilderSettings + & IcebergCatalogSettings + + /** + * The ZLayer that creates the VersionedDataGraphBuilder. + */ + val layer: ZLayer[Any, Throwable, Environment] = + sys.env.get("STREAMCONTEXT__SPEC") map { raw => + val spec = raw.fromJson[StreamSpec] match { + case Left(error) => throw new Exception(s"Failed to decode the stream context: $error") + case Right(value) => value + } + val context = SqlServerChangeTrackingStreamContext(spec) + ZLayer.succeed(context) ++ ZLayer.succeed[ConnectionOptions](context) + } getOrElse { + ZLayer.fail(new Exception("The stream context is not specified.")) + } +} diff --git a/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/models/app/StreamSpec.scala b/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/models/app/StreamSpec.scala index c3acdf8..4c8d121 100644 --- a/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/models/app/StreamSpec.scala +++ b/plugin/arcane-stream-sqlserver-change-tracking/src/main/scala/models/app/StreamSpec.scala @@ -1,16 +1,14 @@ package com.sneaksanddata.arcane.sql_server_change_tracking package models.app -import com.sneaksanddata.arcane.framework.models.app.StreamContext -import com.sneaksanddata.arcane.framework.models.settings.{GroupingSettings, VersionedDataGraphBuilderSettings} -import com.sneaksanddata.arcane.framework.services.mssql.ConnectionOptions -import zio.ZLayer -import zio.json.* - -import java.time.Duration +/** + * The configuration of Iceberg sink. + */ +case class CatalogSettings(namespace: String, warehouse: String, catalogUri: String) /** * The specification for the stream. + * * @param database The database name * @param schema The schema name * @param table The table name @@ -31,54 +29,12 @@ case class StreamSpec(database: String, lookBackInterval: Int, commandTimeout: Int, changeCaptureIntervalSeconds: Int, - partitionExpression: Option[String]) - -/** - * The context for the SQL Server Change Tracking stream. - * @param spec The stream specification - */ -case class SqlServerChangeTrackingStreamContext(spec: StreamSpec) extends StreamContext - with GroupingSettings - with VersionedDataGraphBuilderSettings: - - implicit val specEncoder: JsonEncoder[StreamSpec] = DeriveJsonEncoder.gen[StreamSpec] - implicit val contextEncoder: JsonEncoder[SqlServerChangeTrackingStreamContext] = DeriveJsonEncoder.gen[SqlServerChangeTrackingStreamContext] - - override val rowsPerGroup: Int = spec.rowsPerGroup - override val lookBackInterval: Duration = Duration.ofSeconds(spec.lookBackInterval) - override val changeCaptureInterval: Duration = Duration.ofSeconds(spec.changeCaptureIntervalSeconds) - override val groupingInterval: Duration = Duration.ofSeconds(spec.groupingIntervalSeconds) - @jsonExclude - val connectionString: String = sys.env("ARCANE.STREAM.SQL_SERVER_CHANGE_TRACKING__ARCANE_CONNECTION_STRING") - val database = "IntegrationTests" - override def toString: String = this.toJsonPretty - - -given Conversion[SqlServerChangeTrackingStreamContext, ConnectionOptions] with - def apply(context: SqlServerChangeTrackingStreamContext): ConnectionOptions = - ConnectionOptions(context.connectionString, - context.database, - context.spec.schema, - context.spec.table, - context.spec.partitionExpression) + // Iceberg settings + catalogSettings: CatalogSettings, + stagingLocation: Option[String], + sinkLocation: String, + partitionExpression: Option[String]) -object StreamSpec { - implicit val decoder: JsonDecoder[StreamSpec] = DeriveJsonDecoder.gen[StreamSpec] - /** - * The ZLayer that creates the VersionedDataGraphBuilder. - */ - val layer: ZLayer[Any, Throwable, StreamContext & ConnectionOptions & GroupingSettings & VersionedDataGraphBuilderSettings] = - sys.env.get("STREAMCONTEXT__SPEC") map { raw => - val spec = raw.fromJson[StreamSpec] match { - case Left(error) => throw new Exception(s"Failed to decode the stream context: $error") - case Right(value) => value - } - val context = SqlServerChangeTrackingStreamContext(spec) - ZLayer.succeed(context) ++ ZLayer.succeed[ConnectionOptions](context) - } getOrElse { - ZLayer.fail(new Exception("The stream context is not specified.")) - } -}