From 9e384aac102d6f8d1d2a9ba425e6a49fd46be6d8 Mon Sep 17 00:00:00 2001 From: Louis Date: Sat, 27 Apr 2024 18:57:59 +0100 Subject: [PATCH] feat: connect snapshot query generator --- rust/core/src/project.rs | 92 +++++++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 21 deletions(-) diff --git a/rust/core/src/project.rs b/rust/core/src/project.rs index d20ff2b4..7c5a2c51 100644 --- a/rust/core/src/project.rs +++ b/rust/core/src/project.rs @@ -1418,13 +1418,17 @@ async fn convert_to_select_statement( let sql = render_seed_select_statement(database, file_system, seed).await?; Ok((node.name.clone(), sql)) } + AssetData::Snapshot(snapshot) => { + let sql = + render_snapshot_select_statement(database, file_system, snapshot, project) + .await?; + Ok((node.name.clone(), sql)) + } AssetData::Model(model) => { let sql = render_model_select_statement(database, file_system, model, project) .await?; Ok((node.name.clone(), sql)) } - // TODO: Implement snapshot select functionality (separate PR) - AssetData::Snapshot(_) => Err("Snapshots are not supported yet".to_string()), } }) .collect::>(); @@ -1564,6 +1568,52 @@ async fn render_model_select_statement( Ok(replaced) } +async fn render_snapshot_select_statement( + database: &impl DatabaseQueryGenerator, + fs: &impl FileSystem, + snapshot: &Snapshot, + project: &Project, +) -> Result { + let reader = fs + .read_file(snapshot.file_path.as_str()) + .await + .map_err(|e| { + format!( + "failed to read file {:?} with error {:?}", + snapshot.file_path, e + ) + })?; + let sql = read_normalise_model(reader).await?; + + let reference_search = return_reference_search(DEFAULT_SCHEMA_PREFIX).map_err(|e| { + format!( + "error creating reference search for snapshot {:?}: {:?}", + snapshot.name, e + ) + })?; + let replaced = reference_search.replace_all( + sql.as_str(), + replace_reference_string_found(&HashMap::new(), &database), + ); + let connection_config = project + .connection_config + .as_ref() + .ok_or_else(|| "Connection config is required".to_string())?; + + let replaced = + replace_variable_templates_with_variable_defined_in_config(&replaced, connection_config)?; + + let snapshot_strategy = snapshot + .strategy + .clone() + .ok_or("missing snapshot strategy")?; + let snapshot_strategy_type = snapshot_strategy + .strategy_type + .ok_or("missing snapshot strategy type")?; + + database.generate_snapshot_query(&replaced, &snapshot.unique_key, &snapshot_strategy_type) +} + pub fn replace_variable_templates_with_variable_defined_in_config( sql: &str, connection_config: &ConnectionConfig, @@ -2398,16 +2448,16 @@ sources: }; let expected_output = vec![ ( - "stg_commits".to_string(), + "stg_commits".to_string(), vec![ - "DROP VIEW IF EXISTS `quarylabs.transform.stg_commits`".to_string(), + "DROP VIEW IF EXISTS `quarylabs.transform.stg_commits`".to_string(), "CREATE VIEW `quarylabs.transform.stg_commits` AS SELECT author FROM `quarylabs.airbyte_github.commits`".to_string() ] ), ( - "commits_transformed".to_string(), + "commits_transformed".to_string(), vec![ - "DROP VIEW IF EXISTS `quarylabs.transform.commits_transformed`".to_string(), + "DROP VIEW IF EXISTS `quarylabs.transform.commits_transformed`".to_string(), "CREATE VIEW `quarylabs.transform.commits_transformed` AS SELECT * FROM `quarylabs.transform.stg_commits`".to_string() ] ) @@ -2463,16 +2513,16 @@ sources: let expected_output = vec![ ( - "stg_shifts".to_string(), + "stg_shifts".to_string(), vec![ - "DROP VIEW IF EXISTS `stg_shifts`".to_string(), + "DROP VIEW IF EXISTS `stg_shifts`".to_string(), "CREATE VIEW `stg_shifts` AS SELECT employee_id, shift_date, shift FROM `raw_shifts_real_table`".to_string() ] ), ( - "shifts_transformed".to_string(), + "shifts_transformed".to_string(), vec![ - "DROP VIEW IF EXISTS `shifts_transformed`".to_string(), + "DROP VIEW IF EXISTS `shifts_transformed`".to_string(), "CREATE VIEW `shifts_transformed` AS SELECT * FROM `stg_shifts`".to_string() ] ) @@ -2771,7 +2821,7 @@ models: " sources: - name: raw_orders - path: raw_orders_source + path: raw_orders_source snapshots: - name: orders_snapshot description: Some snapshot description @@ -2846,9 +2896,9 @@ models: " sources: - name: raw_orders - path: raw_orders_source + path: raw_orders_source models: - - name: stg_orders + - name: stg_orders snapshots: - name: orders_snapshot unique_key: id @@ -2900,9 +2950,9 @@ models: " sources: - name: raw_orders - path: raw_orders_source + path: raw_orders_source models: - - name: stg_orders + - name: stg_orders snapshots: - name: orders_snapshot unique_key: id @@ -2957,9 +3007,9 @@ models: " sources: - name: raw_orders - path: raw_orders_source + path: raw_orders_source models: - - name: stg_orders + - name: stg_orders snapshots: - name: orders_snapshot unique_key: id @@ -3014,9 +3064,9 @@ models: " sources: - name: raw_orders - path: raw_orders_source + path: raw_orders_source models: - - name: stg_orders + - name: stg_orders ", ), }, @@ -3065,9 +3115,9 @@ models: " sources: - name: raw_orders - path: raw_orders_source + path: raw_orders_source models: - - name: stg_orders + - name: stg_orders snapshots: - name: orders_snapshot unique_key: id