Skip to content

Commit

Permalink
feat: connect snapshot query generator
Browse files Browse the repository at this point in the history
  • Loading branch information
louisjoecodes committed Apr 28, 2024
1 parent c2b9612 commit 9e384aa
Showing 1 changed file with 71 additions and 21 deletions.
92 changes: 71 additions & 21 deletions rust/core/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1418,13 +1418,17 @@ async fn convert_to_select_statement(
let sql = render_seed_select_statement(database, file_system, seed).await?;
Ok((node.name.clone(), sql))
}
AssetData::Snapshot(snapshot) => {
let sql =
render_snapshot_select_statement(database, file_system, snapshot, project)
.await?;
Ok((node.name.clone(), sql))
}
AssetData::Model(model) => {
let sql = render_model_select_statement(database, file_system, model, project)
.await?;
Ok((node.name.clone(), sql))
}
// TODO: Implement snapshot select functionality (separate PR)
AssetData::Snapshot(_) => Err("Snapshots are not supported yet".to_string()),
}
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -1564,6 +1568,52 @@ async fn render_model_select_statement(
Ok(replaced)
}

async fn render_snapshot_select_statement(
database: &impl DatabaseQueryGenerator,
fs: &impl FileSystem,
snapshot: &Snapshot,
project: &Project,
) -> Result<String, String> {
let reader = fs
.read_file(snapshot.file_path.as_str())
.await
.map_err(|e| {
format!(
"failed to read file {:?} with error {:?}",
snapshot.file_path, e
)
})?;
let sql = read_normalise_model(reader).await?;

let reference_search = return_reference_search(DEFAULT_SCHEMA_PREFIX).map_err(|e| {
format!(
"error creating reference search for snapshot {:?}: {:?}",
snapshot.name, e
)
})?;
let replaced = reference_search.replace_all(
sql.as_str(),
replace_reference_string_found(&HashMap::new(), &database),
);
let connection_config = project
.connection_config
.as_ref()
.ok_or_else(|| "Connection config is required".to_string())?;

let replaced =
replace_variable_templates_with_variable_defined_in_config(&replaced, connection_config)?;

let snapshot_strategy = snapshot
.strategy
.clone()
.ok_or("missing snapshot strategy")?;
let snapshot_strategy_type = snapshot_strategy
.strategy_type
.ok_or("missing snapshot strategy type")?;

database.generate_snapshot_query(&replaced, &snapshot.unique_key, &snapshot_strategy_type)

Check failure on line 1614 in rust/core/src/project.rs

View workflow job for this annotation

GitHub Actions / Rust Lint

this method takes 4 arguments but 3 arguments were supplied

Check failure on line 1614 in rust/core/src/project.rs

View workflow job for this annotation

GitHub Actions / Rust Test

this method takes 4 arguments but 3 arguments were supplied
}

pub fn replace_variable_templates_with_variable_defined_in_config(
sql: &str,
connection_config: &ConnectionConfig,
Expand Down Expand Up @@ -2398,16 +2448,16 @@ sources:
};
let expected_output = vec![
(
"stg_commits".to_string(),
"stg_commits".to_string(),
vec![
"DROP VIEW IF EXISTS `quarylabs.transform.stg_commits`".to_string(),
"DROP VIEW IF EXISTS `quarylabs.transform.stg_commits`".to_string(),
"CREATE VIEW `quarylabs.transform.stg_commits` AS SELECT author FROM `quarylabs.airbyte_github.commits`".to_string()
]
),
(
"commits_transformed".to_string(),
"commits_transformed".to_string(),
vec![
"DROP VIEW IF EXISTS `quarylabs.transform.commits_transformed`".to_string(),
"DROP VIEW IF EXISTS `quarylabs.transform.commits_transformed`".to_string(),
"CREATE VIEW `quarylabs.transform.commits_transformed` AS SELECT * FROM `quarylabs.transform.stg_commits`".to_string()
]
)
Expand Down Expand Up @@ -2463,16 +2513,16 @@ sources:

let expected_output = vec![
(
"stg_shifts".to_string(),
"stg_shifts".to_string(),
vec![
"DROP VIEW IF EXISTS `stg_shifts`".to_string(),
"DROP VIEW IF EXISTS `stg_shifts`".to_string(),
"CREATE VIEW `stg_shifts` AS SELECT employee_id, shift_date, shift FROM `raw_shifts_real_table`".to_string()
]
),
(
"shifts_transformed".to_string(),
"shifts_transformed".to_string(),
vec![
"DROP VIEW IF EXISTS `shifts_transformed`".to_string(),
"DROP VIEW IF EXISTS `shifts_transformed`".to_string(),
"CREATE VIEW `shifts_transformed` AS SELECT * FROM `stg_shifts`".to_string()
]
)
Expand Down Expand Up @@ -2771,7 +2821,7 @@ models:
"
sources:
- name: raw_orders
path: raw_orders_source
path: raw_orders_source
snapshots:
- name: orders_snapshot
description: Some snapshot description
Expand Down Expand Up @@ -2846,9 +2896,9 @@ models:
"
sources:
- name: raw_orders
path: raw_orders_source
path: raw_orders_source
models:
- name: stg_orders
- name: stg_orders
snapshots:
- name: orders_snapshot
unique_key: id
Expand Down Expand Up @@ -2900,9 +2950,9 @@ models:
"
sources:
- name: raw_orders
path: raw_orders_source
path: raw_orders_source
models:
- name: stg_orders
- name: stg_orders
snapshots:
- name: orders_snapshot
unique_key: id
Expand Down Expand Up @@ -2957,9 +3007,9 @@ models:
"
sources:
- name: raw_orders
path: raw_orders_source
path: raw_orders_source
models:
- name: stg_orders
- name: stg_orders
snapshots:
- name: orders_snapshot
unique_key: id
Expand Down Expand Up @@ -3014,9 +3064,9 @@ models:
"
sources:
- name: raw_orders
path: raw_orders_source
path: raw_orders_source
models:
- name: stg_orders
- name: stg_orders
",
),
},
Expand Down Expand Up @@ -3065,9 +3115,9 @@ models:
"
sources:
- name: raw_orders
path: raw_orders_source
path: raw_orders_source
models:
- name: stg_orders
- name: stg_orders
snapshots:
- name: orders_snapshot
unique_key: id
Expand Down

0 comments on commit 9e384aa

Please sign in to comment.