Skip to content

Commit

Permalink
move arrow ipc conversion to dedicated function
Browse files Browse the repository at this point in the history
  • Loading branch information
wssheldon committed Sep 30, 2024
1 parent 9ea59e6 commit 1deac91
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions pond/pond-duckling/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ struct ArrowIpcResponse {
body: Vec<u8>,
}

fn convert_to_arrow_ipc(rbs: &[RecordBatch]) -> Result<Vec<u8>, Error> {
let mut buffer = Cursor::new(Vec::new());
{
let mut writer = StreamWriter::try_new(&mut buffer, &rbs[0].schema())?;
for batch in rbs {
writer.write(batch)?;
}
writer.finish()?;
}
Ok(buffer.into_inner())
}

async fn function_handler(event: LambdaEvent<Request>) -> Result<ArrowIpcResponse, Error> {
let query = event.payload.query.unwrap_or_else(||
"SELECT * FROM read_parquet('https://shell.duckdb.org/data/tpch/0_01/parquet/customer.parquet') LIMIT 5".to_string()
Expand All @@ -33,23 +45,16 @@ async fn function_handler(event: LambdaEvent<Request>) -> Result<ArrowIpcRespons
let mut stmt = conn.prepare(&query)?;
let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();

// Serialize RecordBatches to Arrow IPC format
let mut buffer = Cursor::new(Vec::new());
{
let mut writer = StreamWriter::try_new(&mut buffer, &rbs[0].schema())?;
for batch in &rbs {
writer.write(batch)?;
}
writer.finish()?;
}
// Convert RecordBatches to Arrow IPC format
let arrow_ipc_data = convert_to_arrow_ipc(&rbs)?;

// Return the custom response
Ok(ArrowIpcResponse {
status_code: StatusCode::OK.as_u16(),
headers: json!({
"Content-Type": "application/vnd.apache.arrow.stream",
}),
body: buffer.into_inner(),
body: arrow_ipc_data,
})
}

Expand Down

0 comments on commit 1deac91

Please sign in to comment.