diff --git a/editoast/editoast_osrdyne_client/src/lib.rs b/editoast/editoast_osrdyne_client/src/lib.rs index e873d5d6544..9d0bfc65ca9 100644 --- a/editoast/editoast_osrdyne_client/src/lib.rs +++ b/editoast/editoast_osrdyne_client/src/lib.rs @@ -54,6 +54,15 @@ pub enum WorkerStatus { Error, } +fn combine_worker_status(a: WorkerStatus, b: WorkerStatus) -> WorkerStatus { + match (a, b) { + (WorkerStatus::Ready, _) | (_, WorkerStatus::Ready) => WorkerStatus::Ready, + (WorkerStatus::Started, _) | (_, WorkerStatus::Started) => WorkerStatus::Started, + (WorkerStatus::Error, _) | (_, WorkerStatus::Error) => WorkerStatus::Error, + (WorkerStatus::Unscheduled, WorkerStatus::Unscheduled) => WorkerStatus::Unscheduled, + } +} + #[derive(Clone, Copy, Debug, Deserialize)] pub enum OsrdyneWorkerStatus { Loading, @@ -110,6 +119,12 @@ struct OsrdyneStatusResponse { #[derive(Deserialize)] struct OsrdyneWorkerState { status: OsrdyneWorkerStatus, + worker_metadata: OsrdyneWorkerMetadata, +} + +#[derive(Deserialize)] +struct OsrdyneWorkerMetadata { + worker_key: String, } impl HTTPClient { @@ -123,25 +138,28 @@ impl HTTPClient { #[expect(unstable_name_collisions)] // intersperse let encoded_keys = keys .iter() - .map(|key| urlencoding::encode(key.as_ref())) - .intersperse(std::borrow::Cow::Borrowed(",")) + .map(|key| format!("keys={}", urlencoding::encode(key.as_ref()))) + .intersperse("&".to_string()) .collect::(); - let url = self - .base_url - .join(&format!("/status?keys={encoded_keys}"))?; + let url = self.base_url.join(&format!("/status?{encoded_keys}"))?; let response = self.client.get(url).send().await?; let response: OsrdyneStatusResponse = response.json().await?; - let response = response - .workers - .into_iter() - .map(|(k, v)| (k, v.status)) - .collect::>(); + + // Build the map of worker status by key + let mut statuses = HashMap::new(); + for worker in response.workers.into_values() { + let status = worker.status.into(); + statuses + .entry(worker.worker_metadata.worker_key) + .and_modify(|s| *s = combine_worker_status(*s, status)) + .or_insert(status); + } Ok(keys .iter() .map(|key| { - let status = response + let status = statuses .get(key.as_ref()) .copied() .map(WorkerStatus::from) diff --git a/osrdyne/src/api.rs b/osrdyne/src/api.rs index ccba22fea20..6ece6530fe6 100644 --- a/osrdyne/src/api.rs +++ b/osrdyne/src/api.rs @@ -24,14 +24,14 @@ struct WorkerState { #[derive(Clone)] struct AppState { - known_workers: Arc, WorkerState>>>, + known_workers: Arc>>, is_noop: bool, } pub async fn create_server( addr: String, known_workers: watch::Receiver>>, - worker_status: watch::Receiver, WorkerStatus>>>, + worker_status: watch::Receiver>>, is_noop: bool, ) { let app_state = AppState { @@ -67,7 +67,7 @@ pub async fn create_server( async fn app_state_updater( state: AppState, mut known_workers_recv: watch::Receiver>>, - mut worker_status_recv: watch::Receiver, WorkerStatus>>>, + mut worker_status_recv: watch::Receiver>>, ) { let mut known_workers = Arc::new(vec![]); let mut worker_status = Arc::new(HashMap::new()); @@ -90,9 +90,9 @@ async fn app_state_updater( } let mut known_workers_with_status = HashMap::new(); for worker in known_workers.iter() { - let status = worker_status.get(&worker.worker_id.to_string().into_bytes()); + let status = worker_status.get(&worker.worker_id.to_string()); known_workers_with_status.insert( - worker.worker_id.to_string().into_bytes(), + worker.worker_id.to_string(), WorkerState { worker_metadata: Some(worker.clone()), status: status.cloned().unwrap_or(WorkerStatus::Loading), @@ -115,7 +115,7 @@ async fn health_check() -> Json { #[derive(Serialize)] struct ListWorkersResponse { - workers: HashMap, + workers: HashMap, } #[derive(Deserialize)] @@ -151,9 +151,13 @@ async fn list_workers( .into_iter() .map(|key| { ( - key, + key.encode(), WorkerState { - worker_metadata: None, + worker_metadata: Some(WorkerMetadata { + external_id: "noop".to_string(), + worker_id: uuid::Uuid::nil(), + worker_key: key, + }), // In noop mode, we can't track the worker states. // We consider them always ready, as this mode is only used when debugging. status: WorkerStatus::Ready, @@ -161,10 +165,7 @@ async fn list_workers( ) }) .collect(), - (None, _) => latest_known_workers - .into_iter() - .map(|(k, s)| (Key::from(k.as_ref()), s)) - .collect(), + (None, _) => latest_known_workers.into_iter().collect(), }; Ok(Json(ListWorkersResponse { workers: filtered_workers, @@ -173,11 +174,11 @@ async fn list_workers( fn filter_workers( keys: Vec, - latest_known_workers: HashMap, WorkerState>, -) -> Result, ListWorkerError> { + latest_known_workers: HashMap, +) -> Result, ListWorkerError> { let keys_set: HashSet<_> = keys.into_iter().collect(); let mut filtered_workers = HashMap::new(); - for (_, s) in latest_known_workers.into_iter() { + for (k, s) in latest_known_workers.into_iter() { let worker_key = s .worker_metadata .as_ref() @@ -185,7 +186,7 @@ fn filter_workers( .worker_key .clone(); if keys_set.contains(&worker_key) { - filtered_workers.insert(worker_key, s); + filtered_workers.insert(k, s); } } Ok(filtered_workers) diff --git a/osrdyne/src/pool.rs b/osrdyne/src/pool.rs index 75a9226d6d4..6d083e286d3 100644 --- a/osrdyne/src/pool.rs +++ b/osrdyne/src/pool.rs @@ -376,7 +376,7 @@ async fn orphan_processor( pub struct ActivityMessage { pub kind: ActivityMessageKind, pub worker_key: Key, - pub worker_id: Vec, + pub worker_id: String, } pub enum ActivityMessageKind { @@ -431,6 +431,10 @@ async fn activity_processor( .and_then(|v| v.as_long_string().map(|s| s.as_bytes())); if let Some(worker_id) = worker_id { + let Ok(worker_id) = String::from_utf8(worker_id.to_vec()) else { + continue; + }; + let kind = headers .as_ref() .and_then(|h| h.inner().get("x-event")) @@ -442,7 +446,7 @@ async fn activity_processor( let activity = ActivityMessage { kind, worker_key: key.clone(), - worker_id: worker_id.to_owned(), + worker_id, }; status_tracker.send(activity).await?; } diff --git a/osrdyne/src/status_tracker.rs b/osrdyne/src/status_tracker.rs index a437b3f4e7f..ecc048c7cf0 100644 --- a/osrdyne/src/status_tracker.rs +++ b/osrdyne/src/status_tracker.rs @@ -16,9 +16,9 @@ pub enum WorkerStatus { pub async fn status_tracker( mut known_workers_watch: tokio::sync::watch::Receiver>>, mut activity_receiver: tokio::sync::mpsc::Receiver, - worker_status_watch: tokio::sync::watch::Sender, WorkerStatus>>>, + worker_status_watch: tokio::sync::watch::Sender>>, ) { - let mut worker_states = HashMap::, WorkerStatus>::new(); + let mut worker_states = HashMap::::new(); let mut first_run = true; loop { select! { @@ -29,20 +29,18 @@ pub async fn status_tracker( } let known_workers = known_workers_watch.borrow_and_update().clone(); - let known_workers_ids = HashSet::>::from_iter( + let known_workers_ids = HashSet::::from_iter( known_workers .iter() - .map(|w: &WorkerMetadata| w.worker_id.to_string().into_bytes()), + .map(|w: &WorkerMetadata| w.worker_id.to_string()), ); worker_states.retain(|id, _| known_workers_ids.contains(id)); for worker in known_workers.iter() { - let worker_id = worker.worker_id.to_string().into_bytes(); - if !worker_states.contains_key(&worker_id) { - // If this is the first time we have a worker list, mark all workers as ready - // They may have been loaded from a previous OSRDyne run - worker_states.insert(worker_id.to_vec(), if !first_run {WorkerStatus::Loading} else {WorkerStatus::Ready}); - } + let worker_id = worker.worker_id.to_string(); + // If this is the first time we have a worker list, mark all workers as ready + // They may have been loaded from a previous OSRDyne run + worker_states.entry(worker_id).or_insert_with(|| if !first_run {WorkerStatus::Loading} else {WorkerStatus::Ready}); } first_run = false; }, @@ -51,7 +49,7 @@ pub async fn status_tracker( match activity.kind { ActivityMessageKind::Ready => { if !worker_states.contains_key(&activity.worker_id) { - log::warn!("Received Ready message for unknown worker {}", String::from_utf8_lossy(&activity.worker_id)); + log::warn!("Received Ready message for unknown worker {}", activity.worker_id); } worker_states.insert(activity.worker_id, WorkerStatus::Ready); }