Skip to content

Commit

Permalink
Move all cpal-related code into a background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
SomeoneToIgnore committed Nov 19, 2024
1 parent 72854a6 commit 7e17646
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 118 deletions.
4 changes: 2 additions & 2 deletions crates/call/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ impl Room {
cx.emit(Event::RemoteAudioTracksChanged {
participant_id: participant.peer_id,
});
let stream = play_remote_audio_track(&track, cx);
let stream = play_remote_audio_track(&track, cx.background_executor());
participant.audio_tracks.insert(track_id, (track, stream));
participant.muted = publication.is_muted();
}
Expand Down Expand Up @@ -1345,7 +1345,7 @@ impl Room {
};

cx.spawn(move |this, mut cx| async move {
let (track, stream) = cx.update(capture_local_audio_track)??;
let (track, stream) = capture_local_audio_track(cx.background_executor()).await?;

let publication = participant
.publish_track(
Expand Down
8 changes: 5 additions & 3 deletions crates/live_kit_client/examples/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ impl LivekitWindow {
let output = self.remote_participant(participant);
match track {
RemoteTrack::Audio(track) => {
output.audio_output_stream =
Some((publication.clone(), play_remote_audio_track(&track, cx)));
output.audio_output_stream = Some((
publication.clone(),
play_remote_audio_track(&track, cx.background_executor()),
));
}
RemoteTrack::Video(track) => {
output.screen_share_output_view = Some((
Expand Down Expand Up @@ -269,7 +271,7 @@ impl LivekitWindow {
} else {
let participant = self.room.local_participant();
cx.spawn(|this, mut cx| async move {
let (track, stream) = cx.update(|cx| capture_local_audio_track(cx))??;
let (track, stream) = capture_local_audio_track(cx.background_executor()).await?;
let publication = participant
.publish_track(
LocalTrack::Audio(track),
Expand Down
242 changes: 129 additions & 113 deletions crates/live_kit_client/src/live_kit_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use cpal::{
StreamConfig,
};
use futures::{io, Stream, StreamExt as _};
use gpui::{AppContext, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task};
use gpui::{
BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
};
use parking_lot::Mutex;
use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};
use util::{debug_panic, ResultExt as _, TryFutureExt};
use util::{debug_panic, ResultExt as _};
#[cfg(not(target_os = "windows"))]
use webrtc::{
audio_frame::AudioFrame,
Expand All @@ -32,7 +34,7 @@ pub use test::*;
pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};

pub struct AudioStream {
_tasks: [Task<Option<()>>; 2],
_tasks: [Task<anyhow::Result<Option<()>>>; 2],
}

struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
Expand Down Expand Up @@ -166,99 +168,116 @@ pub async fn capture_local_video_track(

#[cfg(not(target_os = "windows"))]
pub fn capture_local_audio_track(
cx: &mut AppContext,
) -> Result<(track::LocalAudioTrack, AudioStream)> {
background_executor: &BackgroundExecutor,
) -> Task<Result<(track::LocalAudioTrack, AudioStream)>> {
let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();

let sample_rate;
let channels;
let stream;
if cfg!(any(test, feature = "test-support")) {
sample_rate = 1;
channels = 1;
stream = None;
} else {
let device = cpal::default_host()
.default_input_device()
.ok_or_else(|| anyhow!("no audio input device available"))?;
let config = device
.default_input_config()
.context("failed to get default input config")?;
sample_rate = config.sample_rate().0;
channels = config.channels() as u32;
stream = Some(
device
.build_input_stream_raw(
&config.config(),
cpal::SampleFormat::I16,
move |data, _: &_| {
frame_tx
.unbounded_send(AudioFrame {
data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
sample_rate,
num_channels: channels,
samples_per_channel: data.len() as u32 / channels,
})
.ok();
},
|err| log::error!("error capturing audio track: {:?}", err),
None,
)
.context("failed to build input stream")?,
);
}

let source = NativeAudioSource::new(
AudioSourceOptions {
echo_cancellation: true,
noise_suppression: true,
auto_gain_control: false,
},
sample_rate,
channels,
// TODO livekit: Pull these out of a proto later
100,
);

let stream_task = cx.foreground_executor().spawn(async move {
let (track_data_tx, mut track_data_rx) = futures::channel::mpsc::unbounded();

let stream_task = background_executor.spawn(async move {
let sample_rate;
let channels;
let stream;
if cfg!(any(test, feature = "test-support")) {
sample_rate = 1;
channels = 1;
stream = None;
track_data_tx
.unbounded_send((sample_rate, channels))
.expect("failed to send track data");
} else {
let device = cpal::default_host()
.default_input_device()
.ok_or_else(|| anyhow!("no audio input device available"))?;
let config = device
.default_input_config()
.context("failed to get default input config")?;
sample_rate = config.sample_rate().0;
channels = config.channels() as u32;
track_data_tx
.unbounded_send((sample_rate, channels))
.log_err();
stream = Some(
device
.build_input_stream_raw(
&config.config(),
cpal::SampleFormat::I16,
move |data, _: &_| {
frame_tx
.unbounded_send(AudioFrame {
data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
sample_rate,
num_channels: channels,
samples_per_channel: data.len() as u32 / channels,
})
.ok();
},
|err| log::error!("error capturing audio track: {:?}", err),
None,
)
.context("failed to build input stream")?,
);
}
if let Some(stream) = &stream {
stream.play().log_err();
loop {
stream.play().log_err();
}
}
futures::future::pending().await

Ok(Some(()))
});

let transmit_task = cx.background_executor().spawn({
let source = source.clone();
async move {
while let Some(frame) = frame_rx.next().await {
source.capture_frame(&frame).await.ok();
let task_background_executor = background_executor.clone();
background_executor.spawn(async move {
let (sample_rate, channels) = track_data_rx
.next()
.await
.context("receiving sample rate and channels data")?;
let source = NativeAudioSource::new(
AudioSourceOptions {
echo_cancellation: true,
noise_suppression: true,
auto_gain_control: false,
},
sample_rate,
channels,
// TODO livekit: Pull these out of a proto later
100,
);
let transmit_task = task_background_executor.spawn({
let source = source.clone();
async move {
while let Some(frame) = frame_rx.next().await {
source.capture_frame(&frame).await.ok();
}
Ok(Some(()))
}
Some(())
}
});
});

let track =
track::LocalAudioTrack::create_audio_track("microphone", RtcAudioSource::Native(source));
let track = track::LocalAudioTrack::create_audio_track(
"microphone",
RtcAudioSource::Native(source),
);

Ok((
track,
AudioStream {
_tasks: [stream_task, transmit_task],
},
))
anyhow::Ok((
track,
AudioStream {
_tasks: [stream_task, transmit_task],
},
))
})
}

#[cfg(not(target_os = "windows"))]
pub fn play_remote_audio_track(
track: &track::RemoteAudioTrack,
cx: &mut AppContext,
background_executor: &BackgroundExecutor,
) -> AudioStream {
let buffer = Arc::new(Mutex::new(Vec::<i16>::new()));
let (stream_config_tx, mut stream_config_rx) = futures::channel::mpsc::unbounded();
let (stream_config_tx, stream_config_rx) = std::sync::mpsc::channel();
// TODO livekit: Pull these out of a proto later
let mut stream = NativeAudioStream::new(track.rtc_track(), 48000, 1);

let receive_task = cx.background_executor().spawn({
let receive_task = background_executor.spawn({
let buffer = buffer.clone();
async move {
let mut stream_config = None;
Expand All @@ -276,7 +295,7 @@ pub fn play_remote_audio_track(
if stream_config.as_ref().map_or(true, |c| *c != frame_config) {
buffer.resize(buffer_size as usize, 0);
stream_config = Some(frame_config.clone());
stream_config_tx.unbounded_send(frame_config).ok();
stream_config_tx.send(frame_config).ok();
}

if frame.data.len() == buffer.len() {
Expand All @@ -285,47 +304,44 @@ pub fn play_remote_audio_track(
buffer.iter_mut().for_each(|x| *x = 0);
}
}
Some(())
Ok(Some(()))
}
});

let play_task = cx.foreground_executor().spawn(
{
let buffer = buffer.clone();
async move {
if cfg!(any(test, feature = "test-support")) {
return Err(anyhow!("can't play audio in tests"));
}
let play_task = background_executor.spawn({
let buffer = buffer.clone();
async move {
if cfg!(any(test, feature = "test-support")) {
anyhow::bail!("can't play audio in tests");
}

let device = cpal::default_host()
.default_output_device()
.ok_or_else(|| anyhow!("no audio output device available"))?;

let mut _output_stream = None;
while let Some(config) = stream_config_rx.next().await {
_output_stream = Some(device.build_output_stream(
&config,
{
let buffer = buffer.clone();
move |data, _info| {
let buffer = buffer.lock();
if data.len() == buffer.len() {
data.copy_from_slice(&buffer);
} else {
data.iter_mut().for_each(|x| *x = 0);
}
let device = cpal::default_host()
.default_output_device()
.context("no audio output device available")?;

let mut _output_stream = None;
while let Some(config) = stream_config_rx.recv().ok() {
_output_stream = Some(device.build_output_stream(
&config,
{
let buffer = buffer.clone();
move |data, _info| {
let buffer = buffer.lock();
if data.len() == buffer.len() {
data.copy_from_slice(&buffer);
} else {
data.iter_mut().for_each(|x| *x = 0);
}
},
|error| log::error!("error playing audio track: {:?}", error),
None,
)?);
}

Ok(())
}
},
|error| log::error!("error playing audio track: {:?}", error),
None,
)?);
}

Ok(Some(()))
}
.log_err(),
);
});

AudioStream {
_tasks: [receive_task, play_task],
Expand Down

0 comments on commit 7e17646

Please sign in to comment.