From 20a8a8a3cf054d5227f7677f80b948273e514d47 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Fri, 19 Mar 2021 11:12:59 +0800 Subject: [PATCH] repos: refine github release (#72) --- src/github_release.rs | 81 +++++++++++++++++++++++++++++-------------- src/main.rs | 26 ++++++++++---- src/opts.rs | 8 ++++- src/s3.rs | 52 +++++++++++++++++++++++++-- src/stream_pipe.rs | 32 ++++++++++------- 5 files changed, 150 insertions(+), 49 deletions(-) diff --git a/src/github_release.rs b/src/github_release.rs index 6c50446..4e65e82 100644 --- a/src/github_release.rs +++ b/src/github_release.rs @@ -3,30 +3,53 @@ //! GitHubRelease source will fetch the GitHub API when taking snapshots. //! Then, it will construct a list of downloadable URLs. -use crate::common::{Mission, SnapshotConfig, SnapshotPath}; +use crate::common::{Mission, SnapshotConfig, TransferURL}; use crate::error::Result; +use crate::metadata::SnapshotMeta; use crate::timeout::{TryTimeoutExt, TryTimeoutFutureExt}; -use crate::traits::SnapshotStorage; - -use std::time::Duration; +use crate::traits::{SnapshotStorage, SourceStorage}; use async_trait::async_trait; -use serde_json::Value; +use chrono::{DateTime, Utc}; +use serde::Deserialize; use slog::info; +use std::time::Duration; +use structopt::StructOpt; + +#[derive(Deserialize, Debug)] +pub struct GitHubReleaseAsset { + url: String, + id: u64, + name: String, + content_type: String, + size: u64, + created_at: DateTime, + updated_at: DateTime, + browser_download_url: String, +} + +#[derive(Deserialize, Debug)] +pub struct GitHubReleaseItem { + tag_name: String, + name: String, + assets: Vec, +} -#[derive(Debug)] +#[derive(Debug, Clone, StructOpt)] pub struct GitHubRelease { + #[structopt(long, help = "GitHub Repo")] pub repo: String, + #[structopt(long, help = "Version numbers to retain")] pub version_to_retain: usize, } #[async_trait] -impl SnapshotStorage for GitHubRelease { +impl SnapshotStorage for GitHubRelease { async fn snapshot( &mut self, mission: Mission, _config: &SnapshotConfig, - ) -> Result> { + ) -> Result> { let logger = mission.logger; let progress = mission.progress; let client = mission.client; @@ -47,34 +70,40 @@ impl SnapshotStorage for GitHubRelease { .into_result()?; info!(logger, "parsing..."); - let json: Value = serde_json::from_str(&data).unwrap(); - let releases = json.as_array().unwrap(); - let snapshot: Vec = releases - .iter() - .filter_map(|releases| releases.as_object()) - .filter_map(|releases| { - progress.set_message( - releases - .get("tag_name") - .and_then(|tag_name| tag_name.as_str()) - .unwrap_or(""), - ); - releases.get("assets") + let releases = serde_json::from_str::>(&data)?; + let replace_string = format!("https://github.com/{}/", self.repo); + let snapshot: Vec = releases + .into_iter() + .map(|release| { + progress.set_message(&release.tag_name); + release.assets }) .take(self.version_to_retain) - .filter_map(|assets| assets.as_array()) .flatten() - .filter_map(|asset| asset.get("browser_download_url")) - .filter_map(|url| url.as_str()) - .map(|url| url.replace("https://github.com/", "")) + .map(|asset| SnapshotMeta { + key: asset.browser_download_url.replace(&replace_string, ""), + size: Some(asset.size), + last_modified: Some(asset.updated_at.timestamp() as u64), + ..Default::default() + }) .collect(); progress.finish_with_message("done"); - Ok(crate::utils::snapshot_string_to_path(snapshot)) + Ok(snapshot) } fn info(&self) -> String { format!("github releases, {:?}", self) } } + +#[async_trait] +impl SourceStorage for GitHubRelease { + async fn get_object(&self, snapshot: &SnapshotMeta, _mission: &Mission) -> Result { + Ok(TransferURL(format!( + "https://github.com/{}/{}", + self.repo, snapshot.key + ))) + } +} diff --git a/src/main.rs b/src/main.rs index 497096b..dac5df2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,9 +34,13 @@ use simple_diff_transfer::SimpleDiffTransfer; use structopt::StructOpt; macro_rules! index_bytes_pipe { - ($buffer_path: expr, $prefix: expr) => { + ($buffer_path: expr, $prefix: expr, $use_snapshot_last_modified: expr) => { |source| { - let source = stream_pipe::ByteStreamPipe::new(source, $buffer_path.clone().unwrap()); + let source = stream_pipe::ByteStreamPipe::new( + source, + $buffer_path.clone().unwrap(), + $use_snapshot_last_modified, + ); index_pipe::IndexPipe::new( source, $buffer_path.clone().unwrap(), @@ -113,7 +117,7 @@ fn main() { opts, source, transfer_config, - index_bytes_pipe!(buffer_path, prefix) + index_bytes_pipe!(buffer_path, prefix, false) ); } Source::Homebrew(source) => { @@ -121,7 +125,7 @@ fn main() { opts, source, transfer_config, - index_bytes_pipe!(buffer_path, prefix) + index_bytes_pipe!(buffer_path, prefix, false) ); } Source::CratesIo(source) => { @@ -129,7 +133,7 @@ fn main() { opts, source, transfer_config, - index_bytes_pipe!(buffer_path, prefix) + index_bytes_pipe!(buffer_path, prefix, false) ); } Source::Conda(config) => { @@ -138,7 +142,7 @@ fn main() { opts, source, transfer_config, - index_bytes_pipe!(buffer_path, prefix) + index_bytes_pipe!(buffer_path, prefix, false) ); } Source::Rsync(source) => { @@ -146,7 +150,15 @@ fn main() { opts, source, transfer_config, - index_bytes_pipe!(buffer_path, prefix) + index_bytes_pipe!(buffer_path, prefix, false) + ); + } + Source::GithubRelease(source) => { + transfer!( + opts, + source, + transfer_config, + index_bytes_pipe!(buffer_path, prefix, true) ); } } diff --git a/src/opts.rs b/src/opts.rs index 2d02324..186cf54 100644 --- a/src/opts.rs +++ b/src/opts.rs @@ -1,6 +1,7 @@ use crate::conda::CondaConfig; use crate::crates_io::CratesIo as CratesIoConfig; use crate::file_backend::FileBackend; +use crate::github_release::GitHubRelease; use crate::homebrew::Homebrew as HomebrewConfig; use crate::pypi::Pypi as PypiConfig; use crate::rsync::Rsync as RsyncConfig; @@ -24,6 +25,8 @@ pub enum Source { Conda(CondaConfig), #[structopt(about = "rsync")] Rsync(RsyncConfig), + #[structopt(about = "GitHub Releases")] + GithubRelease(GitHubRelease), } #[derive(Debug)] @@ -51,7 +54,8 @@ impl From for MirrorIntel { impl From for S3Backend { fn from(config: S3CliConfig) -> Self { - let mut s3_config = crate::s3::S3Config::new_jcloud(config.s3_prefix.unwrap()); + let mut s3_config = + crate::s3::S3Config::new_jcloud(config.s3_prefix.unwrap(), config.s3_scan_metadata); if let Some(endpoint) = config.s3_endpoint { s3_config.endpoint = endpoint; } @@ -84,6 +88,8 @@ pub struct S3CliConfig { pub s3_prefix_hint_mode: Option, #[structopt(long, help = "Max keys to list at a time", default_value = "1000")] pub s3_max_keys: u64, + #[structopt(long, help = "Scan metadata (Greatly increase requests)")] + pub s3_scan_metadata: bool, } #[derive(StructOpt, Debug, Clone)] diff --git a/src/s3.rs b/src/s3.rs index 1ccb929..013d210 100644 --- a/src/s3.rs +++ b/src/s3.rs @@ -26,7 +26,9 @@ use crate::traits::{Key, SnapshotStorage, TargetStorage}; use async_trait::async_trait; use futures_util::{stream, StreamExt}; use rusoto_core::Region; -use rusoto_s3::{DeleteObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3}; +use rusoto_s3::{ + DeleteObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, +}; use slog::{debug, info, warn}; #[derive(Debug)] @@ -35,17 +37,19 @@ pub struct S3Config { pub bucket: String, pub prefix: String, pub prefix_hint_mode: Option, + pub scan_metadata: bool, pub max_keys: u64, } impl S3Config { - pub fn new_jcloud(prefix: String) -> Self { + pub fn new_jcloud(prefix: String, scan_metadata: bool) -> Self { Self { endpoint: "https://s3.jcloud.sjtu.edu.cn".to_string(), bucket: "899a892efef34b1b944a19981040f55b-oss01".to_string(), prefix, max_keys: 1000, prefix_hint_mode: None, + scan_metadata, } } } @@ -105,6 +109,7 @@ impl SnapshotStorage for S3Backend { } }; + // List bucket let mut futures = stream::iter(prefix) .map(|additional_prefix| { let bucket = self.config.bucket.clone(); @@ -177,6 +182,49 @@ impl SnapshotStorage for S3Backend { snapshots.append(&mut snapshot?); } + // Get metadata + let snapshots = if self.config.scan_metadata { + let mut futures = stream::iter(snapshots) + .map(|snapshot| { + let bucket = self.config.bucket.clone(); + let client = self.client.clone(); + let progress = progress.clone(); + let prefix = self.config.prefix.clone(); + + async move { + progress.set_message(&snapshot.key); + let req = HeadObjectRequest { + bucket, + key: format!("{}/{}", prefix, snapshot.key), + ..Default::default() + }; + let resp = client.head_object(req).await?; + let last_modified = if let Some(metadata) = resp.metadata { + metadata + .get("clone-last-modified") + .and_then(|x| x.parse::().ok()) + } else { + None + }; + Ok::<_, Error>(SnapshotMeta { + last_modified, + ..snapshot + }) + } + }) + .buffer_unordered(64); + + let mut snapshots = vec![]; + + while let Some(snapshot) = futures.next().await { + snapshots.push(snapshot?); + } + + snapshots + } else { + snapshots + }; + progress.finish_with_message("done"); let total_size = total_size.load(std::sync::atomic::Ordering::SeqCst); diff --git a/src/stream_pipe.rs b/src/stream_pipe.rs index 32f39f9..1766d37 100644 --- a/src/stream_pipe.rs +++ b/src/stream_pipe.rs @@ -13,7 +13,7 @@ use chrono::DateTime; use crate::common::{Mission, SnapshotConfig, TransferURL}; use crate::error::{Error, Result}; -use crate::traits::{SnapshotStorage, SourceStorage}; +use crate::traits::{Key, Metadata, SnapshotStorage, SourceStorage}; use crate::utils::{hash_string, unix_time}; use futures_core::Stream; use futures_util::{StreamExt, TryStreamExt}; @@ -75,13 +75,15 @@ pub struct ByteStream { pub struct ByteStreamPipe { pub source: Source, pub buffer_path: String, + pub use_snapshot_last_modified: bool, } impl ByteStreamPipe { - pub fn new(source: Source, buffer_path: String) -> Self { + pub fn new(source: Source, buffer_path: String, use_snapshot_last_modified: bool) -> Self { Self { source, buffer_path, + use_snapshot_last_modified, } } } @@ -112,7 +114,7 @@ where #[async_trait] impl SourceStorage for ByteStreamPipe where - Snapshot: Send + Sync + 'static, + Snapshot: Key + Metadata, Source: SourceStorage, { async fn get_object(&self, snapshot: &Snapshot, mission: &Mission) -> Result { @@ -143,15 +145,19 @@ where let mut total_bytes: u64 = 0; let content_length = response.content_length(); - let modified_at = std::str::from_utf8( - response - .headers() - .get(reqwest::header::LAST_MODIFIED) - .unwrap() - .as_bytes(), - ) - .unwrap() - .to_owned(); + let modified_at = if self.use_snapshot_last_modified { + snapshot.last_modified().unwrap() + } else { + let header = std::str::from_utf8( + response + .headers() + .get(reqwest::header::LAST_MODIFIED) + .unwrap() + .as_bytes(), + ) + .unwrap(); + DateTime::parse_from_rfc2822(&header)?.timestamp() as u64 + }; debug!(logger, "download: {} {:?}", transfer_url.0, content_length); @@ -183,7 +189,7 @@ where path: Some(path.into()), }, length: total_bytes, - modified_at: DateTime::parse_from_rfc2822(&modified_at)?.timestamp() as u64, + modified_at, }) } }