Skip to content

Commit

Permalink
sqs-lambda-partial-batch-rust-sam
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniele Frasca committed Oct 12, 2024
1 parent b184257 commit b487ae6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 77 deletions.
4 changes: 2 additions & 2 deletions sqs-lambda-partial-batch-rust-sam/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ name = "handler"
path = "src/bin/handler.rs"

[dependencies]
aws_lambda_events = { version = "0.7", default-features = false, features = ["sqs"] }
aws_lambda_events = { version = "0.15", default-features = false, features = ["sqs"] }
futures = "0.3"
lambda_runtime = "0.11"
lambda_runtime = "0.13"
serde = {version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["macros"] }
Expand Down
29 changes: 0 additions & 29 deletions sqs-lambda-partial-batch-rust-sam/Makefile

This file was deleted.

8 changes: 4 additions & 4 deletions sqs-lambda-partial-batch-rust-sam/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Important: this application uses various AWS services and there are costs associ
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed
* [Rust](https://www.rust-lang.org/) 1.56.0 or higher
* [cargo-zigbuild](https://github.com/messense/cargo-zigbuild) and [Zig](https://ziglang.org/) for cross-compilation
* [CargoLambda](https://www.cargo-lambda.info/guide/installation.html)
* Make sure to run "rustup target add aarch64-unknown-linux-gnu;"

## Deployment Instructions
Expand All @@ -28,11 +28,11 @@ Important: this application uses various AWS services and there are costs associ
```
3. Install dependencies and build:
```
make build
sam build
```
4. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
```
make deploy
sam deploy
```
5. During the prompts:
* Enter a stack name
Expand Down Expand Up @@ -117,7 +117,7 @@ do_something MyStruct { name: "Daniele", surname: "Frasca" }
1. Delete the stack
```bash
make delete
sam delete
```
2. Confirm the stack has been deleted
```bash
Expand Down
57 changes: 17 additions & 40 deletions sqs-lambda-partial-batch-rust-sam/src/bin/handler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use aws_lambda_events::event::sqs::SqsEvent;
use aws_lambda_events::{
event::sqs::SqsEvent,
sqs::{BatchItemFailure, SqsBatchResponse},
};
use futures::future::join_all;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde::Deserialize;
use std::sync::{Arc, Mutex};

#[tokio::main]
Expand All @@ -19,54 +21,41 @@ async fn main() -> Result<(), Error> {
.await
}

pub async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
pub async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
println!("Input {:?}", event);
let failed_message: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let failed_message = Arc::new(Mutex::new(Vec::with_capacity(event.payload.records.len())));
let mut tasks = Vec::with_capacity(event.payload.records.len());

for record in event.payload.records.into_iter() {
let failed_message = failed_message.clone();

tasks.push(tokio::spawn(async move {
if let Some(body) = &record.body {
let request = serde_json::from_str::<MyStruct>(body);
if let Ok(request) = request {
do_something(&request).await.map_or_else(
|_e| {
failed_message
.lock()
.unwrap()
.push(record.message_id.unwrap().clone());
failed_message.lock().unwrap().push(BatchItemFailure {
item_identifier: record.message_id.unwrap_or_default(),
});
},
|_| (),
);
} else {
failed_message
.lock()
.unwrap()
.push(record.message_id.unwrap().clone());
failed_message.lock().unwrap().push(BatchItemFailure {
item_identifier: record.message_id.unwrap_or_default(),
});
}
}
}));
}

join_all(tasks).await;

let response = BatchItemFailures {
batch_item_failures: failed_message
.lock()
.unwrap()
.clone()
.into_iter()
.map(|message_id| {
ItemIdentifier {
item_identifier: message_id,
}
})
.collect(),
};
let failed_message = failed_message.lock().unwrap().clone();

Ok(serde_json::to_value(response).unwrap())
Ok(SqsBatchResponse {
batch_item_failures: failed_message,
})
}

async fn do_something(request: &MyStruct) -> Result<(), Error> {
Expand All @@ -79,15 +68,3 @@ pub struct MyStruct {
pub name: String,
pub surname: String,
}

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct BatchItemFailures {
#[serde(rename = "batchItemFailures")]
pub batch_item_failures: Vec<ItemIdentifier>,
}

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct ItemIdentifier {
#[serde(rename = "itemIdentifier")]
pub item_identifier: String,
}
5 changes: 3 additions & 2 deletions sqs-lambda-partial-batch-rust-sam/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./build/handler
CodeUri: .
Events:
MySQSEvent:
Type: SQS
Expand All @@ -43,6 +43,8 @@ Resources:
BatchSize: 10
FunctionResponseTypes:
- ReportBatchItemFailures
Metadata:
BuildMethod: rust-cargolambda

Outputs:
MySqsQueueName:
Expand All @@ -54,4 +56,3 @@ Outputs:
MySqsQueueURL:
Description: SQS queue URL
Value: !Ref MySqsQueue

0 comments on commit b487ae6

Please sign in to comment.