Skip to content

Commit

Permalink
Return ObjectReader in priority order (kixelated#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Apr 2, 2024
1 parent 04bc2be commit 022092c
Showing 1 changed file with 43 additions and 12 deletions.
55 changes: 43 additions & 12 deletions moq-transport/src/serve/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! You can clone the [Reader] and each will read a copy of of all future chunks. (fanout)
//!
//! The fragment is closed with [ServeError::Closed] when all writers or readers are dropped.
use std::{cmp, ops::Deref, sync::Arc};
use std::{cmp, collections::BinaryHeap, ops::Deref, sync::Arc};

use super::{ServeError, Track};
use crate::util::State;
Expand Down Expand Up @@ -36,9 +36,6 @@ struct ObjectsState {
// The latest group.
objects: Vec<ObjectReader>,

// Increased each time objects changes.
epoch: usize,

// Can be sent by the writer with an explicit error code.
closed: Result<(), ServeError>,
}
Expand All @@ -47,7 +44,6 @@ impl Default for ObjectsState {
fn default() -> Self {
Self {
objects: Vec::new(),
epoch: 0,
closed: Ok(()),
}
}
Expand Down Expand Up @@ -87,7 +83,6 @@ impl ObjectsWriter {
}

state.objects.push(reader);
state.epoch += 1;

Ok(writer)
}
Expand Down Expand Up @@ -115,22 +110,33 @@ impl Deref for ObjectsWriter {
pub struct ObjectsReader {
state: State<ObjectsState>,
pub track: Arc<Track>,
epoch: usize,
index: usize,

// The objects ready to be returned
pending: BinaryHeap<ObjectReader>,
}

impl ObjectsReader {
fn new(state: State<ObjectsState>, track: Arc<Track>) -> Self {
Self { state, track, epoch: 0 }
Self {
state,
track,
index: 0,
pending: BinaryHeap::new(),
}
}

pub async fn next(&mut self) -> Result<Option<ObjectReader>, ServeError> {
loop {
let notify = {
let state = self.state.lock();
if self.epoch < state.epoch {
let index = state.objects.len().saturating_sub(state.epoch - self.epoch);
self.epoch = state.epoch - state.objects.len() + index + 1;
return Ok(Some(state.objects[index].clone()));
for object in &state.objects[self.index..] {
self.pending.push(object.clone());
}
self.index = state.objects.len();

if let Some(object) = self.pending.pop() {
return Ok(Some(object));
}

state.closed.clone()?;
Expand Down Expand Up @@ -332,3 +338,28 @@ impl Deref for ObjectReader {
&self.info
}
}

// Return object readers in priority order ascending, otherwise group descending, otherwise object ascending.
impl Ord for ObjectReader {
fn cmp(&self, other: &Self) -> cmp::Ordering {
other
.priority
.cmp(&self.priority) // Ascending order
.then_with(|| self.group_id.cmp(&other.group_id)) // Descending order
.then_with(|| other.object_id.cmp(&self.object_id)) // Ascending order
}
}

impl PartialOrd for ObjectReader {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}

impl PartialEq for ObjectReader {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == cmp::Ordering::Equal
}
}

impl Eq for ObjectReader {}

0 comments on commit 022092c

Please sign in to comment.