diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 1c6989f..a38651b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -127,6 +127,16 @@ jobs: asset_name: sqs-ingest-lambda-bootstrap-${{ github.ref_name }}.zip asset_content_type: application/zip + - name: Upload glue-create lambda + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ github.token }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: ./target/lambda/glue-create/bootstrap.zip + asset_name: glue-create-lambda-bootstrap-${{ github.ref_name }}.zip + asset_content_type: application/zip + - name: Upload glue-sync lambda uses: actions/upload-release-asset@v1 env: diff --git a/lambdas/glue-create/.gitignore b/lambdas/glue-create/.gitignore new file mode 100644 index 0000000..c41cc9e --- /dev/null +++ b/lambdas/glue-create/.gitignore @@ -0,0 +1 @@ +/target \ No newline at end of file diff --git a/lambdas/glue-create/Cargo.toml b/lambdas/glue-create/Cargo.toml new file mode 100644 index 0000000..7c4095c --- /dev/null +++ b/lambdas/glue-create/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "glue-create" +version.workspace = true +edition.workspace = true +repository.workspace = true +homepage.workspace = true + +[dependencies] +anyhow = { workspace = true } +aws_lambda_events = { workspace = true } +regex = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +url = { workspace = true } + +oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" } + +aws-sdk-athena = "=1" +aws-sdk-glue = "=1" +aws-config = { version = "=1", features = ["behavior-version-latest"] } +lambda_runtime = { version = "0.11" } diff --git a/lambdas/glue-create/README.adoc b/lambdas/glue-create/README.adoc new file mode 100644 index 0000000..b8a7269 --- /dev/null +++ b/lambdas/glue-create/README.adoc @@ -0,0 +1,38 @@ +ifdef::env-github[] +:tip-caption: :bulb: +:note-caption: :information_source: +:important-caption: :heavy_exclamation_mark: +:caution-caption: :fire: +:warning-caption: :warning: +endif::[] +:toc: macro + += Glue Create + +This is a simple Lambda which receives S3 bucket notifications when a new Delta +Lake table is created and creates the appropriate Glue table entry in the data +catalog. + +The function exists to handle some of the custom renaming logic to accommodate +the two-layer hierarchy in the AWS Glue Data Catalog (`database` and `table +name`). + + +toc::[] + + +== Environment Variables + +|=== + +| Name | Default Value | Notes + +| `RUST_LOG` +| `error` +| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug` + +| `UNWRAP_SNS_ENVELOPE` +| _null_ +| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow + +|=== diff --git a/lambdas/glue-create/src/main.rs b/lambdas/glue-create/src/main.rs new file mode 100644 index 0000000..dde5d01 --- /dev/null +++ b/lambdas/glue-create/src/main.rs @@ -0,0 +1,264 @@ +use aws_lambda_events::s3::S3EventRecord; +use aws_lambda_events::sqs::SqsEvent; +use lambda_runtime::{run, service_fn, Error, LambdaEvent}; +use regex::Regex; +use tracing::log::*; + +use oxbow_lambda_shared::*; + +/// Environment variable which defines the regular express to process a database and table from the +/// key names +const GLUE_REGEX_ENV: &str = "GLUE_PATH_REGEX"; +/// Environment variable of the Athena workgroup to use when creating external tables in the +/// catalog +const ATHENA_WORKGROUP_ENV: &str = "ATHENA_WORKGROUP"; +/// Environment variable for the Data Source to use in Athena, this sohuld be a Glue catalog +/// configured Data Source for Athena +const ATHENA_DATA_SOURCE_ENV: &str = "ATHENA_DATA_SOURCE"; +const QUERY_TIMEOUT_SECS: i32 = 60; + +#[derive(Debug, PartialEq)] +struct GlueTable { + name: String, + database: String, +} + +async fn function_handler(event: LambdaEvent) -> Result<(), Error> { + info!("Invoking glue-create"); + let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let glue = aws_sdk_glue::Client::new(&config); + // The athena client is used for creating the table in the catalog with the + // appropriate metadata. + // + // Rather than processing the Delta Table ourselves, Athena will: + // + // Unlike traditional Hive tables, Delta Lake table metadata are inferred from + // the Delta Lake transaction log and synchronized directly to AWS Glue. + // + // + let athena = aws_sdk_athena::Client::new(&config); + + let _ = std::env::var(GLUE_REGEX_ENV) + .expect("The Lambda must have GLUE_PATH_REGEX defined in the environment"); + let _ = std::env::var(ATHENA_WORKGROUP_ENV) + .expect("The Lambda must have ATHENA_WORKGROUP defined in the environment"); + let _ = std::env::var(ATHENA_DATA_SOURCE_ENV) + .expect("The Lambda must have ATHENA_DATA_SOURCE defined in the environment"); + + let records = match std::env::var("UNWRAP_SNS_ENVELOPE") { + Ok(_) => s3_from_sns(event.payload)?, + Err(_) => s3_from_sqs(event.payload)?, + }; + debug!("processing records: {records:?}"); + + for record in records { + process_record(record, &glue, &athena).await?; + } + + Ok(()) +} + +async fn process_record( + record: S3EventRecord, + glue: &aws_sdk_glue::Client, + athena: &aws_sdk_athena::Client, +) -> Result<(), Error> { + debug!("Parsing keys out of {record:?}"); + let bucket = record + .s3 + .bucket + .name + .expect("Failed to get bucket name out of payload"); + let key = record + .s3 + .object + .key + .expect("Failed to get object key out of payload"); + + let pattern = std::env::var(GLUE_REGEX_ENV) + .expect("Must have GLUE_PATH_REGEX defined in the environment"); + let pattern = Regex::new(&pattern).expect("Failed to compile the defined GLUE_PATH_REGEX"); + + // Map the key key to a GlueTable properly, currently only bucket notifications from the + // aurora_raw/ space is supported. + let glue_table = extract_table_from_key(&key, &pattern).expect(&format!( + "Expected to be able to parse out a table name: {key:?}" + )); + + match glue + .get_table() + .database_name(&glue_table.database) + .name(&glue_table.name) + .send() + .await + { + Ok(_output) => { + info!("The table {glue_table:?} already exists, nice."); + Ok(()) + } + Err(_e) => { + info!("The table {glue_table:?} does not exist, creating it!"); + // Determine if the key is pointing to a delta table and then start loading + if let Some(table_path) = key_is_delta_table(&key) { + let uri = format!("s3://{bucket}/{table_path}"); + let work_group = + std::env::var(ATHENA_WORKGROUP_ENV).expect("Failed to get work group"); + let catalog = match std::env::var(ATHENA_DATA_SOURCE_ENV) { + Ok(val) => Some(val.to_string()), + Err(_) => None, + }; + + let exec_context = aws_sdk_athena::types::QueryExecutionContext::builder() + .database(glue_table.database) + .set_catalog(catalog) + .build(); + + let query = format!( + r#"CREATE EXTERNAL TABLE + {0} + LOCATION '{uri}' + TBLPROPERTIES ('table_type' = 'DELTA')"#, + glue_table.name + ); + info!("Triggering athena with: {query}"); + let result = athena + .start_query_execution() + .work_group(work_group) + .query_execution_context(exec_context) + .query_string(&query) + .send() + .await?; + + debug!("Query response: {result:?}"); + if let Some(query_execution_id) = result.query_execution_id { + debug!("Waiting for results from Athena"); + + for _i in 0..QUERY_TIMEOUT_SECS { + let execution = athena + .get_query_execution() + .query_execution_id(&query_execution_id) + .send() + .await?; + + if let Some(execution) = execution.query_execution { + if let Some(status) = execution.status { + debug!("Current status of query execution: {status:?}"); + match status.state { + Some(aws_sdk_athena::types::QueryExecutionState::Succeeded) => { + break; + } + Some(aws_sdk_athena::types::QueryExecutionState::Failed) => { + error!("Query failed! {status:?}"); + return Err( + "Failed to query Athena for some reason!".into() + ); + } + // Safely ignore Running, Queued, and other statuses + _ => {} + } + } + } + + debug!("Incompleted query, waiting a sec"); + let _ = tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + + Ok(()) + } else { + error!("Invoked with a key that didn't match a delta table! {key}"); + Ok(()) + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} + +/// +/// Use the `GLUE_TABLE_REGEX` to identify whether the key is a consistent with a table +/// +fn extract_table_from_key(key: &str, regex: &Regex) -> Option { + if let Some(captured) = regex.captures(key) { + let database = captured.name("database"); + let table = captured.name("table"); + if database.is_some() && table.is_some() { + let database = database.unwrap().as_str(); + let name = table.unwrap().as_str(); + return Some(GlueTable { + database: database.into(), + name: name.into(), + }); + } + } + None +} + +/// Match the key against a regular expression of what a delta table looks like +/// This will make it easier to figure out whether the modified bucket is a Delta Table so we can +/// load it and look for it in the Glue catalog +/// +/// Returns the key name for the delta table so you can open it +fn key_is_delta_table(key: &str) -> Option { + let regex = Regex::new(r#"(?P.*)\/_delta_log\/.*\.json"#) + .expect("Failed to compile the regular expression"); + + if let Some(caps) = regex.captures(key) { + if let Some(root) = caps.name("root") { + return Some(root.as_str().into()); + } + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_invalid_table() { + let key = "data-lake/foo.parquet"; + let pattern = Regex::new("").expect("Failed to compile regex"); + assert_eq!(None, extract_table_from_key(key, &pattern)); + } + + #[test] + fn test_extract_valid_table() { + let key = r#"data-lake/testdb/testtable/_delta_log/0000000000.json"#; + let pattern = r#"data-lake\/(?P\w+)\/(?P\w+\.?\w+)\/_delta_log\/.*.json"#; + let pattern = Regex::new(pattern).expect("Failed to compile regex"); + let result = extract_table_from_key(key, &pattern); + assert!(result.is_some()); + let info = result.unwrap(); + assert_eq!(info.database, "testdb"); + assert_eq!(info.name, "testtable"); + } + + #[test] + fn test_unrelated_key() { + let key = "prefix/some/key/that/don/match.parquet"; + let result = key_is_delta_table(key); + assert!(result.is_none()); + } + + #[test] + fn test_matching_key() { + let key = "prefix/testdb/testtable/_delta_log/00000000000000000000.json"; + let result = key_is_delta_table(key); + assert!(result.is_some()); + if let Some(root) = result { + assert_eq!(root, "prefix/testdb/testtable"); + } + } +} diff --git a/lambdas/glue-sync/Cargo.toml b/lambdas/glue-sync/Cargo.toml index 097a4fe..25cecc8 100644 --- a/lambdas/glue-sync/Cargo.toml +++ b/lambdas/glue-sync/Cargo.toml @@ -21,4 +21,3 @@ oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" } aws-sdk-glue = "1.26.0" aws-config = { version = "1.2.0", features = ["behavior-version-latest"] } lambda_runtime = { version = "0.11" } -aws-smithy-runtime-api = "1.4.0"