Skip to content

Commit

Permalink
Added support for new seed asset types: Postgres, Redshift, MsSQL, Da…
Browse files Browse the repository at this point in the history
…tabricks, Snowflake, Athena, and Synapse.
  • Loading branch information
y-bruin committed Jan 2, 2025
1 parent 6c14f58 commit cd9ca67
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
21 changes: 21 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,10 @@ func setupExecutors(
mainExecutors[pipeline.AssetTypePostgresQuery][scheduler.TaskInstanceTypeColumnCheck] = pgCheckRunner
mainExecutors[pipeline.AssetTypePostgresQuery][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

mainExecutors[pipeline.AssetTypePostgresSeed][scheduler.TaskInstanceTypeMain] = seedOperator
mainExecutors[pipeline.AssetTypePostgresSeed][scheduler.TaskInstanceTypeColumnCheck] = pgCheckRunner
mainExecutors[pipeline.AssetTypePostgresSeed][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

// we set the Python runners to run the checks on Snowflake assuming that there won't be many usecases where a user has both BQ and Snowflake
if estimateCustomCheckType == pipeline.AssetTypePostgresQuery || estimateCustomCheckType == pipeline.AssetTypeRedshiftQuery {
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = pgCheckRunner
Expand All @@ -685,6 +689,10 @@ func setupExecutors(
mainExecutors[pipeline.AssetTypeSnowflakeQuerySensor][scheduler.TaskInstanceTypeColumnCheck] = sfCheckRunner
mainExecutors[pipeline.AssetTypeSnowflakeQuerySensor][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

mainExecutors[pipeline.AssetTypeSnowflakeSeed][scheduler.TaskInstanceTypeMain] = seedOperator
mainExecutors[pipeline.AssetTypeSnowflakeSeed][scheduler.TaskInstanceTypeColumnCheck] = sfCheckRunner
mainExecutors[pipeline.AssetTypeSnowflakeSeed][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

// we set the Python runners to run the checks on Snowflake assuming that there won't be many usecases where a user has both BQ and Snowflake
if estimateCustomCheckType == pipeline.AssetTypeSnowflakeQuery {
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = sfCheckRunner
Expand All @@ -708,6 +716,10 @@ func setupExecutors(
mainExecutors[pipeline.AssetTypeSynapseQuery][scheduler.TaskInstanceTypeColumnCheck] = synapseCheckRunner
mainExecutors[pipeline.AssetTypeSynapseQuery][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

mainExecutors[pipeline.AssetTypeMsSQLSeed][scheduler.TaskInstanceTypeMain] = seedOperator
mainExecutors[pipeline.AssetTypeMsSQLSeed][scheduler.TaskInstanceTypeColumnCheck] = synapseCheckRunner
mainExecutors[pipeline.AssetTypeMsSQLSeed][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

// we set the Python runners to run the checks on MsSQL
if estimateCustomCheckType == pipeline.AssetTypeMsSQLQuery || estimateCustomCheckType == pipeline.AssetTypeSynapseQuery {
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = msCheckRunner
Expand All @@ -723,6 +735,10 @@ func setupExecutors(
mainExecutors[pipeline.AssetTypeDatabricksQuery][scheduler.TaskInstanceTypeColumnCheck] = databricksCheckRunner
mainExecutors[pipeline.AssetTypeDatabricksQuery][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

mainExecutors[pipeline.AssetTypeDatabricksSeed][scheduler.TaskInstanceTypeMain] = seedOperator
mainExecutors[pipeline.AssetTypeDatabricksSeed][scheduler.TaskInstanceTypeColumnCheck] = databricksCheckRunner
mainExecutors[pipeline.AssetTypeDatabricksSeed][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

// we set the Python runners to run the checks on MsSQL
if estimateCustomCheckType == pipeline.AssetTypeDatabricksQuery {
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = databricksOperator
Expand Down Expand Up @@ -751,6 +767,11 @@ func setupExecutors(
mainExecutors[pipeline.AssetTypeAthenaQuery][scheduler.TaskInstanceTypeMain] = athenaOperator
mainExecutors[pipeline.AssetTypeAthenaQuery][scheduler.TaskInstanceTypeColumnCheck] = athenaCheckRunner
mainExecutors[pipeline.AssetTypeAthenaQuery][scheduler.TaskInstanceTypeCustomCheck] = athenaCustomCheckRunner

mainExecutors[pipeline.AssetTypeAthenaSeed][scheduler.TaskInstanceTypeMain] = seedOperator
mainExecutors[pipeline.AssetTypeAthenaSeed][scheduler.TaskInstanceTypeColumnCheck] = athenaCheckRunner
mainExecutors[pipeline.AssetTypeAthenaSeed][scheduler.TaskInstanceTypeCustomCheck] = athenaCustomCheckRunner

if estimateCustomCheckType == pipeline.AssetTypeAthenaQuery {
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = athenaCheckRunner
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeCustomCheck] = athenaCustomCheckRunner
Expand Down
33 changes: 33 additions & 0 deletions pkg/executor/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,51 @@ var DefaultExecutorsV2 = map[pipeline.AssetType]Config{
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypePostgresSeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeRedshiftQuery: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeRedshiftSeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeMsSQLQuery: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeMsSQLSeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeDatabricksQuery: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeDatabricksSeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeAthenaQuery: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeAthenaSeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeAthenaSQLSensor: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
Expand All @@ -102,6 +127,11 @@ var DefaultExecutorsV2 = map[pipeline.AssetType]Config{
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeSynapseSeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypePython: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
Expand All @@ -121,6 +151,9 @@ var DefaultExecutorsV2 = map[pipeline.AssetType]Config{
pipeline.AssetTypeSnowflakeQuerySensor: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
pipeline.AssetTypeSnowflakeSeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
"appsflyer.export.bq": {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
Expand Down
15 changes: 15 additions & 0 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (

AssetTypePython = AssetType("python")
AssetTypeSnowflakeQuery = AssetType("sf.sql")
AssetTypeSnowflakeSeed = AssetType("sf.seed")
AssetTypeSnowflakeQuerySensor = AssetType("sf.sensor.query")
AssetTypeBigqueryQuery = AssetType("bq.sql")
AssetTypeBigqueryTableSensor = AssetType("bq.sensor.table")
Expand All @@ -36,12 +37,19 @@ const (
AssetTypeDuckDBSeed = AssetType("duckdb.seed")
AssetTypeEmpty = AssetType("empty")
AssetTypePostgresQuery = AssetType("pg.sql")
AssetTypePostgresSeed = AssetType("pg.seed")
AssetTypeRedshiftQuery = AssetType("rs.sql")
AssetTypeRedshiftSeed = AssetType("rs.seed")
AssetTypeAthenaQuery = AssetType("athena.sql")
AssetTypeAthenaSQLSensor = AssetType("athena.sensor.query")
AssetTypeAthenaSeed = AssetType("athena.seed")
AssetTypeAthenaSensor = AssetType("athena.sensor.csv")
AssetTypeMsSQLQuery = AssetType("ms.sql")
AssetTypeMsSQLSeed = AssetType("ms.seed")
AssetTypeDatabricksQuery = AssetType("databricks.sql")
AssetTypeDatabricksSeed = AssetType("databricks.seed")
AssetTypeSynapseQuery = AssetType("synapse.sql")
AssetTypeSynapseSeed = AssetType("synapse.seed")
AssetTypeIngestr = AssetType("ingestr")
AssetTypeTableau = AssetType("tableau")
RunConfigFullRefresh = RunConfig("full-refresh")
Expand Down Expand Up @@ -457,12 +465,19 @@ var AssetTypeConnectionMapping = map[AssetType]string{
AssetTypeBigquerySource: "google_cloud_platform",
AssetTypeSnowflakeQuery: "snowflake",
AssetTypeSnowflakeQuerySensor: "snowflake",
AssetTypeSnowflakeSeed: "snowflake",
AssetTypePostgresQuery: "postgres",
AssetTypePostgresSeed: "postgres",
AssetTypeRedshiftQuery: "redshift",
AssetTypeRedshiftSeed: "redshift",
AssetTypeMsSQLQuery: "mssql",
AssetTypeMsSQLSeed: "mssql",
AssetTypeDatabricksQuery: "databricks",
AssetTypeDatabricksSeed: "databricks",
AssetTypeSynapseQuery: "synapse",
AssetTypeSynapseSeed: "synapse",
AssetTypeAthenaQuery: "athena",
AssetTypeAthenaSeed: "athena",
AssetTypeDuckDBQuery: "duckdb",
AssetTypeDuckDBSeed: "duckdb",
}
Expand Down

0 comments on commit cd9ca67

Please sign in to comment.