diff --git a/stac-async/CHANGELOG.md b/stac-async/CHANGELOG.md index 6ef7e2c3..72fdddf4 100644 --- a/stac-async/CHANGELOG.md +++ b/stac-async/CHANGELOG.md @@ -10,6 +10,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - `impl Default for Client` ([#252](https://github.com/stac-utils/stac-rs/pull/252)) +### Fixed + +- Better send message handling in the api client ([#288](https://github.com/stac-utils/stac-rs/pull/288)) + ## [0.5.1] ### Changed diff --git a/stac-async/src/api_client.rs b/stac-async/src/api_client.rs index 48cdcd08..32113d5a 100644 --- a/stac-async/src/api_client.rs +++ b/stac-async/src/api_client.rs @@ -5,7 +5,10 @@ use futures_util::{pin_mut, StreamExt}; use reqwest::Method; use stac::{Collection, Links}; use stac_api::{GetItems, Item, ItemCollection, Items, Search, UrlBuilder}; -use tokio::sync::mpsc; +use tokio::{ + sync::mpsc::{self, error::SendError}, + task::JoinHandle, +}; const DEFAULT_CHANNEL_BUFFER: usize = 4; @@ -159,18 +162,19 @@ fn stream_items( channel_buffer: usize, ) -> impl Stream> { let (tx, mut rx) = mpsc::channel(channel_buffer); - let handle = tokio::spawn(async move { + let handle: JoinHandle>> = tokio::spawn(async move { let pages = stream_pages(client, page); pin_mut!(pages); while let Some(result) = pages.next().await { match result { - Ok(page) => tx.send(Ok(page)).await.unwrap(), + Ok(page) => tx.send(Ok(page)).await?, Err(err) => { - tx.send(Err(err)).await.unwrap(); - return; + tx.send(Err(err)).await?; + return Ok(()); } } } + Ok(()) }); try_stream! { while let Some(result) = rx.recv().await { @@ -179,7 +183,7 @@ fn stream_items( yield item; } } - handle.await?; + let _ = handle.await?; } }