Skip to content

Commit

Permalink
Merge pull request #4 from getstrm/feature/strm-2707
Browse files Browse the repository at this point in the history
- Create REST API Paths for all RPCs.
- Add optional to some proto message fields.
- Add proto message validations.
  • Loading branch information
ivan-p92 authored Oct 25, 2023
2 parents f6c71b6 + e3b9689 commit 56dcc77
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 213 deletions.
30 changes: 0 additions & 30 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,8 @@ common_protos := ${CURDIR}/.common-protos

grpc_version := 1.50.0
protobuf_version := 3.21.9
google_common_protos_version := 2.10.0
git_branch := $(shell git rev-parse --abbrev-ref HEAD)

# google/protobuf dependencies (predefined Protos for e.g. Timestamp, Duration, etc)
${common_protos}/protobuf-java.jar:
curl "https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java/${protobuf_version}/protobuf-java-${protobuf_version}.jar" --create-dirs -o "${common_protos}/protobuf-java.jar"

# google/api dependencies (Common Google Protos, such as field_behavior)
${common_protos}/proto-google-common-protos.jar:
curl "https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-common-protos/${google_common_protos_version}/proto-google-common-protos-${google_common_protos_version}.jar" --create-dirs -o "${common_protos}/proto-google-common-protos.jar"

${common_protos}: ${common_protos}/proto-google-common-protos.jar ${common_protos}/protobuf-java.jar ${common_protos}/validate/validate.proto

clean-common-protos:
rm -rf ${common_protos}

${common_protos}/google/protobuf: ${common_protos}/protobuf-java.jar
unzip -d ${common_protos} $< "google/**/*.proto"

${common_protos}/google/api: ${common_protos}/proto-google-common-protos.jar
unzip -d ${common_protos} $< "google/**/*.proto"

${common_protos}/validate/validate.proto:
curl "https://raw.githubusercontent.com/bufbuild/protoc-gen-validate/v0.9.1/validate/validate.proto" --create-dirs -o "${common_protos}/validate/validate.proto"

# To ensure that we use the same Google Common Protobuf files in all languages, we extract them from the jar
default-google-dependencies: ${common_protos}/google/protobuf ${common_protos}/google/api
protoc-gen-validate-dependency: ${common_protos}/validate/validate.proto

intellij: ${common_protos}
./protos/setup-ide-protobuf-plugins.sh

buf-publish-current-branch:
[[ "$$OSTYPE" == "darwin"* ]] && SED=gsed || SED=sed && \
commit_hash=$$(cd protos && buf push --branch "${git_branch}") && \
Expand Down
10 changes: 5 additions & 5 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ dependencies {
implementation(enforcedPlatform("com.google.cloud:libraries-bom:26.24.0"))
implementation("com.google.cloud:google-cloud-bigquery")

implementation("build.buf.gen:getstrm_daps_grpc_java:1.58.0.1.$generatedBufDependencyVersion")
implementation("build.buf.gen:getstrm_daps_grpc_kotlin:1.3.1.1.$generatedBufDependencyVersion")
implementation("build.buf.gen:getstrm_daps_protocolbuffers_java:24.4.0.1.$generatedBufDependencyVersion")
implementation("build.buf.gen:getstrm_pace_grpc_java:1.58.0.1.$generatedBufDependencyVersion")
implementation("build.buf.gen:getstrm_pace_grpc_kotlin:1.3.1.1.$generatedBufDependencyVersion")
implementation("build.buf.gen:getstrm_pace_protocolbuffers_java:24.4.0.1.$generatedBufDependencyVersion")

implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
Expand Down Expand Up @@ -114,9 +114,9 @@ kotlin {
}

tasks.named<BootJar>("bootJar") {
mainClass = "com.getstrm.daps.DataPolicyServiceApplicationKt"
mainClass = "com.getstrm.pace.DataPolicyServiceApplicationKt"
manifest {
attributes["Implementation-Title"] = "Data Policy Service"
attributes["Implementation-Title"] = "Policy and Contract Engine"
attributes["Implementation-Version"] = version
}
}
Expand Down
45 changes: 36 additions & 9 deletions app/src/main/kotlin/com/getstrm/pace/api/DataPolicyApi.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
package com.getstrm.pace.api

import build.buf.gen.getstrm.api.data_policies.v1alpha.*
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicyServiceGrpcKt
import build.buf.gen.getstrm.api.data_policies.v1alpha.GetCatalogBarePolicyRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.GetCatalogBarePolicyResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.GetDataPolicyRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.GetDataPolicyResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.GetProcessingPlatformBarePolicyRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.GetProcessingPlatformBarePolicyResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListCatalogsRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListCatalogsResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListDataPoliciesRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListDataPoliciesResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListDatabasesRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListDatabasesResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformGroupsRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformGroupsResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformTablesRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformTablesResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformsRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformsResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListSchemasRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListSchemasResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListTablesRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListTablesResponse
import build.buf.gen.getstrm.api.data_policies.v1alpha.UpsertDataPolicyRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.UpsertDataPolicyResponse
import com.getstrm.pace.service.CatalogService
import com.getstrm.pace.service.DataPolicyService
import com.getstrm.pace.service.ProcessingPlatformsService
Expand All @@ -27,7 +52,7 @@ class DataPolicyApi(

override suspend fun getDataPolicy(request: GetDataPolicyRequest): GetDataPolicyResponse {
return GetDataPolicyResponse.newBuilder()
.setDataPolicy(dataPolicyService.getLatestDataPolicy(request.id))
.setDataPolicy(dataPolicyService.getLatestDataPolicy(request.dataPolicyId))
.build()
}

Expand All @@ -43,17 +68,17 @@ class DataPolicyApi(

override suspend fun listProcessingPlatformTables(request: ListProcessingPlatformTablesRequest): ListProcessingPlatformTablesResponse =
ListProcessingPlatformTablesResponse.newBuilder().addAllTables(
processingPlatformsService.listProcessingPlatformTables(request).map { it.fullName },
processingPlatformsService.listProcessingPlatformTables(request.platformId).map { it.fullName },
).build()

override suspend fun listProcessingPlatformGroups(request: ListProcessingPlatformGroupsRequest): ListProcessingPlatformGroupsResponse =
ListProcessingPlatformGroupsResponse.newBuilder().addAllGroups(
processingPlatformsService.listProcessingPlatformGroups(request).map { it.name },
processingPlatformsService.listProcessingPlatformGroups(request.platformId).map { it.name },
).build()

override suspend fun getProcessingPlatformBarePolicy(request: GetProcessingPlatformBarePolicyRequest): GetProcessingPlatformBarePolicyResponse =
GetProcessingPlatformBarePolicyResponse.newBuilder()
.setDataPolicy(processingPlatformsService.createBarePolicy(request.platform, request.table))
.setDataPolicy(processingPlatformsService.createBarePolicy(request.platformId, request.table))
.build()

override suspend fun listCatalogs(request: ListCatalogsRequest): ListCatalogsResponse =
Expand All @@ -62,26 +87,28 @@ class DataPolicyApi(
.build()

override suspend fun listDatabases(request: ListDatabasesRequest): ListDatabasesResponse {
val databases = catalogService.listDatabases(request.catalog)
val databases = catalogService.listDatabases(request.catalogId)
return ListDatabasesResponse.newBuilder()
.addAllDatabases(databases)
.build()
}
override suspend fun listSchemas(request: ListSchemasRequest): ListSchemasResponse {
val schemas = catalogService.listSchemas(request.database)
val schemas = catalogService.listSchemas(request.catalogId, request.databaseId)
return ListSchemasResponse.newBuilder()
.addAllSchemas(schemas)
.build()
}
override suspend fun listTables(request: ListTablesRequest): ListTablesResponse {
val tables = catalogService.listTables(request.schema)
val tables = catalogService.listTables(request.catalogId, request.databaseId, request.schemaId)
return ListTablesResponse.newBuilder()
.addAllTables(tables)
.build()
}

override suspend fun getCatalogBarePolicy(request: GetCatalogBarePolicyRequest): GetCatalogBarePolicyResponse {
val dataPolicy: DataPolicy = catalogService.getBarePolicy(request.table)
val dataPolicy: DataPolicy = catalogService.getBarePolicy(
request.catalogId, request.databaseId, request.schemaId, request.tableId
)
return GetCatalogBarePolicyResponse.newBuilder()
.setDataPolicy(dataPolicy)
.build()
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/kotlin/com/getstrm/pace/dao/DataPolicyDao.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DataPolicyDao(
dataPolicy.info.toBuilder()
.setUpdateTime(updateTimestamp.toTimestamp())
.setCreateTime(oldPolicy?.info?.createTime ?: updateTimestamp.toTimestamp())
.setOrganizationId(context)
.setContext(context)
.build()
).build()

Expand Down
65 changes: 39 additions & 26 deletions app/src/main/kotlin/com/getstrm/pace/service/CatalogService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,47 @@ class CatalogService(
.build()
}

suspend fun listDatabases(apiCatalog: ApiCatalog): List<ApiDatabase> =
getCatalog(apiCatalog.id).listDatabases().map { it.apiDatabase }
suspend fun listDatabases(catalogId: String): List<ApiDatabase> =
getCatalog(catalogId).listDatabases().map { it.apiDatabase }

suspend fun listSchemas(apiDatabase: ApiDatabase): List<ApiSchema> {
val catalog = getCatalog(apiDatabase.catalog.id)
val database = catalog.listDatabases().find { it.id == apiDatabase.id }
suspend fun listSchemas(catalogId: String, databaseId: String): List<ApiSchema> {
val catalog = getCatalog(catalogId)
val database = catalog.listDatabases().firstOrNull { it.id == databaseId }
?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Database")
.setResourceName(apiDatabase.id)
.setDescription("Database ${apiDatabase.id} not found in catalog ${apiDatabase.catalog.id}")
.setOwner("Catalog: ${apiDatabase.catalog.id}")
.setResourceName(databaseId)
.setDescription("Database $databaseId not found in catalog $catalogId")
.setOwner("Catalog: $catalogId")
.build()
)
val schemas = database.getSchemas()
return schemas.map { it.apiSchema }
}

suspend fun listTables(apiSchema: ApiSchema): List<ApiTable> =
getTablesInfo(apiSchema).map { it.apiTable }
suspend fun listTables(
catalogId: String,
databaseId: String,
schemaId: String,
): List<ApiTable> =
getTablesInfo(catalogId, databaseId, schemaId).map { it.apiTable }

suspend fun getBarePolicy(apiTable: ApiTable): DataPolicy {
val tables = getTablesInfo(apiTable.schema)
val table = tables.find { it.id == apiTable.id }
suspend fun getBarePolicy(
catalogId: String,
databaseId: String,
schemaId: String,
tableId: String,
): DataPolicy {
val tables = getTablesInfo(catalogId, databaseId, schemaId)
val table = tables.firstOrNull { it.id == tableId }
?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Table")
.setResourceName(apiTable.id)
.setDescription("Table ${apiTable.id} not found in schema ${apiTable.schema.id}")
.setOwner("Schema: ${apiTable.schema.id}")
.setResourceName(tableId)
.setDescription("Table $tableId not found in schema $schemaId")
.setOwner("Schema: $schemaId")
.build()
)
return table.getDataPolicy()!!
Expand All @@ -85,25 +94,29 @@ class CatalogService(
*
* @return dto object with all relevant info
*/
private suspend fun getTablesInfo(apiSchema: ApiSchema): List<DataCatalog.Table> {
val catalog = getCatalog(apiSchema.database.catalog.id)
val database = catalog.listDatabases().find { it.id == apiSchema.database.id }
private suspend fun getTablesInfo(
catalogId: String,
databaseId: String,
schemaId: String,
): List<DataCatalog.Table> {
val catalog = getCatalog(catalogId)
val database = catalog.listDatabases().firstOrNull { it.id == databaseId }
?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Catalog Database")
.setResourceName(apiSchema.database.id)
.setDescription("Database ${apiSchema.database.id} not found in catalog ${apiSchema.database.catalog.id}")
.setOwner("Catalog: ${apiSchema.database.catalog.id}")
.setResourceName(databaseId)
.setDescription("Database $databaseId not found in catalog $catalogId")
.setOwner("Catalog: $catalogId")
.build()
)
val schema = database.getSchemas().find { it.id == apiSchema.id } ?: throw ResourceException(
val schema = database.getSchemas().firstOrNull { it.id == schemaId } ?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Catalog Database Schema")
.setResourceName(apiSchema.id)
.setDescription("Schema ${apiSchema.id} not found in database ${apiSchema.database.id} of catalog ${apiSchema.database.catalog.id}")
.setOwner("Database: ${apiSchema.database.id}")
.setResourceName(schemaId)
.setDescription("Schema $schemaId not found in database $databaseId of catalog $catalogId")
.setOwner("Database: $databaseId")
.build()
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.getstrm.pace.service

import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformGroupsRequest
import build.buf.gen.getstrm.api.data_policies.v1alpha.ListProcessingPlatformTablesRequest
import com.getstrm.pace.bigquery.BigQueryClient
import com.getstrm.pace.config.ProcessingPlatformConfiguration
import com.getstrm.pace.databricks.DatabricksClient
Expand Down Expand Up @@ -56,18 +54,18 @@ class ProcessingPlatformsService(
}
}

suspend fun listProcessingPlatformTables(request: ListProcessingPlatformTablesRequest): List<Table> =
(platforms[request.platform.id] ?: throw processingPlatformNotFound(request.platform.id)).listTables()
suspend fun listProcessingPlatformTables(platformId: String): List<Table> =
(platforms[platformId] ?: throw processingPlatformNotFound(platformId)).listTables()

suspend fun listProcessingPlatformGroups(request: ListProcessingPlatformGroupsRequest): List<Group> =
(platforms[request.platform.id] ?: throw processingPlatformNotFound(request.platform.id)).listGroups()
suspend fun listProcessingPlatformGroups(platformId: String): List<Group> =
(platforms[platformId] ?: throw processingPlatformNotFound(platformId)).listGroups()

suspend fun createBarePolicy(platform: DataPolicy.ProcessingPlatform?, tableName: String): DataPolicy {
suspend fun createBarePolicy(platformId: String, tableName: String): DataPolicy {
val processingPlatformInterface =
platforms[platform!!.id] ?: throw processingPlatformNotFound(platform.id)
platforms[platformId] ?: throw processingPlatformNotFound(platformId)
val table = processingPlatformInterface.getTable(tableName)
return table.toDataPolicy(
DataPolicy.ProcessingPlatform.newBuilder().setId(platform.id)
DataPolicy.ProcessingPlatform.newBuilder().setId(platformId)
.setPlatformType(processingPlatformInterface.type).build()
)
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ jooqVersion=3.18.7
# TODO verify the tag is correct
dockertag = ghcr.io/getstrm/pace:latest

generatedBufDependencyVersion=00000000000000.609525407a2f
generatedBufDependencyVersion=00000000000000.240640388efd
3 changes: 3 additions & 0 deletions protos/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1. Install the `Buf for Protocol Buffers` and the bundled `gRPC` plugins for Intellij.
2. Configure your buf path in the plugin settings (e.g. `$ which buf`)
3. Update buf with `buf mod update`
8 changes: 4 additions & 4 deletions protos/buf.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
version: v1
deps:
- remote: buf.build
owner: envoyproxy
repository: protoc-gen-validate
commit: eac44469a7af47e7839a7f1f3d7ac004
digest: shake256:0feabcde01b6b11e3c75a5e3f807968d5995626546f39c37e5d4205892b3a59cced0ed83b35a2eb9e6dddd3309660ad46b737c9dcd224b425de0a6654ce04417
owner: bufbuild
repository: protovalidate
commit: 0de7443d03cf41228f8a9790b12b417e
digest: shake256:3c0676a73cef06439c107cb9560627354815adbc254976f807d645de7e2c1bf19d0438d5d56d5bc92465377e0d9315951e986fc6ced2871e450534b2b8c953b0
- remote: buf.build
owner: googleapis
repository: googleapis
Expand Down
5 changes: 2 additions & 3 deletions protos/buf.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
version: v1
name: buf.build/getstrm/daps
name: buf.build/getstrm/pace
deps:
- buf.build/googleapis/googleapis
# FIXME once an official repo is available for bufbuild/protoc-gen-validate, we should change to that
- buf.build/envoyproxy/protoc-gen-validate
- buf.build/bufbuild/protovalidate
breaking:
use:
- FILE
Expand Down
Loading

0 comments on commit 56dcc77

Please sign in to comment.