Skip to content

Commit

Permalink
feat: prevent duplicate emissions for keyboard + media providers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lars-berger authored Nov 27, 2024
1 parent 1dfeacc commit 13a25c5
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Provider for KeyboardProvider {
crossbeam::select! {
recv(interval.tick()) -> _ => {
let output = self.run_interval();
self.common.emitter.emit_output(output);
self.common.emitter.emit_output_cached(output);
}
recv(self.common.input.sync_rx) -> input => {
if let Ok(ProviderInputMsg::Stop) = input {
Expand Down
33 changes: 30 additions & 3 deletions packages/desktop/src/providers/media/media_provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};

use anyhow::Context;
use crossbeam::channel::{unbounded, Receiver, Sender};
Expand Down Expand Up @@ -68,6 +71,7 @@ impl Default for MediaSession {
enum MediaSessionEvent {
SessionAddOrRemove,
CurrentSessionChanged,
TimelineRefresh,
PlaybackInfoChanged(String),
MediaPropertiesChanged(String),
TimelinePropertiesChanged(String),
Expand Down Expand Up @@ -128,6 +132,13 @@ impl MediaProvider {
// Emit initial output.
self.emit_output();

// Create a ticker that fires every 5 seconds. GSMTC timeline
// properties normally only update when the end or start position
// changes, so we manually re-fetch them periodically to update the
// current position.
let timeline_interval =
crossbeam::channel::tick(Duration::from_secs(5));

loop {
crossbeam::select! {
recv(self.event_receiver) -> event => {
Expand All @@ -139,6 +150,11 @@ impl MediaProvider {
}
}
}
recv(timeline_interval) -> _ => {
if let Err(err) = self.handle_event(MediaSessionEvent::TimelineRefresh) {
warn!("Error handling timeline refresh: {}", err);
}
}
recv(self.common.input.sync_rx) -> input => {
match input {
Ok(ProviderInputMsg::Stop) => {
Expand Down Expand Up @@ -174,6 +190,17 @@ impl MediaProvider {
let manager = GsmtcManager::RequestAsync()?.get()?;
self.update_session_states(&manager)?;
}
MediaSessionEvent::TimelineRefresh => {
// Update timeline properties for all playing sessions.
for session_state in self.session_states.values_mut() {
if session_state.output.is_playing {
Self::update_timeline_properties(
&mut session_state.output,
&session_state.session,
)?;
}
}
}
MediaSessionEvent::PlaybackInfoChanged(id) => {
if let Some(session_state) = self.session_states.get_mut(&id) {
Self::update_playback_info(
Expand Down Expand Up @@ -424,7 +451,7 @@ impl MediaProvider {
///
/// Note that at times, GSMTC can have a valid session, but return empty
/// string for all media properties.
fn emit_output(&self) {
fn emit_output(&mut self) {
let current_session = self
.current_session_id
.as_ref()
Expand All @@ -437,7 +464,7 @@ impl MediaProvider {
.map(|state| state.output.clone())
.collect();

self.common.emitter.emit_output(Ok(MediaOutput {
self.common.emitter.emit_output_cached(Ok(MediaOutput {
current_session,
all_sessions,
}));
Expand Down
36 changes: 32 additions & 4 deletions packages/desktop/src/providers/provider_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,54 @@ pub struct ProviderEmitter {

/// Hash of the provider's config.
config_hash: String,

/// Previous emission from the provider.
prev_emission: Option<ProviderEmission>,
}

impl ProviderEmitter {
fn emit(&self, emission: ProviderEmission) {
let send_res = self.emit_tx.send(emission);

if let Err(err) = send_res {
tracing::error!("Error sending provider result: {:?}", err);
}
}

/// Emits an output from a provider.
pub fn emit_output<T>(&self, output: anyhow::Result<T>)
where
T: Into<ProviderOutput>,
{
let send_res = self.emit_tx.send(ProviderEmission {
self.emit(ProviderEmission {
config_hash: self.config_hash.clone(),
result: output.map(Into::into).map_err(|err| err.to_string()),
});
}

if let Err(err) = send_res {
tracing::error!("Error sending provider result: {:?}", err);
/// Emits an output from a provider and prevents duplicate emissions by
/// caching the previous emission.
///
/// Note that this won't share the same cache if the `ProviderEmitter`
/// is cloned.
pub fn emit_output_cached<T>(&mut self, output: anyhow::Result<T>)
where
T: Into<ProviderOutput>,
{
let emission = ProviderEmission {
config_hash: self.config_hash.clone(),
result: output.map(Into::into).map_err(|err| err.to_string()),
};

if self.prev_emission.as_ref() != Some(&emission) {
self.prev_emission = Some(emission.clone());
self.emit(emission);
}
}
}

/// Emission from a provider.
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ProviderEmission {
/// Hash of the provider's config.
Expand Down Expand Up @@ -190,6 +217,7 @@ impl ProviderManager {
emitter: ProviderEmitter {
emit_tx: self.emit_tx.clone(),
config_hash: config_hash.clone(),
prev_emission: None,
},
sysinfo: self.sysinfo.clone(),
};
Expand Down

0 comments on commit 13a25c5

Please sign in to comment.