Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Commit

Permalink
Implement dynamic tracks addition with renegotiation (#105, #27)
Browse files Browse the repository at this point in the history
- support adding new endpoints to the already interconnected Members
- implement PeerConnection renegotiation
- add TracksApplied event
  • Loading branch information
evdokimovs authored Jun 26, 2020
1 parent ba91090 commit 55cb227
Show file tree
Hide file tree
Showing 27 changed files with 1,228 additions and 533 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ All user visible changes to this project will be documented in this file. This p
- Send reason of closing WebSocket connection as [Close](https://tools.ietf.org/html/rfc4566#section-5.14) frame's description ([#58]);
- Send `Event::RpcSettingsUpdated` when `Member` connects ([#75]);
- Send relay mode in `Event::PeerCreated` which is used for configuring client's `RtcIceTransportPolicy` ([#79]);
- Send `Command::UpdateTracks` on `Event::TracksUpdated` ([#81]).
- Emit `TracksApplied` event to create new and update existing tracks ([#105]);
- `PeerConnection` renegotiation functionality ([#105]).
- [Coturn] integration:
- [Coturn] sessions destroying ([#84]);
- [Coturn] stats processing ([#94]).
Expand All @@ -55,7 +56,8 @@ All user visible changes to this project will be documented in this file. This p
### Fixed

- Signalling:
- Room crashing when handling commands with non-existent `peer_id` ([#86]).
- Room crashing when handling commands with non-existent `peer_id` ([#86]);
- Adding new endpoints to the already interconnected `Member`s ([#105]).

[#28]: /../../pull/28
[#33]: /../../pull/33
Expand All @@ -70,6 +72,7 @@ All user visible changes to this project will be documented in this file. This p
[#94]: /../../pull/94
[#95]: /../../pull/95
[#98]: /../../pull/98
[#105]: /../../pull/105



Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -703,19 +703,6 @@ It's recommended to cache `Peer` ID and `Member` ID relations in `Web Client`'s
```
</details>

#### 10. TracksUpdated

`Media Server` notifies about necessity to update `Track`s in specified `Peer`.

Can be used to update existing `Track` settings (e.g. change to lower video resolution, mute audio).

```rust
struct TracksUpdated {
peer_id: PeerId,
tracks_patches: Vec<TrackPatch>,
}
```


### Commands

Expand Down Expand Up @@ -1102,7 +1089,7 @@ Metrics list will be extended as needed.

#### 10. UpdateTracks

`Web Client` asks permission to update `Track`s in specified `Peer`. `Media Server` gives permission by sending `Event::TracksUpdated`.
`Web Client` asks permission to update `Track`s in specified `Peer`. `Media Server` gives permission by sending `Event::TracksApplied`.

```rust
struct UpdateTracks {
Expand Down
5 changes: 4 additions & 1 deletion jason/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ All user visible changes to this project will be documented in this file. This p
- Emitting of RPC commands:
- `AddPeerConnectionMetrics` with `IceConnectionState` and `PeerConnectionState` ([#71], [#87]);
- `ApplyTracks` for muting/unmuting ([#81]);
- `AddPeerConnectionStats` with `RtcStats` ([#90]).
- `AddPeerConnectionStats` with `RtcStats` ([#90]);
- Handling of RPC events:
- `TracksApplied` ([#105]).
- Error handling:
- Library API:
- `JasonError` as library error with trace information and underlying JS error if it is the cause ([#55])
Expand All @@ -80,6 +82,7 @@ All user visible changes to this project will be documented in this file. This p
[#87]: /../../pull/87
[#90]: /../../pull/90
[#97]: /../../pull/97
[#105]: /../../pull/105
[#106]: /../../pull/106


Expand Down
118 changes: 106 additions & 12 deletions jason/src/api/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use futures::{
use js_sys::Promise;
use medea_client_api_proto::{
Command, Direction, Event as RpcEvent, EventHandler, IceCandidate,
IceConnectionState, IceServer, PeerConnectionState, PeerId, PeerMetrics,
Track, TrackId, TrackPatch,
IceConnectionState, IceServer, NegotiationRole, PeerConnectionState,
PeerId, PeerMetrics, Track, TrackId, TrackPatch, TrackUpdate,
};
use tracerr::Traced;
use wasm_bindgen::{prelude::*, JsValue};
Expand Down Expand Up @@ -780,7 +780,7 @@ impl EventHandler for InnerRoom {
fn on_peer_created(
&mut self,
peer_id: PeerId,
sdp_offer: Option<String>,
negotiation_role: NegotiationRole,
tracks: Vec<Track>,
ice_servers: Vec<IceServer>,
is_force_relayed: bool,
Expand All @@ -804,13 +804,16 @@ impl EventHandler for InnerRoom {

self.create_connections_from_tracks(&tracks);

// TODO(alexlapa): Eliminate code duplication (on_tracks_applied).
// Doing Room refactoring in another PR, it`ll allow
// to fix this smoothly.
let local_stream_constraints = self.local_stream_settings.clone();
let rpc = Rc::clone(&self.rpc);
let error_callback = Rc::clone(&self.on_failed_local_stream);
spawn_local(
async move {
match sdp_offer {
None => {
match negotiation_role {
NegotiationRole::Offerer => {
let sdp_offer = peer
.get_offer(tracks, local_stream_constraints)
.await
Expand All @@ -826,7 +829,7 @@ impl EventHandler for InnerRoom {
mids,
});
}
Some(offer) => {
NegotiationRole::Answerer(offer) => {
let sdp_answer = peer
.process_offer(
offer,
Expand Down Expand Up @@ -916,16 +919,107 @@ impl EventHandler for InnerRoom {
});
}

/// Updates [`Track`]s of this [`Room`].
fn on_tracks_updated(&mut self, peer_id: PeerId, tracks: Vec<TrackPatch>) {
if let Some(peer) = self.peers.get(peer_id) {
if let Err(err) = peer.update_senders(tracks) {
JasonError::from(err).print();
}
/// Creates new `Track`s, updates existing [`Sender`]s/[`Receiver`]s with
/// [`TrackUpdate`]s.
///
/// Will start renegotiation process if `Some` [`NegotiationRole`] is
/// provided.
fn on_tracks_applied(
&mut self,
peer_id: PeerId,
updates: Vec<TrackUpdate>,
negotiation_role: Option<NegotiationRole>,
) {
let peer = if let Some(peer) = self.peers.get(peer_id) {
peer
} else {
JasonError::from(tracerr::new!(RoomError::NoSuchPeer(peer_id)))
.print();
return;
};

let mut new_tracks = Vec::new();
let mut patches = Vec::new();

for update in updates {
match update {
TrackUpdate::Added(track) => {
new_tracks.push(track);
}
TrackUpdate::Updated(track_patch) => {
patches.push(track_patch);
}
}
}
if let Err(err) = peer.update_senders(patches) {
JasonError::from(err).print();
return;
}

// TODO(alexlapa): Eliminate code duplication (on_peer_created).
// Doing Room refactoring in another PR, it`ll allow
// to fix this smoothly.
let local_stream_constraints = self.local_stream_settings.clone();
let rpc = Rc::clone(&self.rpc);
let error_callback = Rc::clone(&self.on_failed_local_stream);
spawn_local(
async move {
match negotiation_role {
None => {
peer.create_tracks(new_tracks)
.await
.map_err(tracerr::map_from_and_wrap!())?;
}
Some(NegotiationRole::Offerer) => {
let sdp_offer = peer
.get_offer(new_tracks, local_stream_constraints)
.await
.map_err(tracerr::map_from_and_wrap!())?;
let mids = peer
.get_mids()
.map_err(tracerr::map_from_and_wrap!())?;
let senders_statuses = peer.get_senders_statuses();
rpc.send_command(Command::MakeSdpOffer {
peer_id,
sdp_offer,
senders_statuses,
mids,
});
}
Some(NegotiationRole::Answerer(offer)) => {
let sdp_answer = peer
.process_offer(
offer,
new_tracks,
local_stream_constraints,
)
.await
.map_err(tracerr::map_from_and_wrap!())?;
let senders_statuses = peer.get_senders_statuses();
rpc.send_command(Command::MakeSdpAnswer {
peer_id,
sdp_answer,
senders_statuses,
});
}
};
Result::<_, Traced<RoomError>>::Ok(())
}
.then(|result| async move {
if let Err(err) = result {
let (err, trace) = err.into_parts();
match err {
RoomError::InvalidLocalStream(_)
| RoomError::CouldNotGetLocalMedia(_) => {
let e = JasonError::from((err, trace));
e.print();
error_callback.call(e);
}
_ => JasonError::from((err, trace)).print(),
};
};
}),
)
}
}

Expand Down
8 changes: 2 additions & 6 deletions jason/src/peer/media/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,18 +243,14 @@ impl MediaConnections {
out
}

/// Synchronizes local state with provided tracks. Creates new [`Sender`]s
/// and [`Receiver`]s for each new [`Track`], and updates [`Track`] if
/// its settings has been changed.
/// Creates new [`Sender`]s and [`Receiver`]s for each new [`Track`].
///
/// # Errors
///
/// With [`MediaConnectionsError::TransceiverNotFound`] if could not create
/// new [`Sender`] cause transceiver with specified `mid` does not
/// exist.
// TODO: Doesnt really updates anything, but only generates new senders
// and receivers atm.
pub fn update_tracks<I: IntoIterator<Item = Track>>(
pub fn create_tracks<I: IntoIterator<Item = Track>>(
&self,
tracks: I,
) -> Result<()> {
Expand Down
25 changes: 20 additions & 5 deletions jason/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ impl PeerConnection {
local_stream: Option<MediaStreamSettings>,
) -> Result<String> {
self.media_connections
.update_tracks(tracks)
.create_tracks(tracks)
.map_err(tracerr::map_from_and_wrap!())?;

self.update_local_stream(local_stream)
Expand All @@ -588,10 +588,22 @@ impl PeerConnection {
.create_and_set_offer()
.await
.map_err(tracerr::map_from_and_wrap!())?;

Ok(offer)
}

/// Creates new [`Sender`]s and [`Receiver`]s for each new [`Track`].
///
/// # Errors
///
/// With [`MediaConnectionsError::TransceiverNotFound`] if could not create
/// new [`Sender`] because transceiver with specified `mid` doesn't exist.
pub async fn create_tracks(&self, tracks: Vec<Track>) -> Result<()> {
self.media_connections
.create_tracks(tracks)
.map_err(tracerr::map_from_and_wrap!())?;
Ok(())
}

/// Inserts provided [MediaStream][1] into underlying [RTCPeerConnection][2]
/// if it has all required tracks.
/// Requests local stream from [`MediaManager`] if no stream was provided.
Expand Down Expand Up @@ -775,17 +787,20 @@ impl PeerConnection {
Direction::Recv { .. } => true,
});

// update receivers
// create receivers
self.media_connections
.update_tracks(recv)
.create_tracks(recv)
.map_err(tracerr::map_from_and_wrap!())?;

// set offer, which will create transceivers and discover remote tracks
// in receivers
self.set_remote_offer(offer)
.await
.map_err(tracerr::wrap!())?;

// create senders
self.media_connections
.update_tracks(send)
.create_tracks(send)
.map_err(tracerr::map_from_and_wrap!())?;

self.update_local_stream(local_constraints)
Expand Down
Loading

0 comments on commit 55cb227

Please sign in to comment.