From 667b32edae17e2aef4ad4bbc0c19f51b9b29fafb Mon Sep 17 00:00:00 2001 From: Myriad-Dreamin Date: Thu, 28 Sep 2023 20:06:03 +0800 Subject: [PATCH] dev(compiler): sanity unwraps --- compiler/src/service/compile.rs | 103 +++++++++++++++++++++----------- compiler/src/service/watch.rs | 38 ++++++------ 2 files changed, 90 insertions(+), 51 deletions(-) diff --git a/compiler/src/service/compile.rs b/compiler/src/service/compile.rs index a246b372..62ee23ab 100644 --- a/compiler/src/service/compile.rs +++ b/compiler/src/service/compile.rs @@ -22,7 +22,9 @@ use crate::{ ShadowApi, }; use typst_ts_core::{ - error::prelude::ZResult, vector::span_id_from_u64, TypstDocument, TypstFileId, + error::prelude::{map_string_err, ZResult}, + vector::span_id_from_u64, + TypstDocument, TypstFileId, }; use super::{Compiler, DiagObserver, WorkspaceProvider, WorldExporter}; @@ -162,12 +164,14 @@ where // Wrap sender to send compiler response. let compiler_ack = move |res: CompilerResponse| match res { - CompilerResponse::Notify(msg) => dep_tx.send(msg).unwrap(), + CompilerResponse::Notify(msg) => { + log_send_error("compile_deps", dep_tx.send(msg)); + } }; // Spawn file system watcher. tokio::spawn(super::watch_deps(dep_rx, move |event| { - fs_tx.send(event).unwrap(); + log_send_error("fs_event", fs_tx.send(event)); })); // Spawn compiler thread. @@ -306,19 +310,8 @@ where // Handle file system event if any. if let Some(mut event) = event { // Handle delayed upstream update event before applying file system changes - if let FilesystemEvent::UpstreamUpdate { upstream_event, .. } = &mut event { - let event = upstream_event.take().unwrap().opaque; - let TaggedMemoryEvent { - logical_tick, - event, - } = *event.downcast().unwrap(); - - // Recovery from dirty shadow state. - if logical_tick == self.dirty_shadow_logical_tick { - self.dirty_shadow_logical_tick = 0; - } - - self.apply_memory_changes(event); + if self.apply_delayed_memory_changes(&mut event).is_none() { + log::warn!("CompileActor: unknown upstream update event"); } // Apply file system changes. @@ -331,6 +324,27 @@ where } } + /// Apply delayed memory changes to underlying compiler. + fn apply_delayed_memory_changes(&mut self, event: &mut FilesystemEvent) -> Option<()> { + // Handle delayed upstream update event before applying file system changes + if let FilesystemEvent::UpstreamUpdate { upstream_event, .. } = event { + let event = upstream_event.take()?.opaque; + let TaggedMemoryEvent { + logical_tick, + event, + } = *event.downcast().ok()?; + + // Recovery from dirty shadow state. + if logical_tick == self.dirty_shadow_logical_tick { + self.dirty_shadow_logical_tick = 0; + } + + self.apply_memory_changes(event); + } + + Some(()) + } + /// Apply memory changes to underlying compiler. fn apply_memory_changes(&mut self, event: MemoryEvent) { if matches!(event, MemoryEvent::Sync(..)) { @@ -342,7 +356,19 @@ where let _ = self.compiler.unmap_shadow(&removes); } for (p, t) in event.inserts { - let _ = self.compiler.map_shadow(&p, t.content().cloned().unwrap()); + let insert_file = match t.content().cloned() { + Ok(content) => content, + Err(err) => { + log::error!( + "CompileActor: read memory file at {}: {}", + p.display(), + err, + ); + continue; + } + }; + + let _ = self.compiler.map_shadow(&p, insert_file); } } } @@ -383,26 +409,30 @@ impl CompileClient { fn steal_inner( &mut self, f: impl FnOnce(&mut Ctx) -> Ret + Send + 'static, - ) -> oneshot::Receiver { + ) -> ZResult> { let (tx, rx) = oneshot::channel(); + let task = Box::new(move |this: &mut Ctx| { + if tx.send(f(this)).is_err() { + // Receiver was dropped. The main thread may have exited, or the request may + // have been cancelled. + log::warn!("could not send back return value from Typst thread"); + } + }); + self.steal_send - .send(Box::new(move |this: &mut Ctx| { - if tx.send(f(this)).is_err() { - // Receiver was dropped. The main thread may have exited, or the request may - // have been cancelled. - log::warn!("could not send back return value from Typst thread"); - } - })) - .unwrap(); - rx + .send(task) + .map_err(map_string_err("failed to send to steal"))?; + Ok(rx) } pub fn steal( &mut self, f: impl FnOnce(&mut Ctx) -> Ret + Send + 'static, ) -> ZResult { - Ok(self.steal_inner(f).blocking_recv().unwrap()) + self.steal_inner(f)? + .blocking_recv() + .map_err(map_string_err("failed to recv from steal")) } /// Steal the compiler thread and run the given function. @@ -412,14 +442,13 @@ impl CompileClient { ) -> ZResult { // get current async handle let handle = tokio::runtime::Handle::current(); - Ok(self - .steal_inner(move |this: &mut Ctx| f(this, handle.clone())) + self.steal_inner(move |this: &mut Ctx| f(this, handle.clone()))? .await - .unwrap()) + .map_err(map_string_err("failed to call steal_async")) } pub fn add_memory_changes(&self, event: MemoryEvent) { - self.memory_send.send(event).unwrap(); + log_send_error("mem_event", self.memory_send.send(event)); } } @@ -514,7 +543,7 @@ pub fn jump_from_cursor(frames: &[Frame], source: &Source, cursor: usize) -> Opt let t_dis = min_dis; if let Some(pos) = find_in_frame(frame, span, &mut min_dis, &mut p) { return Some(Position { - page: NonZeroUsize::new(i + 1).unwrap(), + page: NonZeroUsize::new(i + 1)?, point: pos, }); } @@ -528,7 +557,7 @@ pub fn jump_from_cursor(frames: &[Frame], source: &Source, cursor: usize) -> Opt } Some(Position { - page: NonZeroUsize::new(ppage + 1).unwrap(), + page: NonZeroUsize::new(ppage + 1)?, point: p, }) } @@ -562,3 +591,9 @@ fn find_in_frame(frame: &Frame, span: Span, min_dis: &mut u64, p: &mut Point) -> None } + +#[inline] +fn log_send_error(chan: &'static str, res: Result<(), mpsc::error::SendError>) -> bool { + res.map_err(|err| log::warn!("CompileActor: send to {chan} error: {err}")) + .is_ok() +} diff --git a/compiler/src/service/watch.rs b/compiler/src/service/watch.rs index 654a0299..d547c475 100644 --- a/compiler/src/service/watch.rs +++ b/compiler/src/service/watch.rs @@ -141,7 +141,7 @@ impl NotifyActor { /// Send a filesystem event to remove. fn send(&mut self, msg: FilesystemEvent) { - self.sender.send(msg).unwrap(); + log_send_error("fs_event", self.sender.send(msg)); } /// Get the notify event from the watcher. @@ -392,13 +392,12 @@ impl NotifyActor { payload: file.clone(), }; entry.prev = Some(file); - self.undetermined_send - .send(UndeterminedNotifyEvent { - at_realtime: instant::Instant::now(), - at_logical_tick: self.logical_tick, - path: path.clone(), - }) - .unwrap(); + let event = UndeterminedNotifyEvent { + at_realtime: instant::Instant::now(), + at_logical_tick: self.logical_tick, + path: path.clone(), + }; + log_send_error("recheck", self.undetermined_send.send(event)); return None; } // Otherwise, we push the error to the consumer. @@ -430,13 +429,12 @@ impl NotifyActor { payload: file.clone(), }; entry.prev = Some(file); - self.undetermined_send - .send(UndeterminedNotifyEvent { - at_realtime: instant::Instant::now(), - at_logical_tick: self.logical_tick, - path, - }) - .unwrap(); + let event = UndeterminedNotifyEvent { + at_realtime: instant::Instant::now(), + at_logical_tick: self.logical_tick, + path, + }; + log_send_error("recheck", self.undetermined_send.send(event)); return None; } } @@ -490,7 +488,7 @@ impl NotifyActor { let send = self.undetermined_send.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(50) - reserved).await; - send.send(event).unwrap(); + log_send_error("reschedule", send.send(event)); }); return None; } @@ -525,10 +523,16 @@ impl NotifyActor { #[inline] fn log_notify_error(res: notify::Result, reason: &'static str) -> Option { - res.map_err(|err| log::warn!("{reason}: notify error: {}", err)) + res.map_err(|err| log::warn!("{reason}: notify error: {err}")) .ok() } +#[inline] +fn log_send_error(chan: &'static str, res: Result<(), mpsc::error::SendError>) -> bool { + res.map_err(|err| log::warn!("NotifyActor: send to {chan} error: {err}")) + .is_ok() +} + pub async fn watch_deps( inbox: mpsc::UnboundedReceiver, mut interrupted_by_events: impl FnMut(Option),