diff --git a/Cargo.lock b/Cargo.lock index dc50aeb9..5e8fd953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -646,6 +646,17 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bigdecimal" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6773ddc0eafc0e509fb60e48dff7f450f8e674a0686ae8605e8d9901bd5eefa" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -3913,6 +3924,7 @@ checksum = "d84b0a3c3739e220d94b3239fd69fb1f74bc36e16643423bd99de3b43c21bfbd" dependencies = [ "ahash 0.8.8", "atoi", + "bigdecimal", "byteorder", "bytes", "chrono", @@ -3997,6 +4009,7 @@ checksum = "e37195395df71fd068f6e2082247891bc11e3289624bbc776a0cdfa1ca7f1ea4" dependencies = [ "atoi", "base64 0.21.7", + "bigdecimal", "bitflags 2.4.2", "byteorder", "bytes", @@ -4040,6 +4053,7 @@ checksum = "d6ac0ac3b7ccd10cc96c7ab29791a7dd236bd94021f31eec7ba3d46a74aa1c24" dependencies = [ "atoi", "base64 0.21.7", + "bigdecimal", "bitflags 2.4.2", "byteorder", "chrono", @@ -4058,6 +4072,7 @@ dependencies = [ "log", "md-5", "memchr", + "num-bigint", "once_cell", "rand", "serde", diff --git a/proto/quary/service/v1/connection_config.proto b/proto/quary/service/v1/connection_config.proto index f2b93fba..86e08724 100644 --- a/proto/quary/service/v1/connection_config.proto +++ b/proto/quary/service/v1/connection_config.proto @@ -19,7 +19,9 @@ message ConnectionConfig { ConnectionConfigBigQuery big_query = 5; ConnectionConfigSnowflake snowflake = 6; ConnectionConfigPostgres postgres = 7; + ConnectionConfigRedshift redshift = 9; } + repeated Var vars = 8; message ConnectionConfigSqLite { @@ -40,16 +42,10 @@ message ConnectionConfig { message ConnectionConfigPostgres { string schema = 1; } - // - // message ConnectionConfigMySql { - // string username = 1; - // string password = 2; - // string protocol = 3; - // string host = 4; - // string port = 5; - // string database = 6; - // map params = 7; - // } + + message ConnectionConfigRedshift { + string schema = 1; + } message ConnectionConfigBigQuery { string project_id = 1; diff --git a/rust/cli/src/main.rs b/rust/cli/src/main.rs index ef84aa6d..176f8af6 100644 --- a/rust/cli/src/main.rs +++ b/rust/cli/src/main.rs @@ -489,9 +489,13 @@ async fn main_wrapped() -> Result<(), String> { let database = database_from_config(&config).await?; let query_generator = database.query_generator(); let (project, file_system) = parse_project(&query_generator).await?; - let snapshots_sql = - project_and_fs_to_sql_for_snapshots(&project, &file_system, &query_generator) - .await?; + let snapshots_sql = project_and_fs_to_sql_for_snapshots( + &project, + &file_system, + &query_generator, + &database, + ) + .await?; if snapshot_args.dry_run { println!("\n-- Create snapshots\n"); diff --git a/rust/cli/tests/cli_test.rs b/rust/cli/tests/cli_test.rs index 2f464744..309af87e 100644 --- a/rust/cli/tests/cli_test.rs +++ b/rust/cli/tests/cli_test.rs @@ -1,7 +1,9 @@ use assert_cmd::Command; +use chrono::NaiveDateTime; use chrono::Utc; use quary_core::databases::DatabaseConnection; use quary_databases::databases_duckdb; +use quary_databases::databases_redshift; use std::fs; use tempfile::tempdir; @@ -331,8 +333,9 @@ async fn test_duckdb_snapshots() { let current_date = Utc::now().date_naive(); let quary_valid_from_str = &result.rows[0][4]; let quary_valid_from_date = quary_valid_from_str.split_whitespace().next().unwrap(); + println!("quary_valid_from_date: {}", quary_valid_from_date); let quary_valid_from = - chrono::NaiveDate::parse_from_str(quary_valid_from_date, "%Y-%m-%d").unwrap(); + chrono::NaiveDate::parse_from_str(quary_valid_from_date, "%Y-%m-%dT%H:%M:%SZ").unwrap(); assert_eq!(current_date, quary_valid_from); // Update orders.csv data @@ -382,7 +385,191 @@ async fn test_duckdb_snapshots() { .next() .unwrap(); let updated_quary_valid_from = - chrono::NaiveDate::parse_from_str(updated_quary_valid_from_date, "%Y-%m-%d").unwrap(); + chrono::NaiveDate::parse_from_str(updated_quary_valid_from_date, "%Y-%m-%dT%H:%M:%SZ") + .unwrap(); assert_eq!(current_date, updated_quary_valid_from); } } + +/// This test simulates a workflow where a model references a snapshot in redshift. +/// 1. The initial snapshot is taken which builds the orders_snapshot table in the database. +/// 2. The project is built which references the orders_snapshot table. +/// 3. The initial state of the snapshot is asserted. +/// 4. The data is updated and a new snapshot is taken. +/// 5. The updated state of the snapshot is asserted. (from the stg_orders_snapshot table) +#[tokio::test] +#[ignore] +async fn test_redshift_snapshots() { + // Prepare the database + let database = + databases_redshift::Redshift::new("", None, "", "", "", "", None, None, None, None, None) + .await + .ok() + .unwrap(); + database + .exec("DROP TABLE analytics.orders CASCADE") + .await + .unwrap(); + database + .exec("DROP TABLE transform.orders_snapshot CASCADE") + .await + .unwrap(); + + database + .exec( + " + CREATE TABLE analytics.orders ( + order_id character varying(255) ENCODE lzo, + customer_id character varying(255) ENCODE lzo, + order_date timestamp without time zone ENCODE az64, + total_amount numeric(10, 2) ENCODE az64, + status character varying(255) ENCODE lzo + ) DISTSTYLE AUTO; + ", + ) + .await + .unwrap(); + + database.exec( + " + INSERT INTO analytics.orders (order_id, customer_id, order_date, total_amount, status) VALUES ('1', '1', '2022-01-01 00:00:00', 100, 'in_progress') + " + ) + .await + .unwrap(); + + // Setup + let name = "quary"; + let temp_dir = tempdir().unwrap(); + let project_dir = temp_dir.path(); + + // create a .env file + let env_file_path = project_dir.join(".env"); + let env_content = + "REDSHIFT_HOST=\nREDSHIFT_PORT=\nREDSHIFT_USER=\nREDSHIFT_PASSWORD=\nREDSHIFT_DATABASE="; + fs::write(&env_file_path, env_content).unwrap(); + + // Create snapshots directory and orders_snapshot.snapshot.sql file + let snapshots_dir = project_dir.join("models").join("staging").join("snapshots"); + fs::create_dir_all(&snapshots_dir).unwrap(); + let orders_snapshot_file = snapshots_dir.join("orders_snapshot.snapshot.sql"); + let orders_snapshot_content = + "SELECT order_id, customer_id, order_date, total_amount, status FROM q.raw_orders"; + fs::write(&orders_snapshot_file, orders_snapshot_content).unwrap(); + + // Create a model which references the snapshot + let staging_models_dir = project_dir.join("models").join("staging"); + fs::create_dir_all(&staging_models_dir).unwrap(); + let stg_orders_snapshot_file = staging_models_dir.join("stg_orders_snapshot.sql"); + let stg_orders_snapshot_content = "SELECT * FROM q.orders_snapshot"; + fs::write(&stg_orders_snapshot_file, stg_orders_snapshot_content).unwrap(); + + // Create quary.yaml file + let quary_yaml_content = "redshift:\n schema: transform"; + let quary_yaml_path = project_dir.join("quary.yaml"); + fs::write(&quary_yaml_path, quary_yaml_content).unwrap(); + + // Create schema.yaml file + let schema_file = snapshots_dir.join("schema.yaml"); + let schema_content = r#" + sources: + - name: raw_orders + path: analytics.orders + snapshots: + - name: orders_snapshot + unique_key: order_id + strategy: + timestamp: + updated_at: order_date + "#; + fs::write(&schema_file, schema_content).unwrap(); + + // Take the initial snapshot and build the project which references the snapshot + Command::cargo_bin(name) + .unwrap() + .current_dir(project_dir) + .args(vec!["snapshot"]) + .assert() + .success(); + Command::cargo_bin(name) + .unwrap() + .current_dir(project_dir) + .args(vec!["build"]) + .assert() + .success(); + + { + let result = database + .query("SELECT order_id, customer_id, order_date, total_amount, status, quary_valid_from, quary_valid_to, quary_scd_id FROM transform.orders_snapshot") + .await + .unwrap(); + + assert_eq!(result.rows.len(), 1); + assert_eq!(result.rows[0][0], "1"); // id + assert_eq!(result.rows[0][4], "in_progress"); // status + assert_eq!(result.rows[0][6], "NULL"); // quary_valid_to + + // Check that quary_valid_from has the same date as the current date + let current_date: NaiveDateTime = Utc::now().date_naive().into(); + let quary_valid_from_str = &result.rows[0][5]; + let quary_valid_from_date = quary_valid_from_str.split_whitespace().next().unwrap(); + + let quary_valid_from = + chrono::NaiveDateTime::parse_from_str(quary_valid_from_date, "%Y-%m-%dT%H:%M:%S%.f%:z") + .unwrap(); + assert_eq!(current_date.date(), quary_valid_from.date()); + + database + .exec( + " + UPDATE analytics.orders + SET order_date = '2099-06-01 00:00:00', status = 'completed' + WHERE order_id = '1' + ", + ) + .await + .unwrap(); + } + + // Take updated snapshot + Command::cargo_bin(name) + .unwrap() + .current_dir(project_dir) + .args(vec!["snapshot"]) + .assert() + .success(); + + { + // Assert updated snapshot + let updated_result = database + .query("SELECT order_id, customer_id, order_date, total_amount, status, quary_valid_from, quary_valid_to, quary_scd_id FROM transform.stg_orders_snapshot ORDER BY quary_valid_from") + .await + .unwrap(); + + assert_eq!(updated_result.rows.len(), 2); + + // Check the initial row + assert_eq!(updated_result.rows[0][0], "1"); // id + assert_eq!(updated_result.rows[0][4], "in_progress"); // status + assert_ne!(updated_result.rows[0][6], "NULL"); // quary_valid_to should not be NULL + + // Check the updated row + assert_eq!(updated_result.rows[1][0], "1"); // id + assert_eq!(updated_result.rows[1][4], "completed"); // status + assert_eq!(updated_result.rows[1][6], "NULL"); // quary_valid_to should be NULL + + // Check that quary_valid_from of the updated row has the same date as the current date + let current_date: NaiveDateTime = Utc::now().date_naive().into(); + let updated_quary_valid_from_str = &updated_result.rows[1][5]; + let updated_quary_valid_from_date = updated_quary_valid_from_str + .split_whitespace() + .next() + .unwrap(); + let updated_quary_valid_from = chrono::NaiveDateTime::parse_from_str( + updated_quary_valid_from_date, + "%Y-%m-%dT%H:%M:%S%.f%:z", + ) + .unwrap(); + assert_eq!(current_date.date(), updated_quary_valid_from.date()); + } +} diff --git a/rust/core/src/database_duckdb.rs b/rust/core/src/database_duckdb.rs index 23807c26..ed4eea06 100644 --- a/rust/core/src/database_duckdb.rs +++ b/rust/core/src/database_duckdb.rs @@ -24,12 +24,11 @@ impl DatabaseQueryGeneratorDuckDB { } fn get_now(&self) -> String { - if let Some(override_now) = &self.override_now { - let datetime: DateTime = (*override_now).into(); - format!("'{}'", datetime.format("%Y-%m-%dT%H:%M:%SZ")) - } else { - "CURRENT_TIMESTAMP".to_string() - } + let datetime = self + .override_now + .map(|time| -> DateTime { time.into() }) + .unwrap_or(SystemTime::now().into()); + format!("'{}'", datetime.format("%Y-%m-%dT%H:%M:%SZ")) } } @@ -45,7 +44,12 @@ impl DatabaseQueryGenerator for DatabaseQueryGeneratorDuckDB { templated_select: &str, unique_key: &str, strategy: &StrategyType, + table_exists: Option, ) -> Result, String> { + assert_eq!( + table_exists, None, + "table_exists is not necessary for DuckDB snapshots." + ); match strategy { StrategyType::Timestamp(timestamp) => { let updated_at = ×tamp.updated_at; @@ -56,7 +60,7 @@ impl DatabaseQueryGenerator for DatabaseQueryGeneratorDuckDB { SELECT *, {now} AS quary_valid_from, - CAST(NULL AS TIMESTAMP) AS quary_valid_to, + CAST(NULL AS TIMESTAMP WITH TIME ZONE) AS quary_valid_to, MD5(CAST(CONCAT({unique_key}, CAST({updated_at} AS STRING)) AS STRING)) AS quary_scd_id FROM ({templated_select}) )" @@ -189,15 +193,17 @@ mod tests { let expected_datetime: DateTime = override_now.into(); let expected_result = format!("'{}'", expected_datetime.format("%Y-%m-%dT%H:%M:%SZ")); assert_eq!(result, expected_result); - - let database = DatabaseQueryGeneratorDuckDB::new(None, None); - let result = database.get_now(); - assert_eq!(result, "CURRENT_TIMESTAMP".to_string()); } #[test] fn test_generate_snapshot_sql() { - let database = DatabaseQueryGeneratorDuckDB::new(None, None); + let time_override = "2021-01-01T00:00:00Z"; + let override_now = DateTime::parse_from_rfc3339(time_override) + .unwrap() + .with_timezone(&Utc); + let system_time = SystemTime::from(override_now); + + let database = DatabaseQueryGeneratorDuckDB::new(None, Some(system_time)); let path = "mytable"; let templated_select = "SELECT * FROM mytable"; let unique_key = "id"; @@ -209,9 +215,9 @@ mod tests { ); let result = database - .generate_snapshot_sql(path, templated_select, unique_key, &strategy) + .generate_snapshot_sql(path, templated_select, unique_key, &strategy, None) .unwrap(); - assert_eq!(result.iter().map(|s| s.as_str()).collect::>(), vec!["CREATE TABLE IF NOT EXISTS mytable AS (\n SELECT\n *,\n CURRENT_TIMESTAMP AS quary_valid_from,\n CAST(NULL AS TIMESTAMP) AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n )", "UPDATE mytable AS target\n SET quary_valid_to = source.quary_valid_from\n FROM (\n SELECT\n *,\n CURRENT_TIMESTAMP AS quary_valid_from,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n ) AS source\n WHERE target.id = source.id\n AND target.quary_valid_to IS NULL\n AND CAST(source.updated_at AS TIMESTAMP) > CAST(target.updated_at AS TIMESTAMP)", "INSERT INTO mytable\n SELECT\n *,\n CURRENT_TIMESTAMP AS quary_valid_from,\n NULL AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable) AS source\n WHERE NOT EXISTS (\n SELECT 1\n FROM mytable AS target\n WHERE target.quary_scd_id = MD5(CAST(CONCAT(source.id, CAST(source.updated_at AS STRING)) AS STRING))\n )"]); + assert_eq!(result.iter().map(|s| s.as_str()).collect::>(), vec!["CREATE TABLE IF NOT EXISTS mytable AS (\n SELECT\n *,\n '2021-01-01T00:00:00Z' AS quary_valid_from,\n CAST(NULL AS TIMESTAMP WITH TIME ZONE) AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n )", "UPDATE mytable AS target\n SET quary_valid_to = source.quary_valid_from\n FROM (\n SELECT\n *,\n '2021-01-01T00:00:00Z' AS quary_valid_from,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n ) AS source\n WHERE target.id = source.id\n AND target.quary_valid_to IS NULL\n AND CAST(source.updated_at AS TIMESTAMP) > CAST(target.updated_at AS TIMESTAMP)", "INSERT INTO mytable\n SELECT\n *,\n '2021-01-01T00:00:00Z' AS quary_valid_from,\n NULL AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable) AS source\n WHERE NOT EXISTS (\n SELECT 1\n FROM mytable AS target\n WHERE target.quary_scd_id = MD5(CAST(CONCAT(source.id, CAST(source.updated_at AS STRING)) AS STRING))\n )"]); } } diff --git a/rust/core/src/database_postgres.rs b/rust/core/src/database_postgres.rs index d8fda28a..37bea505 100644 --- a/rust/core/src/database_postgres.rs +++ b/rust/core/src/database_postgres.rs @@ -21,15 +21,12 @@ impl DatabaseQueryGeneratorPostgres { } fn get_now(&self) -> String { - if let Some(override_now) = &self.override_now { - let datetime: DateTime = (*override_now).into(); - format!( - "CAST('{}' AS TIMESTAMP WITH TIME ZONE)", - datetime.format("%Y-%m-%dT%H:%M:%SZ") - ) - } else { - "CURRENT_TIMESTAMP".to_string() - } + let datetime = self + .override_now + .map(|time| -> DateTime { time.into() }) + .unwrap_or(SystemTime::now().into()) + .format("%Y-%m-%dT%H:%M:%SZ"); + format!("CAST('{}' AS TIMESTAMP WITH TIME ZONE)", datetime) } } @@ -122,7 +119,12 @@ impl DatabaseQueryGenerator for DatabaseQueryGeneratorPostgres { templated_select: &str, unique_key: &str, strategy: &StrategyType, + table_exists: Option, ) -> Result, String> { + assert_eq!( + table_exists, None, + "table_exists is not necessary for Postgres snapshots." + ); match strategy { StrategyType::Timestamp(timestamp) => { let updated_at = ×tamp.updated_at; @@ -220,12 +222,13 @@ impl DatabaseQueryGenerator for DatabaseQueryGeneratorPostgres { } #[cfg(test)] -mod test { +mod tests { #[test] fn get_now() { let generator = super::DatabaseQueryGeneratorPostgres::new("schema".to_string(), None); let now = generator.get_now(); - assert_eq!(now, "CURRENT_TIMESTAMP".to_string()); + + assert!(now.starts_with("CAST('20")); } #[test] diff --git a/rust/core/src/database_redshift.rs b/rust/core/src/database_redshift.rs new file mode 100644 index 00000000..c165beb3 --- /dev/null +++ b/rust/core/src/database_redshift.rs @@ -0,0 +1,255 @@ +use crate::databases::{base_for_seeds_create_table_specifying_text_type, DatabaseQueryGenerator}; +use chrono::{DateTime, Utc}; +use quary_proto::snapshot::snapshot_strategy::StrategyType; +use sqlinference::dialect::Dialect; +use std::time::SystemTime; + +#[derive(Debug, Clone)] +pub struct DatabaseQueryGeneratorRedshift { + schema: String, + /// override_now is used to override the current timestamp in the generated SQL. It is primarily + /// used for testing purposes. + override_now: Option, +} + +impl DatabaseQueryGeneratorRedshift { + pub fn new(schema: String, override_now: Option) -> DatabaseQueryGeneratorRedshift { + DatabaseQueryGeneratorRedshift { + schema, + override_now, + } + } + + fn get_now(&self) -> String { + let datetime = self + .override_now + .map(|time| -> DateTime { time.into() }) + .unwrap_or(SystemTime::now().into()) + .format("%Y-%m-%dT%H:%M:%SZ"); + format!("CAST('{}' AS TIMESTAMP WITH TIME ZONE)", datetime) + } +} + +impl DatabaseQueryGenerator for DatabaseQueryGeneratorRedshift { + fn validate_materialization_type( + &self, + materialization_type: &Option, + ) -> Result<(), String> { + match materialization_type { + None => Ok(()), + Some(materialization_type) if materialization_type == "view" => Ok(()), + Some(materialization_type) if materialization_type == "table" => Ok(()), + Some(materialization_type) if materialization_type == "materialized_view" => Ok(()), + Some(materialization_type) => Err(format!( + "Materialization type {} is not supported. Supported types are 'view', 'table', 'materialized_view'.", + materialization_type + )), + } + } + + fn models_drop_query( + &self, + object_name: &str, + materialization_type: &Option, + ) -> Result { + let object_name = self.return_full_path_requirement(object_name); + let object_name = self.database_name_wrapper(&object_name); + match materialization_type { + None => Ok(format!("DROP VIEW IF EXISTS {}", object_name).to_string()), + Some(materialization_type) if materialization_type == "view" => { + Ok(format!("DROP VIEW IF EXISTS {}", object_name).to_string()) + } + Some(materialization_type) if materialization_type == "table" => { + Ok(format!("DROP TABLE IF EXISTS {}", object_name).to_string()) + } + Some(materialization_type) if materialization_type == "materialized_view" => { + Ok(format!("DROP MATERIALIZED VIEW IF EXISTS {}", object_name).to_string()) + } + Some(materialization_type) => Err(format!( + "Unsupported materialization type: {}", + materialization_type + )), + } + } + + fn models_create_query( + &self, + object_name: &str, + original_select_statement: &str, + materialization_type: &Option, + ) -> Result { + let object_name = self.return_full_path_requirement(object_name); + let object_name = self.database_name_wrapper(&object_name); + match materialization_type.as_deref() { + None => Ok(format!( + "CREATE VIEW {} AS {}", + object_name, original_select_statement + )), + Some("view") => Ok(format!( + "CREATE VIEW {} AS {}", + object_name, original_select_statement + )), + Some("table") => Ok(format!( + "CREATE TABLE {} AS {}", + object_name, original_select_statement + )), + Some("materialized_view") => Ok(format!( + "CREATE MATERIALIZED VIEW {} AS {}", + object_name, original_select_statement + )), + _ => Err("Unsupported materialization type".to_string()), + } + } + + fn seeds_drop_table_query(&self, table_name: &str) -> String { + format!( + "DROP TABLE IF EXISTS {} CASCADE", + self.return_full_path_requirement(table_name) + ) + } + + fn seeds_create_table_query(&self, table_name: &str, columns: &[String]) -> String { + let table_name = self.return_full_path_requirement(table_name); + base_for_seeds_create_table_specifying_text_type("TEXT", &table_name, columns) + } + + fn generate_snapshot_sql( + &self, + path: &str, + templated_select: &str, + unique_key: &str, + strategy: &StrategyType, + table_exists: Option, + ) -> Result, String> { + match strategy { + StrategyType::Timestamp(timestamp) => { + let updated_at = ×tamp.updated_at; + let now = self.get_now(); + + // Redshift does not support CREATE TABLE IF NOT EXISTS (w/ AS (...)) + let create_table_sql = format!( + "CREATE TABLE {path} AS ( + SELECT + ts.*, + {now} AS quary_valid_from, + CAST(NULL AS TIMESTAMP WITH TIME ZONE) AS quary_valid_to, + MD5(CAST(CONCAT({unique_key}, CAST({updated_at} AS TEXT)) AS TEXT)) AS quary_scd_id + FROM ({templated_select}) AS ts + )" + ); + + let update_sql = format!( + "UPDATE {path} AS target + SET quary_valid_to = source.quary_valid_from + FROM ( + SELECT + ts.*, + {now} AS quary_valid_from, + MD5(CAST(CONCAT({unique_key}, CAST({updated_at} AS TEXT)) AS TEXT)) AS quary_scd_id + FROM ({templated_select}) AS ts + ) AS source + WHERE target.{unique_key} = source.{unique_key} + AND target.quary_valid_to IS NULL + AND CAST(source.{updated_at} AS TIMESTAMP) > CAST(target.{updated_at} AS TIMESTAMP)" + ); + + let insert_sql = format!( + "INSERT INTO {path} + SELECT + *, + {now} AS quary_valid_from, + NULL AS quary_valid_to, + MD5(CAST(CONCAT({unique_key}, CAST({updated_at} AS TEXT)) AS TEXT)) AS quary_scd_id + FROM ({templated_select}) AS source + WHERE NOT EXISTS ( + SELECT 1 + FROM {path} AS target + WHERE target.quary_scd_id = MD5(CAST(CONCAT(source.{unique_key}, CAST(source.{updated_at} AS TEXT)) AS TEXT)) + )" + ); + + let mut sql_statements = vec![update_sql, insert_sql]; + + match table_exists { + Some(exists) => { + if !exists { + sql_statements.insert(0, create_table_sql); + } + Ok(sql_statements) + } + None => Err("table_exists is required for Redshift snapshots".to_string()), + } + } + } + } + + fn return_full_path_requirement(&self, table_name: &str) -> String { + format!("{}.{}", self.schema, table_name) + } + + fn return_name_from_full_path<'a>(&self, full_path: &'a str) -> Result<&'a str, String> { + let split = full_path.split('.').collect::>(); + match split.as_slice() { + [schema, table_name] => { + if schema == &self.schema { + Ok(table_name) + } else { + Err(format!( + "Schema {} does not match expected value {}", + schema, self.schema + )) + } + } + _ => Err(format!( + "Table name {} does not contain the expected schema", + full_path + )), + } + } + + fn automatic_cache_sql_create_statement( + &self, + model: &str, + model_cache_name: &str, + ) -> Vec { + vec![format!( + "CREATE OR REPLACE VIEW {} AS SELECT * FROM {}", + self.return_full_path_requirement(model_cache_name), + self.return_full_path_requirement(model) + )] + } + + fn get_dialect(&self) -> &Dialect { + &Dialect::Postgres + } + + fn database_name_wrapper(&self, name: &str) -> String { + name.to_string() + } +} + +#[cfg(test)] +mod test { + #[test] + fn get_now() { + let generator = super::DatabaseQueryGeneratorRedshift::new("schema".to_string(), None); + let now = generator.get_now(); + + assert!(now.starts_with("CAST('20")); + } + + #[test] + fn get_now_override() { + let generator = super::DatabaseQueryGeneratorRedshift::new( + "schema".to_string(), + Some(std::time::SystemTime::UNIX_EPOCH), + ); + let now = generator.get_now(); + assert_eq!( + now, + "CAST('1970-01-01T00:00:00Z' AS TIMESTAMP WITH TIME ZONE)".to_string() + ); + } + + // TODO MAKE SURE WE HAVE A TEST for sql generation +} diff --git a/rust/core/src/database_snowflake.rs b/rust/core/src/database_snowflake.rs index b262f870..3bfdb93e 100644 --- a/rust/core/src/database_snowflake.rs +++ b/rust/core/src/database_snowflake.rs @@ -88,7 +88,12 @@ impl DatabaseQueryGenerator for DatabaseQueryGeneratorSnowflake { templated_select: &str, unique_key: &str, strategy: &StrategyType, + table_exists: Option, ) -> Result, String> { + assert_eq!( + table_exists, None, + "table_exists is not necessary for Snowflake snapshots." + ); match strategy { StrategyType::Timestamp(timestamp) => { let updated_at = ×tamp.updated_at; @@ -317,7 +322,7 @@ mod tests { ); let result = database - .generate_snapshot_sql(path, templated_select, unique_key, &strategy) + .generate_snapshot_sql(path, templated_select, unique_key, &strategy, None) .unwrap(); assert_eq!(result.iter().map(|s| s.as_str()).collect::>(), vec!["CREATE TABLE IF NOT EXISTS mytable AS (\n SELECT\n *,\n CURRENT_TIMESTAMP() AS quary_valid_from,\n NULL AS quary_valid_to,\n MD5(CONCAT(id, CAST(updated_at AS VARCHAR))) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n )", "MERGE INTO mytable AS target\n USING (\n SELECT\n *,\n CURRENT_TIMESTAMP() AS quary_valid_from,\n MD5(CONCAT(id, CAST(updated_at AS VARCHAR))) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n ) AS source\n ON target.id = source.id\n WHEN MATCHED AND target.quary_valid_to IS NULL AND source.updated_at > target.updated_at\n THEN UPDATE SET\n quary_valid_to = source.quary_valid_from,\n updated_at = source.updated_at", "INSERT INTO mytable\n SELECT\n *,\n CURRENT_TIMESTAMP() AS quary_valid_from,\n NULL AS quary_valid_to,\n MD5(CONCAT(id, CAST(updated_at AS VARCHAR))) AS quary_scd_id\n FROM (SELECT * FROM mytable) AS source\n WHERE NOT EXISTS (\n SELECT 1\n FROM mytable AS target\n WHERE target.quary_scd_id = MD5(CONCAT(source.id, CAST(source.updated_at AS VARCHAR)))\n )"]); diff --git a/rust/core/src/databases.rs b/rust/core/src/databases.rs index 17e8e712..b0adc8f6 100644 --- a/rust/core/src/databases.rs +++ b/rust/core/src/databases.rs @@ -123,12 +123,14 @@ pub trait DatabaseQueryGenerator: Debug + Sync { /// - The q. references have been replaced with the underlying path of the referenced seed or source /// - `unique_key`: The column that uniquely identify each row in the snapshot source table. /// - `strategy`: The snapshot strategy to be used (e.g., timestamp) + /// - `table_exists`: A boolean used to check if the snapshot table already exists in the database. fn generate_snapshot_sql( &self, _path: &str, _templated_select: &str, _unique_key: &str, _strategy: &StrategyType, + _table_exists: Option, ) -> Result, String> { Err("Database does not support snapshots".to_string()) } @@ -215,9 +217,10 @@ impl DatabaseQueryGenerator for Box { select_query: &str, unique_key: &str, strategy: &StrategyType, + table_exists: Option, ) -> Result, String> { self.as_ref() - .generate_snapshot_sql(path, select_query, unique_key, strategy) + .generate_snapshot_sql(path, select_query, unique_key, strategy, table_exists) } fn return_full_path_requirement(&self, table_name: &str) -> String { @@ -304,6 +307,12 @@ pub trait DatabaseConnection: Debug { /// query_generator returns the appropriate query generator fn query_generator(&self) -> Box; + + /// table_exists returns a boolean whether a table at a given path exists in the database + /// if the path is fully qualified i.e. analaytics.table_name it will search for this else it will lean on the schema defined in the configuration + /// Returns an optional boolean: Some(true) if the table exists, Some(false) if it doesn't, and None if the operation is unsupported by the database. + /// TODO: This should be changed to return a Result instead of an Option + async fn table_exists(&self, path: &str) -> Result, String>; } #[derive(Debug, Clone, Default, PartialEq)] diff --git a/rust/core/src/lib.rs b/rust/core/src/lib.rs index ca219d0d..2bf20fe5 100644 --- a/rust/core/src/lib.rs +++ b/rust/core/src/lib.rs @@ -14,6 +14,7 @@ pub mod config; pub mod database_bigquery; pub mod database_duckdb; pub mod database_postgres; +pub mod database_redshift; pub mod database_snowflake; pub mod database_sqlite; pub mod databases; diff --git a/rust/core/src/models.rs b/rust/core/src/models.rs index 78eebc7a..264bfe9d 100644 --- a/rust/core/src/models.rs +++ b/rust/core/src/models.rs @@ -87,7 +87,7 @@ fn return_sql_model_template( } #[cfg(test)] -mod test { +mod tests { // TODO Implement tests // func TestParseModelSchemasToViews(t *testing.T) { // t.Parallel() diff --git a/rust/core/src/project.rs b/rust/core/src/project.rs index 18f01629..ff559b48 100644 --- a/rust/core/src/project.rs +++ b/rust/core/src/project.rs @@ -1,6 +1,6 @@ use crate::automatic_branching::derive_sha256_file_contents; use crate::config::get_config_from_filesystem; -use crate::databases::DatabaseQueryGenerator; +use crate::databases::{DatabaseConnection, DatabaseQueryGenerator}; use crate::file_system::{convert_async_read_to_blocking_read, FileSystem}; use crate::graph::{project_to_graph, Edge}; use crate::map_helpers::safe_adder_map; @@ -1300,15 +1300,23 @@ pub async fn project_and_fs_to_sql_for_snapshots( project: &Project, file_system: &impl FileSystem, database: &impl DatabaseQueryGenerator, + database_connection: &Box, ) -> Result)>, String> { let snapshots_out = project.snapshots.values().map(|snapshot| async move { let connection_config = project .connection_config .clone() .ok_or("missing connection config")?; - let sql_statements = - generate_snapshot_sql(&connection_config, project, database, file_system, snapshot) - .await?; + + let sql_statements = generate_snapshot_sql( + &connection_config, + project, + database, + file_system, + snapshot, + database_connection, + ) + .await?; Ok::<(String, Vec), String>((snapshot.name.clone(), sql_statements)) }); @@ -1326,6 +1334,7 @@ async fn generate_snapshot_sql( database: &impl DatabaseQueryGenerator, file_system: &impl FileSystem, snapshot: &Snapshot, + database_connection: &Box, ) -> Result, String> { let snapshot_strategy = snapshot .strategy @@ -1359,11 +1368,22 @@ async fn generate_snapshot_sql( let templated_select = reference_search.replace_all(&vars_replaced_select_statement, name_replacing_strategy); + let table_exists = match database_connection.table_exists(&snapshot_path).await { + Ok(Some(exists)) => Some(exists), + Ok(None) => None, + Err(err) => { + return Err(format!( + "An error occurred checking for the existence of the snapshot table: {}", + err + )); + } + }; database.generate_snapshot_sql( &snapshot_path, &templated_select, &snapshot.unique_key, &snapshot_strategy, + table_exists, ) } @@ -1638,7 +1658,7 @@ pub const PATH_FOR_MODELS: &str = "models"; pub(crate) const PATH_FOR_TESTS: &str = "tests"; #[cfg(test)] -mod test { +mod tests { use super::*; use crate::database_bigquery::DatabaseQueryGeneratorBigQuery; use crate::database_sqlite::DatabaseQueryGeneratorSqlite; @@ -3053,24 +3073,29 @@ models: } // TODO Reinstate after making get_node_sorted completely deterministic - // #[test] - // fn test_project_and_fs_to_sql_for_views() { - // let assets = Asset {}; - // let config = default_config(); - // let project = parse_project(&config, &assets, "").unwrap(); - // - // let sql = project_and_fs_to_query_sql(&project, &assets, "stg_shifts").unwrap(); - // assert_eq!( - // sql.0, - // "WITH raw_shifts AS (SELECT column1 AS employee_id,column2 AS shop_id,column3 AS date,column4 AS shift FROM (VALUES ('1','2','2023-01-01','morning'),('1','2','2023-01-02','morning'),('1','2','2023-01-03','morning'),('1','2','2023-01-04','morning'),('1','2','2023-01-05','morning'),('1','2','2023-01-06','morning'),('1','2','2023-01-07','morning'),('1','2','2023-01-08','morning'),('1','2','2023-01-09','morning'),('1','2','2023-01-10','morning'),('1','2','2023-01-11','morning'),('1','2','2023-01-12','morning'),('1','2','2023-01-13','morning'),('1','2','2023-01-13','afternoon'))) select\n employee_id,\n shop_id,\n date as shift_date,\n shift\nfrom\n raw_shifts\n" - // ); - // - // let sql = project_and_fs_to_query_sql(&project, &assets, "shifts_summary").unwrap(); - // assert_eq!( - // sql.0, - // "WITH\nraw_employees AS (SELECT column1 AS id,column2 AS first_name,column3 AS last_name FROM (VALUES ('1','John','Doe'),('2','Jane','Doe'),('3','Ashok','Kumar'),('4','Peter','Pan'),('5','Marie','Curie'))),\nraw_shifts AS (SELECT column1 AS employee_id,column2 AS shop_id,column3 AS date,column4 AS shift FROM (VALUES ('1','2','2023-01-01','morning'),('1','2','2023-01-02','morning'),('1','2','2023-01-03','morning'),('1','2','2023-01-04','morning'),('1','2','2023-01-05','morning'),('1','2','2023-01-06','morning'),('1','2','2023-01-07','morning'),('1','2','2023-01-08','morning'),('1','2','2023-01-09','morning'),('1','2','2023-01-10','morning'),('1','2','2023-01-11','morning'),('1','2','2023-01-12','morning'),('1','2','2023-01-13','morning'),('1','2','2023-01-13','afternoon'))),\nshift_hours AS (SELECT 'morning' AS shift,\n '08:00:00' AS start_time,\n '12:00:00' AS end_time\nUNION ALL\nSELECT 'afternoon' AS shift,\n '12:00:00' AS start_time,\n '16:00:00' AS end_time),\nshift_first AS (WITH\n min_shifts AS (\n SELECT\n employee_id,\n MIN(shift_start) AS shift_start\n FROM\n shifts\n GROUP BY\n employee_id\n )\nSELECT\n x.employee_id AS employee_id,\n x.shift_start AS shift_start,\n x.shift_end AS shift_end\nFROM\n shifts x\n INNER JOIN min_shifts y ON y.employee_id = x.employee_id\n AND y.shift_start = x.shift_start\nGROUP BY\n x.employee_id,\n x.shift_start\n),\nshift_last AS (WITH min_shifts AS (SELECT employee_id,\n max(shift_start) AS shift_start\n FROM shifts\n GROUP BY employee_id)\n\nSELECT x.employee_id AS employee_id,\n x.shift_start AS shift_start,\n x.shift_end AS shift_end\nFROM shifts x\nINNER JOIN min_shifts y\nON y.employee_id = x.employee_id AND y.shift_start = x.shift_start\nGROUP BY x.employee_id, x.shift_start),\nstg_employees AS (select\n id as employee_id,\n first_name,\n last_name\nfrom\n raw_employees\n),\nstg_shifts AS (select\n employee_id,\n shop_id,\n date as shift_date,\n shift\nfrom\n raw_shifts\n),\nshifts AS (WITH shifts AS (SELECT employee_id,\n shift_date,\n shift\n FROM stg_shifts\n ),\n shift_details AS (SELECT shift AS shift_name,\n start_time,\n end_time\n FROM shift_hours\n )\n\nSELECT s.employee_id AS employee_id,\n s.shift AS shift,\n datetime(s.shift_date, sd.start_time) AS shift_start,\n datetime(s.shift_date, sd.end_time) AS shift_end\nFROM shifts s\n INNER JOIN shift_details sd\n ON s.shift = sd.shift_name\n)\nSELECT * FROM (WITH total_hours AS (\n SELECT employee_id,\n SUM(strftime('%s', shift_end) - strftime('%s', shift_start)) AS total_hours,\n COUNT(*) AS total_shifts\n FROM shifts\n GROUP BY employee_id\n),\n\npercentage_morning_shifts AS (\n SELECT employee_id,\n SUM(CASE WHEN shift = 'morning' THEN 1 ELSE 0 END) AS total_morning_shifts,\n COUNT(*) AS total_shifts\n FROM shifts\n GROUP BY employee_id\n)\n\nSELECT e.employee_id,\n e.first_name,\n e.last_name,\n sf.shift_start AS first_shift,\n sl.shift_start AS last_shift,\n pms.total_morning_shifts / pms.total_shifts * 100 AS percentage_morning_shifts,\n th.total_shifts,\n th.total_hours\nFROM stg_employees e\nLEFT JOIN shift_first sf\n ON e.employee_id = sf.employee_id\nLEFT JOIN shift_last sl\n ON e.employee_id = sl.employee_id\nLEFT JOIN total_hours th\n ON e.employee_id = th.employee_id\nLEFT JOIN percentage_morning_shifts pms\n ON e.employee_id = pms.employee_id)" - // ) - // } + #[tokio::test] + #[ignore] + async fn test_project_and_fs_to_sql_for_views() { + let assets = Asset {}; + let database = DatabaseQueryGeneratorSqlite::default(); + let project = parse_project(&assets, &database, "").await.unwrap(); + + let sql = project_and_fs_to_query_sql(&database, &project, &assets, "stg_shifts", None) + .await + .unwrap(); + assert_eq!( + sql.0, + "WITH raw_shifts AS (SELECT column1 AS employee_id,column2 AS shop_id,column3 AS date,column4 AS shift FROM (VALUES ('1','2','2023-01-01','morning'),('1','2','2023-01-02','morning'),('1','2','2023-01-03','morning'),('1','2','2023-01-04','morning'),('1','2','2023-01-05','morning'),('1','2','2023-01-06','morning'),('1','2','2023-01-07','morning'),('1','2','2023-01-08','morning'),('1','2','2023-01-09','morning'),('1','2','2023-01-10','morning'),('1','2','2023-01-11','morning'),('1','2','2023-01-12','morning'),('1','2','2023-01-13','morning'),('1','2','2023-01-13','afternoon'))) select\n employee_id,\n shop_id,\n date as shift_date,\n shift\nfrom\n raw_shifts\n" + ); + + let sql = project_and_fs_to_query_sql(&database, &project, &assets, "shifts_summary", None) + .await + .unwrap(); + assert_eq!( + sql.0, + "WITH\nraw_employees AS (SELECT column1 AS id,column2 AS first_name,column3 AS last_name FROM (VALUES ('1','John','Doe'),('2','Jane','Doe'),('3','Ashok','Kumar'),('4','Peter','Pan'),('5','Marie','Curie'))),\nraw_shifts AS (SELECT column1 AS employee_id,column2 AS shop_id,column3 AS date,column4 AS shift FROM (VALUES ('1','2','2023-01-01','morning'),('1','2','2023-01-02','morning'),('1','2','2023-01-03','morning'),('1','2','2023-01-04','morning'),('1','2','2023-01-05','morning'),('1','2','2023-01-06','morning'),('1','2','2023-01-07','morning'),('1','2','2023-01-08','morning'),('1','2','2023-01-09','morning'),('1','2','2023-01-10','morning'),('1','2','2023-01-11','morning'),('1','2','2023-01-12','morning'),('1','2','2023-01-13','morning'),('1','2','2023-01-13','afternoon'))),\nshift_hours AS (SELECT 'morning' AS shift,\n '08:00:00' AS start_time,\n '12:00:00' AS end_time\nUNION ALL\nSELECT 'afternoon' AS shift,\n '12:00:00' AS start_time,\n '16:00:00' AS end_time),\nshift_first AS (WITH\n min_shifts AS (\n SELECT\n employee_id,\n MIN(shift_start) AS shift_start\n FROM\n shifts\n GROUP BY\n employee_id\n )\nSELECT\n x.employee_id AS employee_id,\n x.shift_start AS shift_start,\n x.shift_end AS shift_end\nFROM\n shifts x\n INNER JOIN min_shifts y ON y.employee_id = x.employee_id\n AND y.shift_start = x.shift_start\nGROUP BY\n x.employee_id,\n x.shift_start\n),\nshift_last AS (WITH min_shifts AS (SELECT employee_id,\n max(shift_start) AS shift_start\n FROM shifts\n GROUP BY employee_id)\n\nSELECT x.employee_id AS employee_id,\n x.shift_start AS shift_start,\n x.shift_end AS shift_end\nFROM shifts x\nINNER JOIN min_shifts y\nON y.employee_id = x.employee_id AND y.shift_start = x.shift_start\nGROUP BY x.employee_id, x.shift_start),\nstg_employees AS (select\n id as employee_id,\n first_name,\n last_name\nfrom\n raw_employees\n),\nstg_shifts AS (select\n employee_id,\n shop_id,\n date as shift_date,\n shift\nfrom\n raw_shifts\n),\nshifts AS (WITH shifts AS (SELECT employee_id,\n shift_date,\n shift\n FROM stg_shifts\n ),\n shift_details AS (SELECT shift AS shift_name,\n start_time,\n end_time\n FROM shift_hours\n )\n\nSELECT s.employee_id AS employee_id,\n s.shift AS shift,\n datetime(s.shift_date, sd.start_time) AS shift_start,\n datetime(s.shift_date, sd.end_time) AS shift_end\nFROM shifts s\n INNER JOIN shift_details sd\n ON s.shift = sd.shift_name\n)\nSELECT * FROM (WITH total_hours AS (\n SELECT employee_id,\n SUM(strftime('%s', shift_end) - strftime('%s', shift_start)) AS total_hours,\n COUNT(*) AS total_shifts\n FROM shifts\n GROUP BY employee_id\n),\n\npercentage_morning_shifts AS (\n SELECT employee_id,\n SUM(CASE WHEN shift = 'morning' THEN 1 ELSE 0 END) AS total_morning_shifts,\n COUNT(*) AS total_shifts\n FROM shifts\n GROUP BY employee_id\n)\n\nSELECT e.employee_id,\n e.first_name,\n e.last_name,\n sf.shift_start AS first_shift,\n sl.shift_start AS last_shift,\n pms.total_morning_shifts / pms.total_shifts * 100 AS percentage_morning_shifts,\n th.total_shifts,\n th.total_hours\nFROM stg_employees e\nLEFT JOIN shift_first sf\n ON e.employee_id = sf.employee_id\nLEFT JOIN shift_last sl\n ON e.employee_id = sl.employee_id\nLEFT JOIN total_hours th\n ON e.employee_id = th.employee_id\nLEFT JOIN percentage_morning_shifts pms\n ON e.employee_id = pms.employee_id)" + ) + } // TODO Implement tests //func Test_parseColumnTests(t *testing.T) { diff --git a/rust/core/src/seeds.rs b/rust/core/src/seeds.rs index 16f58845..a7bdbe1c 100644 --- a/rust/core/src/seeds.rs +++ b/rust/core/src/seeds.rs @@ -49,7 +49,7 @@ async fn read_csv_to_strings( } #[cfg(test)] -mod test { +mod tests { use super::*; use crate::database_sqlite::DatabaseQueryGeneratorSqlite; diff --git a/rust/core/src/sql.rs b/rust/core/src/sql.rs index cd4cb6c3..8586f00d 100644 --- a/rust/core/src/sql.rs +++ b/rust/core/src/sql.rs @@ -22,7 +22,7 @@ pub fn remove_sql_comments(sql: &str) -> String { } #[cfg(test)] -mod test { +mod tests { use super::*; #[test] diff --git a/rust/core/src/test_runner.rs b/rust/core/src/test_runner.rs index ea7c7ce9..e343e908 100644 --- a/rust/core/src/test_runner.rs +++ b/rust/core/src/test_runner.rs @@ -458,7 +458,7 @@ pub async fn run_model_tests_internal( } #[cfg(test)] -mod test { +mod tests { use super::*; #[test] diff --git a/rust/core/src/tests.rs b/rust/core/src/tests.rs index 3c7e8256..b7afeb82 100644 --- a/rust/core/src/tests.rs +++ b/rust/core/src/tests.rs @@ -289,7 +289,7 @@ impl ShortTestString for Test { } #[cfg(test)] -mod test { +mod tests { use crate::test_helpers::ToTest; use crate::tests::{ShortTestString, ToSql}; use quary_proto::test::TestType; diff --git a/rust/quary-databases/Cargo.toml b/rust/quary-databases/Cargo.toml index b74e535b..4009fae4 100644 --- a/rust/quary-databases/Cargo.toml +++ b/rust/quary-databases/Cargo.toml @@ -11,7 +11,7 @@ yup-oauth2 = { version = "8", default-features = false } quary-core = { path = "../core" } quary_proto = { path = "../../proto/gen/rust" } tokio = { version = "1", features = ["full"] } -sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "sqlite", "postgres", "chrono"] } +sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "sqlite", "postgres", "chrono", "bigdecimal"] } async-trait = "0.1" gcp-bigquery-client = "0.18" prost = "0.12" diff --git a/rust/quary-databases/src/databases_bigquery.rs b/rust/quary-databases/src/databases_bigquery.rs index 44f3b110..d16aef44 100644 --- a/rust/quary-databases/src/databases_bigquery.rs +++ b/rust/quary-databases/src/databases_bigquery.rs @@ -280,6 +280,10 @@ impl DatabaseConnection for BigQuery { }) } + async fn table_exists(&self, _path: &str) -> Result, String> { + Ok(None) // not implemented + } + fn query_generator(&self) -> Box { Box::new(DatabaseQueryGeneratorBigQuery::new( self.project_id.to_string(), diff --git a/rust/quary-databases/src/databases_connection.rs b/rust/quary-databases/src/databases_connection.rs index f7a65512..84cd3be2 100644 --- a/rust/quary-databases/src/databases_connection.rs +++ b/rust/quary-databases/src/databases_connection.rs @@ -1,157 +1,214 @@ use crate::databases_bigquery::BigQuery; use crate::databases_postgres::Postgres; +use crate::databases_redshift::Redshift; use crate::databases_snowflake; use quary_core::database_bigquery::DatabaseQueryGeneratorBigQuery; use quary_core::database_duckdb::DatabaseQueryGeneratorDuckDB; use quary_core::database_postgres::DatabaseQueryGeneratorPostgres; +use quary_core::database_redshift::DatabaseQueryGeneratorRedshift; use quary_core::database_snowflake::DatabaseQueryGeneratorSnowflake; use quary_core::database_sqlite::DatabaseQueryGeneratorSqlite; use quary_core::databases::{DatabaseConnection, DatabaseQueryGenerator}; use quary_proto::connection_config::Config::{ - BigQuery as OtherBigQueryConfig, Duckdb, DuckdbInMemory, Postgres as PostgresConfig, Snowflake, - Sqlite, SqliteInMemory, + BigQuery as OtherBigQueryConfig, Duckdb, DuckdbInMemory, Postgres as PostgresConfig, + Redshift as RedshiftConfig, Snowflake, Sqlite, SqliteInMemory, }; use std::{env, fs}; pub async fn database_from_config( config: &quary_proto::ConnectionConfig, ) -> Result, String> { - if let Some(config) = &config.config { - match config { - DuckdbInMemory(config) => { - let database = - crate::databases_duckdb::DuckDB::new_in_memory(config.schema.clone())?; - Ok(Box::new(database)) - } - Duckdb(config) => { - let database = crate::databases_duckdb::DuckDB::new_with_file( - config.schema.clone(), - config.path.as_str(), - )?; - Ok(Box::new(database)) + let config = config + .config + .clone() + .ok_or("No config provided".to_string())?; + + match config { + DuckdbInMemory(config) => { + let database = crate::databases_duckdb::DuckDB::new_in_memory(config.schema.clone())?; + Ok(Box::new(database)) + } + Duckdb(config) => { + let database = crate::databases_duckdb::DuckDB::new_with_file( + config.schema.clone(), + config.path.as_str(), + )?; + Ok(Box::new(database)) + } + SqliteInMemory(_) => { + let database = crate::databases_sqlite::Sqlite::new_in_memory() + .await + .map_err(|e| e.to_string())?; + Ok(Box::new(database)) + } + Sqlite(config) => { + let path = format!("./{}", config.path.as_str(),); + if fs::metadata(path.clone()).is_err() { + fs::File::create(path.clone().as_str()) + .map_err(|e| format!("creating file at {}: {:?}", path, e))?; } - SqliteInMemory(_) => { - let database = crate::databases_sqlite::Sqlite::new_in_memory() - .await - .map_err(|e| e.to_string())?; + let database = crate::databases_sqlite::Sqlite::new_with_file(config.path.as_str()) + .await + .map_err(|e| e.to_string())?; + Ok(Box::new(database)) + } + OtherBigQueryConfig(config) => { + let google_access_token = env::var("GOOGLE_CLOUD_ACCESS_TOKEN"); + if let Ok(google_access_token) = google_access_token { + println!("Using GOOGLE_CLOUD_ACCESS_TOKEN"); + let database = BigQuery::new( + config.project_id.clone(), + config.dataset_id.clone(), + Some(google_access_token.to_string()), + ) + .await?; Ok(Box::new(database)) - } - Sqlite(config) => { - let path = format!("./{}", config.path.as_str(),); - if fs::metadata(path.clone()).is_err() { - fs::File::create(path.clone().as_str()) - .map_err(|e| format!("creating file at {}: {:?}", path, e))?; - } - let database = crate::databases_sqlite::Sqlite::new_with_file(config.path.as_str()) - .await - .map_err(|e| e.to_string())?; + } else { + let database = + BigQuery::new(config.project_id.clone(), config.dataset_id.clone(), None) + .await?; Ok(Box::new(database)) } - OtherBigQueryConfig(config) => { - let google_access_token = env::var("GOOGLE_CLOUD_ACCESS_TOKEN"); - if let Ok(google_access_token) = google_access_token { - println!("Using GOOGLE_CLOUD_ACCESS_TOKEN"); - let database = BigQuery::new( - config.project_id.clone(), - config.dataset_id.clone(), - Some(google_access_token.to_string()), - ) - .await?; - Ok(Box::new(database)) - } else { - let database = - BigQuery::new(config.project_id.clone(), config.dataset_id.clone(), None) - .await?; - Ok(Box::new(database)) - } - } - Snowflake(config) => { - let account_identifier = env::var("SNOWSQL_ACCOUNT").map_err(|_| { - "SNOWSQL_ACCOUNT must be set to connect to Snowflake".to_string() - })?; - let warehouse = env::var("SNOWSQL_WAREHOUSE").map_err(|_| { - "SNOWSQL_WAREHOUSE must be set to connect to Snowflake".to_string() - })?; - let username = env::var("SNOWSQL_USER") - .map_err(|_| "SNOWSQL_USER must be set to connect to Snowflake".to_string())?; - let role = env::var("SNOWSQL_ROLE") - .map_err(|_| "SNOWSQL_ROLE must be set to connect to Snowflake".to_string())?; - let password = env::var("SNOWSQL_PWD") - .map_err(|_| "SNOWSQL_PWD must be set to connect to Snowflake".to_string())?; + } + Snowflake(config) => { + let account_identifier = env::var("SNOWSQL_ACCOUNT") + .map_err(|_| "SNOWSQL_ACCOUNT must be set to connect to Snowflake".to_string())?; + let warehouse = env::var("SNOWSQL_WAREHOUSE") + .map_err(|_| "SNOWSQL_WAREHOUSE must be set to connect to Snowflake".to_string())?; + let username = env::var("SNOWSQL_USER") + .map_err(|_| "SNOWSQL_USER must be set to connect to Snowflake".to_string())?; + let role = env::var("SNOWSQL_ROLE") + .map_err(|_| "SNOWSQL_ROLE must be set to connect to Snowflake".to_string())?; + let password = env::var("SNOWSQL_PWD") + .map_err(|_| "SNOWSQL_PWD must be set to connect to Snowflake".to_string())?; - let database = databases_snowflake::Snowflake::new( - account_identifier.as_str(), - warehouse.as_str(), - &config.database, - &config.schema, - username.as_str(), - Some(role.as_str()), - password.as_str(), - )?; - Ok(Box::new(database)) - } - PostgresConfig(config) => { - let host = env::var("PGHOST") - .map_err(|_| "PGHOST must be set to connect to Postgres".to_string())?; - let user = env::var("PGUSER") - .map_err(|_| "PGUSER must be set to connect to Postgres".to_string())?; - let password = env::var("PGPASSWORD") - .map_err(|_| "PGPASSWORD must be set to connect to Postgres".to_string())?; - let database = env::var("PGDATABASE") - .map_err(|_| "PGDATABASE must be set to connect to Postgres".to_string())?; + let database = databases_snowflake::Snowflake::new( + account_identifier.as_str(), + warehouse.as_str(), + &config.database, + &config.schema, + username.as_str(), + Some(role.as_str()), + password.as_str(), + )?; + Ok(Box::new(database)) + } + PostgresConfig(config) => { + let host = env::var("PGHOST") + .map_err(|_| "PGHOST must be set to connect to Postgres".to_string())?; + let user = env::var("PGUSER") + .map_err(|_| "PGUSER must be set to connect to Postgres".to_string())?; + let password = env::var("PGPASSWORD") + .map_err(|_| "PGPASSWORD must be set to connect to Postgres".to_string())?; + let database = env::var("PGDATABASE") + .map_err(|_| "PGDATABASE must be set to connect to Postgres".to_string())?; - let port = if let Ok(port) = env::var("PGPORT") { - Some(port) - } else { - None - }; - let ssl_mode = if let Ok(ssl_mode) = env::var("PGSSLMODE") { - Some(ssl_mode.to_string()) - } else { - None - }; - let ssl_cert = if let Ok(ssl_cert) = env::var("PGSSLCERT") { - Some(ssl_cert.to_string()) - } else { - None - }; - let ssl_key = if let Ok(ssl_key) = env::var("PGSSLKEY") { - Some(ssl_key.to_string()) - } else { - None - }; - let ssl_root_cert = if let Ok(ssl_root_cert) = env::var("PGSSLROOTCERT") { - Some(ssl_root_cert.to_string()) - } else { - None - }; - let channel_binding = if let Ok(channel_binding) = env::var("PGCHANNELBINDING") { - Some(channel_binding.to_string()) - } else { - None - }; + let port = if let Ok(port) = env::var("PGPORT") { + Some(port) + } else { + None + }; + let ssl_mode = if let Ok(ssl_mode) = env::var("PGSSLMODE") { + Some(ssl_mode.to_string()) + } else { + None + }; + let ssl_cert = if let Ok(ssl_cert) = env::var("PGSSLCERT") { + Some(ssl_cert.to_string()) + } else { + None + }; + let ssl_key = if let Ok(ssl_key) = env::var("PGSSLKEY") { + Some(ssl_key.to_string()) + } else { + None + }; + let ssl_root_cert = if let Ok(ssl_root_cert) = env::var("PGSSLROOTCERT") { + Some(ssl_root_cert.to_string()) + } else { + None + }; + let channel_binding = if let Ok(channel_binding) = env::var("PGCHANNELBINDING") { + Some(channel_binding.to_string()) + } else { + None + }; + let database = Postgres::new( + &host, + port, + &user, + &password, + &database, + &config.schema, + ssl_mode, + ssl_cert, + ssl_key, + ssl_root_cert, + channel_binding, + None, + ) + .await + .map_err(|e| e.to_string())?; + Ok(Box::new(database)) + } + RedshiftConfig(config) => { + let host = env::var("RSHOST") + .map_err(|_| "RSHOST must be set to connect to Redshift".to_string())?; + let user = env::var("RSUSER") + .map_err(|_| "RSUSER must be set to connect to Redshift".to_string())?; + let password = env::var("RSPASSWORD") + .map_err(|_| "RSPASSWORD must be set to connect to Redshift".to_string())?; + let database = env::var("RSDATABASE") + .map_err(|_| "RSDATABASE must be set to connect to Redshift".to_string())?; - let database = Postgres::new( - &host, - port, - &user, - &password, - &database, - &config.schema, - ssl_mode, - ssl_cert, - ssl_key, - ssl_root_cert, - channel_binding, - ) - .await - .map_err(|e| e.to_string())?; + let port = if let Ok(port) = env::var("RSPORT") { + Some(port) + } else { + None + }; + let ssl_mode = if let Ok(ssl_mode) = env::var("PGSSLMODE") { + Some(ssl_mode.to_string()) + } else { + None + }; + let ssl_cert = if let Ok(ssl_cert) = env::var("PGSSLCERT") { + Some(ssl_cert.to_string()) + } else { + None + }; + let ssl_key = if let Ok(ssl_key) = env::var("PGSSLKEY") { + Some(ssl_key.to_string()) + } else { + None + }; + let ssl_root_cert = if let Ok(ssl_root_cert) = env::var("PGSSLROOTCERT") { + Some(ssl_root_cert.to_string()) + } else { + None + }; + let channel_binding = if let Ok(channel_binding) = env::var("PGCHANNELBINDING") { + Some(channel_binding.to_string()) + } else { + None + }; - Ok(Box::new(database)) - } + let database = Redshift::new( + &host, + port, + &user, + &password, + &database, + &config.schema, + ssl_mode, + ssl_cert, + ssl_key, + ssl_root_cert, + channel_binding, + ) + .await + .map_err(|e| e.to_string())?; + Ok(Box::new(database)) } - } else { - Err("No config provided".to_string()) } } @@ -188,6 +245,10 @@ pub fn database_query_generator_from_config( config.schema, None, ))), + Some(RedshiftConfig(config)) => Ok(Box::new(DatabaseQueryGeneratorRedshift::new( + config.schema, + None, + ))), _ => Err("not implemented".to_string()), } } diff --git a/rust/quary-databases/src/databases_duckdb.rs b/rust/quary-databases/src/databases_duckdb.rs index a66221a2..4d327554 100644 --- a/rust/quary-databases/src/databases_duckdb.rs +++ b/rust/quary-databases/src/databases_duckdb.rs @@ -209,6 +209,10 @@ impl DatabaseConnection for DuckDB { }) } + async fn table_exists(&self, _path: &str) -> Result, String> { + Ok(None) // not implemented + } + fn query_generator(&self) -> Box { Box::new(DatabaseQueryGeneratorDuckDB::new(self.schema.clone(), None)) } @@ -751,12 +755,108 @@ sources: } } + #[tokio::test] + async fn test_snapshot_with_no_time_override() { + let target_schema = Some("analytics".to_string()); + let database: Box = + Box::new(DuckDB::new_in_memory(target_schema.clone()).unwrap()); + database.exec("CREATE SCHEMA jaffle_shop").await.unwrap(); + + // Create orders table + database + .exec("CREATE TABLE jaffle_shop.raw_orders (order_id INTEGER, status VARCHAR(255), updated_at TIMESTAMP)") + .await + .unwrap(); + + // Insert some initial data + database + .exec("INSERT INTO jaffle_shop.raw_orders VALUES (1, 'in_progress', '2023-01-01 00:00:00'), (2, 'completed', '2023-01-01 00:00:00')") + .await + .unwrap(); + + let file_system = FileSystem { + files: vec![ + ("quary.yaml", "duckdbInMemory: {schema: analytics}"), + ( + "models/orders_snapshot.snapshot.sql", + "SELECT * FROM q.raw_orders", + ), + ( + "models/schema.yaml", + " +sources: + - name: raw_orders + path: jaffle_shop.raw_orders +snapshots: + - name: orders_snapshot + unique_key: order_id + strategy: + timestamp: + updated_at: updated_at +", + ), + ] + .iter() + .map(|(k, v)| { + ( + k.to_string(), + File { + name: k.to_string(), + contents: Bytes::from(v.to_string()), + }, + ) + }) + .collect(), + }; + + let project = parse_project( + &file_system, + &DatabaseQueryGeneratorDuckDB::new(target_schema.clone(), None), + "", + ) + .await + .unwrap(); + + let snapshots_sql = project_and_fs_to_sql_for_snapshots( + &project, + &file_system, + &DatabaseQueryGeneratorDuckDB::new(target_schema.clone(), None), + &database, + ) + .await + .unwrap(); + for (_, sql) in snapshots_sql { + for statement in sql { + database.exec(statement.as_str()).await.unwrap() + } + } + + // assert the data has been created correctly in the snapshot table + let data = database.query("SELECT order_id, status, updated_at, quary_valid_from, quary_valid_to, quary_scd_id FROM analytics.orders_snapshot").await.unwrap(); + assert_eq!( + data.columns + .iter() + .map(|(column, _)| column) + .collect::>(), + vec![ + "order_id", + "status", + "updated_at", + "quary_valid_from", + "quary_valid_to", + "quary_scd_id" + ] + ); + assert_eq!(data.rows.len(), 2); + } + #[tokio::test] async fn test_snapshots_with_schema() { // Create orders table let target_schema = Some("analytics".to_string()); - let database = DuckDB::new_in_memory(target_schema.clone()).unwrap(); + let database: Box = + Box::new(DuckDB::new_in_memory(target_schema.clone()).unwrap()); database.exec("CREATE SCHEMA jaffle_shop").await.unwrap(); let datetime_str = "2023-01-01 01:00:00"; @@ -826,7 +926,7 @@ snapshots: .unwrap(); let snapshots_sql = - project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator) + project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator, &database) .await .unwrap(); for (_, sql) in snapshots_sql { @@ -895,10 +995,14 @@ snapshots: let db_generator_updated = DatabaseQueryGeneratorDuckDB::new(target_schema, Some(system_time_updated)); - let snapshots_sql = - project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator_updated) - .await - .unwrap(); + let snapshots_sql = project_and_fs_to_sql_for_snapshots( + &project, + &file_system, + &db_generator_updated, + &database, + ) + .await + .unwrap(); for (_, sql) in &snapshots_sql { for statement in sql { @@ -959,7 +1063,8 @@ snapshots: // Create orders table let target_schema = None; - let database = DuckDB::new_in_memory(target_schema.clone()).unwrap(); + let database: Box = + Box::new(DuckDB::new_in_memory(target_schema.clone()).unwrap()); database.exec("CREATE SCHEMA jaffle_shop").await.unwrap(); let datetime_str = "2023-01-01 01:00:00"; @@ -1029,7 +1134,7 @@ snapshots: .unwrap(); let snapshots_sql = - project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator) + project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator, &database) .await .unwrap(); for (_, sql) in snapshots_sql { @@ -1098,10 +1203,14 @@ snapshots: let db_generator_updated = DatabaseQueryGeneratorDuckDB::new(target_schema, Some(system_time_updated)); - let snapshots_sql = - project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator_updated) - .await - .unwrap(); + let snapshots_sql = project_and_fs_to_sql_for_snapshots( + &project, + &file_system, + &db_generator_updated, + &database, + ) + .await + .unwrap(); for (_, sql) in &snapshots_sql { for statement in sql { diff --git a/rust/quary-databases/src/databases_postgres.rs b/rust/quary-databases/src/databases_postgres.rs index 27521c1f..a12bf017 100644 --- a/rust/quary-databases/src/databases_postgres.rs +++ b/rust/quary-databases/src/databases_postgres.rs @@ -5,11 +5,13 @@ use quary_core::databases::{ ColumnWithDetails, DatabaseConnection, DatabaseQueryGenerator, QueryError, QueryResult, }; use quary_proto::TableAddress; -use sqlx::postgres::{PgPoolOptions, PgRow}; +use sqlx::postgres::{PgConnectOptions, PgPoolOptions, PgRow}; +use sqlx::types::BigDecimal; use sqlx::{Column, Pool, Row}; use sqlx::{Error, TypeInfo}; use std::collections::HashMap; use std::fmt::Debug; +use std::str::FromStr; #[derive(Debug)] pub struct Postgres { @@ -18,6 +20,7 @@ pub struct Postgres { } impl Postgres { + // TODO This should be a builder pattern or something else pub async fn new( host: &str, port: Option, @@ -30,6 +33,7 @@ impl Postgres { ssl_key: Option, ssl_root_cert: Option, channel_binding: Option, + extra_float_digits: Option, ) -> Result { let params = HashMap::from([ ("sslmode", ssl_mode), @@ -62,7 +66,13 @@ impl Postgres { params.unwrap_or("".to_string()) ); - let pool = PgPoolOptions::new().connect(&connection_string).await?; + let options = PgConnectOptions::from_str(connection_string.as_str())?; + let options = if let Some(extra_float_digits) = extra_float_digits { + options.extra_float_digits(extra_float_digits) + } else { + options + }; + let pool = PgPoolOptions::new().connect_with(options).await?; Ok(Self { pool, schema: schema.to_string(), @@ -71,7 +81,10 @@ impl Postgres { } impl Postgres { - async fn list_table_like_query(&self, where_clause: &str) -> Result, String> { + pub async fn list_table_like_query( + &self, + where_clause: &str, + ) -> Result, String> { let query = format!( "SELECT CASE @@ -274,6 +287,18 @@ impl DatabaseConnection for Postgres { let value = row.try_get::, _>(i)?; value } + "DATE" => { + let value = row + .try_get::, _>(i)? + .map(|v| v.format("%Y-%m-%d").to_string()); + value + } + "NUMERIC" => { + let value = row + .try_get::, _>(i)? + .map(|v| v.to_string()); + value + } _ => Some(format!("Unsupported type: {}", type_name)), }; match value { @@ -299,6 +324,10 @@ impl DatabaseConnection for Postgres { }) } + async fn table_exists(&self, _path: &str) -> Result, String> { + Ok(None) // not implemented + } + fn query_generator(&self) -> Box { Box::new(DatabaseQueryGeneratorPostgres::new( self.schema.clone(), @@ -321,6 +350,75 @@ mod tests { use testcontainers::{clients, RunnableImage}; use testcontainers_modules::postgres::Postgres as TestcontainersPostgres; + #[tokio::test] + async fn run_build_with_project_twice() { + let docker = clients::Cli::default(); + let postgres_image = RunnableImage::from(TestcontainersPostgres::default()); + let pg_container = docker.run(postgres_image); + let pg_port = pg_container.get_host_port_ipv4(5432); + + let quary_postgres = Postgres::new( + "localhost", + Some(pg_port.to_string()), + "postgres", + "postgres", + "postgres", + "public", + None, + None, + None, + None, + None, + None, + ) + .await + .expect("Failed to instantiate Quary Postgres"); + + let filesystem = FileSystem { + files: vec![ + ("quary.yaml", "postgres: {schema: public}"), + ("models/test_model.sql", "SELECT * FROM q.test_seed"), + ("seeds/test_seed.csv", "id,name\n1,test\n2,rubbish"), + ] + .into_iter() + .map(|(k, v)| { + ( + k.to_string(), + File { + name: k.to_string(), + contents: Bytes::from(v), + }, + ) + }) + .collect(), + }; + + let project = parse_project(&filesystem, &quary_postgres.query_generator(), "") + .await + .unwrap(); + let sqls = project_and_fs_to_sql_for_views( + &project, + &filesystem, + &quary_postgres.query_generator(), + false, + false, + ) + .await + .unwrap(); + + for sql in &sqls { + for sql in &sql.1 { + quary_postgres.exec(&sql).await.unwrap(); + } + } + // Run twice + for sql in &sqls { + for sql in &sql.1 { + quary_postgres.exec(&sql).await.unwrap(); + } + } + } + #[tokio::test] async fn test_postgres_list_tables_and_views() { // Start a PostgreSQL container @@ -341,6 +439,7 @@ mod tests { None, None, None, + None, ) .await .expect("Failed to instantiate Quary Postgres"); @@ -451,6 +550,7 @@ mod tests { None, None, None, + None, ) .await .expect("Failed to instantiate Quary Postgres"); @@ -530,6 +630,7 @@ mod tests { None, None, None, + None, ) .await .expect("Failed to instantiate Quary Postgres"); @@ -655,6 +756,7 @@ models: None, None, None, + None, ) .await .expect("Failed to instantiate Quary Postgres"); @@ -803,6 +905,7 @@ models: None, None, None, + None, ) .await .expect("Failed to instantiate Quary Postgres"); @@ -876,6 +979,7 @@ models: None, None, None, + None, ) .await .unwrap(); @@ -923,29 +1027,149 @@ models: } #[tokio::test] - async fn test_snapshots_with_schema() { + async fn test_snapshot_with_no_time_override() { let schema = "analytics"; - let docker = clients::Cli::default(); let postgres_image = RunnableImage::from(TestcontainersPostgres::default()); let pg_container = docker.run(postgres_image); let pg_port = pg_container.get_host_port_ipv4(5432); - let database = Postgres::new( - "localhost", - Some(pg_port.to_string()), - "postgres", - "postgres", - "postgres", - schema, - None, - None, - None, - None, - None, + let database: Box = Box::new( + Postgres::new( + "localhost", + Some(pg_port.to_string()), + "postgres", + "postgres", + "postgres", + schema, + None, + None, + None, + None, + None, + None, + ) + .await + .unwrap(), + ); + + database.exec("CREATE SCHEMA analytics").await.unwrap(); + database.exec("CREATE SCHEMA jaffle_shop").await.unwrap(); + + // Create orders table + database + .exec("CREATE TABLE jaffle_shop.raw_orders (order_id INTEGER, status VARCHAR(255), updated_at TIMESTAMP)") + .await + .unwrap(); + + // Insert some initial data + database + .exec("INSERT INTO jaffle_shop.raw_orders VALUES (1, 'in_progress', '2023-01-01 00:00:00'), (2, 'completed', '2023-01-01 00:00:00')") + .await + .unwrap(); + + let file_system = FileSystem { + files: vec![ + ("quary.yaml", "duckdbInMemory: {schema: analytics}"), + ( + "models/orders_snapshot.snapshot.sql", + "SELECT * FROM q.raw_orders", + ), + ( + "models/schema.yaml", + " +sources: + - name: raw_orders + path: jaffle_shop.raw_orders +snapshots: + - name: orders_snapshot + unique_key: order_id + strategy: + timestamp: + updated_at: updated_at +", + ), + ] + .iter() + .map(|(k, v)| { + ( + k.to_string(), + File { + name: k.to_string(), + contents: Bytes::from(v.to_string()), + }, + ) + }) + .collect(), + }; + + let project = parse_project( + &file_system, + &DatabaseQueryGeneratorPostgres::new(schema.to_string(), None), + "", ) .await .unwrap(); + let snapshots_sql = project_and_fs_to_sql_for_snapshots( + &project, + &file_system, + &DatabaseQueryGeneratorPostgres::new(schema.to_string(), None), + &database, + ) + .await + .unwrap(); + for (_, sql) in snapshots_sql { + for statement in sql { + database.exec(statement.as_str()).await.unwrap() + } + } + + // assert the data has been created correctly in the snapshot table + let data = database.query("SELECT order_id, status, updated_at, quary_valid_from, quary_valid_to, quary_scd_id FROM analytics.orders_snapshot").await.unwrap(); + assert_eq!( + data.columns + .iter() + .map(|(column, _)| column) + .collect::>(), + vec![ + "order_id", + "status", + "updated_at", + "quary_valid_from", + "quary_valid_to", + "quary_scd_id" + ] + ); + assert_eq!(data.rows.len(), 2); + } + + #[tokio::test] + async fn test_snapshots_with_schema() { + let schema = "analytics"; + + let docker = clients::Cli::default(); + let postgres_image = RunnableImage::from(TestcontainersPostgres::default()); + let pg_container = docker.run(postgres_image); + let pg_port = pg_container.get_host_port_ipv4(5432); + let database: Box = Box::new( + Postgres::new( + "localhost", + Some(pg_port.to_string()), + "postgres", + "postgres", + "postgres", + schema, + None, + None, + None, + None, + None, + None, + ) + .await + .unwrap(), + ); + database.exec("CREATE SCHEMA analytics").await.unwrap(); database.exec("CREATE SCHEMA jaffle_shop").await.unwrap(); @@ -1016,12 +1240,11 @@ snapshots: .unwrap(); let snapshots_sql = - project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator) + project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator, &database) .await .unwrap(); for (_, sql) in snapshots_sql { for statement in sql { - println!("{}", statement.as_str()); database.exec(statement.as_str()).await.unwrap() } } @@ -1089,10 +1312,14 @@ snapshots: let db_generator_updated = DatabaseQueryGeneratorPostgres::new(schema.to_string(), Some(system_time_updated)); - let snapshots_sql = - project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator_updated) - .await - .unwrap(); + let snapshots_sql = project_and_fs_to_sql_for_snapshots( + &project, + &file_system, + &db_generator_updated, + &database, + ) + .await + .unwrap(); for (_, sql) in &snapshots_sql { for statement in sql { diff --git a/rust/quary-databases/src/databases_redshift.rs b/rust/quary-databases/src/databases_redshift.rs new file mode 100644 index 00000000..5e889a7c --- /dev/null +++ b/rust/quary-databases/src/databases_redshift.rs @@ -0,0 +1,865 @@ +use async_trait::async_trait; +use quary_core::{ + database_redshift::DatabaseQueryGeneratorRedshift, + databases::{ + ColumnWithDetails, DatabaseConnection, DatabaseQueryGenerator, QueryError, QueryResult, + }, +}; +use quary_proto::TableAddress; +use sqlx::Error; +use std::fmt::Debug; + +use crate::databases_postgres::Postgres; + +#[derive(Debug)] +pub struct Redshift { + postgres: Postgres, + schema: String, +} + +impl Redshift { + pub async fn new( + host: &str, + port: Option, + user: &str, + password: &str, + database: &str, + schema: &str, + ssl_mode: Option, + ssl_cert: Option, + ssl_key: Option, + ssl_root_cert: Option, + channel_binding: Option, + ) -> Result { + let postgres = Postgres::new( + host, + port, + user, + password, + database, + schema, + ssl_mode, + ssl_cert, + ssl_key, + ssl_root_cert, + channel_binding, + Some(2), // Set extra_float_digits to 2 for Redshift + ) + .await?; + Ok(Self { + postgres, + schema: schema.to_string(), + }) + } +} + +#[async_trait] +impl DatabaseConnection for Redshift { + async fn list_tables(&self) -> Result, String> { + let where_clause = "table_type = 'BASE TABLE' AND table_schema NOT IN ('pg_catalog', 'pg_internal', 'information_schema')"; + self.postgres.list_table_like_query(where_clause).await + } + + async fn list_views(&self) -> Result, String> { + let where_clause = + "table_type = 'VIEW' AND table_schema NOT IN ('pg_catalog', 'pg_internal', 'information_schema')"; + self.postgres.list_table_like_query(where_clause).await + } + + async fn list_local_tables(&self) -> Result, String> { + self.postgres.list_local_tables().await + } + + async fn list_local_views(&self) -> Result, String> { + self.postgres.list_local_views().await + } + + async fn list_columns(&self, table: &str) -> Result, String> { + self.postgres.list_columns(table).await + } + + async fn exec(&self, query: &str) -> Result<(), String> { + self.postgres.exec(query).await + } + + async fn query(&self, query: &str) -> Result { + self.postgres.query(query).await + } + + async fn table_exists(&self, path: &str) -> Result, String> { + let parts: Vec<&str> = path.split('.').collect(); + let (schema, table) = match parts.len() { + 2 => (parts[0].to_string(), parts[1].to_string()), + _ => (self.schema.clone(), parts[0].to_string()), + }; + let result = self + .postgres + .list_table_like_query(&format!( + "table_schema = '{schema}' AND table_name = '{table}'" + )) + .await?; + Ok(Some(result.len() > 0)) + } + + fn query_generator(&self) -> Box { + Box::new(DatabaseQueryGeneratorRedshift::new( + self.schema.clone(), + None, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{DateTime, NaiveDateTime, Utc}; + use prost::bytes::Bytes; + use quary_core::database_redshift::DatabaseQueryGeneratorRedshift; + use quary_core::project::{ + parse_project, project_and_fs_to_sql_for_snapshots, project_and_fs_to_sql_for_views, + }; + use quary_core::project_tests::return_tests_sql; + use quary_proto::{File, FileSystem}; + use std::time::SystemTime; + + #[tokio::test] + #[ignore] + async fn test_redshift_list_tables_and_views() { + let quary_postgres = Redshift::new("", None, "", "", "", "", None, None, None, None, None) + .await + .expect("Failed to instantiate Quary Redshift"); + + quary_postgres + .exec("CREATE TABLE wrong_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + quary_postgres + .exec("CREATE TABLE test_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + quary_postgres + .exec("INSERT INTO test_table VALUES (1, 'test')") + .await + .unwrap(); + quary_postgres + .exec("INSERT INTO test_table VALUES (2, 'rubbish')") + .await + .unwrap(); + quary_postgres + .exec("CREATE VIEW test_view AS SELECT * FROM test_table") + .await + .unwrap(); + quary_postgres + .exec("CREATE VIEW wrong_view AS SELECT * FROM test_table") + .await + .unwrap(); + + let tables = quary_postgres.list_local_tables().await.unwrap(); + assert_eq!( + tables, + vec![ + TableAddress { + name: "test_table".to_string(), + full_path: "public.test_table".to_string(), + }, + TableAddress { + name: "wrong_table".to_string(), + full_path: "public.wrong_table".to_string(), + }, + ] + ); + + let views = quary_postgres.list_local_views().await.unwrap(); + assert_eq!( + views, + vec![ + TableAddress { + name: "test_view".to_string(), + full_path: "public.test_view".to_string(), + }, + TableAddress { + name: "wrong_view".to_string(), + full_path: "public.wrong_view".to_string(), + }, + ] + ); + + let columns = quary_postgres.list_columns("test_table").await.unwrap(); + assert_eq!( + columns, + vec!["id", "name"] + .into_iter() + .map(|name| { + ColumnWithDetails { + name: name.to_string(), + is_nullable: Some(true), + is_unique: Some(false), + ..Default::default() + } + }) + .collect::>() + ); + + let result = quary_postgres + .query("SELECT * FROM test_table") + .await + .unwrap(); + assert_eq!( + result + .columns + .iter() + .map(|(column, _)| column) + .collect::>(), + vec!["id", "name"] + ); + assert_eq!(result.rows, vec![vec!["1", "test"], vec!["2", "rubbish"]]); + } + + #[tokio::test] + #[ignore] + async fn test_redshift_list_columns_in_table() { + let database = Redshift::new("", None, "", "", "", "", None, None, None, None, None) + .await + .expect("Failed to instantiate Quary Redshift"); + + database + .exec("CREATE SCHEMA IF NOT EXISTS transform") + .await + .unwrap(); + database + .exec("CREATE TABLE IF NOT EXISTS transform.test_table (id INTEGER, name_transform VARCHAR(255))") + .await + .unwrap(); + database + .exec("CREATE SCHEMA IF NOT EXISTS other_schema") + .await + .unwrap(); + database + .exec("CREATE TABLE IF NOT EXISTS other_schema.test_table (id INTEGER NOT NULL UNIQUE, name VARCHAR(255))") + .await + .unwrap(); + database + .exec("COMMENT ON COLUMN other_schema.test_table.id IS 'test comment'") + .await + .unwrap(); + + let columns = database + .list_columns("other_schema.test_table") + .await + .unwrap(); + assert_eq!( + columns, + vec![ + ColumnWithDetails { + name: "id".to_string(), + description: Some("test comment".to_string()), + data_type: None, + is_nullable: Some(false), + is_unique: Some(true), + }, + ColumnWithDetails { + name: "name".to_string(), + description: None, + data_type: None, + is_nullable: Some(true), + is_unique: Some(false), + } + ] + ); + let columns = database.list_columns("transform.test_table").await.unwrap(); + assert_eq!( + columns, + vec!["id", "name_transform"] + .into_iter() + .map(|name| { + ColumnWithDetails { + name: name.to_string(), + is_nullable: Some(true), + is_unique: Some(false), + ..Default::default() + } + }) + .collect::>() + ); + } + + #[tokio::test] + #[ignore] + async fn test_redshift_foreign_relationship_test_with_schema() { + let database = Redshift::new("", None, "", "", "", "", None, None, None, None, None) + .await + .expect("Failed to instantiate Quary Redshift"); + + database + .exec("CREATE SCHEMA IF NOT EXISTS other_schema") + .await + .unwrap(); + database + .exec("CREATE SCHEMA IF NOT EXISTS transform") + .await + .unwrap(); + database + .exec("CREATE TABLE IF NOT EXISTS other_schema.test_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + database + .exec("INSERT INTO other_schema.test_table VALUES (1, 'test'), (2, 'rubbish')") + .await + .unwrap(); + database + .exec("CREATE TABLE IF NOT EXISTS transform.test_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + database + .exec("INSERT INTO transform.test_table VALUES (3, 'test_3'), (4, 'rubbish_rubiish')") + .await + .unwrap(); + + let file_system = FileSystem { + files: vec![ + ("quary.yaml", "postgres: {schema: transform}"), + ("models/test_model.sql", "SELECT id FROM q.test_source"), + ( + "models/test_model_same_schema.sql", + "SELECT id FROM q.test_source_same_schema", + ), + ("models/test_model_out.sql", "SELECT id FROM q.test_model"), + ( + "models/schema.yaml", + " + sources: + - name: test_source + path: other_schema.test_table + - name: test_source_same_schema + path: transform.test_table + models: + - name: test_model_out + columns: + - name: id + tests: + - type: relationship + info: + column: id + model: test_model + - type: relationship + info: + column: id + model: test_source + - name: test_model_same_schema + columns: + - name: id + tests: + - type: relationship + info: + column: id + model: test_source_same_schema + ", + ), + ] + .into_iter() + .map(|(k, v)| { + ( + k.to_string(), + File { + name: k.to_string(), + contents: Bytes::from(v), + }, + ) + }) + .collect(), + }; + + let project = parse_project(&file_system, &database.query_generator(), "") + .await + .unwrap(); + + let tests = return_tests_sql( + &database.query_generator(), + &project, + &file_system, + true, + None, + None, + ) + .await + .unwrap(); + let tests = tests.iter().collect::>(); + + assert!(!tests.is_empty()); + + for (name, test) in tests.iter() { + let results = database + .query(test) + .await + .expect(&format!("Error running query {}", test)); + + assert_eq!(results.rows.len(), 0, "test {} failed: {}", name, test); + } + } + + // TEST FAILS in Redshift: I think this is because we execute the command twice too quickly? + // Possibly becuase of a table lock that occurs in Redshift when a materialized view is created? + #[tokio::test] + #[ignore] + async fn test_redshift_foreign_relationship_test_with_materialized_view_table() { + let database = Redshift::new("", None, "", "", "", "", None, None, None, None, None) + .await + .expect("Failed to instantiate Quary Redshift"); + + database + .exec("CREATE SCHEMA IF NOT EXISTS other_schema") + .await + .unwrap(); + database + .exec("CREATE SCHEMA IF NOT EXISTS transform") + .await + .unwrap(); + database + .exec("CREATE TABLE IF NOT EXISTS other_schema.test_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + database + .exec("INSERT INTO other_schema.test_table VALUES (1, 'test'), (2, 'rubbish')") + .await + .unwrap(); + database + .exec("CREATE TABLE IF NOT EXISTS transform.test_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + database + .exec("INSERT INTO transform.test_table VALUES (3, 'test_3'), (4, 'rubbish_rubiish')") + .await + .unwrap(); + + let file_system = FileSystem { + files: vec![ + ("quary.yaml", "postgres: {schema: transform}"), + ("models/test_model.sql", "SELECT id FROM q.test_source"), + ( + "models/test_model_same_schema.sql", + "SELECT id FROM q.test_source_same_schema", + ), + ("models/test_model_out.sql", "SELECT id FROM q.test_model"), + ( + "models/schema.yaml", + " + sources: + - name: test_source + path: other_schema.test_table + - name: test_source_same_schema + path: transform.test_table + models: + - name: test_model_out + materialization: table + columns: + - name: id + tests: + - type: relationship + info: + column: id + model: test_model + - type: relationship + info: + column: id + model: test_source + - name: test_model_same_schema + materialization: materialized_view + columns: + - name: id + tests: + - type: relationship + info: + column: id + model: test_source_same_schema + ", + ), + ] + .into_iter() + .map(|(k, v)| { + ( + k.to_string(), + File { + name: k.to_string(), + contents: Bytes::from(v), + }, + ) + }) + .collect(), + }; + + let project = parse_project(&file_system, &database.query_generator(), "") + .await + .unwrap(); + + let sqls = project_and_fs_to_sql_for_views( + &project, + &file_system, + &database.query_generator(), + false, + false, + ) + .await + .unwrap(); + for sql in &sqls { + for sql in &sql.1 { + database.exec(&sql).await.unwrap(); + } + } + // Run twice + for sql in &sqls { + for sql in &sql.1 { + database.exec(&sql).await.unwrap(); + } + } + + let tests = return_tests_sql( + &database.query_generator(), + &project, + &file_system, + false, + None, + None, + ) + .await + .unwrap(); + let tests = tests.iter().collect::>(); + + assert!(!tests.is_empty()); + + for (name, test) in tests.iter() { + let results = database + .query(test) + .await + .expect(&format!("Error running query {}", test)); + + assert_eq!(results.rows.len(), 0, "test {} failed: {}", name, test); + } + } + + #[tokio::test] + #[ignore] + async fn test_list_tables_outside_the_schema() { + let database = Redshift::new("", None, "", "", "", "", None, None, None, None, None) + .await + .expect("Failed to instantiate Quary Postgres"); + + database.exec("CREATE SCHEMA other_schema").await.unwrap(); + database.exec("CREATE SCHEMA transform").await.unwrap(); + database + .exec("CREATE TABLE other_schema.test_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + database + .exec("CREATE TABLE transform.test_table (id INTEGER, name VARCHAR(255))") + .await + .unwrap(); + database + .exec("CREATE VIEW transform.test_view AS SELECT * FROM transform.test_table") + .await + .unwrap(); + database + .exec("CREATE VIEW other_schema.test_view AS SELECT * FROM other_schema.test_table") + .await + .unwrap(); + + let tables = database.list_tables().await.unwrap(); + assert_eq!( + tables, + vec![ + TableAddress { + name: "test_table".to_string(), + full_path: "other_schema.test_table".to_string(), + }, + TableAddress { + name: "test_table".to_string(), + full_path: "transform.test_table".to_string(), + }, + ] + ); + + let views = database.list_views().await.unwrap(); + assert_eq!( + views, + vec![ + TableAddress { + name: "test_view".to_string(), + full_path: "other_schema.test_view".to_string(), + }, + TableAddress { + name: "test_view".to_string(), + full_path: "transform.test_view".to_string(), + }, + ] + ); + } + + // TEST FAILS IN REDSHIFT: In Redshift column names are case-insensitive by default. + #[tokio::test] + #[ignore] + async fn test_redshift_list_columns_with_case_sensitive_columns() { + let database = Redshift::new("", None, "", "", "", "", None, None, None, None, None) + .await + .unwrap(); + + database.exec("CREATE SCHEMA transform").await.unwrap(); + database + .exec("CREATE TABLE transform.test_table (\"ID\" INTEGER, \"Name\" VARCHAR(255), test VARCHAR(255), TESTTWO VARCHAR(255))") + .await + .unwrap(); + + let columns = database.list_columns("transform.test_table").await.unwrap(); + assert_eq!( + columns, + vec![ + ColumnWithDetails { + name: "\"ID\"".to_string(), + description: None, + data_type: None, + is_nullable: Some(true), + is_unique: Some(false), + }, + ColumnWithDetails { + name: "\"Name\"".to_string(), + description: None, + data_type: None, + is_nullable: Some(true), + is_unique: Some(false), + }, + ColumnWithDetails { + name: "test".to_string(), + description: None, + data_type: None, + is_nullable: Some(true), + is_unique: Some(false), + }, + ColumnWithDetails { + name: "testtwo".to_string(), + description: None, + data_type: None, + is_nullable: Some(true), + is_unique: Some(false), + } + ] + ); + } + + #[tokio::test] + #[ignore] + async fn test_redshift_snapshots_with_schema() { + let schema = "analytics"; + + let database: Box = Box::new( + Redshift::new("", None, "", "", "", schema, None, None, None, None, None) + .await + .unwrap(), + ); + + database.exec("CREATE SCHEMA analytics").await.unwrap(); + database.exec("CREATE SCHEMA jaffle_shop").await.unwrap(); + + let datetime_str = "2023-01-01 01:00:00"; + + // Parse the string into a NaiveDateTime + let naive_datetime = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%d %H:%M:%S") + .expect("Failed to parse datetime string"); + + // Convert NaiveDateTime to DateTime + let datetime_utc = DateTime::::from_utc(naive_datetime, Utc); + + // Convert DateTime to SystemTime + let system_time = SystemTime::from(datetime_utc); + + let db_generator = + DatabaseQueryGeneratorRedshift::new(schema.to_string(), Some(system_time)); + + // Create orders table + database + .exec("CREATE TABLE jaffle_shop.raw_orders (order_id INTEGER, status VARCHAR(255), updated_at TIMESTAMP)") + .await + .unwrap(); + + // Insert some initial data + database + .exec("INSERT INTO jaffle_shop.raw_orders VALUES (1, 'in_progress', '2023-01-01 00:00:00'), (2, 'completed', '2023-01-01 00:00:00')") + .await + .unwrap(); + + let file_system = FileSystem { + files: vec![ + ("quary.yaml", "postgres: {schema: analytics}"), + ( + "models/orders_snapshot.snapshot.sql", + "SELECT * FROM q.raw_orders", + ), + ( + "models/schema.yaml", + " + sources: + - name: raw_orders + path: jaffle_shop.raw_orders + snapshots: + - name: orders_snapshot + unique_key: order_id + strategy: + timestamp: + updated_at: updated_at + ", + ), + ] + .iter() + .map(|(k, v)| { + ( + k.to_string(), + File { + name: k.to_string(), + contents: Bytes::from(v.to_string()), + }, + ) + }) + .collect(), + }; + + let project = parse_project(&file_system, &db_generator, "") + .await + .unwrap(); + + let snapshots_sql = + project_and_fs_to_sql_for_snapshots(&project, &file_system, &db_generator, &database) + .await + .unwrap(); + for (_, sql) in snapshots_sql { + for statement in sql { + println!("{}", statement.as_str()); + database.exec(statement.as_str()).await.unwrap() + } + } + + // assert the data has been created correctly in the snapshot table + let data = database + .query("SELECT order_id, status, updated_at, quary_valid_from, quary_valid_to, quary_scd_id FROM analytics.orders_snapshot ORDER BY order_id, quary_valid_from") + .await + .unwrap(); + + assert_eq!( + data.columns + .iter() + .map(|(column, _)| column) + .collect::>(), + vec![ + "order_id", + "status", + "updated_at", + "quary_valid_from", + "quary_valid_to", + "quary_scd_id" + ] + ); + assert_eq!( + data.rows, + vec![ + vec![ + "1", + "in_progress", + "2023-01-01T00:00:00", + "2023-01-01T01:00:00+00:00", + "NULL", + "77f50225cf5a52d15fecaa449be2dcc4" + ], + vec![ + "2", + "completed", + "2023-01-01T00:00:00", + "2023-01-01T01:00:00+00:00", + "NULL", + "3bb5cc6bb5b432df7712d067f57a3780" + ], + ] + ); + + database + .exec("UPDATE jaffle_shop.raw_orders SET status = 'completed', updated_at = CAST('2023-01-01 02:00:00' AS TIMESTAMP) WHERE order_id = 1") + .await + .unwrap(); + + let datetime_str_updated = "2023-01-01 03:00:00"; + + // Parse the string into a NaiveDateTime + let naive_datetime_updated = + NaiveDateTime::parse_from_str(datetime_str_updated, "%Y-%m-%d %H:%M:%S") + .expect("Failed to parse datetime string"); + + // Convert NaiveDateTime to DateTime + let datetime_utc_updated = DateTime::::from_utc(naive_datetime_updated, Utc); + + // Convert DateTime to SystemTime + let system_time_updated = SystemTime::from(datetime_utc_updated); + + let db_generator_updated = + DatabaseQueryGeneratorRedshift::new(schema.to_string(), Some(system_time_updated)); + + let snapshots_sql = project_and_fs_to_sql_for_snapshots( + &project, + &file_system, + &db_generator_updated, + &database, + ) + .await + .unwrap(); + + for (_, sql) in &snapshots_sql { + for statement in sql { + database.exec(statement.as_str()).await.unwrap() + } + } + + // assert the data has been created correctly in the snapshot table + let data = database + .query("SELECT order_id, status, updated_at, quary_valid_from, quary_valid_to, quary_scd_id FROM analytics.orders_snapshot ORDER BY order_id, quary_valid_from") + .await + .unwrap(); + + assert_eq!( + data.columns + .iter() + .map(|(column, _)| column) + .collect::>(), + vec![ + "order_id", + "status", + "updated_at", + "quary_valid_from", + "quary_valid_to", + "quary_scd_id" + ] + ); + assert_eq!( + data.rows, + vec![ + vec![ + "1", + "in_progress", + "2023-01-01T00:00:00", + "2023-01-01T01:00:00+00:00", + "2023-01-01T03:00:00+00:00", + "77f50225cf5a52d15fecaa449be2dcc4" + ], + vec![ + "1", + "completed", + "2023-01-01T02:00:00", + "2023-01-01T03:00:00+00:00", + "NULL", + "f5c7798e30814925cd1a61e9e5ef6683" + ], + vec![ + "2", + "completed", + "2023-01-01T00:00:00", + "2023-01-01T01:00:00+00:00", + "NULL", + "3bb5cc6bb5b432df7712d067f57a3780" + ], + ] + ); + } +} diff --git a/rust/quary-databases/src/databases_snowflake.rs b/rust/quary-databases/src/databases_snowflake.rs index 6a99be20..f9b4fcda 100644 --- a/rust/quary-databases/src/databases_snowflake.rs +++ b/rust/quary-databases/src/databases_snowflake.rs @@ -284,6 +284,10 @@ impl DatabaseConnection for Snowflake { }; } + async fn table_exists(&self, _path: &str) -> Result, String> { + Ok(None) // not implemented + } + fn query_generator(&self) -> Box { Box::new(DatabaseQueryGeneratorSnowflake::new( self.database.to_string(), @@ -388,36 +392,32 @@ mod tests { ); assert!(invalid_schema.is_err()); } -} -// #[cfg(test)] -// mod tests { -// use super::*; -// -// #[tokio::test] -// async fn test_snowflake() { -// let snowflake = Snowflake::new( -// "actual_details.eu-west-2.aws", -// "COMPUTE_WH", -// "TEST_DATABASE", -// "TEST_SCHEMA", -// "USERNAME", -// // Some("TEST_ROLE"), -// None, -// "rubbish_details", -// ) -// .unwrap(); -// -// let tables = snowflake.list_tables().await.unwrap(); -// println!("Tables: {:?}", tables); -// assert_eq!(tables.len(), 1); -// -// let views = snowflake.list_views().await.unwrap(); -// println!("Views: {:?}", views); -// assert_eq!(views.len(), 1); -// -// let columns = snowflake.list_columns(&tables[0]).await.unwrap(); -// println!("Columns: {:?}", columns); -// assert_eq!(columns.len(), 2); -// } -// } + #[tokio::test] + #[ignore] + async fn test_snowflake() { + let snowflake = Snowflake::new( + "actual_details.eu-west-2.aws", + "COMPUTE_WH", + "TEST_DATABASE", + "TEST_SCHEMA", + "USERNAME", + // Some("TEST_ROLE"), + None, + "rubbish_details", + ) + .unwrap(); + + let tables = snowflake.list_tables().await.unwrap(); + println!("Tables: {:?}", tables); + assert_eq!(tables.len(), 1); + + let views = snowflake.list_views().await.unwrap(); + println!("Views: {:?}", views); + assert_eq!(views.len(), 1); + + let columns = snowflake.list_columns(&tables[0].full_path).await.unwrap(); + println!("Columns: {:?}", columns); + assert_eq!(columns.len(), 2); + } +} diff --git a/rust/quary-databases/src/databases_sqlite.rs b/rust/quary-databases/src/databases_sqlite.rs index 56048df8..cdc699ab 100644 --- a/rust/quary-databases/src/databases_sqlite.rs +++ b/rust/quary-databases/src/databases_sqlite.rs @@ -149,6 +149,10 @@ impl DatabaseConnection for Sqlite { }) } + async fn table_exists(&self, _path: &str) -> Result, String> { + Ok(None) // not implemented + } + fn query_generator(&self) -> Box { Box::new(DatabaseQueryGeneratorSqlite {}) } diff --git a/rust/quary-databases/src/lib.rs b/rust/quary-databases/src/lib.rs index 9e46c3e6..6ea24de9 100644 --- a/rust/quary-databases/src/lib.rs +++ b/rust/quary-databases/src/lib.rs @@ -2,5 +2,6 @@ mod databases_bigquery; pub mod databases_connection; pub mod databases_duckdb; mod databases_postgres; +pub mod databases_redshift; mod databases_snowflake; mod databases_sqlite;