Skip to content

Commit

Permalink
feat(strm-2706): merge with alpha branch
Browse files Browse the repository at this point in the history
  • Loading branch information
trietsch committed Oct 25, 2023
2 parents f7f1cf4 + cf105bc commit bdc45f2
Show file tree
Hide file tree
Showing 54 changed files with 1,728 additions and 640 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
.PHONY: clean-common-protos

SHELL := /bin/bash

common_protos := ${CURDIR}/.common-protos

grpc_version := 1.50.0
Expand Down
4 changes: 1 addition & 3 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ dependencies {


// Todo remove before squashing
implementation("io.strmprivacy.grpc.common:kotlin-grpc-common:3.22.0")

implementation(enforcedPlatform("com.google.cloud:libraries-bom:26.24.0"))
implementation("com.google.cloud:google-cloud-bigquery")

Expand Down Expand Up @@ -142,7 +140,7 @@ val createPostgresContainer =
tasks.register("jooqPostgresCreate", DockerCreateContainer::class) {
dependsOn(removePostgresContainer)
group = "postgres"
targetImageId("postgres:12")
targetImageId("postgres")
containerName.set("jooq-postgres")
hostConfig.portBindings.set(listOf("$postgresPort:5432"))
hostConfig.autoRemove.set(true)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Catalog -> Data Source (e.g. Database) -> Schema -> Table -> Column

query TableWithColumns($id: UUID!) {
tables: assets(
where: {id: {eq: $id}}
Expand All @@ -22,6 +22,8 @@ query TableWithColumns($id: UUID!) {
}
}
columns: incomingRelations(
# TODO we need handle paging!
limit: 400,
where: {type: {publicId: {eq: "ColumnIsPartOfTable"}}}
) {
columnDetails: source {
Expand All @@ -31,6 +33,12 @@ query TableWithColumns($id: UUID!) {
) {
value: stringValue
}
tags {
name
}



}
}
}
Expand Down
35 changes: 34 additions & 1 deletion app/src/main/kotlin/com/getstrm/pace/api/DataPolicyApi.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.getstrm.pace.api

import com.getstrm.pace.service.ProcessingPlatformsService
import build.buf.gen.getstrm.api.data_policies.v1alpha.*
import com.getstrm.pace.service.CatalogService
import com.getstrm.pace.service.DataPolicyService
import com.getstrm.pace.service.ProcessingPlatformsService
import net.devh.boot.grpc.server.service.GrpcService

@GrpcService
class DataPolicyApi(
private val dataPolicyService: DataPolicyService,
private val processingPlatformsService: ProcessingPlatformsService,
private val catalogService: CatalogService,
) : DataPolicyServiceGrpcKt.DataPolicyServiceCoroutineImplBase() {

override suspend fun listDataPolicies(request: ListDataPoliciesRequest): ListDataPoliciesResponse {
Expand Down Expand Up @@ -55,4 +57,35 @@ class DataPolicyApi(
GetProcessingPlatformBarePolicyResponse.newBuilder()
.setDataPolicy(processingPlatformsService.createBarePolicy(request.platform, request.table))
.build()

override suspend fun listCatalogs(request: ListCatalogsRequest): ListCatalogsResponse =
ListCatalogsResponse.newBuilder()
.addAllCatalogs(catalogService.listCatalogs())
.build()

override suspend fun listDatabases(request: ListDatabasesRequest): ListDatabasesResponse {
val databases = catalogService.listDatabases(request.catalog)
return ListDatabasesResponse.newBuilder()
.addAllDatabases(databases)
.build()
}
override suspend fun listSchemas(request: ListSchemasRequest): ListSchemasResponse {
val schemas = catalogService.listSchemas(request.database)
return ListSchemasResponse.newBuilder()
.addAllSchemas(schemas)
.build()
}
override suspend fun listTables(request: ListTablesRequest): ListTablesResponse {
val tables = catalogService.listTables(request.schema)
return ListTablesResponse.newBuilder()
.addAllTables(tables)
.build()
}

override suspend fun getCatalogBarePolicy(request: GetCatalogBarePolicyRequest): GetCatalogBarePolicyResponse {
val dataPolicy: DataPolicy = catalogService.getBarePolicy(request.table)
return GetCatalogBarePolicyResponse.newBuilder()
.setDataPolicy(dataPolicy)
.build()
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.getstrm.pace.bigquery

import com.getstrm.pace.exceptions.ProcessingPlatformExecuteException
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.ProcessingPlatform.PlatformType.BIGQUERY
import com.getstrm.pace.config.BigQueryConfig
import com.getstrm.pace.domain.Group
import com.getstrm.pace.domain.ProcessingPlatformExecuteException
import com.getstrm.pace.domain.ProcessingPlatformInterface
import com.getstrm.pace.domain.Table
import com.google.auth.oauth2.GoogleCredentials
Expand Down
189 changes: 92 additions & 97 deletions app/src/main/kotlin/com/getstrm/pace/catalogs/Collibra.kt
Original file line number Diff line number Diff line change
@@ -1,97 +1,92 @@
//package com.getstrm.pace.catalogs
//
//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
//import com.apollographql.apollo3.ApolloClient
//import com.collibra.generated.ListPhysicalDataAssetsQuery
//import com.collibra.generated.ListSchemaIdsQuery
//import com.collibra.generated.ListTablesInSchemaQuery
//import com.collibra.generated.TableWithColumnsQuery
//import com.getstrm.pace.config.CatalogConfiguration
//import com.getstrm.pace.domain.DataCatalog
//import java.util.*
//
//class CollibraCatalog(config: CatalogConfiguration) : DataCatalog() {
// private val client = config.apolloClient()
// override fun close() {
// client.close()
// }
//
// override suspend fun listDatabases(): List<DataCatalog.Database> = listPhysicalAssets(AssetTypes.DATABASE).map {
// Database(this, it.id, it.getDataSourceType())
// }
//
// class Database(private val catalog: CollibraCatalog, id: String, dbType: String) :
// DataCatalog.Database(id, dbType) {
// constructor(catalog: CollibraCatalog, id: Any, dbType: String) : this(catalog, id.toString(), dbType)
//
// override suspend fun getSchemas(): List<DataCatalog.Schema> {
// val assets = catalog.client.query(ListSchemaIdsQuery(id)).execute().data!!.assets!!.filterNotNull()
// .flatMap { schema ->
// schema.schemas
// }
// return assets.map {
// Schema(catalog, this, it.target.id.toString(), it.target.fullName)
// }
// }
// }
//
// class Schema(private val catalog: CollibraCatalog, database: DataCatalog.Database, id: String, name: String) :
// DataCatalog.Schema(database, id, name) {
// override suspend fun getTables(): List<DataCatalog.Table> =
// catalog.client.query(ListTablesInSchemaQuery(id)).execute().data!!.assets!!.filterNotNull()
// .flatMap { table ->
// table.tables.map { Table(catalog, this, it.target.id.toString(), it.target.fullName) }
// }
// }
//
// class Table(private val catalog: CollibraCatalog, schema: DataCatalog.Schema, id: String, name: String) :
// DataCatalog.Table(schema, id, name) {
// override suspend fun getDataPolicy(): DataPolicy? {
// val response = catalog.client.query(TableWithColumnsQuery(id = id)).execute()
// return response.data?.tables?.firstOrNull()?.let { table ->
// val systemName =
// table.schema.firstOrNull()?.schemaDetails?.database?.firstOrNull()?.databaseDetails?.domain?.name
//
// val builder = DataPolicy.newBuilder()
// builder.infoBuilder.title = table.displayName
// builder.infoBuilder.description = systemName
// builder.sourceBuilder.addAllAttributes(table.columns.map { it.toAttribute() })
// builder.build()
// }
// }
//
// private fun TableWithColumnsQuery.Column.toAttribute(): DataPolicy.Attribute {
// return with(DataPolicy.Attribute.newBuilder()) {
// addPathComponents(columnDetails.displayName)
// val sourceType = columnDetails.dataType.firstOrNull()?.value ?: "unknown"
// // source type mapping
// type = sourceType
// build()
// }
// }
// }
//
// class Configuration(
// private val serverUrl: String = "https://test-drive.collibra.com/graphql/knowledgeGraph/v1",
// private val username: String = "test-drive-user-9b8o5m7l",
// private val password: String = "Egwrazg\$8q3j6i0b",
// ) : DataCatalog.Configuration() {
// fun apolloClient(): ApolloClient {
// val basicAuth = Base64.getEncoder().encodeToString("$username:$password".toByteArray())
//
// return ApolloClient.Builder().serverUrl(serverUrl).addHttpHeader("Authorization", "Basic $basicAuth")
// .build()
// }
// }
//
// private suspend fun listPhysicalAssets(type: AssetTypes): List<ListPhysicalDataAssetsQuery.Asset> =
// client.query(ListPhysicalDataAssetsQuery(assetType = type.assetName)).execute().data!!.assets?.filterNotNull()
// ?: emptyList()
//
// private fun ListPhysicalDataAssetsQuery.Asset.getDataSourceType(): String =
// stringAttributes.find { it.type.publicId == "DataSourceType" }?.stringValue ?: "unknown"
//}
//
//enum class AssetTypes(val assetName: String) {
// DATABASE("Database"), SCHEMA("Schema"), TABLE("Table"), COLUMN("Column"),
//}
package com.getstrm.pace.catalogs
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import com.apollographql.apollo3.ApolloClient
import com.collibra.generated.ListPhysicalDataAssetsQuery
import com.collibra.generated.ListSchemaIdsQuery
import com.collibra.generated.ListTablesInSchemaQuery
import com.collibra.generated.TableWithColumnsQuery
import com.getstrm.pace.config.CatalogConfiguration
import com.getstrm.pace.domain.DataCatalog
import normalizeType
import java.util.*

class CollibraCatalog(config: CatalogConfiguration) : DataCatalog(config) {

val client = apolloClient()
override fun close() {
client.close()
}

override suspend fun listDatabases(): List<DataCatalog.Database> =
listPhysicalAssets(AssetTypes.DATABASE).map {
Database(this, it.id, it.getDataSourceType())
}

class Database(override val catalog: CollibraCatalog, id: String, dbType: String) :
DataCatalog.Database(catalog, id, dbType) {
constructor(catalog: CollibraCatalog, id: Any, dbType: String) : this(catalog, id.toString(), dbType)

override suspend fun getSchemas(): List<DataCatalog.Schema> {
val assets = catalog.client.query(ListSchemaIdsQuery(id)).execute().data!!.assets!!.filterNotNull().flatMap { schema ->
schema.schemas
}
return assets.map {
Schema(catalog, this, it.target.id.toString(), it.target.fullName)
}
}
}

class Schema(private val catalog: CollibraCatalog, database: DataCatalog.Database, id: String, name: String) :
DataCatalog.Schema(database, id, name) {
override suspend fun getTables(): List<DataCatalog.Table> =
catalog.client.query(ListTablesInSchemaQuery(id)).execute().data!!.assets!!.filterNotNull().flatMap { table ->
table.tables.map { Table(catalog, this, it.target.id.toString(), it.target.fullName) }
}
}

class Table(private val catalog: CollibraCatalog, schema: DataCatalog.Schema, id: String, name: String) :
DataCatalog.Table(schema, id, name) {
override suspend fun getDataPolicy(): DataPolicy? {
val response = catalog.client.query(TableWithColumnsQuery(id = id)).execute()
return response.data?.tables?.firstOrNull()?.let { table ->
val systemName = table.schema.firstOrNull()?.schemaDetails?.database?.firstOrNull()?.databaseDetails?.domain?.name

val builder = DataPolicy.newBuilder()
builder.infoBuilder.title = table.displayName
builder.infoBuilder.description = systemName
builder.sourceBuilder.addAllAttributes(table.columns.map { it.toAttribute() })
builder.build()
}
}

private fun TableWithColumnsQuery.Column.toAttribute(): DataPolicy.Attribute =
with(DataPolicy.Attribute.newBuilder()) {
addPathComponents(columnDetails.displayName)
val sourceType = columnDetails.dataType.firstOrNull()?.value ?: "unknown"
// source type mapping
type = sourceType
addAllTags(columnDetails.tags.map { it.name })
build().normalizeType()
}
}

private fun apolloClient(): ApolloClient {
val basicAuth = Base64.getEncoder().encodeToString("${config.userName}:${config.password}".toByteArray())
return ApolloClient.Builder()
.serverUrl(config.serverUrl)
.addHttpHeader("Authorization", "Basic $basicAuth")
.build()
}
private suspend fun listPhysicalAssets(type: AssetTypes): List<ListPhysicalDataAssetsQuery.Asset> =
client.query(ListPhysicalDataAssetsQuery(assetType = type.assetName)).execute().data!!.assets?.filterNotNull() ?: emptyList()

private fun ListPhysicalDataAssetsQuery.Asset.getDataSourceType(): String =
stringAttributes.find { it.type.publicId == "DataSourceType" }?.stringValue ?: "unknown"
}

enum class AssetTypes(val assetName: String) {
DATABASE("Database"),
SCHEMA("Schema"),
TABLE("Table"),
COLUMN("Column"),
}
Loading

0 comments on commit bdc45f2

Please sign in to comment.