Skip to content

Commit

Permalink
editoast, osrdyne: fix the multi infra status response
Browse files Browse the repository at this point in the history
Signed-off-by: Younes Khoudli <younes.khoudli@epita.fr>
  • Loading branch information
Khoyo committed Oct 1, 2024
1 parent 91021c9 commit 0861e3d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 40 deletions.
40 changes: 29 additions & 11 deletions editoast/editoast_osrdyne_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ pub enum WorkerStatus {
Error,
}

fn combine_worker_status(a: WorkerStatus, b: WorkerStatus) -> WorkerStatus {
match (a, b) {
(WorkerStatus::Ready, _) | (_, WorkerStatus::Ready) => WorkerStatus::Ready,
(WorkerStatus::Started, _) | (_, WorkerStatus::Started) => WorkerStatus::Started,
(WorkerStatus::Error, _) | (_, WorkerStatus::Error) => WorkerStatus::Error,
(WorkerStatus::Unscheduled, WorkerStatus::Unscheduled) => WorkerStatus::Unscheduled,
}
}

#[derive(Clone, Copy, Debug, Deserialize)]
pub enum OsrdyneWorkerStatus {
Loading,
Expand Down Expand Up @@ -110,6 +119,12 @@ struct OsrdyneStatusResponse {
#[derive(Deserialize)]
struct OsrdyneWorkerState {
status: OsrdyneWorkerStatus,
worker_metadata: OsrdyneWorkerMetadata,
}

#[derive(Deserialize)]
struct OsrdyneWorkerMetadata {
worker_key: String,
}

impl HTTPClient {
Expand All @@ -123,25 +138,28 @@ impl HTTPClient {
#[expect(unstable_name_collisions)] // intersperse
let encoded_keys = keys
.iter()
.map(|key| urlencoding::encode(key.as_ref()))
.intersperse(std::borrow::Cow::Borrowed(","))
.map(|key| format!("keys={}", urlencoding::encode(key.as_ref())))
.intersperse("&".to_string())
.collect::<String>();

let url = self
.base_url
.join(&format!("/status?keys={encoded_keys}"))?;
let url = self.base_url.join(&format!("/status?{encoded_keys}"))?;
let response = self.client.get(url).send().await?;
let response: OsrdyneStatusResponse = response.json().await?;
let response = response
.workers
.into_iter()
.map(|(k, v)| (k, v.status))
.collect::<HashMap<_, _>>();

// Build the map of worker status by key
let mut statuses = HashMap::new();
for worker in response.workers.into_values() {
let status = worker.status.into();
statuses
.entry(worker.worker_metadata.worker_key)
.and_modify(|s| *s = combine_worker_status(*s, status))
.or_insert(status);
}

Ok(keys
.iter()
.map(|key| {
let status = response
let status = statuses
.get(key.as_ref())
.copied()
.map(WorkerStatus::from)
Expand Down
33 changes: 17 additions & 16 deletions osrdyne/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ struct WorkerState {

#[derive(Clone)]
struct AppState {
known_workers: Arc<Mutex<HashMap<Vec<u8>, WorkerState>>>,
known_workers: Arc<Mutex<HashMap<String, WorkerState>>>,
is_noop: bool,
}

pub async fn create_server(
addr: String,
known_workers: watch::Receiver<Arc<Vec<WorkerMetadata>>>,
worker_status: watch::Receiver<Arc<HashMap<Vec<u8>, WorkerStatus>>>,
worker_status: watch::Receiver<Arc<HashMap<String, WorkerStatus>>>,
is_noop: bool,
) {
let app_state = AppState {
Expand Down Expand Up @@ -67,7 +67,7 @@ pub async fn create_server(
async fn app_state_updater(
state: AppState,
mut known_workers_recv: watch::Receiver<Arc<Vec<WorkerMetadata>>>,
mut worker_status_recv: watch::Receiver<Arc<HashMap<Vec<u8>, WorkerStatus>>>,
mut worker_status_recv: watch::Receiver<Arc<HashMap<String, WorkerStatus>>>,
) {
let mut known_workers = Arc::new(vec![]);
let mut worker_status = Arc::new(HashMap::new());
Expand All @@ -90,9 +90,9 @@ async fn app_state_updater(
}
let mut known_workers_with_status = HashMap::new();
for worker in known_workers.iter() {
let status = worker_status.get(&worker.worker_id.to_string().into_bytes());
let status = worker_status.get(&worker.worker_id.to_string());
known_workers_with_status.insert(
worker.worker_id.to_string().into_bytes(),
worker.worker_id.to_string(),
WorkerState {
worker_metadata: Some(worker.clone()),
status: status.cloned().unwrap_or(WorkerStatus::Loading),
Expand All @@ -115,7 +115,7 @@ async fn health_check() -> Json<HealthCheckResponse> {

#[derive(Serialize)]
struct ListWorkersResponse {
workers: HashMap<Key, WorkerState>,
workers: HashMap<String, WorkerState>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -151,20 +151,21 @@ async fn list_workers(
.into_iter()
.map(|key| {
(
key,
key.encode(),
WorkerState {
worker_metadata: None,
worker_metadata: Some(WorkerMetadata {
external_id: "noop".to_string(),
worker_id: uuid::Uuid::nil(),
worker_key: key,
}),
// In noop mode, we can't track the worker states.
// We consider them always ready, as this mode is only used when debugging.
status: WorkerStatus::Ready,
},
)
})
.collect(),
(None, _) => latest_known_workers
.into_iter()
.map(|(k, s)| (Key::from(k.as_ref()), s))
.collect(),
(None, _) => latest_known_workers.into_iter().collect(),
};
Ok(Json(ListWorkersResponse {
workers: filtered_workers,
Expand All @@ -173,19 +174,19 @@ async fn list_workers(

fn filter_workers(
keys: Vec<Key>,
latest_known_workers: HashMap<Vec<u8>, WorkerState>,
) -> Result<HashMap<Key, WorkerState>, ListWorkerError> {
latest_known_workers: HashMap<String, WorkerState>,
) -> Result<HashMap<String, WorkerState>, ListWorkerError> {
let keys_set: HashSet<_> = keys.into_iter().collect();
let mut filtered_workers = HashMap::new();
for (_, s) in latest_known_workers.into_iter() {
for (k, s) in latest_known_workers.into_iter() {
let worker_key = s
.worker_metadata
.as_ref()
.ok_or(ListWorkerError::MissingWorkerMetadata)?
.worker_key
.clone();
if keys_set.contains(&worker_key) {
filtered_workers.insert(worker_key, s);
filtered_workers.insert(k, s);
}
}
Ok(filtered_workers)
Expand Down
8 changes: 6 additions & 2 deletions osrdyne/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ async fn orphan_processor(
pub struct ActivityMessage {
pub kind: ActivityMessageKind,
pub worker_key: Key,
pub worker_id: Vec<u8>,
pub worker_id: String,
}

pub enum ActivityMessageKind {
Expand Down Expand Up @@ -431,6 +431,10 @@ async fn activity_processor(
.and_then(|v| v.as_long_string().map(|s| s.as_bytes()));

if let Some(worker_id) = worker_id {
let Ok(worker_id) = String::from_utf8(worker_id.to_vec()) else {
continue;
};

let kind = headers
.as_ref()
.and_then(|h| h.inner().get("x-event"))
Expand All @@ -442,7 +446,7 @@ async fn activity_processor(
let activity = ActivityMessage {
kind,
worker_key: key.clone(),
worker_id: worker_id.to_owned(),
worker_id,
};
status_tracker.send(activity).await?;
}
Expand Down
20 changes: 9 additions & 11 deletions osrdyne/src/status_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub enum WorkerStatus {
pub async fn status_tracker(
mut known_workers_watch: tokio::sync::watch::Receiver<Arc<Vec<WorkerMetadata>>>,
mut activity_receiver: tokio::sync::mpsc::Receiver<ActivityMessage>,
worker_status_watch: tokio::sync::watch::Sender<Arc<HashMap<Vec<u8>, WorkerStatus>>>,
worker_status_watch: tokio::sync::watch::Sender<Arc<HashMap<String, WorkerStatus>>>,
) {
let mut worker_states = HashMap::<Vec<u8>, WorkerStatus>::new();
let mut worker_states = HashMap::<String, WorkerStatus>::new();
let mut first_run = true;
loop {
select! {
Expand All @@ -29,20 +29,18 @@ pub async fn status_tracker(
}
let known_workers = known_workers_watch.borrow_and_update().clone();

let known_workers_ids = HashSet::<Vec<u8>>::from_iter(
let known_workers_ids = HashSet::<String>::from_iter(
known_workers
.iter()
.map(|w: &WorkerMetadata| w.worker_id.to_string().into_bytes()),
.map(|w: &WorkerMetadata| w.worker_id.to_string()),
);
worker_states.retain(|id, _| known_workers_ids.contains(id));

for worker in known_workers.iter() {
let worker_id = worker.worker_id.to_string().into_bytes();
if !worker_states.contains_key(&worker_id) {
// If this is the first time we have a worker list, mark all workers as ready
// They may have been loaded from a previous OSRDyne run
worker_states.insert(worker_id.to_vec(), if !first_run {WorkerStatus::Loading} else {WorkerStatus::Ready});
}
let worker_id = worker.worker_id.to_string();
// If this is the first time we have a worker list, mark all workers as ready
// They may have been loaded from a previous OSRDyne run
worker_states.entry(worker_id).or_insert_with(|| if !first_run {WorkerStatus::Loading} else {WorkerStatus::Ready});
}
first_run = false;
},
Expand All @@ -51,7 +49,7 @@ pub async fn status_tracker(
match activity.kind {
ActivityMessageKind::Ready => {
if !worker_states.contains_key(&activity.worker_id) {
log::warn!("Received Ready message for unknown worker {}", String::from_utf8_lossy(&activity.worker_id));
log::warn!("Received Ready message for unknown worker {}", activity.worker_id);
}
worker_states.insert(activity.worker_id, WorkerStatus::Ready);
}
Expand Down

0 comments on commit 0861e3d

Please sign in to comment.