Skip to content

Commit

Permalink
Merge pull request #9 from getstrm/feature/strm-2706
Browse files Browse the repository at this point in the history
Exception Handler Interceptor and standardized error handling
  • Loading branch information
ivan-p92 authored Oct 25, 2023
2 parents 5d71d6b + aadb08e commit 345e49b
Show file tree
Hide file tree
Showing 22 changed files with 660 additions and 136 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ out/
/app/build/
output/

.common-protos

app/src/main/resources/application-local.yaml

protos/*.binpb
2 changes: 0 additions & 2 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ dependencies {


// Todo remove before squashing
implementation("io.strmprivacy.grpc.common:kotlin-grpc-common:3.22.0")

implementation(enforcedPlatform("com.google.cloud:libraries-bom:26.24.0"))
implementation("com.google.cloud:google-cloud-bigquery")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class DataPolicyServiceApplication
class PaceApplication

fun main(args: Array<String>) {
runApplication<DataPolicyServiceApplication>(*args)
runApplication<PaceApplication>(*args)
}
27 changes: 16 additions & 11 deletions app/src/main/kotlin/com/getstrm/pace/bigquery/BigQueryClient.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package com.getstrm.pace.bigquery

import com.getstrm.pace.domain.ProcessingPlatformExecuteException
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.ProcessingPlatform.PlatformType.BIGQUERY
import com.getstrm.pace.config.BigQueryConfig
import com.getstrm.pace.domain.Group
import com.getstrm.pace.domain.ProcessingPlatformInterface
import com.getstrm.pace.domain.Table
import com.getstrm.pace.exceptions.InternalException
import com.google.auth.oauth2.GoogleCredentials
import com.google.cloud.bigquery.Acl
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryException
import com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.bigquery.JobException
import com.google.cloud.bigquery.QueryJobConfiguration
import com.google.cloud.bigquery.TableId
import com.google.cloud.bigquery.*
import com.google.rpc.DebugInfo
import org.slf4j.LoggerFactory
import toFullName
import com.google.cloud.bigquery.Table as BQTable
Expand Down Expand Up @@ -67,15 +62,24 @@ class BigQueryClient(
} catch (e: JobException) {
log.warn("SQL query\n{}", query)
log.warn("Caused error {}", e.message)
throw ProcessingPlatformExecuteException(id, e.message ?: e.stackTraceToString())
throw InternalException(
InternalException.Code.INTERNAL,
DebugInfo.newBuilder()
.setDetail(
"Error while executing BigQuery query (error message: ${e.message}), please check the logs of your PACE deployment. This is a bug, please report to https://github.com/getstrm/pace/issues/new"
)
.addAllStackEntries(e.stackTrace.map { it.toString() })
.build(),
e
)
}
try {
authorizeViews(dataPolicy)
} catch (e: BigQueryException) {
if (e.message == "Duplicate authorized views") {
log.debug("{}", e.message)
} else {
throw(e)
throw e
}
}
}
Expand Down Expand Up @@ -122,5 +126,6 @@ class BigQueryTable(
private val table: BQTable,
) : Table() {

override suspend fun toDataPolicy(platform: DataPolicy.ProcessingPlatform): DataPolicy = table.toDataPolicy(platform)
override suspend fun toDataPolicy(platform: DataPolicy.ProcessingPlatform): DataPolicy =
table.toDataPolicy(platform)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.getstrm.pace.common

import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import com.getstrm.pace.domain.SqlParseException
import com.getstrm.pace.exceptions.BadRequestException
import com.google.rpc.BadRequest
import headTailFold
import org.jooq.* // ktlint-disable no-wildcard-imports
import org.jooq.conf.ParseNameCase
Expand Down Expand Up @@ -163,7 +164,7 @@ abstract class AbstractDynamicViewGenerator(
try {
parser.parseField(transform.sqlStatement.statement)
} catch (e: ParserException) {
throw SqlParseException(transform.sqlStatement.statement, e)
throw invalidSqlStatementException(e)
}
// Todo: for now we use the parser just to detect errors, since the resulting sql may be incompatible with the target platform -> I've asked a question on SO: https://stackoverflow.com/q/77300702
// (For example, the BigQuery string datatype gets parsed as varchar)
Expand All @@ -182,6 +183,20 @@ abstract class AbstractDynamicViewGenerator(
return memberCheck to (statement as Field<Any>)
}

private fun invalidSqlStatementException(e: ParserException) = BadRequestException(
BadRequestException.Code.INVALID_ARGUMENT,
BadRequest.newBuilder()
.addAllFieldViolations(
listOf(
BadRequest.FieldViolation.newBuilder()
.setField("dataPolicy.ruleSetsList.fieldTransformsList.sqlStatement")
.setDescription("Error parsing SQL statement: ${e.message}")
.build()
)
)
.build()
)

private fun DataPolicy.Attribute.fullName(): String = this.pathComponentsList.joinToString(".")

companion object {
Expand Down
4 changes: 1 addition & 3 deletions app/src/main/kotlin/com/getstrm/pace/config/AppConfig.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.getstrm.pace.config

import io.strmprivacy.grpc.common.server.ExceptionHandlerInterceptor
import com.getstrm.pace.exceptions.ExceptionHandlerInterceptor
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Configuration
Expand All @@ -9,10 +9,8 @@ import org.springframework.context.annotation.Configuration
@Configuration
@EnableConfigurationProperties(ProcessingPlatformConfiguration::class, CatalogsConfiguration::class)
class AppConfig {

@GrpcGlobalServerInterceptor
fun exceptionInterceptor(): ExceptionHandlerInterceptor {
// Todo: re-implement or use @GrpcAdvice with @GrpcExceptionHandler instead after removing the kotlin-grpc-common dependency
return ExceptionHandlerInterceptor(false)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.getstrm.pace.databricks

import com.getstrm.pace.domain.ProcessingPlatformExecuteException
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.ProcessingPlatform.PlatformType.DATABRICKS
import com.databricks.sdk.AccountClient
Expand All @@ -13,6 +12,8 @@ import com.databricks.sdk.service.sql.ExecuteStatementResponse
import com.databricks.sdk.service.sql.StatementState
import com.getstrm.pace.config.DatabricksConfig
import com.getstrm.pace.domain.*
import com.getstrm.pace.exceptions.InternalException
import com.google.rpc.DebugInfo
import org.slf4j.LoggerFactory

class DatabricksClient(
Expand Down Expand Up @@ -92,7 +93,16 @@ class DatabricksClient(
log.warn("SQL statement\n{}", statement)
val errorMessage = "Databricks response %s: %s".format(response.status.error, response.status.error.message)
log.warn("Caused error {}", errorMessage)
throw ProcessingPlatformExecuteException(id, "Failed to apply policy: $errorMessage")

throw InternalException(
InternalException.Code.INTERNAL,
DebugInfo.newBuilder()
.setDetail(
"Error while executing Databricks query (error code: ${response.status.error.errorCode.name}), please check the logs of your PACE deployment. This is a bug, please report to https://github.com/getstrm/pace/issues/new"
)
.addAllStackEntries(listOf(errorMessage))
.build()
)
}
}
}
Expand Down
41 changes: 0 additions & 41 deletions app/src/main/kotlin/com/getstrm/pace/domain/Exceptions.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.getstrm.pace.domain

import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import com.getstrm.pace.exceptions.ResourceException
import com.google.rpc.ResourceInfo

interface ProcessingPlatformInterface {

Expand All @@ -11,8 +13,16 @@ interface ProcessingPlatformInterface {
suspend fun listTables(): List<Table>
suspend fun applyPolicy(dataPolicy: DataPolicy)

suspend fun createTable(tableName: String): Table =
listTables().find { it.fullName == tableName } ?: throw ProcessingPlatformTableNotFound(id, type, tableName)
suspend fun getTable(tableName: String): Table =
listTables().find { it.fullName == tableName } ?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Table")
.setResourceName(tableName)
.setDescription("Table $tableName not found in platform $id of type $type")
.setOwner("Processing Platform: ${type.name}")
.build()
)
}

data class Group(val id: String, val name: String, val description: String? = null)
Expand Down
Loading

0 comments on commit 345e49b

Please sign in to comment.