Skip to content

Commit

Permalink
Implemented download savefile
Browse files Browse the repository at this point in the history
  • Loading branch information
circlesabound committed Apr 15, 2024
1 parent 77fe150 commit a44ddf4
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 115 deletions.
93 changes: 61 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish = false
base64 = "0.22"
bincode = "1.3"
bytes = "1.0"
chrono = { version = "0.4", features = [ "serde" ] }
chrono = { version = "0.4.37", features = [ "serde" ] }
derive_more = "0.99"
env_logger = "0.11.3"
factorio-mod-settings-parser = { git = "https://github.com/circlesabound/factorio-mod-settings-parser", rev = "a9fecd6" }
Expand All @@ -36,6 +36,7 @@ tar = "0.4"
tokio = { version = "1.5", features = [ "full" ] }
tokio-stream = { version = "0.1", features = [ "sync" ] }
tokio-tungstenite = "0.21"
tokio-util = "0.7"
toml = "0.8.8"
url = "2"
urlencoding = "2.1.0"
Expand Down
49 changes: 42 additions & 7 deletions src/mgmt-server/link_download.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use std::{collections::HashMap, sync::Arc};
use chrono::{DateTime, Duration, Utc};
use log::info;
use tokio::sync::RwLock;
use tokio::{select, sync::RwLock};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

const CLEANUP_INTERVAL: Duration = Duration::minutes(15);
const LINK_EXPIRY: Duration = Duration::minutes(60);

type LinkMap = Arc<RwLock<HashMap<String, (LinkDownloadTarget, DateTime<Utc>)>>>;

pub struct LinkDownloadManager {
links: RwLock<HashMap<String, (LinkDownloadTarget, DateTime<Utc>)>>,
links: LinkMap,
_cleanup_task_ct: CancellationToken,
}

#[derive(Clone, Debug)]
Expand All @@ -14,10 +21,17 @@ pub enum LinkDownloadTarget {
}

impl LinkDownloadManager {
pub fn new() -> LinkDownloadManager {
// TODO periodic job to expire out links
pub async fn new() -> LinkDownloadManager {
let links = LinkMap::default();
let links_clone = Arc::clone(&links);
let cancellation_token = CancellationToken::new();
let _cleanup_task_ct = cancellation_token.clone();
tokio::spawn(async move {
Self::cleanup_job(links_clone, cancellation_token).await;
});
LinkDownloadManager {
links: RwLock::new(HashMap::new()),
links,
_cleanup_task_ct,
}
}

Expand All @@ -33,4 +47,25 @@ impl LinkDownloadManager {
let r_guard = self.links.read().await;
r_guard.get(&link).map(|(target, _dt)| target.clone())
}

async fn cleanup_job(links: LinkMap, cancellation_token: CancellationToken) {
loop {
select! {
_ = cancellation_token.cancelled() => {
break;
}
_ = tokio::time::sleep(CLEANUP_INTERVAL.to_std().unwrap()) => {
let mut w_guard = links.write().await;
let now = Utc::now();
w_guard.retain(|link, (target, dt)| {
let should_remove = now - *dt > LINK_EXPIRY;
if should_remove {
info!("Expiring download link: {} -> {:?}", link, target);
}
!should_remove
});
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/mgmt-server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.await?;

info!("Creating link download manager");
let link_download_manager = Arc::new(LinkDownloadManager::new());
let link_download_manager = Arc::new(LinkDownloadManager::new().await);

let ws_port = std::env::var("MGMT_SERVER_WS_PORT")?.parse()?;
let ws_addr = std::env::var("MGMT_SERVER_WS_ADDRESS")?.parse()?;
Expand Down
43 changes: 41 additions & 2 deletions src/mgmt-server/routes/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ use std::sync::Arc;

use crate::{clients::AgentApiClient, error::{Error, Result}, link_download::{LinkDownloadManager, LinkDownloadTarget}};

use fctrl::schema::{AgentOutMessage, AgentResponseWithId};
use log::{error, info};
use rocket::{get, response::stream::ByteStream, State};
use tokio_stream::StreamExt;

use super::DownloadResponder;

#[get("/<link_id>")]
pub async fn download(
agent_client: &State<Arc<AgentApiClient>>,
link_download_manager: &State<Arc<LinkDownloadManager>>,
link_id: String,
) -> Result<ByteStream![Vec<u8>]> {
) -> Result<DownloadResponder<ByteStream![Vec<u8>]>> {
match link_download_manager.get_link(link_id).await {
Some(target) => match target {
LinkDownloadTarget::Savefile { id } => crate::routes::server::get_savefile_real(
LinkDownloadTarget::Savefile { id } => download_savefile(
agent_client,
id,
).await,
Expand All @@ -21,3 +26,37 @@ pub async fn download(
}
}

pub async fn download_savefile(
agent_client: &State<Arc<AgentApiClient>>,
id: String,
) -> Result<DownloadResponder<ByteStream![Vec<u8>]>> {
let (_operation_id, sub) = agent_client.save_get(id.clone()).await?;
let download_filename = format!("{}.zip", &id);
// TODO figure out how to properly handle errors
let s = sub.filter_map(|event| {
match serde_json::from_str::<AgentResponseWithId>(&event.content) {
Ok(m) => {
match m.content {
AgentOutMessage::SaveFile(sb) => {
if sb.bytes.len() == 0 {
info!("get_savefile completed with total multiparts = {:?}", sb.multipart_seqnum);
None
} else {
Some(sb.bytes)
}
}
c => {
error!("Expected AgentOutMessage::SaveFile during get_savefile, got something else: {:?}", c);
None
},
}
}
Err(e) => {
error!("Error deserialising event content during get_savefile: {:?}", e);
None
}
}
});

Ok(DownloadResponder::new(ByteStream::from(s), download_filename))
}
Loading

0 comments on commit a44ddf4

Please sign in to comment.