Skip to content

Commit

Permalink
repos: refine github release (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
skyzh authored Mar 19, 2021
1 parent 8040395 commit 20a8a8a
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 49 deletions.
81 changes: 55 additions & 26 deletions src/github_release.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,53 @@
//! GitHubRelease source will fetch the GitHub API when taking snapshots.
//! Then, it will construct a list of downloadable URLs.
use crate::common::{Mission, SnapshotConfig, SnapshotPath};
use crate::common::{Mission, SnapshotConfig, TransferURL};
use crate::error::Result;
use crate::metadata::SnapshotMeta;
use crate::timeout::{TryTimeoutExt, TryTimeoutFutureExt};
use crate::traits::SnapshotStorage;

use std::time::Duration;
use crate::traits::{SnapshotStorage, SourceStorage};

use async_trait::async_trait;
use serde_json::Value;
use chrono::{DateTime, Utc};
use serde::Deserialize;
use slog::info;
use std::time::Duration;
use structopt::StructOpt;

#[derive(Deserialize, Debug)]
pub struct GitHubReleaseAsset {
url: String,
id: u64,
name: String,
content_type: String,
size: u64,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
browser_download_url: String,
}

#[derive(Deserialize, Debug)]
pub struct GitHubReleaseItem {
tag_name: String,
name: String,
assets: Vec<GitHubReleaseAsset>,
}

#[derive(Debug)]
#[derive(Debug, Clone, StructOpt)]
pub struct GitHubRelease {
#[structopt(long, help = "GitHub Repo")]
pub repo: String,
#[structopt(long, help = "Version numbers to retain")]
pub version_to_retain: usize,
}

#[async_trait]
impl SnapshotStorage<SnapshotPath> for GitHubRelease {
impl SnapshotStorage<SnapshotMeta> for GitHubRelease {
async fn snapshot(
&mut self,
mission: Mission,
_config: &SnapshotConfig,
) -> Result<Vec<SnapshotPath>> {
) -> Result<Vec<SnapshotMeta>> {
let logger = mission.logger;
let progress = mission.progress;
let client = mission.client;
Expand All @@ -47,34 +70,40 @@ impl SnapshotStorage<SnapshotPath> for GitHubRelease {
.into_result()?;

info!(logger, "parsing...");
let json: Value = serde_json::from_str(&data).unwrap();
let releases = json.as_array().unwrap();
let snapshot: Vec<String> = releases
.iter()
.filter_map(|releases| releases.as_object())
.filter_map(|releases| {
progress.set_message(
releases
.get("tag_name")
.and_then(|tag_name| tag_name.as_str())
.unwrap_or(""),
);
releases.get("assets")
let releases = serde_json::from_str::<Vec<GitHubReleaseItem>>(&data)?;
let replace_string = format!("https://github.com/{}/", self.repo);
let snapshot: Vec<SnapshotMeta> = releases
.into_iter()
.map(|release| {
progress.set_message(&release.tag_name);
release.assets
})
.take(self.version_to_retain)
.filter_map(|assets| assets.as_array())
.flatten()
.filter_map(|asset| asset.get("browser_download_url"))
.filter_map(|url| url.as_str())
.map(|url| url.replace("https://github.com/", ""))
.map(|asset| SnapshotMeta {
key: asset.browser_download_url.replace(&replace_string, ""),
size: Some(asset.size),
last_modified: Some(asset.updated_at.timestamp() as u64),
..Default::default()
})
.collect();

progress.finish_with_message("done");

Ok(crate::utils::snapshot_string_to_path(snapshot))
Ok(snapshot)
}

fn info(&self) -> String {
format!("github releases, {:?}", self)
}
}

#[async_trait]
impl SourceStorage<SnapshotMeta, TransferURL> for GitHubRelease {
async fn get_object(&self, snapshot: &SnapshotMeta, _mission: &Mission) -> Result<TransferURL> {
Ok(TransferURL(format!(
"https://github.com/{}/{}",
self.repo, snapshot.key
)))
}
}
26 changes: 19 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ use simple_diff_transfer::SimpleDiffTransfer;
use structopt::StructOpt;

macro_rules! index_bytes_pipe {
($buffer_path: expr, $prefix: expr) => {
($buffer_path: expr, $prefix: expr, $use_snapshot_last_modified: expr) => {
|source| {
let source = stream_pipe::ByteStreamPipe::new(source, $buffer_path.clone().unwrap());
let source = stream_pipe::ByteStreamPipe::new(
source,
$buffer_path.clone().unwrap(),
$use_snapshot_last_modified,
);
index_pipe::IndexPipe::new(
source,
$buffer_path.clone().unwrap(),
Expand Down Expand Up @@ -113,23 +117,23 @@ fn main() {
opts,
source,
transfer_config,
index_bytes_pipe!(buffer_path, prefix)
index_bytes_pipe!(buffer_path, prefix, false)
);
}
Source::Homebrew(source) => {
transfer!(
opts,
source,
transfer_config,
index_bytes_pipe!(buffer_path, prefix)
index_bytes_pipe!(buffer_path, prefix, false)
);
}
Source::CratesIo(source) => {
transfer!(
opts,
source,
transfer_config,
index_bytes_pipe!(buffer_path, prefix)
index_bytes_pipe!(buffer_path, prefix, false)
);
}
Source::Conda(config) => {
Expand All @@ -138,15 +142,23 @@ fn main() {
opts,
source,
transfer_config,
index_bytes_pipe!(buffer_path, prefix)
index_bytes_pipe!(buffer_path, prefix, false)
);
}
Source::Rsync(source) => {
transfer!(
opts,
source,
transfer_config,
index_bytes_pipe!(buffer_path, prefix)
index_bytes_pipe!(buffer_path, prefix, false)
);
}
Source::GithubRelease(source) => {
transfer!(
opts,
source,
transfer_config,
index_bytes_pipe!(buffer_path, prefix, true)
);
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/opts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::conda::CondaConfig;
use crate::crates_io::CratesIo as CratesIoConfig;
use crate::file_backend::FileBackend;
use crate::github_release::GitHubRelease;
use crate::homebrew::Homebrew as HomebrewConfig;
use crate::pypi::Pypi as PypiConfig;
use crate::rsync::Rsync as RsyncConfig;
Expand All @@ -24,6 +25,8 @@ pub enum Source {
Conda(CondaConfig),
#[structopt(about = "rsync")]
Rsync(RsyncConfig),
#[structopt(about = "GitHub Releases")]
GithubRelease(GitHubRelease),
}

#[derive(Debug)]
Expand Down Expand Up @@ -51,7 +54,8 @@ impl From<MirrorIntelCliConfig> for MirrorIntel {

impl From<S3CliConfig> for S3Backend {
fn from(config: S3CliConfig) -> Self {
let mut s3_config = crate::s3::S3Config::new_jcloud(config.s3_prefix.unwrap());
let mut s3_config =
crate::s3::S3Config::new_jcloud(config.s3_prefix.unwrap(), config.s3_scan_metadata);
if let Some(endpoint) = config.s3_endpoint {
s3_config.endpoint = endpoint;
}
Expand Down Expand Up @@ -84,6 +88,8 @@ pub struct S3CliConfig {
pub s3_prefix_hint_mode: Option<String>,
#[structopt(long, help = "Max keys to list at a time", default_value = "1000")]
pub s3_max_keys: u64,
#[structopt(long, help = "Scan metadata (Greatly increase requests)")]
pub s3_scan_metadata: bool,
}

#[derive(StructOpt, Debug, Clone)]
Expand Down
52 changes: 50 additions & 2 deletions src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::traits::{Key, SnapshotStorage, TargetStorage};
use async_trait::async_trait;
use futures_util::{stream, StreamExt};
use rusoto_core::Region;
use rusoto_s3::{DeleteObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3};
use rusoto_s3::{
DeleteObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
};
use slog::{debug, info, warn};

#[derive(Debug)]
Expand All @@ -35,17 +37,19 @@ pub struct S3Config {
pub bucket: String,
pub prefix: String,
pub prefix_hint_mode: Option<String>,
pub scan_metadata: bool,
pub max_keys: u64,
}

impl S3Config {
pub fn new_jcloud(prefix: String) -> Self {
pub fn new_jcloud(prefix: String, scan_metadata: bool) -> Self {
Self {
endpoint: "https://s3.jcloud.sjtu.edu.cn".to_string(),
bucket: "899a892efef34b1b944a19981040f55b-oss01".to_string(),
prefix,
max_keys: 1000,
prefix_hint_mode: None,
scan_metadata,
}
}
}
Expand Down Expand Up @@ -105,6 +109,7 @@ impl SnapshotStorage<SnapshotMeta> for S3Backend {
}
};

// List bucket
let mut futures = stream::iter(prefix)
.map(|additional_prefix| {
let bucket = self.config.bucket.clone();
Expand Down Expand Up @@ -177,6 +182,49 @@ impl SnapshotStorage<SnapshotMeta> for S3Backend {
snapshots.append(&mut snapshot?);
}

// Get metadata
let snapshots = if self.config.scan_metadata {
let mut futures = stream::iter(snapshots)
.map(|snapshot| {
let bucket = self.config.bucket.clone();
let client = self.client.clone();
let progress = progress.clone();
let prefix = self.config.prefix.clone();

async move {
progress.set_message(&snapshot.key);
let req = HeadObjectRequest {
bucket,
key: format!("{}/{}", prefix, snapshot.key),
..Default::default()
};
let resp = client.head_object(req).await?;
let last_modified = if let Some(metadata) = resp.metadata {
metadata
.get("clone-last-modified")
.and_then(|x| x.parse::<u64>().ok())
} else {
None
};
Ok::<_, Error>(SnapshotMeta {
last_modified,
..snapshot
})
}
})
.buffer_unordered(64);

let mut snapshots = vec![];

while let Some(snapshot) = futures.next().await {
snapshots.push(snapshot?);
}

snapshots
} else {
snapshots
};

progress.finish_with_message("done");

let total_size = total_size.load(std::sync::atomic::Ordering::SeqCst);
Expand Down
32 changes: 19 additions & 13 deletions src/stream_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use chrono::DateTime;

use crate::common::{Mission, SnapshotConfig, TransferURL};
use crate::error::{Error, Result};
use crate::traits::{SnapshotStorage, SourceStorage};
use crate::traits::{Key, Metadata, SnapshotStorage, SourceStorage};
use crate::utils::{hash_string, unix_time};
use futures_core::Stream;
use futures_util::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -75,13 +75,15 @@ pub struct ByteStream {
pub struct ByteStreamPipe<Source> {
pub source: Source,
pub buffer_path: String,
pub use_snapshot_last_modified: bool,
}

impl<Source> ByteStreamPipe<Source> {
pub fn new(source: Source, buffer_path: String) -> Self {
pub fn new(source: Source, buffer_path: String, use_snapshot_last_modified: bool) -> Self {
Self {
source,
buffer_path,
use_snapshot_last_modified,
}
}
}
Expand Down Expand Up @@ -112,7 +114,7 @@ where
#[async_trait]
impl<Snapshot, Source> SourceStorage<Snapshot, ByteStream> for ByteStreamPipe<Source>
where
Snapshot: Send + Sync + 'static,
Snapshot: Key + Metadata,
Source: SourceStorage<Snapshot, TransferURL>,
{
async fn get_object(&self, snapshot: &Snapshot, mission: &Mission) -> Result<ByteStream> {
Expand Down Expand Up @@ -143,15 +145,19 @@ where

let mut total_bytes: u64 = 0;
let content_length = response.content_length();
let modified_at = std::str::from_utf8(
response
.headers()
.get(reqwest::header::LAST_MODIFIED)
.unwrap()
.as_bytes(),
)
.unwrap()
.to_owned();
let modified_at = if self.use_snapshot_last_modified {
snapshot.last_modified().unwrap()
} else {
let header = std::str::from_utf8(
response
.headers()
.get(reqwest::header::LAST_MODIFIED)
.unwrap()
.as_bytes(),
)
.unwrap();
DateTime::parse_from_rfc2822(&header)?.timestamp() as u64
};

debug!(logger, "download: {} {:?}", transfer_url.0, content_length);

Expand Down Expand Up @@ -183,7 +189,7 @@ where
path: Some(path.into()),
},
length: total_bytes,
modified_at: DateTime::parse_from_rfc2822(&modified_at)?.timestamp() as u64,
modified_at,
})
}
}

0 comments on commit 20a8a8a

Please sign in to comment.