diff --git a/Cargo.lock b/Cargo.lock index 3bcbc56e..ea665a0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1179,6 +1179,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tui", "url", ] @@ -2263,9 +2264,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index a4c47c29..0225bdb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ sqlx = { version = "0.7.2", features = ["sqlite", "chrono", "runtime-tokio"] } thiserror = "1" tokio = { version = "1.33", features = ["full"] } tokio-stream = { version = "0.1.14", features = ["time"] } +tokio-util = { version = "0.7.9", features = ["io"] } url = { version = "2.4.1", features = ["serde"] } zstd = "0.12.4" diff --git a/moss/Cargo.toml b/moss/Cargo.toml index 5fce3317..76fda3fa 100644 --- a/moss/Cargo.toml +++ b/moss/Cargo.toml @@ -24,5 +24,6 @@ serde_yaml.workspace = true sqlx.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true thiserror.workspace = true url.workspace = true diff --git a/moss/src/package/fetch.rs b/moss/src/package/fetch.rs index 0a498841..e7e9555c 100644 --- a/moss/src/package/fetch.rs +++ b/moss/src/package/fetch.rs @@ -42,7 +42,7 @@ pub async fn fetch( let url = meta.uri.as_ref().ok_or(Error::MissingUri)?.parse::()?; let hash = meta.hash.as_ref().ok_or(Error::MissingHash)?; - let mut bytes = request::get(url).await.unwrap(); + let mut bytes = request::get(url).await?; let download_path = download_path(installation, hash).await?; let mut out = File::create(&download_path).await?; @@ -241,7 +241,7 @@ pub enum Error { #[error("invalid url: {0}")] InvalidUrl(#[from] url::ParseError), #[error("request failed: {0}")] - Request(#[from] reqwest::Error), + Request(#[from] request::Error), #[error("io error: {0}")] Io(#[from] io::Error), } diff --git a/moss/src/repository/mod.rs b/moss/src/repository/mod.rs index b56707cf..8b8c6d88 100644 --- a/moss/src/repository/mod.rs +++ b/moss/src/repository/mod.rs @@ -145,7 +145,7 @@ async fn fetch_index(url: Url, out_path: impl AsRef) -> Result<(), FetchEr #[derive(Debug, Error)] pub enum FetchError { #[error("request failed: {0}")] - Request(#[from] reqwest::Error), + Request(#[from] request::Error), #[error("io error: {0}")] Io(#[from] io::Error), } diff --git a/moss/src/request.rs b/moss/src/request.rs index 26e9d2b8..c6dff3ca 100644 --- a/moss/src/request.rs +++ b/moss/src/request.rs @@ -2,10 +2,14 @@ // // SPDX-License-Identifier: MPL-2.0 +use std::{io, path::PathBuf}; + use bytes::Bytes; -use futures::Stream; +use futures::{stream::BoxStream, Stream, StreamExt}; use once_cell::sync::Lazy; -use reqwest::Result; +use thiserror::Error; +use tokio::fs::File; +use tokio_util::io::ReaderStream; use url::Url; /// Shared client for tcp socket reuse and connection limit @@ -21,15 +25,44 @@ static CLIENT: Lazy = Lazy::new(|| { }); /// Fetch a resource at the provided [`Url`] and stream it's response bytes -pub async fn get(url: Url) -> Result>> { - match url.scheme() { - "file" => todo!(), - _ => { - let response = CLIENT.get(url).send().await?; - - response - .error_for_status() - .map(reqwest::Response::bytes_stream) - } +pub async fn get(url: Url) -> Result>, Error> { + match url_file(&url) { + Some(path) => Ok(read(path).await?.boxed()), + _ => Ok(fetch(url).await?.boxed()), } } + +async fn fetch(url: Url) -> Result>, Error> { + let response = CLIENT.get(url).send().await?; + + response + .error_for_status() + .map(reqwest::Response::bytes_stream) + .map(|stream| stream.map(|result| result.map_err(Error::Fetch))) + .map_err(Error::Fetch) +} + +async fn read(path: PathBuf) -> Result>, Error> { + // 4 MiB + const BUFFER_SIZE: usize = 4 * 1024 * 1024; + + let file = File::open(path).await?; + + Ok(ReaderStream::with_capacity(file, BUFFER_SIZE).map(|result| result.map_err(Error::Read))) +} + +fn url_file(url: &Url) -> Option { + if url.scheme() == "file" { + url.to_file_path().ok() + } else { + None + } +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("fetch error: {0}")] + Fetch(#[from] reqwest::Error), + #[error("read error: {0}")] + Read(#[from] io::Error), +}