diff --git a/CHANGELOG.md b/CHANGELOG.md index e3efead0..3311adbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ Added: - Ability to include or exclude channels for server messages (join, part, etc.). See [configuration](https://halloy.squidowl.org/configuration/buffer/server_messages/index.html). - Ability to color nicknames within channel messages. See [configuration](https://halloy.squidowl.org/configuration/buffer/channel/message.html#nickname_color) - Ability to define a shell command for loading a server password. See [configuration](https://halloy.squidowl.org/configuration/servers/index.html#password_command) -- Enable support for IRCv3 `msgid` +- Enable support for IRCv3 `msgid` and `read-marker` Fixed: diff --git a/README.md b/README.md index 04ea3d52..1c2c43d4 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ Halloy is also available from [Flathub](https://flathub.org/apps/org.squidowl.ha * [Monitor](https://ircv3.net/specs/extensions/monitor) * [msgid](https://ircv3.net/specs/extensions/message-ids) * [multi-prefix](https://ircv3.net/specs/extensions/multi-prefix) + * [read-marker](https://ircv3.net/specs/extensions/read-marker) * [sasl-3.1](https://ircv3.net/specs/extensions/sasl-3.1) * [server-time](https://ircv3.net/specs/extensions/server-time) * [userhost-in-names](https://ircv3.net/specs/extensions/userhost-in-names) diff --git a/book/src/README.md b/book/src/README.md index 69f4387f..ce91ca6e 100644 --- a/book/src/README.md +++ b/book/src/README.md @@ -20,6 +20,7 @@ * [Monitor](https://ircv3.net/specs/extensions/monitor) * [msgid](https://ircv3.net/specs/extensions/message-ids) * [multi-prefix](https://ircv3.net/specs/extensions/multi-prefix) + * [read-marker](https://ircv3.net/specs/extensions/read-marker) * [sasl-3.1](https://ircv3.net/specs/extensions/sasl-3.1) * [server-time](https://ircv3.net/specs/extensions/server-time) * [userhost-in-names](https://ircv3.net/specs/extensions/userhost-in-names) diff --git a/data/src/client.rs b/data/src/client.rs index 8ba5e3ee..846368c9 100644 --- a/data/src/client.rs +++ b/data/src/client.rs @@ -6,6 +6,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::time::{Duration, Instant}; +use crate::history::ReadMarker; use crate::message::server_time; use crate::time::Posix; use crate::user::{Nick, NickRef}; @@ -79,6 +80,8 @@ pub enum Event { Broadcast(Broadcast), Notification(message::Encoded, Nick, Notification), FileTransferRequest(file_transfer::ReceiveRequest), + UpdateReadMarker(String, ReadMarker), + JoinedChannel(String), } pub struct Client { @@ -99,6 +102,7 @@ pub struct Client { supports_away_notify: bool, supports_account_notify: bool, supports_extended_join: bool, + supports_read_marker: bool, highlight_blackout: HighlightBlackout, registration_required_channels: Vec, isupport: HashMap, @@ -151,6 +155,7 @@ impl Client { supports_away_notify: false, supports_account_notify: false, supports_extended_join: false, + supports_read_marker: false, highlight_blackout: HighlightBlackout::Blackout(Instant::now()), registration_required_channels: vec![], isupport: HashMap::new(), @@ -367,13 +372,17 @@ impl Client { if contains("multi-prefix") { requested.push("multi-prefix"); } + if contains("draft/read-marker") { + requested.push("draft/read-marker"); + } if !requested.is_empty() { // Request self.registration_step = RegistrationStep::Req; - let _ = self - .handle - .try_send(command!("CAP", "REQ", requested.join(" "))); + + for message in group_capability_requests(&requested) { + let _ = self.handle.try_send(message); + } } else { // If none requested, end negotiation self.registration_step = RegistrationStep::End; @@ -399,6 +408,9 @@ impl Client { if caps.contains(&"extended-join") { self.supports_extended_join = true; } + if caps.contains(&"draft/read-marker") { + self.supports_read_marker = true; + } let supports_sasl = caps.iter().any(|cap| cap.contains("sasl")); @@ -479,12 +491,14 @@ impl Client { if newly_contains("multi-prefix") { requested.push("multi-prefix"); } + if newly_contains("draft/read-marker") { + requested.push("draft/read-marker"); + } if !requested.is_empty() { - // Request - let _ = self - .handle - .try_send(command!("CAP", "REQ", requested.join(" "))); + for message in group_capability_requests(&requested) { + let _ = self.handle.try_send(message); + } } self.listed_caps.extend(new_caps); @@ -506,6 +520,9 @@ impl Client { if del_caps.contains(&"extended-join") { self.supports_extended_join = false; } + if del_caps.contains(&"draft/read-marker") { + self.supports_read_marker = false; + } self.listed_caps .retain(|cap| !del_caps.iter().any(|del_cap| del_cap == cap)); @@ -845,6 +862,8 @@ impl Client { } log::debug!("[{}] {channel} - WHO requested", self.server); } + + return Some(vec![Event::JoinedChannel(channel.clone())]); } else if let Some(channel) = self.chanmap.get_mut(channel) { let user = if self.supports_extended_join { accountname.as_ref().map_or(user.clone(), |accountname| { @@ -1225,12 +1244,30 @@ impl Client { Command::Numeric(RPL_ENDOFMONLIST, _) => { return None; } + Command::MARKREAD(target, Some(timestamp)) => { + if let Some(read_marker) = timestamp + .strip_prefix("timestamp=") + .and_then(|timestamp| timestamp.parse::().ok()) + { + return Some(vec![Event::UpdateReadMarker(target.clone(), read_marker)]); + } + } _ => {} } Some(vec![Event::Single(message, self.nickname().to_owned())]) } + pub fn send_markread(&mut self, target: &str, read_marker: ReadMarker) { + if self.supports_read_marker { + let _ = self.handle.try_send(command!( + "MARKREAD", + target.to_string(), + format!("timestamp={read_marker}"), + )); + } + } + fn sync(&mut self) { self.channels = self.chanmap.keys().cloned().collect(); self.users = self @@ -1430,6 +1467,12 @@ impl Map { } } + pub fn send_markread(&mut self, server: &Server, target: &str, read_marker: ReadMarker) { + if let Some(client) = self.client_mut(server) { + client.send_markread(target, read_marker); + } + } + pub fn join(&mut self, server: &Server, channels: &[String]) { if let Some(client) = self.client_mut(server) { client.join(channels); @@ -1442,6 +1485,20 @@ impl Map { } } + pub fn exit(&mut self) -> HashSet { + self.0 + .iter_mut() + .filter_map(|(server, state)| { + if let State::Ready(client) = state { + client.quit(None); + Some(server.clone()) + } else { + None + } + }) + .collect() + } + pub fn resolve_user_attributes<'a>( &'a self, server: &Server, @@ -1660,6 +1717,26 @@ pub enum WhoStatus { Done(Instant), } +fn group_capability_requests<'a>( + capabilities: &'a [&'a str], +) -> impl Iterator + 'a { + const MAX_LEN: usize = proto::format::BYTE_LIMIT - b"CAP REQ :\r\n".len(); + + capabilities + .iter() + .scan(0, |count, capability| { + // Capability + a space + *count += capability.len() + 1; + + let chunk = *count / MAX_LEN; + + Some((chunk, capability)) + }) + .into_group_map() + .into_values() + .map(|capabilities| command!("CAP", "REQ", capabilities.into_iter().join(" "))) +} + /// Group channels together into as few JOIN messages as possible fn group_joins<'a>( channels: &'a [String], @@ -1732,5 +1809,5 @@ fn group_monitors( }) .into_group_map() .into_values() - .map(|targets| command!("MONITOR", "+", targets.into_iter().join(","),)) + .map(|targets| command!("MONITOR", "+", targets.into_iter().join(","))) } diff --git a/data/src/history.rs b/data/src/history.rs index 6c1dc2c8..be1a0776 100644 --- a/data/src/history.rs +++ b/data/src/history.rs @@ -2,17 +2,21 @@ use std::path::PathBuf; use std::time::Duration; use std::{fmt, io}; +use chrono::{DateTime, Utc}; use futures::future::BoxFuture; use futures::{Future, FutureExt}; +use irc::proto; use tokio::fs; use tokio::time::Instant; -pub use self::manager::{Manager, Resource}; -use crate::time::Posix; use crate::user::Nick; use crate::{compression, environment, message, server, Message}; +pub use self::manager::{Manager, Resource}; +pub use self::metadata::{Metadata, ReadMarker}; + pub mod manager; +pub mod metadata; // TODO: Make this configurable? /// Max # messages to persist @@ -29,6 +33,16 @@ pub enum Kind { Query(Nick), } +impl Kind { + pub fn target(&self) -> Option<&str> { + match self { + Kind::Server => None, + Kind::Channel(channel) => Some(channel), + Kind::Query(nick) => Some(nick.as_ref()), + } + } +} + impl fmt::Display for Kind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -49,19 +63,45 @@ impl From for Kind { } } -pub async fn load(server: &server::Server, kind: &Kind) -> Result, Error> { - let path = path(server, kind).await?; +impl From for Kind { + fn from(target: String) -> Self { + Kind::from(target.as_ref()) + } +} + +impl From<&str> for Kind { + fn from(target: &str) -> Self { + if proto::is_channel(target) { + Kind::Channel(target.to_string()) + } else { + Kind::Query(target.to_string().into()) + } + } +} + +#[derive(Debug)] +pub struct Loaded { + pub messages: Vec, + pub metadata: Metadata, +} - Ok(read_all(&path).await.unwrap_or_default()) +pub async fn load(server: server::Server, kind: Kind) -> Result { + let path = path(&server, &kind).await?; + + let messages = read_all(&path).await.unwrap_or_default(); + let metadata = metadata::load(server, kind).await.unwrap_or_default(); + + Ok(Loaded { messages, metadata }) } pub async fn overwrite( server: &server::Server, kind: &Kind, messages: &[Message], + read_marker: Option, ) -> Result<(), Error> { if messages.is_empty() { - return Ok(()); + return metadata::save(server, kind, messages, read_marker).await; } let latest = &messages[messages.len().saturating_sub(MAX_MESSAGES)..]; @@ -71,6 +111,8 @@ pub async fn overwrite( fs::write(path, &compressed).await?; + metadata::save(server, kind, latest, read_marker).await?; + Ok(()) } @@ -78,15 +120,14 @@ pub async fn append( server: &server::Server, kind: &Kind, messages: Vec, + read_marker: Option, ) -> Result<(), Error> { - if messages.is_empty() { - return Ok(()); - } + let loaded = load(server.clone(), kind.clone()).await?; - let mut all_messages = load(server, kind).await?; + let mut all_messages = loaded.messages; all_messages.extend(messages); - overwrite(server, kind, &all_messages).await + overwrite(server, kind, &all_messages, read_marker).await } async fn read_all(path: &PathBuf) -> Result, Error> { @@ -94,24 +135,30 @@ async fn read_all(path: &PathBuf) -> Result, Error> { Ok(compression::decompress(&bytes)?) } -async fn path(server: &server::Server, kind: &Kind) -> Result { +pub async fn dir_path() -> Result { let data_dir = environment::data_dir(); - // TODO: Is this stable enough? What if user's nickname changes + let history_dir = data_dir.join("history"); + + if !history_dir.exists() { + fs::create_dir_all(&history_dir).await?; + } + + Ok(history_dir) +} + +async fn path(server: &server::Server, kind: &Kind) -> Result { + let dir = dir_path().await?; + let name = match kind { Kind::Server => format!("{server}"), Kind::Channel(channel) => format!("{server}channel{channel}"), Kind::Query(nick) => format!("{server}nickname{}", nick), }; - let hashed_name = seahash::hash(name.as_bytes()); - let parent = data_dir.join("history"); - - if !parent.exists() { - fs::create_dir_all(&parent).await?; - } + let hashed_name = seahash::hash(name.as_bytes()); - Ok(parent.join(format!("{hashed_name}.json.gz"))) + Ok(dir.join(format!("{hashed_name}.json.gz"))) } #[derive(Debug)] @@ -120,53 +167,88 @@ pub enum History { server: server::Server, kind: Kind, messages: Vec, - last_received_at: Option, - unread_message_count: usize, - opened_at: Posix, + last_updated_at: Option, + max_triggers_unread: Option>, + read_marker: Option, }, Full { server: server::Server, kind: Kind, messages: Vec, - last_received_at: Option, - opened_at: Posix, + last_updated_at: Option, + read_marker: Option, }, } impl History { - fn partial(server: server::Server, kind: Kind, opened_at: Posix) -> Self { + fn partial(server: server::Server, kind: Kind) -> Self { Self::Partial { server, kind, messages: vec![], - last_received_at: None, - unread_message_count: 0, - opened_at, + last_updated_at: None, + max_triggers_unread: None, + read_marker: None, } } - fn add_message(&mut self, message: Message) { + pub fn update_partial(&mut self, metadata: Metadata) { + if let Self::Partial { + max_triggers_unread, + read_marker, + .. + } = self + { + *read_marker = (*read_marker).max(metadata.read_marker); + *max_triggers_unread = (*max_triggers_unread).max(metadata.last_triggers_unread); + } + } + + fn has_unread(&self) -> bool { match self { History::Partial { - messages, - last_received_at, - unread_message_count, + max_triggers_unread, + read_marker, .. } => { - if message.triggers_unread() { - *unread_message_count += 1; + // Read marker is prior to last known message which triggers unread + if let Some(read_marker) = read_marker { + max_triggers_unread.is_some_and(|max| read_marker.date_time() < max) + } + // Default state == unread if theres messages that trigger indicator + else { + max_triggers_unread.is_some() } + } + History::Full { .. } => false, + } + } - messages.push(message); - *last_received_at = Some(Instant::now()); + fn add_message(&mut self, message: Message) { + if message.triggers_unread() { + if let History::Partial { + max_triggers_unread, + .. + } = self + { + *max_triggers_unread = (*max_triggers_unread).max(Some(message.server_time)); } - History::Full { + } + + match self { + History::Partial { + messages, + last_updated_at, + .. + } + | History::Full { messages, - last_received_at, + last_updated_at, .. } => { + *last_updated_at = Some(Instant::now()); + messages.push(message); - *last_received_at = Some(Instant::now()); } } } @@ -177,19 +259,25 @@ impl History { server, kind, messages, - last_received_at, + last_updated_at, + read_marker, .. } => { - if let Some(last_received) = *last_received_at { + if let Some(last_received) = *last_updated_at { let since = now.duration_since(last_received); - if since >= FLUSH_AFTER_LAST_RECEIVED && !messages.is_empty() { + if since >= FLUSH_AFTER_LAST_RECEIVED { let server = server.clone(); let kind = kind.clone(); let messages = std::mem::take(messages); - *last_received_at = None; + let read_marker = *read_marker; - return Some(async move { append(&server, &kind, messages).await }.boxed()); + *last_updated_at = None; + + return Some( + async move { append(&server, &kind, messages, read_marker).await } + .boxed(), + ); } } @@ -199,16 +287,18 @@ impl History { server, kind, messages, - last_received_at, + last_updated_at, + read_marker, .. } => { - if let Some(last_received) = *last_received_at { + if let Some(last_received) = *last_updated_at { let since = now.duration_since(last_received); if since >= FLUSH_AFTER_LAST_RECEIVED && !messages.is_empty() { let server = server.clone(); let kind = kind.clone(); - *last_received_at = None; + let read_marker = *read_marker; + *last_updated_at = None; if messages.len() > MAX_MESSAGES { messages.drain(0..messages.len() - (MAX_MESSAGES - TRUNC_COUNT)); @@ -217,7 +307,8 @@ impl History { let messages = messages.clone(); return Some( - async move { overwrite(&server, &kind, &messages).await }.boxed(), + async move { overwrite(&server, &kind, &messages, read_marker).await } + .boxed(), ); } } @@ -227,42 +318,78 @@ impl History { } } - fn make_partial(&mut self) -> Option>> { + fn make_partial(&mut self) -> Option, Error>>> { match self { History::Partial { .. } => None, History::Full { server, kind, messages, + read_marker, .. } => { let server = server.clone(); let kind = kind.clone(); let messages = std::mem::take(messages); - *self = Self::partial(server.clone(), kind.clone(), Posix::now()); - - Some(async move { overwrite(&server, &kind, &messages).await }) + let read_marker = ReadMarker::latest(&messages).max(*read_marker); + let max_triggers_unread = metadata::latest_triggers_unread(&messages); + + *self = Self::Partial { + server: server.clone(), + kind: kind.clone(), + messages: vec![], + last_updated_at: None, + read_marker, + max_triggers_unread, + }; + + Some(async move { + overwrite(&server, &kind, &messages, read_marker) + .await + .map(|_| read_marker) + }) } } } - async fn close(self) -> Result<(), Error> { + async fn close(self) -> Result, Error> { match self { History::Partial { server, kind, messages, + read_marker, .. - } => append(&server, &kind, messages).await, + } => { + append(&server, &kind, messages, read_marker).await?; + + Ok(None) + } History::Full { server, kind, messages, + read_marker, .. - } => overwrite(&server, &kind, &messages).await, + } => { + let read_marker = ReadMarker::latest(&messages).max(read_marker); + + overwrite(&server, &kind, &messages, read_marker).await?; + + Ok(read_marker) + } } } + + pub fn update_read_marker(&mut self, read_marker: ReadMarker) { + let stored = match self { + History::Partial { read_marker, .. } => read_marker, + History::Full { read_marker, .. } => read_marker, + }; + + *stored = (*stored).max(Some(read_marker)); + } } #[derive(Debug)] @@ -280,4 +407,6 @@ pub enum Error { Compression(#[from] compression::Error), #[error(transparent)] Io(#[from] io::Error), + #[error(transparent)] + SerdeJson(#[from] serde_json::Error), } diff --git a/data/src/history/manager.rs b/data/src/history/manager.rs index 352d7027..209842b3 100644 --- a/data/src/history/manager.rs +++ b/data/src/history/manager.rs @@ -8,7 +8,6 @@ use tokio::time::Instant; use crate::history::{self, History}; use crate::message::{self, Limit}; -use crate::time::Posix; use crate::user::Nick; use crate::{config, input}; use crate::{server, Buffer, Config, Input, Server, User}; @@ -21,13 +20,40 @@ pub struct Resource { #[derive(Debug)] pub enum Message { - Loaded( + LoadFull( server::Server, history::Kind, - Result, history::Error>, + Result, + ), + UpdatePartial( + server::Server, + history::Kind, + Result, + ), + UpdateReadMarker( + server::Server, + history::Kind, + history::ReadMarker, + Result<(), history::Error>, + ), + Closed( + server::Server, + history::Kind, + Result, history::Error>, ), - Closed(server::Server, history::Kind, Result<(), history::Error>), Flushed(server::Server, history::Kind, Result<(), history::Error>), + Exited( + Vec<( + Server, + history::Kind, + Result, history::Error>, + )>, + ), +} + +pub enum Event { + Closed(server::Server, history::Kind, Option), + Exited(Vec<(Server, history::Kind, Option)>), } #[derive(Debug, Default)] @@ -43,8 +69,8 @@ impl Manager { let added = added.into_iter().map(|resource| { async move { - history::load(&resource.server.clone(), &resource.kind.clone()) - .map(move |result| Message::Loaded(resource.server, resource.kind, result)) + history::load(resource.server.clone(), resource.kind.clone()) + .map(move |result| Message::LoadFull(resource.server, resource.kind, result)) .await } .boxed() @@ -66,20 +92,21 @@ impl Manager { tasks } - pub fn update(&mut self, message: Message) { + pub fn update(&mut self, message: Message) -> Option { match message { - Message::Loaded(server, kind, Ok(messages)) => { + Message::LoadFull(server, kind, Ok(loaded)) => { log::debug!( "loaded history for {kind} on {server}: {} messages", - messages.len() + loaded.messages.len() ); - self.data.loaded(server, kind, messages); + self.data.load_full(server, kind, loaded); } - Message::Loaded(server, kind, Err(error)) => { + Message::LoadFull(server, kind, Err(error)) => { log::warn!("failed to load history for {kind} on {server}: {error}"); } - Message::Closed(server, kind, Ok(_)) => { + Message::Closed(server, kind, Ok(read_marker)) => { log::debug!("closed history for {kind} on {server}",); + return Some(Event::Closed(server, kind, read_marker)); } Message::Closed(server, kind, Err(error)) => { log::warn!("failed to close history for {kind} on {server}: {error}") @@ -90,7 +117,42 @@ impl Manager { Message::Flushed(server, kind, Err(error)) => { log::warn!("failed to flush history for {kind} on {server}: {error}") } + Message::UpdatePartial(server, kind, Ok(metadata)) => { + log::debug!("loaded metadata for {kind} on {server}"); + self.data.update_partial(server, kind, metadata); + } + Message::UpdatePartial(server, kind, Err(error)) => { + log::warn!("failed to load metadata for {kind} on {server}: {error}"); + } + Message::UpdateReadMarker(server, kind, read_marker, Ok(_)) => { + log::debug!("updated read marker for {kind} on {server} to {read_marker}"); + } + Message::UpdateReadMarker(server, kind, read_marker, Err(error)) => { + log::warn!( + "failed to update read marker for {kind} on {server} to {read_marker}: {error}" + ); + } + Message::Exited(results) => { + let mut output = vec![]; + + for (server, kind, result) in results { + match result { + Ok(marker) => { + log::debug!("closed history for {kind} on {server}",); + output.push((server, kind, marker)); + } + Err(error) => { + log::warn!("failed to close history for {kind} on {server}: {error}"); + output.push((server, kind, None)); + } + } + } + + return Some(Event::Exited(output)); + } } + + None } pub fn tick(&mut self, now: Instant) -> Vec> { @@ -101,46 +163,17 @@ impl Manager { &mut self, server: Server, kind: history::Kind, - ) -> Option> { + ) -> Option> { let history = self.data.map.get_mut(&server)?.remove(&kind)?; - Some(async move { - match history.close().await { - Ok(_) => { - log::debug!("closed history for {kind} on {server}",); - } - Err(error) => { - log::warn!("failed to close history for {kind} on {server}: {error}"); - } - } - }) - } - - pub fn close_server(&mut self, server: Server) -> Option> { - let map = self.data.map.remove(&server)?; - - Some(async move { - let tasks = map.into_iter().map(move |(kind, state)| { - let server = server.clone(); - state.close().map(move |result| (server, kind, result)) - }); - - let results = future::join_all(tasks).await; - - for (server, kind, result) in results { - match result { - Ok(_) => { - log::debug!("closed history for {kind} on {server}",); - } - Err(error) => { - log::warn!("failed to close history for {kind} on {server}: {error}"); - } - } - } - }) + Some( + history + .close() + .map(|result| Message::Closed(server, kind, result)), + ) } - pub fn close_all(&mut self) -> impl Future { + pub fn exit(&mut self) -> impl Future { let map = std::mem::take(&mut self.data).map; async move { @@ -151,43 +184,62 @@ impl Manager { }) }); - let results = future::join_all(tasks).await; - - for (server, kind, result) in results { - match result { - Ok(_) => { - log::debug!("closed history for {kind} on {server}",); - } - Err(error) => { - log::warn!("failed to close history for {kind} on {server}: {error}"); - } - } - } + Message::Exited(future::join_all(tasks).await) } } - pub fn record_input(&mut self, input: Input, user: User, channel_users: &[User]) { + pub fn record_input( + &mut self, + input: Input, + user: User, + channel_users: &[User], + ) -> Vec> { + let mut tasks = vec![]; + if let Some(messages) = input.messages(user, channel_users) { for message in messages { - self.record_message(input.server(), message); + tasks.extend(self.record_message(input.server(), message)); } } if let Some(text) = input.raw() { self.data.input.record(input.buffer(), text.to_string()); } + + tasks } pub fn record_draft(&mut self, draft: input::Draft) { self.data.input.store_draft(draft); } - pub fn record_message(&mut self, server: &Server, message: crate::Message) { + pub fn record_message( + &mut self, + server: &Server, + message: crate::Message, + ) -> Option> { self.data.add_message( server.clone(), history::Kind::from(message.target.clone()), message, - ); + ) + } + + pub fn update_read_marker( + &mut self, + server: Server, + kind: impl Into, + read_marker: history::ReadMarker, + ) -> Option> { + self.data.update_read_marker(server, kind, read_marker) + } + + pub fn channel_joined( + &mut self, + server: Server, + channel: String, + ) -> Option> { + self.data.channel_joined(server, channel) } pub fn get_channel_messages( @@ -252,15 +304,7 @@ impl Manager { .map .get(server) .and_then(|map| map.get(kind)) - .map(|history| { - matches!( - history, - History::Partial { - unread_message_count, - .. - } if *unread_message_count > 0 - ) - }) + .map(|history| history.has_unread()) .unwrap_or_default() } @@ -270,7 +314,7 @@ impl Manager { broadcast: Broadcast, config: &Config, sent_time: DateTime, - ) { + ) -> Vec> { let map = self.data.map.entry(server.clone()).or_default(); let channels = map @@ -388,9 +432,10 @@ impl Manager { } }; - messages.into_iter().for_each(|message| { - self.record_message(server, message); - }); + messages + .into_iter() + .filter_map(|message| self.record_message(server, message)) + .collect() } pub fn input<'a>(&'a self, buffer: &Buffer) -> input::Cache<'a> { @@ -423,14 +468,14 @@ struct Data { } impl Data { - fn loaded( - &mut self, - server: server::Server, - kind: history::Kind, - mut messages: Vec, - ) { + fn load_full(&mut self, server: server::Server, kind: history::Kind, data: history::Loaded) { use std::collections::hash_map; + let history::Loaded { + mut messages, + metadata, + } = data; + match self .map .entry(server.clone()) @@ -440,19 +485,20 @@ impl Data { hash_map::Entry::Occupied(mut entry) => match entry.get_mut() { History::Partial { messages: new_messages, - last_received_at, - opened_at, + last_updated_at, + read_marker: partial_read_marker, .. } => { - let last_received_at = *last_received_at; - let opened_at = *opened_at; + let read_marker = (*partial_read_marker).max(metadata.read_marker); + + let last_updated_at = *last_updated_at; messages.extend(std::mem::take(new_messages)); entry.insert(History::Full { server, kind, messages, - last_received_at, - opened_at, + last_updated_at, + read_marker, }); } _ => { @@ -460,8 +506,8 @@ impl Data { server, kind, messages, - last_received_at: None, - opened_at: Posix::now(), + last_updated_at: None, + read_marker: metadata.read_marker, }); } }, @@ -470,13 +516,24 @@ impl Data { server, kind, messages, - last_received_at: None, - opened_at: Posix::now(), + last_updated_at: None, + read_marker: metadata.read_marker, }); } } } + fn update_partial( + &mut self, + server: server::Server, + kind: history::Kind, + data: history::Metadata, + ) { + if let Some(history) = self.map.get_mut(&server).and_then(|map| map.get_mut(&kind)) { + history.update_partial(data); + } + } + fn history_view( &self, server: &server::Server, @@ -486,7 +543,7 @@ impl Data { ) -> Option { let History::Full { messages, - opened_at, + read_marker, .. } = self.map.get(server)?.get(kind)? else { @@ -602,10 +659,13 @@ impl Data { let limited = with_limit(limit, filtered.into_iter()); - let split_at = limited - .iter() - .position(|message| message.received_at >= *opened_at) - .unwrap_or(limited.len()); + let split_at = read_marker.map_or(0, |read_marker| { + limited + .iter() + .rev() + .position(|message| message.server_time <= read_marker.date_time()) + .map_or(limited.len(), |position| limited.len() - position) + }); let (old, new) = limited.split_at(split_at); @@ -623,20 +683,105 @@ impl Data { server: server::Server, kind: history::Kind, message: crate::Message, - ) { - self.map + ) -> Option> { + use std::collections::hash_map; + + match self + .map .entry(server.clone()) .or_default() .entry(kind.clone()) - .or_insert_with(|| History::partial(server, kind, message.received_at)) - .add_message(message) + { + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().add_message(message); + + None + } + hash_map::Entry::Vacant(entry) => { + entry + .insert(History::partial(server.clone(), kind.clone())) + .add_message(message); + + Some( + async move { + let loaded = history::metadata::load(server.clone(), kind.clone()).await; + + Message::UpdatePartial(server, kind, loaded) + } + .boxed(), + ) + } + } + } + + fn update_read_marker( + &mut self, + server: server::Server, + kind: impl Into, + read_marker: history::ReadMarker, + ) -> Option> { + use std::collections::hash_map; + + let kind = kind.into(); + + match self + .map + .entry(server.clone()) + .or_default() + .entry(kind.clone()) + { + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().update_read_marker(read_marker); + + None + } + hash_map::Entry::Vacant(_) => Some( + async move { + let updated = history::metadata::update(&server, &kind, &read_marker).await; + + Message::UpdateReadMarker(server, kind, read_marker, updated) + } + .boxed(), + ), + } + } + + fn channel_joined( + &mut self, + server: server::Server, + channel: String, + ) -> Option> { + use std::collections::hash_map; + + let kind = history::Kind::Channel(channel); + + match self + .map + .entry(server.clone()) + .or_default() + .entry(kind.clone()) + { + hash_map::Entry::Occupied(_) => None, + hash_map::Entry::Vacant(entry) => { + entry.insert(History::partial(server.clone(), kind.clone())); + + Some( + async move { + let loaded = history::metadata::load(server.clone(), kind.clone()).await; + + Message::UpdatePartial(server, kind, loaded) + } + .boxed(), + ) + } + } } fn untrack( &mut self, server: &server::Server, kind: &history::Kind, - ) -> Option>> { + ) -> Option, history::Error>>> { self.map .get_mut(server) .and_then(|map| map.get_mut(kind).and_then(History::make_partial)) diff --git a/data/src/history/metadata.rs b/data/src/history/metadata.rs new file mode 100644 index 00000000..89273682 --- /dev/null +++ b/data/src/history/metadata.rs @@ -0,0 +1,126 @@ +use std::fmt; +use std::path::PathBuf; +use std::str::FromStr; + +use chrono::{format::SecondsFormat, DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tokio::fs; + +use crate::history::{dir_path, Error, Kind}; +use crate::{message, server, Message}; + +#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)] +pub struct Metadata { + pub read_marker: Option, + pub last_triggers_unread: Option>, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Deserialize, Serialize)] +pub struct ReadMarker(DateTime); + +impl ReadMarker { + pub fn latest(messages: &[Message]) -> Option { + messages + .iter() + .rev() + .find(|message| !matches!(message.target.source(), message::Source::Internal(_))) + .map(|message| message.server_time) + .map(Self) + } + + pub fn date_time(self) -> DateTime { + self.0 + } +} + +impl FromStr for ReadMarker { + type Err = chrono::ParseError; + + fn from_str(s: &str) -> Result { + DateTime::parse_from_rfc3339(s) + .map(|dt| dt.with_timezone(&Utc)) + .map(Self) + } +} + +impl fmt::Display for ReadMarker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.to_rfc3339_opts(SecondsFormat::Millis, true).fmt(f) + } +} + +pub fn latest_triggers_unread(messages: &[Message]) -> Option> { + messages + .iter() + .rev() + .find(|message| message.triggers_unread()) + .map(|message| message.server_time) +} + +pub async fn load(server: server::Server, kind: Kind) -> Result { + let path = path(&server, &kind).await?; + + if let Ok(bytes) = fs::read(path).await { + Ok(serde_json::from_slice(&bytes).unwrap_or_default()) + } else { + Ok(Metadata::default()) + } +} + +pub async fn save( + server: &server::Server, + kind: &Kind, + messages: &[Message], + read_marker: Option, +) -> Result<(), Error> { + let bytes = serde_json::to_vec(&Metadata { + read_marker, + last_triggers_unread: latest_triggers_unread(messages), + })?; + + let path = path(server, kind).await?; + + fs::write(path, &bytes).await?; + + Ok(()) +} + +pub async fn update( + server: &server::Server, + kind: &Kind, + read_marker: &ReadMarker, +) -> Result<(), Error> { + let metadata = load(server.clone(), kind.clone()).await?; + + if metadata + .read_marker + .is_some_and(|metadata_read_marker| metadata_read_marker >= *read_marker) + { + return Ok(()); + } + + let bytes = serde_json::to_vec(&Metadata { + read_marker: Some(*read_marker), + last_triggers_unread: metadata.last_triggers_unread, + })?; + + let path = path(server, kind).await?; + + fs::write(path, &bytes).await?; + + Ok(()) +} + +async fn path(server: &server::Server, kind: &Kind) -> Result { + let dir = dir_path().await?; + + let name = match kind { + Kind::Server => format!("{server}-metadata"), + Kind::Channel(channel) => format!("{server}channel{channel}-metadata"), + Kind::Query(nick) => format!("{server}nickname{}-metadata", nick), + }; + + let hashed_name = seahash::hash(name.as_bytes()); + + Ok(dir.join(format!("{hashed_name}.json"))) +} diff --git a/data/src/message.rs b/data/src/message.rs index 24d299e2..40694022 100644 --- a/data/src/message.rs +++ b/data/src/message.rs @@ -694,6 +694,7 @@ fn target( | Command::CNOTICE(_, _, _) | Command::CPRIVMSG(_, _, _) | Command::KNOCK(_, _) + | Command::MARKREAD(_, _) | Command::MONITOR(_, _) | Command::TAGMSG(_) | Command::USERIP(_) diff --git a/irc/proto/src/command.rs b/irc/proto/src/command.rs index a7ad32aa..12de0d7d 100644 --- a/irc/proto/src/command.rs +++ b/irc/proto/src/command.rs @@ -107,6 +107,8 @@ pub enum Command { CPRIVMSG(String, String, String), /// [] KNOCK(String, Option), + /// [] + MARKREAD(String, Option), /// [] MONITOR(String, Option), /// @@ -205,6 +207,7 @@ impl Command { "CNOTICE" if len > 2 => CNOTICE(req!(), req!(), req!()), "CPRIVMSG" if len > 2 => CPRIVMSG(req!(), req!(), req!()), "KNOCK" if len > 0 => KNOCK(req!(), opt!()), + "MARKREAD" if len > 0 => MARKREAD(req!(), opt!()), "MONITOR" if len > 0 => MONITOR(req!(), opt!()), "TAGMSG" if len > 0 => TAGMSG(req!()), "USERIP" if len > 0 => USERIP(req!()), @@ -265,6 +268,7 @@ impl Command { Command::CNOTICE(a, b, c) => vec![a, b, c], Command::CPRIVMSG(a, b, c) => vec![a, b, c], Command::KNOCK(a, b) => std::iter::once(a).chain(b).collect(), + Command::MARKREAD(a, b) => std::iter::once(a).chain(b).collect(), Command::MONITOR(a, b) => std::iter::once(a).chain(b).collect(), Command::TAGMSG(a) => vec![a], Command::USERIP(a) => vec![a], @@ -324,6 +328,7 @@ impl Command { CNOTICE(_, _, _) => "CNOTICE".to_string(), CPRIVMSG(_, _, _) => "CPRIVMSG".to_string(), KNOCK(_, _) => "KNOCK".to_string(), + MARKREAD(_, _) => "MARKREAD".to_string(), MONITOR(_, _) => "MONITOR".to_string(), TAGMSG(_) => "TAGMSG".to_string(), USERIP(_) => "USERIP".to_string(), diff --git a/src/buffer.rs b/src/buffer.rs index 1951cb02..d8f393e0 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -37,10 +37,10 @@ pub enum Message { FileTransfers(file_transfers::Message), } -#[derive(Debug, Clone)] pub enum Event { UserContext(user_context::Event), OpenChannel(String), + History(Task), } impl Buffer { @@ -73,6 +73,7 @@ impl Buffer { let event = event.map(|event| match event { channel::Event::UserContext(event) => Event::UserContext(event), channel::Event::OpenChannel(channel) => Event::OpenChannel(channel), + channel::Event::History(task) => Event::History(task), }); (command.map(Message::Channel), event) @@ -83,6 +84,7 @@ impl Buffer { let event = event.map(|event| match event { server::Event::UserContext(event) => Event::UserContext(event), server::Event::OpenChannel(channel) => Event::OpenChannel(channel), + server::Event::History(task) => Event::History(task), }); (command.map(Message::Server), event) @@ -93,6 +95,7 @@ impl Buffer { let event = event.map(|event| match event { query::Event::UserContext(event) => Event::UserContext(event), query::Event::OpenChannel(channel) => Event::OpenChannel(channel), + query::Event::History(task) => Event::History(task), }); (command.map(Message::Query), event) diff --git a/src/buffer/channel.rs b/src/buffer/channel.rs index 7e196f25..3f8bb4a9 100644 --- a/src/buffer/channel.rs +++ b/src/buffer/channel.rs @@ -19,10 +19,10 @@ pub enum Message { Topic(topic::Message), } -#[derive(Debug, Clone)] pub enum Event { UserContext(user_context::Event), OpenChannel(String), + History(Task), } pub fn view<'a>( @@ -341,13 +341,13 @@ impl Channel { let command = command.map(Message::InputView); match event { - Some(input_view::Event::InputSent) => { + Some(input_view::Event::InputSent { history_task }) => { let command = Task::batch(vec![ command, self.scroll_view.scroll_to_end().map(Message::ScrollView), ]); - (command, None) + (command, Some(Event::History(history_task))) } None => (command, None), } diff --git a/src/buffer/input_view.rs b/src/buffer/input_view.rs index 6292bdd4..201d1a0b 100644 --- a/src/buffer/input_view.rs +++ b/src/buffer/input_view.rs @@ -11,7 +11,9 @@ use crate::widget::{anchored_overlay, key_press, Element}; mod completion; pub enum Event { - InputSent, + InputSent { + history_task: Task, + }, } #[derive(Debug, Clone)] @@ -177,6 +179,8 @@ impl State { clients.send(buffer, encoded); } + let mut history_task = Task::none(); + if let Some(nick) = clients.nickname(buffer.server()) { let mut user = nick.to_owned().into(); let mut channel_users = &[][..]; @@ -192,10 +196,15 @@ impl State { } } - history.record_input(input, user, channel_users); + history_task = Task::batch( + history + .record_input(input, user, channel_users) + .into_iter() + .map(Task::future), + ); } - (Task::none(), Some(Event::InputSent)) + (Task::none(), Some(Event::InputSent { history_task })) } else { (Task::none(), None) } diff --git a/src/buffer/query.rs b/src/buffer/query.rs index abef293f..9a6b8ebb 100644 --- a/src/buffer/query.rs +++ b/src/buffer/query.rs @@ -13,10 +13,10 @@ pub enum Message { InputView(input_view::Message), } -#[derive(Debug, Clone)] pub enum Event { UserContext(user_context::Event), OpenChannel(String), + History(Task), } pub fn view<'a>( @@ -255,13 +255,13 @@ impl Query { let command = command.map(Message::InputView); match event { - Some(input_view::Event::InputSent) => { + Some(input_view::Event::InputSent { history_task }) => { let command = Task::batch(vec![ command, self.scroll_view.scroll_to_end().map(Message::ScrollView), ]); - (command, None) + (command, Some(Event::History(history_task))) } None => (command, None), } diff --git a/src/buffer/server.rs b/src/buffer/server.rs index d6d08369..71a43935 100644 --- a/src/buffer/server.rs +++ b/src/buffer/server.rs @@ -12,10 +12,10 @@ pub enum Message { InputView(input_view::Message), } -#[derive(Debug, Clone)] pub enum Event { UserContext(user_context::Event), OpenChannel(String), + History(Task), } pub fn view<'a>( @@ -143,15 +143,16 @@ impl Server { .update(message, &self.buffer, clients, history, config); let command = command.map(Message::InputView); - let task = match event { - Some(input_view::Event::InputSent) => Task::batch(vec![ - command, - self.scroll_view.scroll_to_end().map(Message::ScrollView), - ]), - None => command, - }; - - (task, None) + match event { + Some(input_view::Event::InputSent { history_task }) => ( + Task::batch(vec![ + command, + self.scroll_view.scroll_to_end().map(Message::ScrollView), + ]), + Some(Event::History(history_task)), + ), + None => (command, None), + } } } } diff --git a/src/main.rs b/src/main.rs index 51122f27..b9bf5dd2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,13 +16,16 @@ mod url; mod widget; mod window; +use std::collections::HashSet; use std::env; use std::time::{Duration, Instant}; use chrono::Utc; use data::config::{self, Config}; +use data::history::manager::Broadcast; use data::version::Version; -use data::{environment, history, server, version, Url, User}; +use data::{environment, server, version, Url, User}; +use data::{history, Server}; use iced::widget::{column, container}; use iced::{padding, Length, Subscription, Task}; use screen::{dashboard, help, migration, welcome}; @@ -199,6 +202,7 @@ pub enum Screen { Help(screen::Help), Welcome(screen::Welcome), Migration(screen::Migration), + Exit { pending_exit: HashSet }, } #[derive(Debug)] @@ -337,6 +341,16 @@ impl Halloy { self.clients.quit(&server, None); Task::none() } + Some(dashboard::Event::Exit) => { + let pending_exit = self.clients.exit(); + + if pending_exit.is_empty() { + iced::exit() + } else { + self.screen = Screen::Exit { pending_exit }; + Task::none() + } + } None => Task::none(), }; @@ -403,14 +417,21 @@ impl Halloy { if is_initial { // Intial is sent when first trying to connect - dashboard.broadcast_connecting(&server, &self.config, sent_time); + dashboard + .broadcast(&server, &self.config, sent_time, Broadcast::Connecting) + .map(Message::Dashboard) } else { notification::disconnected(&self.config.notifications, &server); - dashboard.broadcast_disconnected(&server, error, &self.config, sent_time); + dashboard + .broadcast( + &server, + &self.config, + sent_time, + Broadcast::Disconnected { error }, + ) + .map(Message::Dashboard) } - - Task::none() } stream::Update::Connected { server, @@ -427,14 +448,16 @@ impl Halloy { if is_initial { notification::connected(&self.config.notifications, &server); - dashboard.broadcast_connected(&server, &self.config, sent_time); + dashboard + .broadcast(&server, &self.config, sent_time, Broadcast::Connected) + .map(Message::Dashboard) } else { notification::reconnected(&self.config.notifications, &server); - dashboard.broadcast_reconnected(&server, &self.config, sent_time); + dashboard + .broadcast(&server, &self.config, sent_time, Broadcast::Reconnected) + .map(Message::Dashboard) } - - Task::none() } stream::Update::ConnectionFailed { server, @@ -445,9 +468,14 @@ impl Halloy { return Task::none(); }; - dashboard.broadcast_connection_failed(&server, error, &self.config, sent_time); - - Task::none() + dashboard + .broadcast( + &server, + &self.config, + sent_time, + Broadcast::ConnectionFailed { error }, + ) + .map(Message::Dashboard) } stream::Update::MessagesReceived(server, messages) => { let Screen::Dashboard(dashboard) = &mut self.screen else { @@ -480,7 +508,11 @@ impl Halloy { resolve_user_attributes, channel_users, ) { - dashboard.record_message(&server, message); + commands.push( + dashboard + .record_message(&server, message) + .map(Message::Dashboard), + ); } } data::client::Event::WithTarget(encoded, our_nick, target) => { @@ -491,9 +523,13 @@ impl Halloy { resolve_user_attributes, channel_users, ) { - dashboard.record_message( - &server, - message.with_target(target), + commands.push( + dashboard + .record_message( + &server, + message.with_target(target), + ) + .map(Message::Dashboard), ); } } @@ -503,16 +539,20 @@ impl Halloy { comment, channels, sent_time, - } => { - dashboard.broadcast_quit( - &server, - user, - comment, - channels, - &self.config, - sent_time, - ); - } + } => commands.push( + dashboard + .broadcast( + &server, + &self.config, + sent_time, + Broadcast::Quit { + user, + comment, + user_channels: channels, + }, + ) + .map(Message::Dashboard), + ), data::client::Broadcast::Nickname { old_user, new_nick, @@ -522,14 +562,20 @@ impl Halloy { } => { let old_nick = old_user.nickname(); - dashboard.broadcast_nickname( - &server, - old_nick.to_owned(), - new_nick, - ourself, - channels, - &self.config, - sent_time, + commands.push( + dashboard + .broadcast( + &server, + &self.config, + sent_time, + Broadcast::Nickname { + old_nick: old_nick.to_owned(), + new_nick, + ourself, + user_channels: channels, + }, + ) + .map(Message::Dashboard), ); } data::client::Broadcast::Invite { @@ -540,13 +586,19 @@ impl Halloy { } => { let inviter = inviter.nickname(); - dashboard.broadcast_invite( - &server, - inviter.to_owned(), - channel, - user_channels, - &self.config, - sent_time, + commands.push( + dashboard + .broadcast( + &server, + &self.config, + sent_time, + Broadcast::Invite { + inviter: inviter.to_owned(), + channel, + user_channels, + }, + ) + .map(Message::Dashboard), ); } data::client::Broadcast::ChangeHost { @@ -557,15 +609,21 @@ impl Halloy { channels, sent_time, } => { - dashboard.broadcast_change_host( - &server, - old_user, - new_username, - new_hostname, - ourself, - channels, - &self.config, - sent_time, + commands.push( + dashboard + .broadcast( + &server, + &self.config, + sent_time, + Broadcast::ChangeHost { + old_user, + new_username, + new_hostname, + ourself, + user_channels: channels, + }, + ) + .map(Message::Dashboard), ); } }, @@ -581,7 +639,11 @@ impl Halloy { resolve_user_attributes, channel_users, ) { - dashboard.record_message(&server, message); + commands.push( + dashboard + .record_message(&server, message) + .map(Message::Dashboard), + ); } match notification { @@ -644,6 +706,24 @@ impl Halloy { commands.push(command.map(Message::Dashboard)); } } + data::client::Event::UpdateReadMarker(target, read_marker) => { + commands.push( + dashboard + .update_read_marker( + server.clone(), + target, + read_marker, + ) + .map(Message::Dashboard), + ); + } + data::client::Event::JoinedChannel(channel) => { + commands.push( + dashboard + .channel_joined(server.clone(), channel) + .map(Message::Dashboard), + ); + } } } @@ -657,30 +737,42 @@ impl Halloy { Task::batch(commands) } - stream::Update::Quit(server, reason) => { - let Screen::Dashboard(dashboard) = &mut self.screen else { - return Task::none(); - }; - - self.servers.remove(&server); - - if let Some(client) = self.clients.remove(&server) { - let user = client.nickname().to_owned().into(); - - let channels = client.channels().to_vec(); - - dashboard.broadcast_quit( - &server, - user, - reason, - channels, - &self.config, - Utc::now(), - ); + stream::Update::Quit(server, reason) => match &mut self.screen { + Screen::Dashboard(dashboard) => { + self.servers.remove(&server); + + if let Some(client) = self.clients.remove(&server) { + let user = client.nickname().to_owned().into(); + + let channels = client.channels().to_vec(); + + dashboard + .broadcast( + &server, + &self.config, + Utc::now(), + Broadcast::Quit { + user, + comment: reason, + user_channels: channels, + }, + ) + .map(Message::Dashboard) + } else { + Task::none() + } } + Screen::Exit { pending_exit } => { + pending_exit.remove(&server); - Task::none() - } + if pending_exit.is_empty() { + iced::exit() + } else { + Task::none() + } + } + _ => Task::none(), + }, }, Message::Event(window, event) => { // Events only enabled for main window @@ -764,7 +856,7 @@ impl Halloy { } window::Event::CloseRequested => { if let Screen::Dashboard(dashboard) = &mut self.screen { - return dashboard.exit().then(|_| iced::exit()); + return dashboard.exit().map(Message::Dashboard); } else { return iced::exit(); } @@ -808,6 +900,7 @@ impl Halloy { Screen::Help(help) => help.view().map(Message::Help), Screen::Welcome(welcome) => welcome.view().map(Message::Welcome), Screen::Migration(migration) => migration.view().map(Message::Migration), + Screen::Exit { .. } => column![].into(), }; let content = container(screen) diff --git a/src/screen/dashboard.rs b/src/screen/dashboard.rs index 2cee0f1d..c80c4fb2 100644 --- a/src/screen/dashboard.rs +++ b/src/screen/dashboard.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Utc}; use data::environment::RELEASE_WEBSITE; +use data::history::ReadMarker; use std::collections::HashMap; use std::path::PathBuf; use std::slice; @@ -9,7 +10,7 @@ use data::config; use data::file_transfer; use data::history::manager::Broadcast; use data::user::Nick; -use data::{client, environment, history, Config, Server, User, Version}; +use data::{client, environment, history, Config, Server, Version}; use iced::widget::pane_grid::{self, PaneGrid}; use iced::widget::{column, container, row, Space}; use iced::{clipboard, Length, Task, Vector}; @@ -51,7 +52,6 @@ pub enum Message { SelectedText(Vec<(f32, String)>), History(history::manager::Message), DashboardSaved(Result<(), data::dashboard::Error>), - CloseHistory, Task(command_bar::Message), Shortcut(shortcut::Command), FileTransfer(file_transfer::task::Update), @@ -66,6 +66,7 @@ pub enum Event { ConfigReloaded(Result), ReloadThemes, QuitServer(Server), + Exit, } impl Dashboard { @@ -154,12 +155,12 @@ impl Dashboard { config, ); - let command = command.map(move |message| { + let task = command.map(move |message| { Message::Pane(window, pane::Message::Buffer(id, message)) }); let Some(event) = event else { - return (command, None); + return (task, None); }; match event { @@ -170,11 +171,11 @@ impl Dashboard { mode, ) => { let Some(buffer) = pane.buffer.data() else { - return (command, None); + return (task, None); }; let Some(target) = buffer.target() else { - return (command, None); + return (task, None); }; let command = data::Command::Mode( @@ -226,12 +227,23 @@ impl Dashboard { if let Some(messages) = input.messages(user, channel_users) { + let mut tasks = vec![task]; + for message in messages { - self.history.record_message( - input.server(), - message, - ); + if let Some(task) = + self.history.record_message( + input.server(), + message, + ) + { + tasks.push(Task::perform( + task, + Message::History, + )); + } } + + return (Task::batch(tasks), None); } } } @@ -244,7 +256,7 @@ impl Dashboard { ); return ( Task::batch(vec![ - command, + task, self.open_buffer( buffer, config.buffer.clone().into(), @@ -259,12 +271,12 @@ impl Dashboard { let Some((_, pane, history)) = self.get_focused_with_history_mut(main_window) else { - return (command, None); + return (task, None); }; return ( Task::batch(vec![ - command, + task, pane.buffer .insert_user_to_input(nick, history) .map(move |message| { @@ -285,7 +297,7 @@ impl Dashboard { return ( Task::batch(vec![ - command, + task, Task::perform( async move { rfd::AsyncFileDialog::new() @@ -319,7 +331,7 @@ impl Dashboard { { return ( Task::batch(vec![ - command, + task, self.open_channel( server, channel, @@ -332,9 +344,15 @@ impl Dashboard { ); } } + buffer::Event::History(history_task) => { + return ( + Task::batch(vec![task, history_task.map(Message::History)]), + None, + ) + } } - return (command, None); + return (task, None); } } pane::Message::ToggleShowUserList => { @@ -494,7 +512,25 @@ impl Dashboard { } } Message::History(message) => { - self.history.update(message); + if let Some(event) = self.history.update(message) { + match event { + history::manager::Event::Closed(server, kind, read_marker) => { + if let Some((target, read_marker)) = kind.target().zip(read_marker) { + clients.send_markread(&server, target, read_marker); + } + } + history::manager::Event::Exited(results) => { + for (server, kind, read_marker) in results { + if let Some((target, read_marker)) = kind.target().zip(read_marker) + { + clients.send_markread(&server, target, read_marker); + } + } + + return (Task::none(), Some(Event::Exit)); + } + } + } } Message::DashboardSaved(Ok(_)) => { log::info!("dashboard saved"); @@ -502,7 +538,6 @@ impl Dashboard { Message::DashboardSaved(Err(error)) => { log::warn!("error saving dashboard: {error}"); } - Message::CloseHistory => {} Message::Task(message) => { let Some(command_bar) = &mut self.command_bar else { return (Task::none(), None); @@ -1153,7 +1188,7 @@ impl Dashboard { tasks.push( self.history .close(server, history::Kind::Channel(channel)) - .map(|task| Task::perform(task, |_| Message::CloseHistory)) + .map(|task| Task::perform(task, Message::History)) .unwrap_or_else(Task::none), ); @@ -1163,7 +1198,7 @@ impl Dashboard { tasks.push( self.history .close(server, history::Kind::Query(nick)) - .map(|task| Task::perform(task, |_| Message::CloseHistory)) + .map(|task| Task::perform(task, Message::History)) .unwrap_or_else(Task::none), ); @@ -1173,154 +1208,48 @@ impl Dashboard { } } - pub fn record_message(&mut self, server: &Server, message: data::Message) { - self.history.record_message(server, message); - } - - pub fn broadcast_quit( - &mut self, - server: &Server, - user: User, - comment: Option, - user_channels: Vec, - config: &Config, - sent_time: DateTime, - ) { - self.history.broadcast( - server, - Broadcast::Quit { - user, - comment, - user_channels, - }, - config, - sent_time, - ); - } - - pub fn broadcast_nickname( - &mut self, - server: &Server, - old_nick: Nick, - new_nick: Nick, - ourself: bool, - user_channels: Vec, - config: &Config, - sent_time: DateTime, - ) { - self.history.broadcast( - server, - Broadcast::Nickname { - new_nick, - old_nick, - ourself, - user_channels, - }, - config, - sent_time, - ); - } - - pub fn broadcast_invite( - &mut self, - server: &Server, - inviter: Nick, - channel: String, - user_channels: Vec, - config: &Config, - sent_time: DateTime, - ) { - self.history.broadcast( - server, - Broadcast::Invite { - inviter, - channel, - user_channels, - }, - config, - sent_time, - ); - } - - pub fn broadcast_change_host( - &mut self, - server: &Server, - old_user: User, - new_username: String, - new_hostname: String, - ourself: bool, - user_channels: Vec, - config: &Config, - sent_time: DateTime, - ) { - self.history.broadcast( - server, - Broadcast::ChangeHost { - old_user, - new_username, - new_hostname, - ourself, - user_channels, - }, - config, - sent_time, - ); - } - - pub fn broadcast_connecting( - &mut self, - server: &Server, - config: &Config, - sent_time: DateTime, - ) { - self.history - .broadcast(server, Broadcast::Connecting, config, sent_time); - } - - pub fn broadcast_connected( - &mut self, - server: &Server, - config: &Config, - sent_time: DateTime, - ) { - self.history - .broadcast(server, Broadcast::Connected, config, sent_time); + pub fn record_message(&mut self, server: &Server, message: data::Message) -> Task { + if let Some(task) = self.history.record_message(server, message) { + Task::perform(task, Message::History) + } else { + Task::none() + } } - pub fn broadcast_disconnected( + pub fn broadcast( &mut self, server: &Server, - error: Option, config: &Config, sent_time: DateTime, - ) { - self.history - .broadcast(server, Broadcast::Disconnected { error }, config, sent_time); + broadcast: Broadcast, + ) -> Task { + Task::batch( + self.history + .broadcast(server, broadcast, config, sent_time) + .into_iter() + .map(|task| Task::perform(task, Message::History)), + ) } - pub fn broadcast_reconnected( + pub fn update_read_marker( &mut self, - server: &Server, - config: &Config, - sent_time: DateTime, - ) { - self.history - .broadcast(server, Broadcast::Reconnected, config, sent_time); + server: Server, + kind: impl Into + 'static, + read_marker: ReadMarker, + ) -> Task { + if let Some(task) = self.history.update_read_marker(server, kind, read_marker) { + Task::perform(task, Message::History) + } else { + Task::none() + } } - pub fn broadcast_connection_failed( - &mut self, - server: &Server, - error: String, - config: &Config, - sent_time: DateTime, - ) { - self.history.broadcast( - server, - Broadcast::ConnectionFailed { error }, - config, - sent_time, - ); + pub fn channel_joined(&mut self, server: Server, channel: String) -> Task { + if let Some(task) = self.history.channel_joined(server, channel) { + Task::perform(task, Message::History) + } else { + Task::none() + } } fn get_focused_mut( @@ -1615,32 +1544,36 @@ impl Dashboard { server: &Server, event: file_transfer::manager::Event, ) -> Task { + let mut tasks = vec![]; + match event { file_transfer::manager::Event::NewTransfer(transfer, task) => { match transfer.direction { file_transfer::Direction::Received => { - self.record_message( + tasks.push(self.record_message( server, data::Message::file_transfer_request_received( &transfer.remote_user, &transfer.filename, ), - ); + )); } file_transfer::Direction::Sent => { - self.record_message( + tasks.push(self.record_message( server, data::Message::file_transfer_request_sent( &transfer.remote_user, &transfer.filename, ), - ); + )); } } - Task::run(task, Message::FileTransfer) + tasks.push(Task::run(task, Message::FileTransfer)); } } + + Task::batch(tasks) } fn from_data( @@ -1768,27 +1701,28 @@ impl Dashboard { } } - pub fn exit(&mut self) -> Task<()> { - let history = self.history.close_all(); - let last_changed = self.last_changed; + pub fn exit(&mut self) -> Task { + let history = self.history.exit(); + let last_changed = self.last_changed.take(); let dashboard = data::Dashboard::from(&*self); - let task = async move { - history.await; - - if last_changed.is_some() { - match dashboard.save().await { - Ok(_) => { - log::info!("dashboard saved"); - } - Err(error) => { - log::warn!("error saving dashboard: {error}"); + Task::perform( + async move { + if last_changed.is_some() { + match dashboard.save().await { + Ok(_) => { + log::info!("dashboard saved"); + } + Err(error) => { + log::warn!("error saving dashboard: {error}"); + } } } - } - }; - Task::perform(task, move |_| ()) + history.await + }, + Message::History, + ) } fn open_popout_window(&mut self, main_window: &Window, pane: Pane) -> Task {