Skip to content

Commit

Permalink
chore: update version april 24
Browse files Browse the repository at this point in the history
  • Loading branch information
benfdking committed Apr 24, 2024
1 parent bc5fb8b commit 29d7034
Show file tree
Hide file tree
Showing 25 changed files with 2,044 additions and 267 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 6 additions & 10 deletions proto/quary/service/v1/connection_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ message ConnectionConfig {
ConnectionConfigBigQuery big_query = 5;
ConnectionConfigSnowflake snowflake = 6;
ConnectionConfigPostgres postgres = 7;
ConnectionConfigRedshift redshift = 9;
}

repeated Var vars = 8;

message ConnectionConfigSqLite {
Expand All @@ -40,16 +42,10 @@ message ConnectionConfig {
message ConnectionConfigPostgres {
string schema = 1;
}
//
// message ConnectionConfigMySql {
// string username = 1;
// string password = 2;
// string protocol = 3;
// string host = 4;
// string port = 5;
// string database = 6;
// map<string, string> params = 7;
// }

message ConnectionConfigRedshift {
string schema = 1;
}

message ConnectionConfigBigQuery {
string project_id = 1;
Expand Down
10 changes: 7 additions & 3 deletions rust/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,13 @@ async fn main_wrapped() -> Result<(), String> {
let database = database_from_config(&config).await?;
let query_generator = database.query_generator();
let (project, file_system) = parse_project(&query_generator).await?;
let snapshots_sql =
project_and_fs_to_sql_for_snapshots(&project, &file_system, &query_generator)
.await?;
let snapshots_sql = project_and_fs_to_sql_for_snapshots(
&project,
&file_system,
&query_generator,
&database,
)
.await?;

if snapshot_args.dry_run {
println!("\n-- Create snapshots\n");
Expand Down
191 changes: 189 additions & 2 deletions rust/cli/tests/cli_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use assert_cmd::Command;
use chrono::NaiveDateTime;
use chrono::Utc;
use quary_core::databases::DatabaseConnection;
use quary_databases::databases_duckdb;
use quary_databases::databases_redshift;
use std::fs;
use tempfile::tempdir;

Expand Down Expand Up @@ -331,8 +333,9 @@ async fn test_duckdb_snapshots() {
let current_date = Utc::now().date_naive();
let quary_valid_from_str = &result.rows[0][4];
let quary_valid_from_date = quary_valid_from_str.split_whitespace().next().unwrap();
println!("quary_valid_from_date: {}", quary_valid_from_date);
let quary_valid_from =
chrono::NaiveDate::parse_from_str(quary_valid_from_date, "%Y-%m-%d").unwrap();
chrono::NaiveDate::parse_from_str(quary_valid_from_date, "%Y-%m-%dT%H:%M:%SZ").unwrap();
assert_eq!(current_date, quary_valid_from);

// Update orders.csv data
Expand Down Expand Up @@ -382,7 +385,191 @@ async fn test_duckdb_snapshots() {
.next()
.unwrap();
let updated_quary_valid_from =
chrono::NaiveDate::parse_from_str(updated_quary_valid_from_date, "%Y-%m-%d").unwrap();
chrono::NaiveDate::parse_from_str(updated_quary_valid_from_date, "%Y-%m-%dT%H:%M:%SZ")
.unwrap();
assert_eq!(current_date, updated_quary_valid_from);
}
}

/// This test simulates a workflow where a model references a snapshot in redshift.
/// 1. The initial snapshot is taken which builds the orders_snapshot table in the database.
/// 2. The project is built which references the orders_snapshot table.
/// 3. The initial state of the snapshot is asserted.
/// 4. The data is updated and a new snapshot is taken.
/// 5. The updated state of the snapshot is asserted. (from the stg_orders_snapshot table)
#[tokio::test]
#[ignore]
async fn test_redshift_snapshots() {
// Prepare the database
let database =
databases_redshift::Redshift::new("", None, "", "", "", "", None, None, None, None, None)
.await
.ok()
.unwrap();
database
.exec("DROP TABLE analytics.orders CASCADE")
.await
.unwrap();
database
.exec("DROP TABLE transform.orders_snapshot CASCADE")
.await
.unwrap();

database
.exec(
"
CREATE TABLE analytics.orders (
order_id character varying(255) ENCODE lzo,
customer_id character varying(255) ENCODE lzo,
order_date timestamp without time zone ENCODE az64,
total_amount numeric(10, 2) ENCODE az64,
status character varying(255) ENCODE lzo
) DISTSTYLE AUTO;
",
)
.await
.unwrap();

database.exec(
"
INSERT INTO analytics.orders (order_id, customer_id, order_date, total_amount, status) VALUES ('1', '1', '2022-01-01 00:00:00', 100, 'in_progress')
"
)
.await
.unwrap();

// Setup
let name = "quary";
let temp_dir = tempdir().unwrap();
let project_dir = temp_dir.path();

// create a .env file
let env_file_path = project_dir.join(".env");
let env_content =
"REDSHIFT_HOST=\nREDSHIFT_PORT=\nREDSHIFT_USER=\nREDSHIFT_PASSWORD=\nREDSHIFT_DATABASE=";
fs::write(&env_file_path, env_content).unwrap();

// Create snapshots directory and orders_snapshot.snapshot.sql file
let snapshots_dir = project_dir.join("models").join("staging").join("snapshots");
fs::create_dir_all(&snapshots_dir).unwrap();
let orders_snapshot_file = snapshots_dir.join("orders_snapshot.snapshot.sql");
let orders_snapshot_content =
"SELECT order_id, customer_id, order_date, total_amount, status FROM q.raw_orders";
fs::write(&orders_snapshot_file, orders_snapshot_content).unwrap();

// Create a model which references the snapshot
let staging_models_dir = project_dir.join("models").join("staging");
fs::create_dir_all(&staging_models_dir).unwrap();
let stg_orders_snapshot_file = staging_models_dir.join("stg_orders_snapshot.sql");
let stg_orders_snapshot_content = "SELECT * FROM q.orders_snapshot";
fs::write(&stg_orders_snapshot_file, stg_orders_snapshot_content).unwrap();

// Create quary.yaml file
let quary_yaml_content = "redshift:\n schema: transform";
let quary_yaml_path = project_dir.join("quary.yaml");
fs::write(&quary_yaml_path, quary_yaml_content).unwrap();

// Create schema.yaml file
let schema_file = snapshots_dir.join("schema.yaml");
let schema_content = r#"
sources:
- name: raw_orders
path: analytics.orders
snapshots:
- name: orders_snapshot
unique_key: order_id
strategy:
timestamp:
updated_at: order_date
"#;
fs::write(&schema_file, schema_content).unwrap();

// Take the initial snapshot and build the project which references the snapshot
Command::cargo_bin(name)
.unwrap()
.current_dir(project_dir)
.args(vec!["snapshot"])
.assert()
.success();
Command::cargo_bin(name)
.unwrap()
.current_dir(project_dir)
.args(vec!["build"])
.assert()
.success();

{
let result = database
.query("SELECT order_id, customer_id, order_date, total_amount, status, quary_valid_from, quary_valid_to, quary_scd_id FROM transform.orders_snapshot")
.await
.unwrap();

assert_eq!(result.rows.len(), 1);
assert_eq!(result.rows[0][0], "1"); // id
assert_eq!(result.rows[0][4], "in_progress"); // status
assert_eq!(result.rows[0][6], "NULL"); // quary_valid_to

// Check that quary_valid_from has the same date as the current date
let current_date: NaiveDateTime = Utc::now().date_naive().into();
let quary_valid_from_str = &result.rows[0][5];
let quary_valid_from_date = quary_valid_from_str.split_whitespace().next().unwrap();

let quary_valid_from =
chrono::NaiveDateTime::parse_from_str(quary_valid_from_date, "%Y-%m-%dT%H:%M:%S%.f%:z")
.unwrap();
assert_eq!(current_date.date(), quary_valid_from.date());

database
.exec(
"
UPDATE analytics.orders
SET order_date = '2099-06-01 00:00:00', status = 'completed'
WHERE order_id = '1'
",
)
.await
.unwrap();
}

// Take updated snapshot
Command::cargo_bin(name)
.unwrap()
.current_dir(project_dir)
.args(vec!["snapshot"])
.assert()
.success();

{
// Assert updated snapshot
let updated_result = database
.query("SELECT order_id, customer_id, order_date, total_amount, status, quary_valid_from, quary_valid_to, quary_scd_id FROM transform.stg_orders_snapshot ORDER BY quary_valid_from")
.await
.unwrap();

assert_eq!(updated_result.rows.len(), 2);

// Check the initial row
assert_eq!(updated_result.rows[0][0], "1"); // id
assert_eq!(updated_result.rows[0][4], "in_progress"); // status
assert_ne!(updated_result.rows[0][6], "NULL"); // quary_valid_to should not be NULL

// Check the updated row
assert_eq!(updated_result.rows[1][0], "1"); // id
assert_eq!(updated_result.rows[1][4], "completed"); // status
assert_eq!(updated_result.rows[1][6], "NULL"); // quary_valid_to should be NULL

// Check that quary_valid_from of the updated row has the same date as the current date
let current_date: NaiveDateTime = Utc::now().date_naive().into();
let updated_quary_valid_from_str = &updated_result.rows[1][5];
let updated_quary_valid_from_date = updated_quary_valid_from_str
.split_whitespace()
.next()
.unwrap();
let updated_quary_valid_from = chrono::NaiveDateTime::parse_from_str(
updated_quary_valid_from_date,
"%Y-%m-%dT%H:%M:%S%.f%:z",
)
.unwrap();
assert_eq!(current_date.date(), updated_quary_valid_from.date());
}
}
34 changes: 20 additions & 14 deletions rust/core/src/database_duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ impl DatabaseQueryGeneratorDuckDB {
}

fn get_now(&self) -> String {
if let Some(override_now) = &self.override_now {
let datetime: DateTime<Utc> = (*override_now).into();
format!("'{}'", datetime.format("%Y-%m-%dT%H:%M:%SZ"))
} else {
"CURRENT_TIMESTAMP".to_string()
}
let datetime = self
.override_now
.map(|time| -> DateTime<Utc> { time.into() })
.unwrap_or(SystemTime::now().into());
format!("'{}'", datetime.format("%Y-%m-%dT%H:%M:%SZ"))
}
}

Expand All @@ -45,7 +44,12 @@ impl DatabaseQueryGenerator for DatabaseQueryGeneratorDuckDB {
templated_select: &str,
unique_key: &str,
strategy: &StrategyType,
table_exists: Option<bool>,
) -> Result<Vec<String>, String> {
assert_eq!(
table_exists, None,
"table_exists is not necessary for DuckDB snapshots."
);
match strategy {
StrategyType::Timestamp(timestamp) => {
let updated_at = &timestamp.updated_at;
Expand All @@ -56,7 +60,7 @@ impl DatabaseQueryGenerator for DatabaseQueryGeneratorDuckDB {
SELECT
*,
{now} AS quary_valid_from,
CAST(NULL AS TIMESTAMP) AS quary_valid_to,
CAST(NULL AS TIMESTAMP WITH TIME ZONE) AS quary_valid_to,
MD5(CAST(CONCAT({unique_key}, CAST({updated_at} AS STRING)) AS STRING)) AS quary_scd_id
FROM ({templated_select})
)"
Expand Down Expand Up @@ -189,15 +193,17 @@ mod tests {
let expected_datetime: DateTime<Utc> = override_now.into();
let expected_result = format!("'{}'", expected_datetime.format("%Y-%m-%dT%H:%M:%SZ"));
assert_eq!(result, expected_result);

let database = DatabaseQueryGeneratorDuckDB::new(None, None);
let result = database.get_now();
assert_eq!(result, "CURRENT_TIMESTAMP".to_string());
}

#[test]
fn test_generate_snapshot_sql() {
let database = DatabaseQueryGeneratorDuckDB::new(None, None);
let time_override = "2021-01-01T00:00:00Z";
let override_now = DateTime::parse_from_rfc3339(time_override)
.unwrap()
.with_timezone(&Utc);
let system_time = SystemTime::from(override_now);

let database = DatabaseQueryGeneratorDuckDB::new(None, Some(system_time));
let path = "mytable";
let templated_select = "SELECT * FROM mytable";
let unique_key = "id";
Expand All @@ -209,9 +215,9 @@ mod tests {
);

let result = database
.generate_snapshot_sql(path, templated_select, unique_key, &strategy)
.generate_snapshot_sql(path, templated_select, unique_key, &strategy, None)
.unwrap();

assert_eq!(result.iter().map(|s| s.as_str()).collect::<Vec<&str>>(), vec!["CREATE TABLE IF NOT EXISTS mytable AS (\n SELECT\n *,\n CURRENT_TIMESTAMP AS quary_valid_from,\n CAST(NULL AS TIMESTAMP) AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n )", "UPDATE mytable AS target\n SET quary_valid_to = source.quary_valid_from\n FROM (\n SELECT\n *,\n CURRENT_TIMESTAMP AS quary_valid_from,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n ) AS source\n WHERE target.id = source.id\n AND target.quary_valid_to IS NULL\n AND CAST(source.updated_at AS TIMESTAMP) > CAST(target.updated_at AS TIMESTAMP)", "INSERT INTO mytable\n SELECT\n *,\n CURRENT_TIMESTAMP AS quary_valid_from,\n NULL AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable) AS source\n WHERE NOT EXISTS (\n SELECT 1\n FROM mytable AS target\n WHERE target.quary_scd_id = MD5(CAST(CONCAT(source.id, CAST(source.updated_at AS STRING)) AS STRING))\n )"]);
assert_eq!(result.iter().map(|s| s.as_str()).collect::<Vec<&str>>(), vec!["CREATE TABLE IF NOT EXISTS mytable AS (\n SELECT\n *,\n '2021-01-01T00:00:00Z' AS quary_valid_from,\n CAST(NULL AS TIMESTAMP WITH TIME ZONE) AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n )", "UPDATE mytable AS target\n SET quary_valid_to = source.quary_valid_from\n FROM (\n SELECT\n *,\n '2021-01-01T00:00:00Z' AS quary_valid_from,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable)\n ) AS source\n WHERE target.id = source.id\n AND target.quary_valid_to IS NULL\n AND CAST(source.updated_at AS TIMESTAMP) > CAST(target.updated_at AS TIMESTAMP)", "INSERT INTO mytable\n SELECT\n *,\n '2021-01-01T00:00:00Z' AS quary_valid_from,\n NULL AS quary_valid_to,\n MD5(CAST(CONCAT(id, CAST(updated_at AS STRING)) AS STRING)) AS quary_scd_id\n FROM (SELECT * FROM mytable) AS source\n WHERE NOT EXISTS (\n SELECT 1\n FROM mytable AS target\n WHERE target.quary_scd_id = MD5(CAST(CONCAT(source.id, CAST(source.updated_at AS STRING)) AS STRING))\n )"]);
}
}
Loading

0 comments on commit 29d7034

Please sign in to comment.