Skip to content

Commit

Permalink
feat(strm-2706): replace all custom exceptions with standardized exce…
Browse files Browse the repository at this point in the history
…ptions
  • Loading branch information
trietsch committed Oct 25, 2023
1 parent bdc45f2 commit fb70281
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 121 deletions.
26 changes: 15 additions & 11 deletions app/src/main/kotlin/com/getstrm/pace/bigquery/BigQueryClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@ import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.ProcessingPlatform.PlatformType.BIGQUERY
import com.getstrm.pace.config.BigQueryConfig
import com.getstrm.pace.domain.Group
import com.getstrm.pace.domain.ProcessingPlatformExecuteException
import com.getstrm.pace.domain.ProcessingPlatformInterface
import com.getstrm.pace.domain.Table
import com.getstrm.pace.exceptions.InternalException
import com.google.auth.oauth2.GoogleCredentials
import com.google.cloud.bigquery.Acl
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryException
import com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.bigquery.JobException
import com.google.cloud.bigquery.QueryJobConfiguration
import com.google.cloud.bigquery.TableId
import com.google.cloud.bigquery.*
import com.google.rpc.DebugInfo
import org.slf4j.LoggerFactory
import toFullName
import com.google.cloud.bigquery.Table as BQTable
Expand Down Expand Up @@ -67,15 +62,23 @@ class BigQueryClient(
} catch (e: JobException) {
log.warn("SQL query\n{}", query)
log.warn("Caused error {}", e.message)
throw ProcessingPlatformExecuteException(id, e.message ?: e.stackTraceToString())
throw InternalException(
InternalException.Code.INTERNAL,
DebugInfo.newBuilder()
.setDetail(
"Error while executing BigQuery query (error message: ${e.message}), please check the logs of your PACE deployment. This is a bug, please report to https://github.com/getstrm/pace/issues/new"
)
.addAllStackEntries(e.stackTrace.map { it.toString() })
.build()
)
}
try {
authorizeViews(dataPolicy)
} catch (e: BigQueryException) {
if (e.message == "Duplicate authorized views") {
log.debug("{}", e.message)
} else {
throw(e)
throw (e)
}
}
}
Expand Down Expand Up @@ -122,5 +125,6 @@ class BigQueryTable(
private val table: BQTable,
) : Table() {

override suspend fun toDataPolicy(platform: DataPolicy.ProcessingPlatform): DataPolicy = table.toDataPolicy(platform)
override suspend fun toDataPolicy(platform: DataPolicy.ProcessingPlatform): DataPolicy =
table.toDataPolicy(platform)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.getstrm.pace.common

import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import com.getstrm.pace.domain.SqlParseException
import com.getstrm.pace.exceptions.BadRequestException
import com.google.rpc.BadRequest
import headTailFold
import org.jooq.* // ktlint-disable no-wildcard-imports
import org.jooq.conf.ParseNameCase
Expand Down Expand Up @@ -163,8 +164,19 @@ abstract class AbstractDynamicViewGenerator(
try {
parser.parseField(transform.sqlStatement.statement)
} catch (e: ParserException) {
// throw SqlParseException(transform.sqlStatement.statement, e)
throw IllegalArgumentException(transform.sqlStatement.statement, e)
throw BadRequestException(
BadRequestException.Code.INVALID_ARGUMENT,
BadRequest.newBuilder()
.addAllFieldViolations(
listOf(
BadRequest.FieldViolation.newBuilder()
.setField("dataPolicy.ruleSetsList.fieldTransformsList.sqlStatement")
.setDescription("Error parsing SQL statement: ${e.message}")
.build()
)
)
.build()
)
}
// Todo: for now we use the parser just to detect errors, since the resulting sql may be incompatible with the target platform -> I've asked a question on SO: https://stackoverflow.com/q/77300702
// (For example, the BigQuery string datatype gets parsed as varchar)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.getstrm.pace.databricks

import com.getstrm.pace.domain.ProcessingPlatformExecuteException
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.ProcessingPlatform.PlatformType.DATABRICKS
import com.databricks.sdk.AccountClient
Expand All @@ -13,6 +12,8 @@ import com.databricks.sdk.service.sql.ExecuteStatementResponse
import com.databricks.sdk.service.sql.StatementState
import com.getstrm.pace.config.DatabricksConfig
import com.getstrm.pace.domain.*
import com.getstrm.pace.exceptions.InternalException
import com.google.rpc.DebugInfo
import org.slf4j.LoggerFactory

class DatabricksClient(
Expand Down Expand Up @@ -92,7 +93,16 @@ class DatabricksClient(
log.warn("SQL statement\n{}", statement)
val errorMessage = "Databricks response %s: %s".format(response.status.error, response.status.error.message)
log.warn("Caused error {}", errorMessage)
throw ProcessingPlatformExecuteException(id, "Failed to apply policy: $errorMessage")

throw InternalException(
InternalException.Code.INTERNAL,
DebugInfo.newBuilder()
.setDetail(
"Error while executing Databricks query (error code: ${response.status.error.errorCode.name}), please check the logs of your PACE deployment. This is a bug, please report to https://github.com/getstrm/pace/issues/new"
)
.addAllStackEntries(listOf(errorMessage))
.build()
)
}
}
}
Expand Down
41 changes: 0 additions & 41 deletions app/src/main/kotlin/com/getstrm/pace/domain/Exceptions.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.getstrm.pace.domain

import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import com.getstrm.pace.domain.ProcessingPlatformTableNotFound
import com.getstrm.pace.exceptions.ResourceException
import com.google.rpc.ResourceInfo

interface ProcessingPlatformInterface {

Expand All @@ -13,7 +14,15 @@ interface ProcessingPlatformInterface {
suspend fun applyPolicy(dataPolicy: DataPolicy)

suspend fun createTable(tableName: String): Table =
listTables().find { it.fullName == tableName } ?: throw ProcessingPlatformTableNotFound(id, type, tableName)
listTables().find { it.fullName == tableName } ?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Table")
.setResourceName(tableName)
.setDescription("Table $tableName not found in platform $id of type $type")
.setOwner("Processing Platform: ${type.name}")
.build()
)
}

data class Group(val id: String, val name: String, val description: String? = null)
Expand Down
54 changes: 48 additions & 6 deletions app/src/main/kotlin/com/getstrm/pace/service/CatalogService.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.getstrm.pace.service

import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy
import com.getstrm.pace.catalogs.CollibraCatalog
import com.getstrm.pace.catalogs.DatahubCatalog
import com.getstrm.pace.catalogs.OpenDataDiscoveryCatalog
import com.getstrm.pace.config.CatalogsConfiguration
import com.getstrm.pace.domain.*
import com.getstrm.pace.exceptions.ResourceException
import com.google.rpc.ResourceInfo
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog as ApiCatalog
Expand Down Expand Up @@ -46,7 +49,15 @@ class CatalogService(
suspend fun listSchemas(apiDatabase: ApiDatabase): List<ApiSchema> {
val catalog = getCatalog(apiDatabase.catalog.id)
val database = catalog.listDatabases().find { it.id == apiDatabase.id }
?: throw CatalogDatabaseNotFoundException(apiDatabase.id)
?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Database")
.setResourceName(apiDatabase.id)
.setDescription("Database ${apiDatabase.id} not found in catalog ${apiDatabase.catalog.id}")
.setOwner("Catalog: ${apiDatabase.catalog.id}")
.build()
)
val schemas = database.getSchemas()
return schemas.map { it.apiSchema }
}
Expand All @@ -57,7 +68,15 @@ class CatalogService(
suspend fun getBarePolicy(apiTable: ApiTable): DataPolicy {
val tables = getTablesInfo(apiTable.schema)
val table = tables.find { it.id == apiTable.id }
?: throw CatalogTableNotFoundException(apiTable.id)
?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Table")
.setResourceName(apiTable.id)
.setDescription("Table ${apiTable.id} not found in schema ${apiTable.schema.id}")
.setOwner("Schema: ${apiTable.schema.id}")
.build()
)
return table.getDataPolicy()!!
}

Expand All @@ -69,12 +88,35 @@ class CatalogService(
private suspend fun getTablesInfo(apiSchema: ApiSchema): List<DataCatalog.Table> {
val catalog = getCatalog(apiSchema.database.catalog.id)
val database = catalog.listDatabases().find { it.id == apiSchema.database.id }
?: throw CatalogDatabaseNotFoundException(apiSchema.database.id)
val schema =
database.getSchemas().find { it.id == apiSchema.id } ?: throw CatalogSchemaNotFoundException(apiSchema.id)
?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Catalog Database")
.setResourceName(apiSchema.database.id)
.setDescription("Database ${apiSchema.database.id} not found in catalog ${apiSchema.database.catalog.id}")
.setOwner("Catalog: ${apiSchema.database.catalog.id}")
.build()
)
val schema = database.getSchemas().find { it.id == apiSchema.id } ?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Catalog Database Schema")
.setResourceName(apiSchema.id)
.setDescription("Schema ${apiSchema.id} not found in database ${apiSchema.database.id} of catalog ${apiSchema.database.catalog.id}")
.setOwner("Database: ${apiSchema.database.id}")
.build()
)

return schema.getTables()
}

private fun getCatalog(id: String): DataCatalog =
catalogs[id] ?: throw CatalogNotFoundException(id)
catalogs[id] ?: throw ResourceException(
ResourceException.Code.NOT_FOUND,
ResourceInfo.newBuilder()
.setResourceType("Catalog")
.setResourceName(id)
.setDescription("Catalog $id not found, please ensure it is present in the configuration of the catalogs.")
.build()
)
}
Loading

0 comments on commit fb70281

Please sign in to comment.