Skip to content

Commit

Permalink
fix(stac-async): better send cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomski committed Aug 8, 2024
1 parent d76ca59 commit eace9bc
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
4 changes: 4 additions & 0 deletions stac-async/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

- `impl Default for Client` ([#252](https://github.com/stac-utils/stac-rs/pull/252))

### Fixed

- Better send message handling in the api client ([#288](https://github.com/stac-utils/stac-rs/pull/288))

## [0.5.1]

### Changed
Expand Down
16 changes: 10 additions & 6 deletions stac-async/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use futures_util::{pin_mut, StreamExt};
use reqwest::Method;
use stac::{Collection, Links};
use stac_api::{GetItems, Item, ItemCollection, Items, Search, UrlBuilder};
use tokio::sync::mpsc;
use tokio::{
sync::mpsc::{self, error::SendError},
task::JoinHandle,
};

const DEFAULT_CHANNEL_BUFFER: usize = 4;

Expand Down Expand Up @@ -159,18 +162,19 @@ fn stream_items(
channel_buffer: usize,
) -> impl Stream<Item = Result<Item>> {
let (tx, mut rx) = mpsc::channel(channel_buffer);
let handle = tokio::spawn(async move {
let handle: JoinHandle<std::result::Result<(), SendError<_>>> = tokio::spawn(async move {
let pages = stream_pages(client, page);
pin_mut!(pages);
while let Some(result) = pages.next().await {
match result {
Ok(page) => tx.send(Ok(page)).await.unwrap(),
Ok(page) => tx.send(Ok(page)).await?,
Err(err) => {
tx.send(Err(err)).await.unwrap();
return;
tx.send(Err(err)).await?;
return Ok(());
}
}
}
Ok(())
});
try_stream! {
while let Some(result) = rx.recv().await {
Expand All @@ -179,7 +183,7 @@ fn stream_items(
yield item;
}
}
handle.await?;
let _ = handle.await?;
}
}

Expand Down

0 comments on commit eace9bc

Please sign in to comment.