From c07f13be05365bef2b85d97f9eff186bbb9b1043 Mon Sep 17 00:00:00 2001 From: Evgenii Vilkov Date: Thu, 3 Oct 2024 00:55:34 +0200 Subject: [PATCH] try 1 --- src/config.rs | 10 ++- src/keyboard.rs | 29 ++++--- src/main.rs | 134 ++++++++++++++++++++------------ src/providers/_base.rs | 1 + src/providers/layout/linux.rs | 33 ++++---- src/providers/layout/macos.rs | 35 +++++---- src/providers/layout/windows.rs | 29 ++++--- src/providers/media/linux.rs | 32 ++++---- src/providers/media/windows.rs | 30 +++---- src/providers/time.rs | 28 ++++--- src/providers/volume/linux.rs | 30 +++---- src/providers/volume/macos.rs | 90 +++++++++++---------- src/providers/volume/windows.rs | 38 +++++---- 13 files changed, 300 insertions(+), 219 deletions(-) diff --git a/src/config.rs b/src/config.rs index 15c0b3f..2ae116f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,7 @@ #[derive(serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Config { - pub device: Device, + pub devices: Vec, pub layouts: Vec, pub reconnect_delay: u64, } @@ -9,6 +9,7 @@ pub struct Config { #[derive(serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Device { + pub name: Option, pub product_id: u16, pub usage: u16, pub usage_page: u16, @@ -16,12 +17,13 @@ pub struct Device { pub fn get_config() -> Config { let default_config = Config { - device: Device { + devices: vec![Device { + name: None, product_id: 0x0844, usage: 0x61, usage_page: 0xff60, - }, - layouts: vec!["pl".to_string()], + }], + layouts: vec!["en".to_string()], reconnect_delay: 5000, }; diff --git a/src/keyboard.rs b/src/keyboard.rs index f7c5226..217059d 100644 --- a/src/keyboard.rs +++ b/src/keyboard.rs @@ -4,6 +4,7 @@ use tokio::sync::{broadcast, mpsc}; use crate::config::Device; pub struct Keyboard { + name: String, product_id: u16, usage: u16, usage_page: u16, @@ -13,6 +14,7 @@ pub struct Keyboard { impl Keyboard { pub fn new(device: Device, reconnect_delay: u64) -> Self { return Self { + name: device.name.unwrap_or_else(|| "keyboard".to_string()), product_id: device.product_id, usage: device.usage, usage_page: device.usage_page, @@ -33,30 +35,29 @@ impl Keyboard { return Err(HidError::HidApiErrorEmpty); } - pub fn connect(&self) -> (broadcast::Sender, mpsc::Sender>) { + pub fn connect(&self, data_sender: broadcast::Sender>, is_connected_sender: mpsc::Sender) { + let name = self.name.clone(); let pid = self.product_id; let usage = self.usage; let usage_page = self.usage_page; let reconnect_delay = self.reconnect_delay; - let (data_sender, mut data_receiver) = mpsc::channel::>(32); - let (connected_sender, _) = broadcast::channel::(32); - let internal_connected_sender = connected_sender.clone(); + let is_connected_sender = is_connected_sender.clone(); + let mut data_receiver = data_sender.subscribe(); std::thread::spawn(move || { - tracing::info!("Waiting for keyboard..."); + tracing::info!("Waiting for {}...", name); loop { - tracing::debug!("Trying to connect..."); + tracing::debug!("Trying to connect to {}...", name); if let Ok(device) = Self::get_device(&pid, &usage, &usage_page) { - let _ = &internal_connected_sender.send(true).unwrap(); - tracing::info!("Connected to keyboard"); + let _ = &is_connected_sender.try_send(true).unwrap_or_else(|e| tracing::error!("{}", e)); + tracing::info!("Connected to {}", name); loop { - let msg = data_receiver.blocking_recv(); - if let Some(mut received) = msg { - tracing::info!("Sending to keyboard: {:?}", received); + if let Ok(mut received) = data_receiver.blocking_recv() { + tracing::info!("Sending to {}: {:?}", name, received); received.truncate(32); received.insert(0, 0); if let Err(_) = device.write(received.as_mut()) { - let _ = internal_connected_sender.send(false).unwrap(); - tracing::warn!("Disconnected from keyboard"); + let _ = is_connected_sender.try_send(false).unwrap_or_else(|e| tracing::error!("{}", e)); + tracing::warn!("Disconnected from {}", name); break; } @@ -67,7 +68,5 @@ impl Keyboard { std::thread::sleep(std::time::Duration::from_millis(reconnect_delay)); } }); - - return (connected_sender, data_sender); } } diff --git a/src/main.rs b/src/main.rs index 36460a3..92fc580 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,78 +8,112 @@ mod data_type; mod keyboard; mod providers; -use std::thread; -use tokio::sync::{broadcast, mpsc}; use config::get_config; use keyboard::Keyboard; -#[cfg(not(target_os = "macos"))] -use providers::{_base::Provider, layout::LayoutProvider, time::TimeProvider, media::MediaProvider, volume::VolumeProvider}; +use providers::{_base::Provider, layout::LayoutProvider, time::TimeProvider, volume::VolumeProvider}; +use std::thread; +use tokio::sync::{broadcast, mpsc}; +#[cfg(not(target_os = "macos"))] +use providers::media::MediaProvider; #[cfg(target_os = "macos")] -use { - providers::{_base::Provider, layout::LayoutProvider, time::TimeProvider, volume::VolumeProvider}, - core_foundation_sys::runloop::CFRunLoopRun, -}; +use core_foundation_sys::runloop::CFRunLoopRun; -#[cfg(target_os = "macos")] -fn run(layouts: Vec, data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) { - let mut is_connected = false; - let mut connected_receiver = connected_sender.subscribe(); +fn main() { + let env_filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) + .from_env_lossy(); + let tracing_subscriber = tracing_subscriber::fmt().with_env_filter(env_filter).finish(); + let _ = tracing::subscriber::set_global_default(tracing_subscriber); + let config = get_config(); - thread::spawn(move || { - let providers: Vec> = vec![ - TimeProvider::new(data_sender.clone(), connected_sender.clone()), - LayoutProvider::new(data_sender.clone(), connected_sender.clone(), layouts), - VolumeProvider::new(data_sender.clone(), connected_sender.clone()), - ]; + let (data_sender, _) = broadcast::channel::>(1); + let (is_connected_sender, is_connected_receiver) = mpsc::channel::(1); - loop { - if let Ok(connected) = connected_receiver.blocking_recv() { - if !is_connected && connected { - providers.iter().for_each(|p| p.start()); - } + for device in config.devices { + let data_sender = data_sender.clone(); + let is_connected_sender = is_connected_sender.clone(); + let reconnect_delay = config.reconnect_delay; + thread::spawn(move || { + let keyboard = Keyboard::new(device, reconnect_delay); + keyboard.connect(data_sender, is_connected_sender); + }); + } - is_connected = connected; - } - } - }); - unsafe { CFRunLoopRun(); } + run(data_sender, is_connected_receiver); } #[cfg(not(target_os = "macos"))] -fn run(layouts: Vec, data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) { - let providers: Vec> = vec![ - TimeProvider::new(data_sender.clone(), connected_sender.clone()), - VolumeProvider::new(data_sender.clone(), connected_sender.clone()), - LayoutProvider::new(data_sender.clone(), connected_sender.clone(), layouts), - MediaProvider::new(data_sender.clone(), connected_sender.clone()), +fn get_providers(data_sender: &broadcast::Sender>) -> Vec> { + return vec![ + TimeProvider::new(data_sender.clone()), + VolumeProvider::new(data_sender.clone()), + LayoutProvider::new(data_sender.clone()), + MediaProvider::new(data_sender.clone()), + ]; +} + +#[cfg(target_os = "macos")] +fn get_providers(data_sender: &broadcast::Sender>, layouts: Vec) -> Vec> { + return vec![ + TimeProvider::new(data_sender.clone()), + VolumeProvider::new(data_sender.clone()), + LayoutProvider::new(data_sender.clone(), layouts), ]; +} - let mut is_connected = false; - let mut connected_receiver = connected_sender.subscribe(); +#[cfg(not(target_os = "macos"))] +fn run(data_sender: broadcast::Sender>, mut is_connected_receiver: mpsc::Receiver) { + let providers = get_providers(&data_sender); + + let mut connected_count = 0; + let mut is_started = false; loop { - if let Ok(connected) = connected_receiver.blocking_recv() { - if !is_connected && connected { + if let Some(is_connected) = is_connected_receiver.blocking_recv() { + connected_count += if is_connected { 1 } else { -1 }; + tracing::info!("Connected devices: {}", connected_count); + + if connected_count > 0 && !is_started { + tracing::info!("Starting providers"); + is_started = true; providers.iter().for_each(|p| p.start()); + } else if connected_count == 0 && is_started { + tracing::info!("Stopping providers"); + is_started = false; + providers.iter().for_each(|p| p.stop()); } - - is_connected = connected; } } } -fn main() { - let env_filter = tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) - .from_env_lossy(); - let tracing_subscriber = tracing_subscriber::fmt().with_env_filter(env_filter).finish(); - let _ = tracing::subscriber::set_global_default(tracing_subscriber); - let config = get_config(); +#[cfg(target_os = "macos")] +fn run(data_sender: broadcast::Sender>, mut is_connected_receiver: mpsc::Receiver) { + thread::spawn(move || { + let providers = get_providers(&data_sender); + + let mut connected_count = 0; + let mut is_started = false; - let keyboard = Keyboard::new(config.device, config.reconnect_delay); - let (connected_sender, data_sender) = keyboard.connect(); + loop { + if let Some(is_connected) = is_connected_receiver.blocking_recv() { + connected_count += if is_connected { 1 } else { -1 }; + tracing::info!("Connected devices: {}", connected_count); - run(config.layouts, data_sender, connected_sender); + if connected_count > 0 && !is_started { + tracing::info!("Starting providers"); + is_started = true; + providers.iter().for_each(|p| p.start()); + } else if connected_count == 0 && is_started { + tracing::info!("Stopping providers"); + is_started = false; + providers.iter().for_each(|p| p.stop()); + } + } + } + }); + unsafe { + CFRunLoopRun(); + } } diff --git a/src/providers/_base.rs b/src/providers/_base.rs index 7db5903..b4ffd1d 100644 --- a/src/providers/_base.rs +++ b/src/providers/_base.rs @@ -1,3 +1,4 @@ pub trait Provider { fn start(&self); + fn stop(&self); } diff --git a/src/providers/layout/linux.rs b/src/providers/layout/linux.rs index 5ab92a2..dd4b003 100644 --- a/src/providers/layout/linux.rs +++ b/src/providers/layout/linux.rs @@ -1,8 +1,11 @@ +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; use std::{ffi, mem, ptr}; +use tokio::sync::broadcast; +use x11::xlib::{XGetAtomName, XOpenDisplay, XkbAllocKeyboard, XkbGetNames, XkbGetState, _XDisplay, _XkbDesc, _XkbStateRec}; +use crate::config::get_config; use crate::data_type::DataType; -use tokio::sync::{broadcast, mpsc}; -use x11::xlib::{XGetAtomName, XOpenDisplay, XkbAllocKeyboard, XkbGetNames, XkbGetState, _XDisplay, _XkbDesc, _XkbStateRec}; use super::super::_base::Provider; @@ -24,27 +27,27 @@ fn get_layout_index(display: *mut _XDisplay) -> usize { return state.group as usize; } -fn send_data(value: &String, layouts: &Vec, data_sender: &mpsc::Sender>) { +fn send_data(value: &String, layouts: &Vec, data_sender: &broadcast::Sender>) { tracing::info!("new layout: '{0}', layout list: {1:?}", value, layouts); let index = layouts.into_iter().position(|r| r == value); if let Some(index) = index { let data = vec![DataType::Layout as u8, index as u8]; - data_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + data_sender.send(data).unwrap(); } } pub struct LayoutProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, layouts: Vec, + is_started: Arc, } impl LayoutProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender, layouts: Vec) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = LayoutProvider { data_sender, - connected_sender, - layouts, + layouts: get_config().layouts, + is_started: Arc::new(AtomicBool::new(false)), }; return Box::new(provider); } @@ -53,13 +56,11 @@ impl LayoutProvider { impl Provider for LayoutProvider { fn start(&self) { tracing::info!("Layout Provider started"); - + self.is_started.store(true, Relaxed); let data_sender = self.data_sender.clone(); - let connected_sender = self.connected_sender.clone(); let layouts = self.layouts.clone(); - + let is_started = self.is_started.clone(); std::thread::spawn(move || { - let mut connected_receiver = connected_sender.subscribe(); let mut synced_layout = 0; let display = unsafe { XOpenDisplay(ptr::null()) }; let keyboard = unsafe { XkbAllocKeyboard() }; @@ -67,7 +68,7 @@ impl Provider for LayoutProvider { let symbol_list = symbols.split('+').map(|x| x.to_string()).collect::>(); loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } @@ -85,4 +86,8 @@ impl Provider for LayoutProvider { tracing::info!("Layout Provider stopped"); }); } + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } } diff --git a/src/providers/layout/macos.rs b/src/providers/layout/macos.rs index 67b0909..779ccb1 100644 --- a/src/providers/layout/macos.rs +++ b/src/providers/layout/macos.rs @@ -1,8 +1,13 @@ -use crate::data_type::DataType; use core_foundation::base::{CFRelease, TCFType}; use core_foundation::string::{CFString, CFStringRef}; use libc::c_void; -use tokio::sync::{broadcast, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; + +use crate::config::get_config; +use crate::data_type::DataType; + use super::super::_base::Provider; #[link(name = "Carbon", kind = "framework")] @@ -13,7 +18,6 @@ extern "C" { fn get_keyboard_layout() -> Option { unsafe { - let layout_input_source = TISCopyCurrentKeyboardLayoutInputSource(); if layout_input_source.is_null() { return None; @@ -39,26 +43,26 @@ fn get_keyboard_layout() -> Option { } } -fn send_data(value: &String, layouts: &Vec, data_sender: &mpsc::Sender>) { +fn send_data(value: &String, layouts: &Vec, data_sender: &broadcast::Sender>) { tracing::info!("new layout: '{0}', layout list: {1:?}", value, layouts); if let Some(index) = layouts.into_iter().position(|r| r == value) { let data = vec![DataType::Layout as u8, index as u8]; - data_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + data_sender.send(data).unwrap(); } } pub struct LayoutProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, layouts: Vec, + is_started: Arc, } impl LayoutProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender, layouts: Vec) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = LayoutProvider { data_sender, - connected_sender, - layouts, + layouts: get_config().layouts, + is_started: Arc::new(AtomicBool::new(false)), }; Box::new(provider) } @@ -67,18 +71,19 @@ impl LayoutProvider { impl Provider for LayoutProvider { fn start(&self) { tracing::info!("Layout Provider started"); - + self.is_started.store(true, Relaxed); let data_sender = self.data_sender.clone(); let layouts = self.layouts.clone(); - let connected_sender = self.connected_sender.clone(); + let is_started = self.is_started.clone(); let mut synced_layout = "".to_string(); std::thread::spawn(move || { let mut connected_receiver = connected_sender.subscribe(); loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } + if let Some(layout) = get_keyboard_layout() { let lang = layout.split('.').last().unwrap().to_string(); if synced_layout != lang { @@ -87,8 +92,8 @@ impl Provider for LayoutProvider { } } std::thread::sleep(std::time::Duration::from_millis(100)); - }} - ); + } + }); tracing::info!("Layout Provider stopped"); } diff --git a/src/providers/layout/windows.rs b/src/providers/layout/windows.rs index 4b4bf85..ee80878 100644 --- a/src/providers/layout/windows.rs +++ b/src/providers/layout/windows.rs @@ -1,4 +1,6 @@ -use tokio::sync::{broadcast, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; use windows::Win32::{ Globalization::{GetLocaleInfoW, LOCALE_SISO639LANGNAME}, UI::{ @@ -8,6 +10,7 @@ use windows::Win32::{ }, }; +use crate::config::get_config; use crate::data_type::DataType; use super::super::_base::Provider; @@ -26,25 +29,25 @@ unsafe fn get_layout() -> Option { None } -fn send_data(value: &String, layouts: &Vec, data_sender: &mpsc::Sender>) { +fn send_data(value: &String, layouts: &Vec, data_sender: &broadcast::Sender>) { if let Some(index) = layouts.into_iter().position(|r| r == value) { let data = vec![DataType::Layout as u8, index as u8]; - data_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + data_sender.send(data).unwrap(); } } pub struct LayoutProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, layouts: Vec, + is_started: Arc, } impl LayoutProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender, layouts: Vec) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = LayoutProvider { data_sender, - connected_sender, - layouts, + layouts: get_config().layouts, + is_started: Arc::new(AtomicBool::new(false)), }; return Box::new(provider); } @@ -53,14 +56,14 @@ impl LayoutProvider { impl Provider for LayoutProvider { fn start(&self) { tracing::info!("Layout Provider started"); + self.is_started.store(true, Relaxed); let data_sender = self.data_sender.clone(); - let connected_sender = self.connected_sender.clone(); let layouts = self.layouts.clone(); + let is_started = self.is_started.clone(); std::thread::spawn(move || { - let mut connected_receiver = connected_sender.subscribe(); let mut synced_layout = "".to_string(); loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } @@ -77,4 +80,8 @@ impl Provider for LayoutProvider { tracing::info!("Layout Provider stopped"); }); } + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } } diff --git a/src/providers/media/linux.rs b/src/providers/media/linux.rs index d18091b..266ae7d 100644 --- a/src/providers/media/linux.rs +++ b/src/providers/media/linux.rs @@ -1,11 +1,13 @@ use mpris::{Metadata, PlayerFinder}; -use tokio::sync::{broadcast, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; use crate::data_type::DataType; use super::super::_base::Provider; -fn send_media_data(metadata: &Metadata, data_sender: &mpsc::Sender>, current: &(String, String)) -> (String, String) { +fn send_media_data(metadata: &Metadata, data_sender: &broadcast::Sender>, current: &(String, String)) -> (String, String) { let (mut artist, mut title) = current.clone(); let new_artist = metadata.artists().and_then(|x| x.get(0).map(|x| x.to_string())).unwrap_or_default(); @@ -26,24 +28,24 @@ fn send_media_data(metadata: &Metadata, data_sender: &mpsc::Sender>, cur return (artist, title); } -fn send_data(data_type: DataType, value: &String, data_sender: &mpsc::Sender>) { +fn send_data(data_type: DataType, value: &String, data_sender: &broadcast::Sender>) { let mut data = value.to_string().into_bytes(); data.truncate(30); data.insert(0, data.len() as u8); data.insert(0, data_type as u8); - data_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + data_sender.send(data).unwrap(); } pub struct MediaProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, + is_started: Arc, } impl MediaProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = MediaProvider { data_sender, - connected_sender, + is_started: Arc::new(AtomicBool::new(false)), }; return Box::new(provider); } @@ -52,16 +54,14 @@ impl MediaProvider { impl Provider for MediaProvider { fn start(&self) { tracing::info!("Media Provider started"); - + self.is_started.store(true, Relaxed); let data_sender = self.data_sender.clone(); - let connected_sender = self.connected_sender.clone(); + let is_started = self.is_started.clone(); std::thread::spawn(move || { - let mut connected_receiver = connected_sender.subscribe(); - let mut media_data = (String::default(), String::default()); 'outer: loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } @@ -74,7 +74,7 @@ impl Provider for MediaProvider { for event in events { tracing::debug!("{:?}", event); - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break 'outer; } @@ -101,4 +101,8 @@ impl Provider for MediaProvider { tracing::info!("Media Provider stopped"); }); } + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } } diff --git a/src/providers/media/windows.rs b/src/providers/media/windows.rs index 29a3f3c..37315c1 100644 --- a/src/providers/media/windows.rs +++ b/src/providers/media/windows.rs @@ -1,5 +1,6 @@ -use tokio::sync::{broadcast, mpsc}; - +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; use windows::{ Foundation::{EventRegistrationToken, TypedEventHandler}, Media::Control::{GlobalSystemMediaTransportControlsSession, GlobalSystemMediaTransportControlsSessionManager}, @@ -17,7 +18,7 @@ fn get_manager() -> Result fn handle_session( session: &GlobalSystemMediaTransportControlsSession, - data_sender: &mpsc::Sender>, + data_sender: &broadcast::Sender>, ) -> Option { let mut synced_artist = String::new(); let mut synced_title = String::new(); @@ -68,24 +69,24 @@ fn get_media_data(session: &GlobalSystemMediaTransportControlsSession) -> Option None } -fn send_data(data_type: DataType, value: &String, data_sender: &mpsc::Sender>) { +fn send_data(data_type: DataType, value: &String, data_sender: &broadcast::Sender>) { let mut data = value.to_string().into_bytes(); data.truncate(30); data.insert(0, data.len() as u8); data.insert(0, data_type as u8); - data_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + data_sender.send(data).unwrap(); } pub struct MediaProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, + is_started: Arc, } impl MediaProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = MediaProvider { data_sender, - connected_sender, + is_started: Arc::new(AtomicBool::new(false)), }; return Box::new(provider); } @@ -94,11 +95,10 @@ impl MediaProvider { impl Provider for MediaProvider { fn start(&self) { tracing::info!("Media Provider started"); - + self.is_started.store(true, Relaxed); let data_sender = self.data_sender.clone(); - let connected_sender = self.connected_sender.clone(); + let is_started = self.is_started.clone(); std::thread::spawn(move || { - let mut connected_receiver = connected_sender.subscribe(); let mut session_token: Option = None; if let Ok(manager) = get_manager() { @@ -123,7 +123,7 @@ impl Provider for MediaProvider { .map_err(|e| tracing::error!("Can not register CurrentSessionChanged callback: {}", e)); loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } @@ -138,4 +138,8 @@ impl Provider for MediaProvider { } }); } + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } } diff --git a/src/providers/time.rs b/src/providers/time.rs index 6257420..8057f41 100644 --- a/src/providers/time.rs +++ b/src/providers/time.rs @@ -1,5 +1,7 @@ use chrono::{DateTime, Local, Timelike}; -use tokio::sync::{broadcast, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; use crate::data_type::DataType; @@ -12,21 +14,21 @@ fn get_time() -> (u8, u8) { return (hour, minute); } -fn send_data(value: &(u8, u8), push_sender: &mpsc::Sender>) { +fn send_data(value: &(u8, u8), push_sender: &broadcast::Sender>) { let data = vec![DataType::Time as u8, value.0, value.1]; - push_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + push_sender.send(data).unwrap(); } pub struct TimeProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, + is_started: Arc, } impl TimeProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = TimeProvider { data_sender, - connected_sender, + is_started: Arc::new(AtomicBool::new(false)), }; return Box::new(provider); } @@ -34,14 +36,14 @@ impl TimeProvider { impl Provider for TimeProvider { fn start(&self) { - tracing::info!("Time Provider enabled"); + tracing::info!("Time Provider started"); + self.is_started.store(true, Relaxed); let data_sender = self.data_sender.clone(); - let connected_sender = self.connected_sender.clone(); + let is_started = self.is_started.clone(); std::thread::spawn(move || { - let mut connected_receiver = connected_sender.subscribe(); let mut synced_time = (0u8, 0u8); loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } @@ -57,4 +59,8 @@ impl Provider for TimeProvider { tracing::info!("Time Provider stopped"); }); } + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } } diff --git a/src/providers/volume/linux.rs b/src/providers/volume/linux.rs index 76c09db..a5b71f4 100644 --- a/src/providers/volume/linux.rs +++ b/src/providers/volume/linux.rs @@ -1,8 +1,9 @@ -use std::ops::Deref; - use libpulse_binding::context::subscribe::Facility; use pulsectl::controllers::{DeviceControl, SinkController}; -use tokio::sync::{broadcast, mpsc}; +use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; use crate::data_type::DataType; @@ -19,22 +20,22 @@ fn get_volume() -> Option { return None; } -fn send_data(value: &f32, push_sender: &mpsc::Sender>) { +fn send_data(value: &f32, push_sender: &broadcast::Sender>) { let volume = (value * 100.0).round() as u8; let data = vec![DataType::Volume as u8, volume]; - push_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + push_sender.send(data).unwrap(); } pub struct VolumeProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, + is_started: Arc, } impl VolumeProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = VolumeProvider { data_sender, - connected_sender, + is_started: Arc::new(AtomicBool::new(false)), }; return Box::new(provider); } @@ -43,15 +44,14 @@ impl VolumeProvider { impl Provider for VolumeProvider { fn start(&self) { tracing::info!("Volume Provider started"); + self.is_started.store(true, Relaxed); let data_sender = self.data_sender.clone(); - let connected_sender = self.connected_sender.clone(); + let is_started = self.is_started.clone(); let mut volume = get_volume().unwrap_or_default(); send_data(&volume, &self.data_sender); std::thread::spawn(move || { - let mut connected_receiver = connected_sender.subscribe(); - let controller = SinkController::create().map_err(|e| tracing::error!("{}", e)).unwrap(); let mut ctx = controller.handler.context.deref().borrow_mut(); @@ -66,7 +66,7 @@ impl Provider for VolumeProvider { ctx.subscribe(Facility::Sink.to_interest_mask(), |_| {}); loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } @@ -76,4 +76,8 @@ impl Provider for VolumeProvider { tracing::info!("Volume Provider stopped"); }); } + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } } diff --git a/src/providers/volume/macos.rs b/src/providers/volume/macos.rs index d87026a..1745677 100644 --- a/src/providers/volume/macos.rs +++ b/src/providers/volume/macos.rs @@ -1,13 +1,18 @@ -use crate::providers::_base::Provider; use block2::{Block, RcBlock}; use coreaudio::audio_unit::macos_helpers::get_default_device_id; -use coreaudio_sys::{dispatch_queue_t, kAudioDevicePropertyScopeOutput, kAudioDevicePropertyVolumeScalar, kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMain, kAudioObjectPropertyScopeGlobal, kAudioObjectPropertyScopeOutput, kAudioObjectSystemObject, AudioObjectGetPropertyData, AudioObjectID, AudioObjectIsPropertySettable, AudioObjectPropertyAddress, OSStatus}; +use coreaudio_sys::{ + dispatch_queue_t, kAudioDevicePropertyScopeOutput, kAudioDevicePropertyVolumeScalar, kAudioHardwarePropertyDefaultOutputDevice, + kAudioObjectPropertyElementMain, kAudioObjectPropertyScopeGlobal, kAudioObjectPropertyScopeOutput, kAudioObjectSystemObject, + AudioObjectGetPropertyData, AudioObjectID, AudioObjectIsPropertySettable, AudioObjectPropertyAddress, OSStatus, +}; use std::option::Option; use std::ptr; -use tokio::sync::{broadcast, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; use crate::data_type::DataType; - +use crate::providers::_base::Provider; extern "C" { pub fn AudioObjectAddPropertyListenerBlock( @@ -68,13 +73,7 @@ fn is_volume_control_supported(device_id: AudioObjectID, channel: u32) -> bool { mElement: channel, }; - let status = unsafe { - AudioObjectIsPropertySettable( - device_id, - &property_address, - &mut is_writable, - ) - }; + let status = unsafe { AudioObjectIsPropertySettable(device_id, &property_address, &mut is_writable) }; status == 0 && is_writable != 0 } @@ -104,31 +103,39 @@ fn register_volume_listener(listener: &RcBlock) { mElement: channel.unwrap(), }; - let listener_status = unsafe { AudioObjectRemovePropertyListenerBlock(device_id.unwrap(), &property_address, ptr::null_mut(), &listener)}; + let listener_status = + unsafe { AudioObjectRemovePropertyListenerBlock(device_id.unwrap(), &property_address, ptr::null_mut(), &listener) }; if listener_status == 0 { tracing::info!( "Volume listener successfully removed for channel {} of device {}", - channel.unwrap(), device_id.unwrap() + channel.unwrap(), + device_id.unwrap() ); } else { - tracing::info!("Failed to remove volume listener for channel {} of device {}", channel.unwrap(), device_id.unwrap()) + tracing::info!( + "Failed to remove volume listener for channel {} of device {}", + channel.unwrap(), + device_id.unwrap() + ) } - let listener_status = unsafe { - AudioObjectAddPropertyListenerBlock(device_id.unwrap(), &property_address, ptr::null_mut(), &listener) - }; + let listener_status = unsafe { AudioObjectAddPropertyListenerBlock(device_id.unwrap(), &property_address, ptr::null_mut(), &listener) }; if listener_status == 0 { tracing::info!( "Volume listener successfully registered for channel {} of device {}", - channel.unwrap(), device_id.unwrap() + channel.unwrap(), + device_id.unwrap() ); } else { - tracing::info!("Failed to register volume listener for channel {} of device {}", channel.unwrap(), device_id.unwrap()) + tracing::info!( + "Failed to register volume listener for channel {} of device {}", + channel.unwrap(), + device_id.unwrap() + ) } } - fn register_device_change_listener(listener: &RcBlock) { let property_address = AudioObjectPropertyAddress { mSelector: kAudioHardwarePropertyDefaultOutputDevice, @@ -136,18 +143,16 @@ fn register_device_change_listener(listener: &RcBlock) { mElement: kAudioObjectPropertyElementMain, }; - let listener_status = unsafe { - AudioObjectRemovePropertyListenerBlock(kAudioObjectSystemObject, &property_address, ptr::null_mut(), &listener) - }; + let listener_status = + unsafe { AudioObjectRemovePropertyListenerBlock(kAudioObjectSystemObject, &property_address, ptr::null_mut(), &listener) }; if listener_status == 0 { tracing::info!("Default device change listener successfully removed"); } else { tracing::info!("Failed to remove default device change listener"); } - let listener_status = unsafe { - AudioObjectAddPropertyListenerBlock(kAudioObjectSystemObject, &property_address, ptr::null_mut(), &listener) - }; + let listener_status = + unsafe { AudioObjectAddPropertyListenerBlock(kAudioObjectSystemObject, &property_address, ptr::null_mut(), &listener) }; if listener_status == 0 { tracing::info!("Default device change listener registered successfully"); @@ -156,60 +161,57 @@ fn register_device_change_listener(listener: &RcBlock) { } } -fn send_data(value: &f32, push_sender: &mpsc::Sender>) { +fn send_data(value: &f32, push_sender: &broadcast::Sender>) { let volume = (value * 100.0).round() as u8; let data = vec![DataType::Volume as u8, volume]; - push_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + push_sender.send(data).unwrap(); } pub struct VolumeProvider { - connected_sender: broadcast::Sender, + is_started: Arc, device_changed_block: RcBlock, volume_changed_block: RcBlock, } impl VolumeProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) -> Box { + pub fn new(data_sender: mpsc::Sender>) -> Box { let sender = data_sender.clone(); - let volume_changed_block = RcBlock::new(move |_: u32, _: u64|{ - if let Some(volume) = get_current_volume(){ + let volume_changed_block = RcBlock::new(move |_: u32, _: u64| { + if let Some(volume) = get_current_volume() { send_data(&volume, &sender.clone()); } }); let sender = data_sender.clone(); let volume_changed_block_clone = volume_changed_block.clone(); - let device_changed_block: RcBlock = RcBlock::new(move |_: u32, _: u64|{ + let device_changed_block: RcBlock = RcBlock::new(move |_: u32, _: u64| { register_volume_listener(&volume_changed_block_clone); - if let Some(volume) = get_current_volume(){ + if let Some(volume) = get_current_volume() { send_data(&volume, &sender.clone()); } }); let provider = VolumeProvider { - connected_sender, + is_started: Arc::new(AtomicBool::new(false)), device_changed_block, volume_changed_block, }; Box::new(provider) } - } impl Provider for VolumeProvider { - fn start(&self) { tracing::info!("Volume Provider started"); - let connected_sender = self.connected_sender.clone(); + self.is_started.store(true, Relaxed); + let is_started = self.is_started.clone(); register_volume_listener(&self.volume_changed_block); register_device_change_listener(&self.device_changed_block); std::thread::spawn(move || { - let mut connected_receiver = connected_sender.subscribe(); - loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; } @@ -219,4 +221,8 @@ impl Provider for VolumeProvider { tracing::info!("Volume Provider stopped"); }); } -} \ No newline at end of file + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } +} diff --git a/src/providers/volume/windows.rs b/src/providers/volume/windows.rs index b37b55d..42761df 100644 --- a/src/providers/volume/windows.rs +++ b/src/providers/volume/windows.rs @@ -1,7 +1,6 @@ -use tokio::sync::{ - broadcast::{self, Receiver}, - mpsc::{self, Sender}, -}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use tokio::sync::broadcast; use windows::{ core::Error, Win32::{ @@ -33,7 +32,7 @@ unsafe fn get_volume_endpoint() -> Result { #[windows::core::implement(IAudioEndpointVolumeCallback)] struct VolumeChangeCallback { - push_sender: mpsc::Sender>, + push_sender: broadcast::Sender>, } impl IAudioEndpointVolumeCallback_Impl for VolumeChangeCallback { @@ -44,22 +43,22 @@ impl IAudioEndpointVolumeCallback_Impl for VolumeChangeCallback { } } -fn send_data(value: &f32, push_sender: &mpsc::Sender>) { +fn send_data(value: &f32, push_sender: &broadcast::Sender>) { let volume = (value * 100.0).round() as u8; let data = vec![DataType::Volume as u8, volume]; - push_sender.try_send(data).unwrap_or_else(|e| tracing::error!("{}", e)); + push_sender.send(data).unwrap(); } pub struct VolumeProvider { - data_sender: mpsc::Sender>, - connected_sender: broadcast::Sender, + data_sender: broadcast::Sender>, + is_started: Arc, } impl VolumeProvider { - pub fn new(data_sender: mpsc::Sender>, connected_sender: broadcast::Sender) -> Box { + pub fn new(data_sender: broadcast::Sender>) -> Box { let provider = VolumeProvider { data_sender, - connected_sender, + is_started: Arc::new(AtomicBool::new(false)), }; return Box::new(provider); } @@ -68,15 +67,15 @@ impl VolumeProvider { impl Provider for VolumeProvider { fn start(&self) { tracing::info!("Volume Provider started"); + self.is_started.store(true, Relaxed); if let Ok(volume) = get_volume() { send_data(&volume, &self.data_sender); } let data_sender = self.data_sender.clone(); - let connected_sender = self.connected_sender.clone(); + let is_started = self.is_started.clone(); std::thread::spawn(move || loop { - let connected_receiver = connected_sender.subscribe(); - if subscribe_and_wait(data_sender.clone(), connected_receiver) { + if subscribe_and_wait(&data_sender, &is_started) { tracing::info!("Volume Provider stopped"); break; } @@ -84,18 +83,23 @@ impl Provider for VolumeProvider { std::thread::sleep(std::time::Duration::from_millis(10000)); }); } + + fn stop(&self) { + self.is_started.store(false, Relaxed); + } } -fn subscribe_and_wait(data_sender: Sender>, mut connected_receiver: Receiver) -> bool { +fn subscribe_and_wait(data_sender: &broadcast::Sender>, is_started: &Arc) -> bool { if let Ok(endpoint_volume) = unsafe { get_volume_endpoint() } { - let volume_callback: IAudioEndpointVolumeCallback = VolumeChangeCallback { push_sender: data_sender }.into(); + let push_sender = data_sender.clone(); + let volume_callback: IAudioEndpointVolumeCallback = VolumeChangeCallback { push_sender }.into(); if let Err(e) = unsafe { endpoint_volume.RegisterControlChangeNotify(&volume_callback) } { tracing::error!("Can not register Volume callback: {}", e); return false; } loop { - if !connected_receiver.try_recv().unwrap_or(true) { + if !is_started.load(Relaxed) { break; }