Skip to content

Commit

Permalink
Merge pull request #59 from serpent-os/feat/request-file
Browse files Browse the repository at this point in the history
Handle file URL for request
  • Loading branch information
ikeycode authored Oct 19, 2023
2 parents c19d75d + cf2bcda commit ea7fe94
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 17 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sqlx = { version = "0.7.2", features = ["sqlite", "chrono", "runtime-tokio"] }
thiserror = "1"
tokio = { version = "1.33", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["time"] }
tokio-util = { version = "0.7.9", features = ["io"] }
url = { version = "2.4.1", features = ["serde"] }
zstd = "0.12.4"

Expand Down
1 change: 1 addition & 0 deletions moss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ serde_yaml.workspace = true
sqlx.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
thiserror.workspace = true
url.workspace = true
4 changes: 2 additions & 2 deletions moss/src/package/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn fetch(
let url = meta.uri.as_ref().ok_or(Error::MissingUri)?.parse::<Url>()?;
let hash = meta.hash.as_ref().ok_or(Error::MissingHash)?;

let mut bytes = request::get(url).await.unwrap();
let mut bytes = request::get(url).await?;

let download_path = download_path(installation, hash).await?;
let mut out = File::create(&download_path).await?;
Expand Down Expand Up @@ -241,7 +241,7 @@ pub enum Error {
#[error("invalid url: {0}")]
InvalidUrl(#[from] url::ParseError),
#[error("request failed: {0}")]
Request(#[from] reqwest::Error),
Request(#[from] request::Error),
#[error("io error: {0}")]
Io(#[from] io::Error),
}
2 changes: 1 addition & 1 deletion moss/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn fetch_index(url: Url, out_path: impl AsRef<Path>) -> Result<(), FetchEr
#[derive(Debug, Error)]
pub enum FetchError {
#[error("request failed: {0}")]
Request(#[from] reqwest::Error),
Request(#[from] request::Error),
#[error("io error: {0}")]
Io(#[from] io::Error),
}
57 changes: 45 additions & 12 deletions moss/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
//
// SPDX-License-Identifier: MPL-2.0

use std::{io, path::PathBuf};

use bytes::Bytes;
use futures::Stream;
use futures::{stream::BoxStream, Stream, StreamExt};
use once_cell::sync::Lazy;
use reqwest::Result;
use thiserror::Error;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use url::Url;

/// Shared client for tcp socket reuse and connection limit
Expand All @@ -21,15 +25,44 @@ static CLIENT: Lazy<reqwest::Client> = Lazy::new(|| {
});

/// Fetch a resource at the provided [`Url`] and stream it's response bytes
pub async fn get(url: Url) -> Result<impl Stream<Item = Result<Bytes>>> {
match url.scheme() {
"file" => todo!(),
_ => {
let response = CLIENT.get(url).send().await?;

response
.error_for_status()
.map(reqwest::Response::bytes_stream)
}
pub async fn get(url: Url) -> Result<BoxStream<'static, Result<Bytes, Error>>, Error> {
match url_file(&url) {
Some(path) => Ok(read(path).await?.boxed()),
_ => Ok(fetch(url).await?.boxed()),
}
}

async fn fetch(url: Url) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
let response = CLIENT.get(url).send().await?;

response
.error_for_status()
.map(reqwest::Response::bytes_stream)
.map(|stream| stream.map(|result| result.map_err(Error::Fetch)))
.map_err(Error::Fetch)
}

async fn read(path: PathBuf) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
// 4 MiB
const BUFFER_SIZE: usize = 4 * 1024 * 1024;

let file = File::open(path).await?;

Ok(ReaderStream::with_capacity(file, BUFFER_SIZE).map(|result| result.map_err(Error::Read)))
}

fn url_file(url: &Url) -> Option<PathBuf> {
if url.scheme() == "file" {
url.to_file_path().ok()
} else {
None
}
}

#[derive(Debug, Error)]
pub enum Error {
#[error("fetch error: {0}")]
Fetch(#[from] reqwest::Error),
#[error("read error: {0}")]
Read(#[from] io::Error),
}

0 comments on commit ea7fe94

Please sign in to comment.