From ba9c1fd6bd97a98de3aea00bb6e37de1c87dcc07 Mon Sep 17 00:00:00 2001 From: yngrtc Date: Sun, 21 Apr 2024 16:27:19 -0700 Subject: [PATCH] add set_local_description --- rtc/src/peer_connection/mod.rs | 239 +++-- .../peer_connection_internal.rs | 36 - rtc/src/peer_connection/sdp/mod.rs | 14 +- rtc/src/rtp_transceiver/mod.rs | 29 +- rtc/src/rtp_transceiver/rtp_receiver/mod.rs | 844 ++++++++---------- rtc/src/rtp_transceiver/rtp_sender/mod.rs | 7 +- .../transport/ice_transport/ice_gatherer.rs | 10 +- 7 files changed, 521 insertions(+), 658 deletions(-) diff --git a/rtc/src/peer_connection/mod.rs b/rtc/src/peer_connection/mod.rs index 636676c..c5a895d 100644 --- a/rtc/src/peer_connection/mod.rs +++ b/rtc/src/peer_connection/mod.rs @@ -71,15 +71,15 @@ use crate::peer_connection::peer_connection_state::{ NegotiationNeededState, RTCPeerConnectionState, }; use crate::peer_connection::policy::ice_transport_policy::RTCIceTransportPolicy; -use crate::peer_connection::sdp::populate_sdp; use crate::peer_connection::sdp::sdp_type::RTCSdpType; use crate::peer_connection::sdp::session_description::RTCSessionDescription; use crate::peer_connection::sdp::{ get_mid_value, get_peer_direction, get_rids, update_sdp_origin, MediaSection, PopulateSdpParams, }; +use crate::peer_connection::sdp::{populate_local_candidates, populate_sdp}; //use crate::peer_connection::sdp::*; use crate::peer_connection::signaling_state::{ - /*check_next_signaling_state,*/ RTCSignalingState, //StateChangeOp, + check_next_signaling_state, RTCSignalingState, StateChangeOp, }; use crate::rtp_transceiver::rtp_codec::RTPCodecType; use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; @@ -969,14 +969,51 @@ impl RTCPeerConnection { ) .await; } + */ - // 4.4.1.6 Set the SessionDescription - pub(crate) async fn set_description( + // Helper to trigger a negotiation needed. + fn trigger_negotiation_needed(&self) { + //TODO: RTCPeerConnection::do_negotiation_needed(self.create_negotiation_needed_params()); + } + + /// Creates the parameters needed to trigger a negotiation needed. + fn create_negotiation_needed_params(&self) -> NegotiationNeededParams { + NegotiationNeededParams { + //TODO: on_negotiation_needed_handler: Arc::clone(&self.on_negotiation_needed_handler), + is_closed: self.is_closed, + //todo: ops: Arc::clone(&self.ops), + negotiation_needed_state: self.negotiation_needed_state, + is_negotiation_needed: self.is_negotiation_needed, + signaling_state: self.signaling_state, + /*check_negotiation_needed_params: CheckNegotiationNeededParams { + sctp_transport: Arc::clone(&self.sctp_transport), + rtp_transceivers: Arc::clone(&self.rtp_transceivers), + current_local_description: Arc::clone(&self.current_local_description), + current_remote_description: Arc::clone(&self.current_remote_description), + },*/ + } + } + + /*fn make_negotiation_needed_trigger( &self, + ) -> impl Fn() -> Pin + Send + Sync>> + Send + Sync { + let params = self.create_negotiation_needed_params(); + move || { + let params = params.clone(); + Box::pin(async move { + let params = params.clone(); + RTCPeerConnection::do_negotiation_needed(params).await; + }) + } + }*/ + + // 4.4.1.6 Set the SessionDescription + pub(crate) fn set_description( + &mut self, sd: &RTCSessionDescription, op: StateChangeOp, ) -> Result<()> { - if self.internal.is_closed.load(Ordering::SeqCst) { + if self.is_closed { return Err(Error::ErrConnectionClosed); } else if sd.sdp_type == RTCSdpType::Unspecified { return Err(Error::ErrPeerConnSDPTypeInvalidValue); @@ -992,11 +1029,7 @@ impl RTCPeerConnection { match sd.sdp_type { // stable->SetLocal(offer)->have-local-offer RTCSdpType::Offer => { - let check = { - let last_offer = self.internal.last_offer.lock().await; - sd.sdp != *last_offer - }; - if check { + if sd.sdp != self.last_offer { Err(new_sdpdoes_not_match_offer) } else { let next_state = check_next_signaling_state( @@ -1006,9 +1039,7 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let mut pending_local_description = - self.internal.pending_local_description.lock().await; - *pending_local_description = Some(sd.clone()); + self.pending_local_description = Some(sd.clone()); } next_state } @@ -1016,11 +1047,7 @@ impl RTCPeerConnection { // have-remote-offer->SetLocal(answer)->stable // have-local-pranswer->SetLocal(answer)->stable RTCSdpType::Answer => { - let check = { - let last_answer = self.internal.last_answer.lock().await; - sd.sdp != *last_answer - }; - if check { + if sd.sdp != self.last_answer { Err(new_sdpdoes_not_match_answer) } else { let next_state = check_next_signaling_state( @@ -1030,27 +1057,12 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let pending_remote_description = { - let mut pending_remote_description = - self.internal.pending_remote_description.lock().await; - pending_remote_description.take() - }; - let _pending_local_description = { - let mut pending_local_description = - self.internal.pending_local_description.lock().await; - pending_local_description.take() - }; + let pending_remote_description = + self.pending_remote_description.take(); + self.pending_local_description.take(); - { - let mut current_local_description = - self.internal.current_local_description.lock().await; - *current_local_description = Some(sd.clone()); - } - { - let mut current_remote_description = - self.internal.current_remote_description.lock().await; - *current_remote_description = pending_remote_description; - } + self.current_local_description = Some(sd.clone()); + self.current_remote_description = pending_remote_description; } next_state } @@ -1063,19 +1075,13 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let mut pending_local_description = - self.internal.pending_local_description.lock().await; - *pending_local_description = None; + self.pending_local_description = None; } next_state } // have-remote-offer->SetLocal(pranswer)->have-local-pranswer RTCSdpType::Pranswer => { - let check = { - let last_answer = self.internal.last_answer.lock().await; - sd.sdp != *last_answer - }; - if check { + if sd.sdp != self.last_answer { Err(new_sdpdoes_not_match_answer) } else { let next_state = check_next_signaling_state( @@ -1085,9 +1091,7 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let mut pending_local_description = - self.internal.pending_local_description.lock().await; - *pending_local_description = Some(sd.clone()); + self.pending_local_description = Some(sd.clone()); } next_state } @@ -1106,9 +1110,7 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let mut pending_remote_description = - self.internal.pending_remote_description.lock().await; - *pending_remote_description = Some(sd.clone()); + self.pending_remote_description = Some(sd.clone()); } next_state } @@ -1122,28 +1124,13 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let pending_local_description = { - let mut pending_local_description = - self.internal.pending_local_description.lock().await; - pending_local_description.take() - }; + let pending_local_description = + self.pending_local_description.take(); - let _pending_remote_description = { - let mut pending_remote_description = - self.internal.pending_remote_description.lock().await; - pending_remote_description.take() - }; + self.pending_remote_description.take(); - { - let mut current_remote_description = - self.internal.current_remote_description.lock().await; - *current_remote_description = Some(sd.clone()); - } - { - let mut current_local_description = - self.internal.current_local_description.lock().await; - *current_local_description = pending_local_description; - } + self.current_remote_description = Some(sd.clone()); + self.current_local_description = pending_local_description; } next_state } @@ -1155,9 +1142,7 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let mut pending_remote_description = - self.internal.pending_remote_description.lock().await; - *pending_remote_description = None; + self.pending_remote_description = None; } next_state } @@ -1170,9 +1155,7 @@ impl RTCPeerConnection { sd.sdp_type, ); if next_state.is_ok() { - let mut pending_remote_description = - self.internal.pending_remote_description.lock().await; - *pending_remote_description = Some(sd.clone()); + self.pending_remote_description = Some(sd.clone()); } next_state } @@ -1184,16 +1167,12 @@ impl RTCPeerConnection { match next_state { Ok(next_state) => { - self.internal - .signaling_state - .store(next_state as u8, Ordering::SeqCst); + self.signaling_state = next_state; if self.signaling_state() == RTCSignalingState::Stable { - self.internal - .is_negotiation_needed - .store(false, Ordering::SeqCst); - self.internal.trigger_negotiation_needed().await; + self.is_negotiation_needed = false; + self.trigger_negotiation_needed(); } - self.update_signaling_state_change(next_state).await; + self.update_signaling_state_change(next_state); Ok(()) } Err(err) => Err(err), @@ -1201,37 +1180,31 @@ impl RTCPeerConnection { } /// set_local_description sets the SessionDescription of the local peer - pub async fn set_local_description(&self, mut desc: RTCSessionDescription) -> Result<()> { - if self.internal.is_closed.load(Ordering::SeqCst) { + pub fn set_local_description(&mut self, mut desc: RTCSessionDescription) -> Result<()> { + if self.is_closed { return Err(Error::ErrConnectionClosed); } - let have_local_description = { - let current_local_description = self.internal.current_local_description.lock().await; - current_local_description.is_some() - }; + let _have_local_description = self.current_local_description.is_some(); // JSEP 5.4 if desc.sdp.is_empty() { match desc.sdp_type { RTCSdpType::Answer | RTCSdpType::Pranswer => { - let last_answer = self.internal.last_answer.lock().await; - desc.sdp = last_answer.clone(); + desc.sdp = self.last_answer.clone(); } RTCSdpType::Offer => { - let last_offer = self.internal.last_offer.lock().await; - desc.sdp = last_offer.clone(); + desc.sdp = self.last_offer.clone(); } _ => return Err(Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription), } } desc.parsed = Some(desc.unmarshal()?); - self.set_description(&desc, StateChangeOp::SetLocal).await?; + self.set_description(&desc, StateChangeOp::SetLocal)?; let we_answer = desc.sdp_type == RTCSdpType::Answer; - let remote_description = self.remote_description().await; - let mut local_transceivers = self.get_transceivers().await; + let remote_description = self.remote_description().cloned(); if we_answer { if let Some(parsed) = desc.parsed { // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/ @@ -1254,8 +1227,8 @@ impl RTCPeerConnection { _ => continue, }; - let t = match find_by_mid(mid_value, &mut local_transceivers).await { - Some(t) => t, + let t = match find_by_mid(mid_value, &mut self.rtp_transceivers) { + Some((_, t)) => t, None => continue, }; let previous_direction = t.current_direction(); @@ -1264,14 +1237,14 @@ impl RTCPeerConnection { // TODO: Also set FiredDirection here. t.set_current_direction(direction); - t.process_new_current_direction(previous_direction).await?; + t.process_new_current_direction(previous_direction)?; } } - if let Some(remote_desc) = remote_description { - self.start_rtp_senders().await?; + if let Some(_remote_desc) = remote_description { + //TODO: self.start_rtp_senders().await?; - let pci = Arc::clone(&self.internal); + /*TODO: let pci = Arc::clone(&self.internal); let remote_desc = Arc::new(remote_desc); self.internal .ops @@ -1286,12 +1259,12 @@ impl RTCPeerConnection { }, "set_local_description", )) - .await?; + .await?;*/ } } - if self.internal.ice_gatherer.state() == RTCIceGathererState::New { - self.internal.ice_gatherer.gather().await + if self.ice_transport.gatherer.state() == RTCIceGathererState::New { + self.ice_transport.gatherer.gather() } else { Ok(()) } @@ -1301,12 +1274,12 @@ impl RTCPeerConnection { /// otherwise it returns CurrentLocalDescription. This property is used to /// determine if set_local_description has already been called. /// - pub async fn local_description(&self) -> Option { - if let Some(pending_local_description) = self.pending_local_description().await { + pub fn local_description(&self) -> Option { + if let Some(pending_local_description) = self.pending_local_description() { return Some(pending_local_description); } - self.current_local_description().await - }*/ + self.current_local_description() + } pub fn is_lite_set(desc: &SessionDescription) -> bool { for a in &desc.attributes { @@ -1967,44 +1940,43 @@ impl RTCPeerConnection { flatten_errs(close_errs) } - + */ /// CurrentLocalDescription represents the local description that was /// successfully negotiated the last time the PeerConnection transitioned /// into the stable state plus any local candidates that have been generated /// by the ICEAgent since the offer or answer was created. - pub async fn current_local_description(&self) -> Option { - let local_description = { - let current_local_description = self.internal.current_local_description.lock().await; - current_local_description.clone() - }; - let ice_gather = Some(&self.internal.ice_gatherer); + pub fn current_local_description(&self) -> Option { + let local_description = self.current_local_description.clone(); let ice_gathering_state = self.ice_gathering_state(); - populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await + populate_local_candidates( + local_description.as_ref(), + &self.ice_transport.gatherer, + ice_gathering_state, + ) } /// PendingLocalDescription represents a local description that is in the /// process of being negotiated plus any local candidates that have been /// generated by the ICEAgent since the offer or answer was created. If the /// PeerConnection is in the stable state, the value is null. - pub async fn pending_local_description(&self) -> Option { - let local_description = { - let pending_local_description = self.internal.pending_local_description.lock().await; - pending_local_description.clone() - }; - let ice_gather = Some(&self.internal.ice_gatherer); + pub fn pending_local_description(&self) -> Option { + let local_description = self.pending_local_description.clone(); let ice_gathering_state = self.ice_gathering_state(); - populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await + populate_local_candidates( + local_description.as_ref(), + &self.ice_transport.gatherer, + ice_gathering_state, + ) } /// current_remote_description represents the last remote description that was /// successfully negotiated the last time the PeerConnection transitioned /// into the stable state plus any remote candidates that have been supplied /// via add_icecandidate() since the offer or answer was created. - pub async fn current_remote_description(&self) -> Option { - let current_remote_description = self.internal.current_remote_description.lock().await; - current_remote_description.clone() + pub fn current_remote_description(&self) -> Option { + self.current_remote_description.clone() } /// pending_remote_description represents a remote description that is in the @@ -2012,11 +1984,10 @@ impl RTCPeerConnection { /// have been supplied via add_icecandidate() since the offer or answer was /// created. If the PeerConnection is in the stable state, the value is /// null. - pub async fn pending_remote_description(&self) -> Option { - let pending_remote_description = self.internal.pending_remote_description.lock().await; - pending_remote_description.clone() + pub fn pending_remote_description(&self) -> Option { + self.pending_remote_description.clone() } - */ + /// signaling_state attribute returns the signaling state of the /// PeerConnection instance. pub fn signaling_state(&self) -> RTCSignalingState { diff --git a/rtc/src/peer_connection/peer_connection_internal.rs b/rtc/src/peer_connection/peer_connection_internal.rs index 78e56b9..d1f0d60 100644 --- a/rtc/src/peer_connection/peer_connection_internal.rs +++ b/rtc/src/peer_connection/peer_connection_internal.rs @@ -415,42 +415,6 @@ impl PeerConnectionInternal { self.trigger_negotiation_needed().await; } - /// Helper to trigger a negotiation needed. - pub(crate) async fn trigger_negotiation_needed(&self) { - RTCPeerConnection::do_negotiation_needed(self.create_negotiation_needed_params()).await; - } - - /// Creates the parameters needed to trigger a negotiation needed. - fn create_negotiation_needed_params(&self) -> NegotiationNeededParams { - NegotiationNeededParams { - on_negotiation_needed_handler: Arc::clone(&self.on_negotiation_needed_handler), - is_closed: Arc::clone(&self.is_closed), - ops: Arc::clone(&self.ops), - negotiation_needed_state: Arc::clone(&self.negotiation_needed_state), - is_negotiation_needed: Arc::clone(&self.is_negotiation_needed), - signaling_state: Arc::clone(&self.signaling_state), - check_negotiation_needed_params: CheckNegotiationNeededParams { - sctp_transport: Arc::clone(&self.sctp_transport), - rtp_transceivers: Arc::clone(&self.rtp_transceivers), - current_local_description: Arc::clone(&self.current_local_description), - current_remote_description: Arc::clone(&self.current_remote_description), - }, - } - } - - pub(crate) fn make_negotiation_needed_trigger( - &self, - ) -> impl Fn() -> Pin + Send + Sync>> + Send + Sync { - let params = self.create_negotiation_needed_params(); - move || { - let params = params.clone(); - Box::pin(async move { - let params = params.clone(); - RTCPeerConnection::do_negotiation_needed(params).await; - }) - } - } - pub(super) fn set_gather_complete_handler(&self, f: OnGatheringCompleteHdlrFn) { self.ice_gatherer.on_gathering_complete(f); } diff --git a/rtc/src/peer_connection/sdp/mod.rs b/rtc/src/peer_connection/sdp/mod.rs index d3d4096..3e41893 100644 --- a/rtc/src/peer_connection/sdp/mod.rs +++ b/rtc/src/peer_connection/sdp/mod.rs @@ -43,6 +43,7 @@ use url::Url; use crate::peer_connection::MEDIA_SECTION_APPLICATION; use crate::transport::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint; use crate::transport::ice_transport::ice_candidate::RTCIceCandidate; +use crate::transport::ice_transport::ice_gatherer::RTCIceGatherer; use crate::transport::ice_transport::ice_gathering_state::RTCIceGatheringState; use crate::transport::ice_transport::ice_parameters::RTCIceParameters; /*use crate::{SDP_ATTRIBUTE_RID, SDP_ATTRIBUTE_SIMULCAST}; @@ -398,21 +399,18 @@ pub(crate) fn add_data_media_section( Ok(d.with_media(media)) } -/* + pub(crate) fn populate_local_candidates( session_description: Option<&session_description::RTCSessionDescription>, ice_gatherer: &RTCIceGatherer, ice_gathering_state: RTCIceGatheringState, ) -> Option { - if session_description.is_none() || ice_gatherer.is_none() { + if session_description.is_none() { return session_description.cloned(); } - if let (Some(sd), Some(ice)) = (session_description, ice_gatherer) { - let candidates = match ice.get_local_candidates() { - Ok(candidates) => candidates, - Err(_) => return Some(sd.clone()), - }; + if let Some(sd) = session_description { + let candidates = ice_gatherer.get_local_candidates(); let mut parsed = match sd.unmarshal() { Ok(parsed) => parsed, @@ -436,7 +434,7 @@ pub(crate) fn populate_local_candidates( } else { None } -}*/ +} pub(crate) struct AddTransceiverSdpParams { should_add_candidates: bool, diff --git a/rtc/src/rtp_transceiver/mod.rs b/rtc/src/rtp_transceiver/mod.rs index 1f45f98..7a1610e 100644 --- a/rtc/src/rtp_transceiver/mod.rs +++ b/rtc/src/rtp_transceiver/mod.rs @@ -176,7 +176,7 @@ pub type TriggerNegotiationNeededFnOption = pub struct RTCRtpTransceiver { mid: Option, sender: RTCRtpSender, - //todo: receiver: RTCRtpReceiver, + receiver: RTCRtpReceiver, direction: RTCRtpTransceiverDirection, current_direction: RTCRtpTransceiverDirection, @@ -382,16 +382,16 @@ impl RTCRtpTransceiver { ); } } - /* + /// Perform any subsequent actions after altering the transceiver's direction. /// /// After changing the transceiver's direction this method should be called to perform any /// side-effects that results from the new direction, such as pausing/resuming the RTP receiver. - pub(crate) async fn process_new_current_direction( - &self, + pub(crate) fn process_new_current_direction( + &mut self, previous_direction: RTCRtpTransceiverDirection, ) -> Result<()> { - if self.stopped.load(Ordering::SeqCst) { + if self.stopped { return Ok(()); } @@ -409,26 +409,21 @@ impl RTCRtpTransceiver { return Ok(()); } - { - let receiver = self.receiver.lock().await; - let pause_receiver = !current_direction.has_recv(); + let pause_receiver = !current_direction.has_recv(); - if pause_receiver { - receiver.pause().await?; - } else { - receiver.resume().await?; - } + if pause_receiver { + self.receiver.pause()?; + } else { + self.receiver.resume()?; } let pause_sender = !current_direction.has_send(); - { - let sender = &*self.sender.lock().await; - sender.set_paused(pause_sender); - } + self.sender.set_paused(pause_sender); Ok(()) } + /* /// stop irreversibly stops the RTPTransceiver pub async fn stop(&self) -> Result<()> { if self.stopped.load(Ordering::SeqCst) { diff --git a/rtc/src/rtp_transceiver/rtp_receiver/mod.rs b/rtc/src/rtp_transceiver/rtp_receiver/mod.rs index 46d84b8..0b526b2 100644 --- a/rtc/src/rtp_transceiver/rtp_receiver/mod.rs +++ b/rtc/src/rtp_transceiver/rtp_receiver/mod.rs @@ -28,6 +28,7 @@ use crate::api::media_engine::MediaEngine; use crate::rtp_transceiver::rtp_codec::{ codec_parameters_fuzzy_search, CodecMatch, RTCRtpCodecParameters, RTPCodecType, }; +use shared::error::Result; #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[repr(u8)] @@ -76,78 +77,11 @@ impl fmt::Display for State { } } -/* impl State { - fn transition(to: Self, tx: &watch::Sender) -> Result<()> { - let current = *tx.borrow(); - if current == to { - // Already in this state - return Ok(()); - } - - match current { - Self::Unstarted - if matches!(to, Self::Started | Self::Stopped | Self::UnstartedPaused) => - { - let _ = tx.send(to); - return Ok(()); - } - Self::UnstartedPaused - if matches!(to, Self::Unstarted | Self::Stopped | Self::Paused) => - { - let _ = tx.send(to); - return Ok(()); - } - State::Started if matches!(to, Self::Paused | Self::Stopped) => { - let _ = tx.send(to); - return Ok(()); - } - State::Paused if matches!(to, Self::Started | Self::Stopped) => { - let _ = tx.send(to); - return Ok(()); - } - _ => {} - } - - Err(Error::ErrRTPReceiverStateChangeInvalid { from: current, to }) - } - - async fn wait_for(rx: &mut watch::Receiver, states: &[State]) -> Result<()> { - loop { - let state = *rx.borrow(); - - match state { - _ if states.contains(&state) => return Ok(()), - State::Stopped => { - return Err(Error::ErrClosedPipe); - } - _ => {} - } - - if rx.changed().await.is_err() { - return Err(Error::ErrClosedPipe); - } - } - } - - async fn error_on_close(rx: &mut watch::Receiver) -> Result<()> { - if rx.changed().await.is_err() { - return Err(Error::ErrClosedPipe); - } - - let state = *rx.borrow(); - if state == State::Stopped { - return Err(Error::ErrClosedPipe); - } - - Ok(()) - } - fn is_started(&self) -> bool { matches!(self, Self::Started | Self::Paused) } } -*/ impl RTCRtpReceiver { /* @@ -343,43 +277,15 @@ impl RTCRtpReceiver { filtered_codecs } - /* - // State - - /// Get the current state and a receiver for the next state change. - pub(crate) fn current_state(&self) -> State { - *self.state_rx.borrow() - } - pub(crate) fn start(&self) -> Result<()> { - State::transition(State::Started, &self.state_tx) - } - - pub(crate) fn pause(&self) -> Result<()> { - let current = self.current_state(); - - match current { - State::Unstarted => State::transition(State::UnstartedPaused, &self.state_tx), - State::Started => State::transition(State::Paused, &self.state_tx), - _ => Ok(()), - } - } - - pub(crate) fn resume(&self) -> Result<()> { - let current = self.current_state(); + /* + // State - match current { - State::UnstartedPaused => State::transition(State::Unstarted, &self.state_tx), - State::Paused => State::transition(State::Started, &self.state_tx), - _ => Ok(()), - } - } - pub(crate) fn close(&self) -> Result<()> { - State::transition(State::Stopped, &self.state_tx) - } - */ + pub(crate) fn start(&self) -> Result<()> { + State::transition(State::Started, &self.state_tx) + }*/ } /// RTPReceiver allows an application to inspect the receipt of a TrackRemote @@ -390,8 +296,8 @@ pub struct RTCRtpReceiver { //pub internal: Arc, // State is stored within the channel - /*state_tx: watch::Sender, - state_rx: watch::Receiver, + state: State, + /*state_rx: watch::Receiver, tracks: RwLock>, @@ -410,457 +316,485 @@ impl fmt::Debug for RTCRtpReceiver { } } -/* impl RTCRtpReceiver { - pub fn new( - receive_mtu: usize, - kind: RTPCodecType, - transport: Arc, - media_engine: Arc, - interceptor: Arc, - ) -> Self { - let (state_tx, state_rx) = watch::channel(State::Unstarted); - - RTCRtpReceiver { - receive_mtu, - kind, - transport: Arc::clone(&transport), - - internal: Arc::new(RTPReceiverInternal { + /* + pub fn new( + receive_mtu: usize, + kind: RTPCodecType, + transport: Arc, + media_engine: Arc, + interceptor: Arc, + ) -> Self { + let (state_tx, state_rx) = watch::channel(State::Unstarted); + + RTCRtpReceiver { + receive_mtu, kind, + transport: Arc::clone(&transport), - tracks: RwLock::new(vec![]), - transport, - media_engine, - interceptor, + internal: Arc::new(RTPReceiverInternal { + kind, - state_tx, - state_rx, + tracks: RwLock::new(vec![]), + transport, + media_engine, + interceptor, - transceiver_codecs: ArcSwapOption::new(None), - }), - } - } + state_tx, + state_rx, - pub fn kind(&self) -> RTPCodecType { - self.kind - } + transceiver_codecs: ArcSwapOption::new(None), + }), + } + } - pub(crate) fn set_transceiver_codecs( - &self, - codecs: Option>>>, - ) { - self.internal.transceiver_codecs.store(codecs); - } + pub fn kind(&self) -> RTPCodecType { + self.kind + } - /// transport returns the currently-configured *DTLSTransport or nil - /// if one has not yet been configured - pub fn transport(&self) -> Arc { - Arc::clone(&self.transport) - } + pub(crate) fn set_transceiver_codecs( + &self, + codecs: Option>>>, + ) { + self.internal.transceiver_codecs.store(codecs); + } - /// get_parameters describes the current configuration for the encoding and - /// transmission of media on the receiver's track. - pub async fn get_parameters(&self) -> RTCRtpParameters { - self.internal.get_parameters().await - } + /// transport returns the currently-configured *DTLSTransport or nil + /// if one has not yet been configured + pub fn transport(&self) -> Arc { + Arc::clone(&self.transport) + } - /// SetRTPParameters applies provided RTPParameters the RTPReceiver's tracks. - /// This method is part of the ORTC API. It is not - /// meant to be used together with the basic WebRTC API. - /// The amount of provided codecs must match the number of tracks on the receiver. - pub async fn set_rtp_parameters(&self, params: RTCRtpParameters) { - let mut header_extensions = vec![]; - for h in ¶ms.header_extensions { - header_extensions.push(RTPHeaderExtension { - id: h.id, - uri: h.uri.clone(), - }); + /// get_parameters describes the current configuration for the encoding and + /// transmission of media on the receiver's track. + pub async fn get_parameters(&self) -> RTCRtpParameters { + self.internal.get_parameters().await } - let mut tracks = self.internal.tracks.write().await; - for (idx, codec) in params.codecs.iter().enumerate() { - let t = &mut tracks[idx]; - if let Some(stream_info) = &mut t.stream.stream_info { - stream_info.rtp_header_extensions = header_extensions.clone(); + /// SetRTPParameters applies provided RTPParameters the RTPReceiver's tracks. + /// This method is part of the ORTC API. It is not + /// meant to be used together with the basic WebRTC API. + /// The amount of provided codecs must match the number of tracks on the receiver. + pub async fn set_rtp_parameters(&self, params: RTCRtpParameters) { + let mut header_extensions = vec![]; + for h in ¶ms.header_extensions { + header_extensions.push(RTPHeaderExtension { + id: h.id, + uri: h.uri.clone(), + }); } - let current_track = &t.track; - current_track.set_codec(codec.clone()); - current_track.set_params(params.clone()); + let mut tracks = self.internal.tracks.write().await; + for (idx, codec) in params.codecs.iter().enumerate() { + let t = &mut tracks[idx]; + if let Some(stream_info) = &mut t.stream.stream_info { + stream_info.rtp_header_extensions = header_extensions.clone(); + } + + let current_track = &t.track; + current_track.set_codec(codec.clone()); + current_track.set_params(params.clone()); + } } - } - /// track returns the RtpTransceiver TrackRemote - pub async fn track(&self) -> Option> { - let tracks = self.internal.tracks.read().await; - if tracks.len() != 1 { - None - } else { - tracks.first().map(|t| Arc::clone(&t.track)) + /// track returns the RtpTransceiver TrackRemote + pub async fn track(&self) -> Option> { + let tracks = self.internal.tracks.read().await; + if tracks.len() != 1 { + None + } else { + tracks.first().map(|t| Arc::clone(&t.track)) + } } - } - /// tracks returns the RtpTransceiver traclockks - /// A RTPReceiver to support Simulcast may now have multiple tracks - pub async fn tracks(&self) -> Vec> { - let tracks = self.internal.tracks.read().await; - tracks.iter().map(|t| Arc::clone(&t.track)).collect() - } + /// tracks returns the RtpTransceiver traclockks + /// A RTPReceiver to support Simulcast may now have multiple tracks + pub async fn tracks(&self) -> Vec> { + let tracks = self.internal.tracks.read().await; + tracks.iter().map(|t| Arc::clone(&t.track)).collect() + } - /// receive initialize the track and starts all the transports - pub async fn receive(&self, parameters: &RTCRtpReceiveParameters) -> Result<()> { - let receiver = Arc::downgrade(&self.internal); + /// receive initialize the track and starts all the transports + pub async fn receive(&self, parameters: &RTCRtpReceiveParameters) -> Result<()> { + let receiver = Arc::downgrade(&self.internal); - let current_state = self.internal.current_state(); - if current_state.is_started() { - return Err(Error::ErrRTPReceiverReceiveAlreadyCalled); - } - self.internal.start()?; - - let (global_params, interceptor, media_engine) = { - ( - self.internal.get_parameters().await, - Arc::clone(&self.internal.interceptor), - Arc::clone(&self.internal.media_engine), - ) - }; + let current_state = self.internal.current_state(); + if current_state.is_started() { + return Err(Error::ErrRTPReceiverReceiveAlreadyCalled); + } + self.internal.start()?; - let codec = if let Some(codec) = global_params.codecs.first() { - codec.capability.clone() - } else { - RTCRtpCodecCapability::default() - }; + let (global_params, interceptor, media_engine) = { + ( + self.internal.get_parameters().await, + Arc::clone(&self.internal.interceptor), + Arc::clone(&self.internal.media_engine), + ) + }; + + let codec = if let Some(codec) = global_params.codecs.first() { + codec.capability.clone() + } else { + RTCRtpCodecCapability::default() + }; - for encoding in ¶meters.encodings { - let (stream_info, rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = - if encoding.ssrc != 0 { + for encoding in ¶meters.encodings { + let (stream_info, rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = + if encoding.ssrc != 0 { + let stream_info = create_stream_info( + "".to_owned(), + encoding.ssrc, + 0, + codec.clone(), + &global_params.header_extensions, + ); + let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = + self.transport + .streams_for_ssrc(encoding.ssrc, &stream_info, &interceptor) + .await?; + + ( + Some(stream_info), + Some(rtp_read_stream), + Some(rtp_interceptor), + Some(rtcp_read_stream), + Some(rtcp_interceptor), + ) + } else { + (None, None, None, None, None) + }; + + let t = TrackStreams { + track: Arc::new(TrackRemote::new( + self.receive_mtu, + self.kind, + encoding.ssrc, + encoding.rid.clone(), + receiver.clone(), + Arc::clone(&media_engine), + Arc::clone(&interceptor), + )), + stream: TrackStream { + stream_info, + rtp_read_stream, + rtp_interceptor, + rtcp_read_stream, + rtcp_interceptor, + }, + + repair_stream: TrackStream { + stream_info: None, + rtp_read_stream: None, + rtp_interceptor: None, + rtcp_read_stream: None, + rtcp_interceptor: None, + }, + }; + + { + let mut tracks = self.internal.tracks.write().await; + tracks.push(t); + }; + + let rtx_ssrc = encoding.rtx.ssrc; + if rtx_ssrc != 0 { let stream_info = create_stream_info( "".to_owned(), - encoding.ssrc, + rtx_ssrc, 0, codec.clone(), &global_params.header_extensions, ); - let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = - self.transport - .streams_for_ssrc(encoding.ssrc, &stream_info, &interceptor) - .await?; - - ( - Some(stream_info), - Some(rtp_read_stream), - Some(rtp_interceptor), - Some(rtcp_read_stream), - Some(rtcp_interceptor), - ) - } else { - (None, None, None, None, None) - }; + let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self + .transport + .streams_for_ssrc(rtx_ssrc, &stream_info, &interceptor) + .await?; - let t = TrackStreams { - track: Arc::new(TrackRemote::new( - self.receive_mtu, - self.kind, - encoding.ssrc, - encoding.rid.clone(), - receiver.clone(), - Arc::clone(&media_engine), - Arc::clone(&interceptor), - )), - stream: TrackStream { - stream_info, - rtp_read_stream, - rtp_interceptor, - rtcp_read_stream, - rtcp_interceptor, - }, - - repair_stream: TrackStream { - stream_info: None, - rtp_read_stream: None, - rtp_interceptor: None, - rtcp_read_stream: None, - rtcp_interceptor: None, - }, - }; - - { - let mut tracks = self.internal.tracks.write().await; - tracks.push(t); - }; - - let rtx_ssrc = encoding.rtx.ssrc; - if rtx_ssrc != 0 { - let stream_info = create_stream_info( - "".to_owned(), - rtx_ssrc, - 0, - codec.clone(), - &global_params.header_extensions, - ); - let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self - .transport - .streams_for_ssrc(rtx_ssrc, &stream_info, &interceptor) + self.receive_for_rtx( + rtx_ssrc, + "".to_owned(), + TrackStream { + stream_info: Some(stream_info), + rtp_read_stream: Some(rtp_read_stream), + rtp_interceptor: Some(rtp_interceptor), + rtcp_read_stream: Some(rtcp_read_stream), + rtcp_interceptor: Some(rtcp_interceptor), + }, + ) .await?; - - self.receive_for_rtx( - rtx_ssrc, - "".to_owned(), - TrackStream { - stream_info: Some(stream_info), - rtp_read_stream: Some(rtp_read_stream), - rtp_interceptor: Some(rtp_interceptor), - rtcp_read_stream: Some(rtcp_read_stream), - rtcp_interceptor: Some(rtcp_interceptor), - }, - ) - .await?; + } } + + Ok(()) } - Ok(()) - } + /// read reads incoming RTCP for this RTPReceiver + pub async fn read( + &self, + b: &mut [u8], + ) -> Result<(Vec>, Attributes)> { + self.internal.read(b).await + } - /// read reads incoming RTCP for this RTPReceiver - pub async fn read( - &self, - b: &mut [u8], - ) -> Result<(Vec>, Attributes)> { - self.internal.read(b).await - } + /// read_simulcast reads incoming RTCP for this RTPReceiver for given rid + pub async fn read_simulcast( + &self, + b: &mut [u8], + rid: &str, + ) -> Result<(Vec>, Attributes)> { + self.internal.read_simulcast(b, rid).await + } - /// read_simulcast reads incoming RTCP for this RTPReceiver for given rid - pub async fn read_simulcast( - &self, - b: &mut [u8], - rid: &str, - ) -> Result<(Vec>, Attributes)> { - self.internal.read_simulcast(b, rid).await - } + /// read_rtcp is a convenience method that wraps Read and unmarshal for you. + /// It also runs any configured interceptors. + pub async fn read_rtcp( + &self, + ) -> Result<(Vec>, Attributes)> { + self.internal.read_rtcp(self.receive_mtu).await + } - /// read_rtcp is a convenience method that wraps Read and unmarshal for you. - /// It also runs any configured interceptors. - pub async fn read_rtcp( - &self, - ) -> Result<(Vec>, Attributes)> { - self.internal.read_rtcp(self.receive_mtu).await - } + /// read_simulcast_rtcp is a convenience method that wraps ReadSimulcast and unmarshal for you + pub async fn read_simulcast_rtcp( + &self, + rid: &str, + ) -> Result<(Vec>, Attributes)> { + self.internal + .read_simulcast_rtcp(rid, self.receive_mtu) + .await + } - /// read_simulcast_rtcp is a convenience method that wraps ReadSimulcast and unmarshal for you - pub async fn read_simulcast_rtcp( - &self, - rid: &str, - ) -> Result<(Vec>, Attributes)> { - self.internal - .read_simulcast_rtcp(rid, self.receive_mtu) - .await - } + pub(crate) async fn have_received(&self) -> bool { + self.internal.current_state().is_started() + } - pub(crate) async fn have_received(&self) -> bool { - self.internal.current_state().is_started() - } + pub(crate) async fn start(&self, incoming: &TrackDetails) { + let mut encoding_size = incoming.ssrcs.len(); + if incoming.rids.len() >= encoding_size { + encoding_size = incoming.rids.len(); + }; - pub(crate) async fn start(&self, incoming: &TrackDetails) { - let mut encoding_size = incoming.ssrcs.len(); - if incoming.rids.len() >= encoding_size { - encoding_size = incoming.rids.len(); - }; + let mut encodings = vec![RTCRtpDecodingParameters::default(); encoding_size]; + for (i, encoding) in encodings.iter_mut().enumerate() { + if incoming.rids.len() > i { + encoding.rid = incoming.rids[i].clone(); + } + if incoming.ssrcs.len() > i { + encoding.ssrc = incoming.ssrcs[i]; + } - let mut encodings = vec![RTCRtpDecodingParameters::default(); encoding_size]; - for (i, encoding) in encodings.iter_mut().enumerate() { - if incoming.rids.len() > i { - encoding.rid = incoming.rids[i].clone(); - } - if incoming.ssrcs.len() > i { - encoding.ssrc = incoming.ssrcs[i]; + encoding.rtx.ssrc = incoming.repair_ssrc; } - encoding.rtx.ssrc = incoming.repair_ssrc; - } - - if let Err(err) = self.receive(&RTCRtpReceiveParameters { encodings }).await { - log::warn!("RTPReceiver Receive failed {}", err); - return; - } + if let Err(err) = self.receive(&RTCRtpReceiveParameters { encodings }).await { + log::warn!("RTPReceiver Receive failed {}", err); + return; + } - // set track id and label early so they can be set as new track information - // is received from the SDP. - let is_unpaused = self.current_state() == State::Started; - for track_remote in &self.tracks().await { - track_remote.set_id(incoming.id.clone()); - track_remote.set_stream_id(incoming.stream_id.clone()); + // set track id and label early so they can be set as new track information + // is received from the SDP. + let is_unpaused = self.current_state() == State::Started; + for track_remote in &self.tracks().await { + track_remote.set_id(incoming.id.clone()); + track_remote.set_stream_id(incoming.stream_id.clone()); - if is_unpaused { - track_remote.fire_onunmute().await; + if is_unpaused { + track_remote.fire_onunmute().await; + } } } - } - - /// Stop irreversibly stops the RTPReceiver - pub async fn stop(&self) -> Result<()> { - let previous_state = self.internal.current_state(); - self.internal.close()?; - let mut errs = vec![]; - let was_ever_started = previous_state.is_started(); - if was_ever_started { - let tracks = self.internal.tracks.write().await; - for t in &*tracks { - if let Some(rtcp_read_stream) = &t.stream.rtcp_read_stream { - if let Err(err) = rtcp_read_stream.close().await { - errs.push(err); + /// Stop irreversibly stops the RTPReceiver + pub async fn stop(&self) -> Result<()> { + let previous_state = self.internal.current_state(); + self.internal.close()?; + + let mut errs = vec![]; + let was_ever_started = previous_state.is_started(); + if was_ever_started { + let tracks = self.internal.tracks.write().await; + for t in &*tracks { + if let Some(rtcp_read_stream) = &t.stream.rtcp_read_stream { + if let Err(err) = rtcp_read_stream.close().await { + errs.push(err); + } } - } - if let Some(rtp_read_stream) = &t.stream.rtp_read_stream { - if let Err(err) = rtp_read_stream.close().await { - errs.push(err); + if let Some(rtp_read_stream) = &t.stream.rtp_read_stream { + if let Err(err) = rtp_read_stream.close().await { + errs.push(err); + } } - } - if let Some(repair_rtcp_read_stream) = &t.repair_stream.rtcp_read_stream { - if let Err(err) = repair_rtcp_read_stream.close().await { - errs.push(err); + if let Some(repair_rtcp_read_stream) = &t.repair_stream.rtcp_read_stream { + if let Err(err) = repair_rtcp_read_stream.close().await { + errs.push(err); + } } - } - if let Some(repair_rtp_read_stream) = &t.repair_stream.rtp_read_stream { - if let Err(err) = repair_rtp_read_stream.close().await { - errs.push(err); + if let Some(repair_rtp_read_stream) = &t.repair_stream.rtp_read_stream { + if let Err(err) = repair_rtp_read_stream.close().await { + errs.push(err); + } } - } - if let Some(stream_info) = &t.stream.stream_info { - self.internal - .interceptor - .unbind_remote_stream(stream_info) - .await; - } + if let Some(stream_info) = &t.stream.stream_info { + self.internal + .interceptor + .unbind_remote_stream(stream_info) + .await; + } - if let Some(repair_stream_info) = &t.repair_stream.stream_info { - self.internal - .interceptor - .unbind_remote_stream(repair_stream_info) - .await; + if let Some(repair_stream_info) = &t.repair_stream.stream_info { + self.internal + .interceptor + .unbind_remote_stream(repair_stream_info) + .await; + } } } - } - flatten_errs(errs) - } + flatten_errs(errs) + } - /// read_rtp should only be called by a track, this only exists so we can keep state in one place - pub(crate) async fn read_rtp( - &self, - b: &mut [u8], - tid: usize, - ) -> Result<(rtp::packet::Packet, Attributes)> { - self.internal.read_rtp(b, tid).await - } + /// read_rtp should only be called by a track, this only exists so we can keep state in one place + pub(crate) async fn read_rtp( + &self, + b: &mut [u8], + tid: usize, + ) -> Result<(rtp::packet::Packet, Attributes)> { + self.internal.read_rtp(b, tid).await + } - /// receive_for_rid is the sibling of Receive expect for RIDs instead of SSRCs - /// It populates all the internal state for the given RID - pub(crate) async fn receive_for_rid( - &self, - rid: SmolStr, - params: RTCRtpParameters, - stream: TrackStream, - ) -> Result> { - let mut tracks = self.internal.tracks.write().await; - for t in &mut *tracks { - if *t.track.rid() == rid { - t.track.set_kind(self.kind); - if let Some(codec) = params.codecs.first() { - t.track.set_codec(codec.clone()); + /// receive_for_rid is the sibling of Receive expect for RIDs instead of SSRCs + /// It populates all the internal state for the given RID + pub(crate) async fn receive_for_rid( + &self, + rid: SmolStr, + params: RTCRtpParameters, + stream: TrackStream, + ) -> Result> { + let mut tracks = self.internal.tracks.write().await; + for t in &mut *tracks { + if *t.track.rid() == rid { + t.track.set_kind(self.kind); + if let Some(codec) = params.codecs.first() { + t.track.set_codec(codec.clone()); + } + t.track.set_params(params.clone()); + t.track + .set_ssrc(stream.stream_info.as_ref().map_or(0, |s| s.ssrc)); + t.stream = stream; + return Ok(Arc::clone(&t.track)); } - t.track.set_params(params.clone()); - t.track - .set_ssrc(stream.stream_info.as_ref().map_or(0, |s| s.ssrc)); - t.stream = stream; - return Ok(Arc::clone(&t.track)); } - } - Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound) - } + Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound) + } - /// receiveForRtx starts a routine that processes the repair stream - /// These packets aren't exposed to the user yet, but we need to process them for - /// TWCC - pub(crate) async fn receive_for_rtx( - &self, - ssrc: SSRC, - rsid: String, - repair_stream: TrackStream, - ) -> Result<()> { - let mut tracks = self.internal.tracks.write().await; - let l = tracks.len(); - for t in &mut *tracks { - if (ssrc != 0 && l == 1) || t.track.rid() == rsid { - t.repair_stream = repair_stream; - - let receive_mtu = self.receive_mtu; - let track = t.clone(); - tokio::spawn(async move { - let a = Attributes::new(); - let mut b = vec![0u8; receive_mtu]; - while let Some(repair_rtp_interceptor) = &track.repair_stream.rtp_interceptor { - //TODO: cancel repair_rtp_interceptor.read gracefully - //println!("repair_rtp_interceptor read begin with ssrc={}", ssrc); - if repair_rtp_interceptor.read(&mut b, &a).await.is_err() { - break; + /// receiveForRtx starts a routine that processes the repair stream + /// These packets aren't exposed to the user yet, but we need to process them for + /// TWCC + pub(crate) async fn receive_for_rtx( + &self, + ssrc: SSRC, + rsid: String, + repair_stream: TrackStream, + ) -> Result<()> { + let mut tracks = self.internal.tracks.write().await; + let l = tracks.len(); + for t in &mut *tracks { + if (ssrc != 0 && l == 1) || t.track.rid() == rsid { + t.repair_stream = repair_stream; + + let receive_mtu = self.receive_mtu; + let track = t.clone(); + tokio::spawn(async move { + let a = Attributes::new(); + let mut b = vec![0u8; receive_mtu]; + while let Some(repair_rtp_interceptor) = &track.repair_stream.rtp_interceptor { + //TODO: cancel repair_rtp_interceptor.read gracefully + //println!("repair_rtp_interceptor read begin with ssrc={}", ssrc); + if repair_rtp_interceptor.read(&mut b, &a).await.is_err() { + break; + } } - } - }); + }); - return Ok(()); + return Ok(()); + } } - } - Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound) - } + Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound) + } + */ // State pub(crate) fn current_state(&self) -> State { - self.internal.current_state() + self.state } - pub(crate) async fn pause(&self) -> Result<()> { - self.internal.pause()?; + pub(crate) fn pause(&mut self) -> Result<()> { + self.pause_internal(); - if !self.internal.current_state().is_started() { + if !self.current_state().is_started() { return Ok(()); } - let streams = self.internal.tracks.read().await; + /*TODO: let streams = self.tracks.read().await; for stream in streams.iter() { // TODO: If we introduce futures as a direct dependency this and other futures could be // ran concurrently with [`join_all`](https://docs.rs/futures/0.3.21/futures/future/fn.join_all.html) stream.track.fire_onmute().await; - } + }*/ Ok(()) } - pub(crate) async fn resume(&self) -> Result<()> { - self.internal.resume()?; + pub(crate) fn resume(&mut self) -> Result<()> { + self.resume_internal(); - if !self.internal.current_state().is_started() { + if !self.current_state().is_started() { return Ok(()); } - let streams = self.internal.tracks.read().await; + /*TODO:let streams = self.tracks.read().await; for stream in streams.iter() { // TODO: If we introduce futures as a direct dependency this and other futures could be // ran concurrently with [`join_all`](https://docs.rs/futures/0.3.21/futures/future/fn.join_all.html) stream.track.fire_onunmute().await; - } + }*/ Ok(()) } + + pub(crate) fn pause_internal(&mut self) { + let current = self.current_state(); + + match current { + State::Unstarted => { + self.state = State::UnstartedPaused; + } + State::Started => { + self.state = State::Paused; + } + _ => {} + } + } + + pub(crate) fn resume_internal(&mut self) { + let current = self.current_state(); + + match current { + State::UnstartedPaused => self.state = State::Unstarted, + State::Paused => self.state = State::Started, + _ => {} + } + } + + pub(crate) fn close(&mut self) { + self.state = State::Stopped + } } -*/ diff --git a/rtc/src/rtp_transceiver/rtp_sender/mod.rs b/rtc/src/rtp_transceiver/rtp_sender/mod.rs index 334ed24..2645730 100644 --- a/rtc/src/rtp_transceiver/rtp_sender/mod.rs +++ b/rtc/src/rtp_transceiver/rtp_sender/mod.rs @@ -229,11 +229,12 @@ impl RTCRtpSender { let mut tr = self.rtp_transceiver.lock(); *tr = rtp_transceiver; } - - pub(crate) fn set_paused(&self, paused: bool) { - self.paused.store(paused, Ordering::SeqCst); + */ + pub(crate) fn set_paused(&mut self, paused: bool) { + self.paused = paused; } + /* /// transport returns the currently-configured DTLSTransport /// if one has not yet been configured pub fn transport(&self) -> Arc { diff --git a/rtc/src/transport/ice_transport/ice_gatherer.rs b/rtc/src/transport/ice_transport/ice_gatherer.rs index 5265a74..a6a23db 100644 --- a/rtc/src/transport/ice_transport/ice_gatherer.rs +++ b/rtc/src/transport/ice_transport/ice_gatherer.rs @@ -62,8 +62,8 @@ impl RTCIceGatherer { } } - /*TODO:/// Gather ICE candidates. pub fn gather(&self) -> Result<()> { + /*TODO:/// Gather ICE candidates. self.create_agent().await?; self.set_state(RTCIceGathererState::Gathering).await; @@ -111,9 +111,9 @@ impl RTCIceGatherer { )); agent.gather_candidates()?; - } + }*/ Ok(()) - }*/ + } /// Close prunes all local candidates, and closes the ports. pub fn close(&mut self) -> Result<()> { @@ -123,7 +123,7 @@ impl RTCIceGatherer { } /// get_local_parameters returns the ICE parameters of the ICEGatherer. - pub fn get_local_parameters(&mut self) -> Result { + pub fn get_local_parameters(&self) -> Result { let Credentials { ufrag, pwd } = self.agent.get_local_credentials(); Ok(RTCIceParameters { @@ -134,7 +134,7 @@ impl RTCIceGatherer { } /// get_local_candidates returns the sequence of valid local candidates associated with the ICEGatherer. - pub fn get_local_candidates(&mut self) -> Vec { + pub fn get_local_candidates(&self) -> Vec { let ice_candidates = self.agent.get_local_candidates(); rtc_ice_candidates_from_ice_candidates(ice_candidates) }