Skip to content

Commit

Permalink
conda: force transfer repodata (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
skyzh authored Mar 5, 2021
1 parent a8ad4a4 commit 68409ea
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 21 deletions.
35 changes: 21 additions & 14 deletions src/conda.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::common::{Mission, SnapshotConfig, SnapshotPath, TransferURL};
use crate::common::{Mission, SnapshotConfig, TransferURL};
use crate::error::{Error, Result};
use crate::metadata::SnapshotMeta;
use crate::traits::{SnapshotStorage, SourceStorage};

use async_trait::async_trait;
Expand All @@ -25,11 +26,18 @@ pub struct Conda {
repos: CondaRepos,
}

fn parse_index(data: &[u8]) -> Result<Vec<String>> {
fn parse_index(repo: &str, data: &[u8]) -> Result<Vec<SnapshotMeta>> {
let v: JsonValue = serde_json::from_slice(data)?;
let mut result = vec![];

let package_mapper = |(key, _value): (&String, &JsonValue)| key.clone();
let package_mapper = |(key, value): (&String, &JsonValue)| SnapshotMeta {
key: format!("{}/{}", repo, key),
size: value.get("size").map(|x| x.as_u64().unwrap()),
last_modified: None,
checksum_method: value.get("sha256").map(|_| "sha256".to_string()),
checksum: value.get("sha256").map(|x| x.as_str().unwrap().to_owned()),
force: None,
};

if let Some(JsonValue::Object(map)) = v.get("packages") {
result.append(&mut map.iter().map(package_mapper).collect());
Expand All @@ -56,12 +64,12 @@ impl std::fmt::Debug for Conda {
}

#[async_trait]
impl SnapshotStorage<SnapshotPath> for Conda {
impl SnapshotStorage<SnapshotMeta> for Conda {
async fn snapshot(
&mut self,
mission: Mission,
_config: &SnapshotConfig,
) -> Result<Vec<SnapshotPath>> {
) -> Result<Vec<SnapshotMeta>> {
let logger = mission.logger;
let progress = mission.progress;
let client = mission.client;
Expand All @@ -78,13 +86,13 @@ impl SnapshotStorage<SnapshotPath> for Conda {
let mut snapshot = vec![];
let repodata = format!("{}/{}/repodata.json", base, repo);
let index_data = client.get(&repodata).send().await?.bytes().await?;
let packages = parse_index(&index_data)?;
snapshot.extend(packages.into_iter().map(|pkg| format!("{}/{}", repo, pkg)));
let packages = parse_index(&repo, &index_data)?;
snapshot.extend(packages.into_iter());
progress.set_message(&repo);
snapshot.append(&mut vec![
"repodata.json".to_string(),
"repodata.json.bz2".to_string(),
"current_repodata.json".to_string(),
SnapshotMeta::force(format!("{}/{}/repodata.json", base, repo)),
SnapshotMeta::force(format!("{}/{}/repodata.json.bz2", base, repo)),
SnapshotMeta::force(format!("{}/{}/current_repodata.json", base, repo)),
]);
Ok::<_, Error>(snapshot)
};
Expand All @@ -105,7 +113,6 @@ impl SnapshotStorage<SnapshotPath> for Conda {
.await?
.into_iter()
.flatten()
.map(SnapshotPath)
.collect::<Vec<_>>();

Ok(snapshots)
Expand All @@ -117,8 +124,8 @@ impl SnapshotStorage<SnapshotPath> for Conda {
}

#[async_trait]
impl SourceStorage<SnapshotPath, TransferURL> for Conda {
async fn get_object(&self, snapshot: &SnapshotPath, _mission: &Mission) -> Result<TransferURL> {
Ok(TransferURL(format!("{}/{}", self.repos.base, snapshot.0)))
impl SourceStorage<SnapshotMeta, TransferURL> for Conda {
async fn get_object(&self, snapshot: &SnapshotMeta, _mission: &Mission) -> Result<TransferURL> {
Ok(TransferURL(format!("{}/{}", self.repos.base, snapshot.key)))
}
}
1 change: 0 additions & 1 deletion src/file_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::traits::{Key, SnapshotStorage, TargetStorage};
use crate::{
common::{Mission, SnapshotConfig, SnapshotPath},
metadata::SnapshotMeta,
opts::Target,
};

use async_trait::async_trait;
Expand Down
13 changes: 12 additions & 1 deletion src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@ use crate::common::{Mission, SnapshotConfig, SnapshotPath};
use crate::error::Result;
use crate::traits::{Diff, Key, SnapshotStorage};

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct SnapshotMeta {
pub key: String,
pub size: Option<u64>,
pub last_modified: Option<u64>,
pub checksum_method: Option<String>,
pub checksum: Option<String>,
pub force: Option<bool>,
}

impl SnapshotMeta {
pub fn force(key: String) -> Self {
Self {
key,
force: Some(true),
..Default::default()
}
}
}

pub struct MetaAsPath<Source: SnapshotStorage<SnapshotMeta> + std::fmt::Debug + std::marker::Send> {
Expand Down
3 changes: 1 addition & 2 deletions src/rsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ impl SnapshotStorage<SnapshotMeta> for Rsync {
key: file.to_string(),
size: Some(size.parse().unwrap()),
last_modified: Some(datetime.timestamp() as u64),
checksum_method: None,
checksum: None,
..Default::default()
};
snapshot.push(meta);
}
Expand Down
4 changes: 1 addition & 3 deletions src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ impl SnapshotStorage<SnapshotMeta> for S3Backend {
snapshot.push(SnapshotMeta {
key,
size: item.size.map(|x| x as u64),
last_modified: None,
checksum: None,
checksum_method: None,
..Default::default()
});
} else {
warn!(logger, "prefix not match {}", key);
Expand Down

0 comments on commit 68409ea

Please sign in to comment.