Skip to content

Commit

Permalink
dev(compiler): sanity unwraps
Browse files Browse the repository at this point in the history
  • Loading branch information
Myriad-Dreamin committed Sep 28, 2023
1 parent c198de6 commit 667b32e
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 51 deletions.
103 changes: 69 additions & 34 deletions compiler/src/service/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use crate::{
ShadowApi,
};
use typst_ts_core::{
error::prelude::ZResult, vector::span_id_from_u64, TypstDocument, TypstFileId,
error::prelude::{map_string_err, ZResult},
vector::span_id_from_u64,
TypstDocument, TypstFileId,
};

use super::{Compiler, DiagObserver, WorkspaceProvider, WorldExporter};
Expand Down Expand Up @@ -162,12 +164,14 @@ where

// Wrap sender to send compiler response.
let compiler_ack = move |res: CompilerResponse| match res {
CompilerResponse::Notify(msg) => dep_tx.send(msg).unwrap(),
CompilerResponse::Notify(msg) => {
log_send_error("compile_deps", dep_tx.send(msg));
}
};

// Spawn file system watcher.
tokio::spawn(super::watch_deps(dep_rx, move |event| {
fs_tx.send(event).unwrap();
log_send_error("fs_event", fs_tx.send(event));
}));

// Spawn compiler thread.
Expand Down Expand Up @@ -306,19 +310,8 @@ where
// Handle file system event if any.
if let Some(mut event) = event {
// Handle delayed upstream update event before applying file system changes
if let FilesystemEvent::UpstreamUpdate { upstream_event, .. } = &mut event {
let event = upstream_event.take().unwrap().opaque;
let TaggedMemoryEvent {
logical_tick,
event,
} = *event.downcast().unwrap();

// Recovery from dirty shadow state.
if logical_tick == self.dirty_shadow_logical_tick {
self.dirty_shadow_logical_tick = 0;
}

self.apply_memory_changes(event);
if self.apply_delayed_memory_changes(&mut event).is_none() {
log::warn!("CompileActor: unknown upstream update event");
}

// Apply file system changes.
Expand All @@ -331,6 +324,27 @@ where
}
}

/// Apply delayed memory changes to underlying compiler.
fn apply_delayed_memory_changes(&mut self, event: &mut FilesystemEvent) -> Option<()> {
// Handle delayed upstream update event before applying file system changes
if let FilesystemEvent::UpstreamUpdate { upstream_event, .. } = event {
let event = upstream_event.take()?.opaque;
let TaggedMemoryEvent {
logical_tick,
event,
} = *event.downcast().ok()?;

// Recovery from dirty shadow state.
if logical_tick == self.dirty_shadow_logical_tick {
self.dirty_shadow_logical_tick = 0;
}

self.apply_memory_changes(event);
}

Some(())
}

/// Apply memory changes to underlying compiler.
fn apply_memory_changes(&mut self, event: MemoryEvent) {
if matches!(event, MemoryEvent::Sync(..)) {
Expand All @@ -342,7 +356,19 @@ where
let _ = self.compiler.unmap_shadow(&removes);
}
for (p, t) in event.inserts {
let _ = self.compiler.map_shadow(&p, t.content().cloned().unwrap());
let insert_file = match t.content().cloned() {
Ok(content) => content,
Err(err) => {
log::error!(
"CompileActor: read memory file at {}: {}",
p.display(),
err,
);
continue;
}
};

let _ = self.compiler.map_shadow(&p, insert_file);
}
}
}
Expand Down Expand Up @@ -383,26 +409,30 @@ impl<Ctx> CompileClient<Ctx> {
fn steal_inner<Ret: Send + 'static>(
&mut self,
f: impl FnOnce(&mut Ctx) -> Ret + Send + 'static,
) -> oneshot::Receiver<Ret> {
) -> ZResult<oneshot::Receiver<Ret>> {
let (tx, rx) = oneshot::channel();

let task = Box::new(move |this: &mut Ctx| {
if tx.send(f(this)).is_err() {
// Receiver was dropped. The main thread may have exited, or the request may
// have been cancelled.
log::warn!("could not send back return value from Typst thread");
}
});

self.steal_send
.send(Box::new(move |this: &mut Ctx| {
if tx.send(f(this)).is_err() {
// Receiver was dropped. The main thread may have exited, or the request may
// have been cancelled.
log::warn!("could not send back return value from Typst thread");
}
}))
.unwrap();
rx
.send(task)
.map_err(map_string_err("failed to send to steal"))?;
Ok(rx)
}

pub fn steal<Ret: Send + 'static>(
&mut self,
f: impl FnOnce(&mut Ctx) -> Ret + Send + 'static,
) -> ZResult<Ret> {
Ok(self.steal_inner(f).blocking_recv().unwrap())
self.steal_inner(f)?
.blocking_recv()
.map_err(map_string_err("failed to recv from steal"))
}

/// Steal the compiler thread and run the given function.
Expand All @@ -412,14 +442,13 @@ impl<Ctx> CompileClient<Ctx> {
) -> ZResult<Ret> {
// get current async handle
let handle = tokio::runtime::Handle::current();
Ok(self
.steal_inner(move |this: &mut Ctx| f(this, handle.clone()))
self.steal_inner(move |this: &mut Ctx| f(this, handle.clone()))?
.await
.unwrap())
.map_err(map_string_err("failed to call steal_async"))
}

pub fn add_memory_changes(&self, event: MemoryEvent) {
self.memory_send.send(event).unwrap();
log_send_error("mem_event", self.memory_send.send(event));
}
}

Expand Down Expand Up @@ -514,7 +543,7 @@ pub fn jump_from_cursor(frames: &[Frame], source: &Source, cursor: usize) -> Opt
let t_dis = min_dis;
if let Some(pos) = find_in_frame(frame, span, &mut min_dis, &mut p) {
return Some(Position {
page: NonZeroUsize::new(i + 1).unwrap(),
page: NonZeroUsize::new(i + 1)?,
point: pos,
});
}
Expand All @@ -528,7 +557,7 @@ pub fn jump_from_cursor(frames: &[Frame], source: &Source, cursor: usize) -> Opt
}

Some(Position {
page: NonZeroUsize::new(ppage + 1).unwrap(),
page: NonZeroUsize::new(ppage + 1)?,
point: p,
})
}
Expand Down Expand Up @@ -562,3 +591,9 @@ fn find_in_frame(frame: &Frame, span: Span, min_dis: &mut u64, p: &mut Point) ->

None
}

#[inline]
fn log_send_error<T>(chan: &'static str, res: Result<(), mpsc::error::SendError<T>>) -> bool {
res.map_err(|err| log::warn!("CompileActor: send to {chan} error: {err}"))
.is_ok()
}
38 changes: 21 additions & 17 deletions compiler/src/service/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl NotifyActor {

/// Send a filesystem event to remove.
fn send(&mut self, msg: FilesystemEvent) {
self.sender.send(msg).unwrap();
log_send_error("fs_event", self.sender.send(msg));
}

/// Get the notify event from the watcher.
Expand Down Expand Up @@ -392,13 +392,12 @@ impl NotifyActor {
payload: file.clone(),
};
entry.prev = Some(file);
self.undetermined_send
.send(UndeterminedNotifyEvent {
at_realtime: instant::Instant::now(),
at_logical_tick: self.logical_tick,
path: path.clone(),
})
.unwrap();
let event = UndeterminedNotifyEvent {
at_realtime: instant::Instant::now(),
at_logical_tick: self.logical_tick,
path: path.clone(),
};
log_send_error("recheck", self.undetermined_send.send(event));
return None;
}
// Otherwise, we push the error to the consumer.
Expand Down Expand Up @@ -430,13 +429,12 @@ impl NotifyActor {
payload: file.clone(),
};
entry.prev = Some(file);
self.undetermined_send
.send(UndeterminedNotifyEvent {
at_realtime: instant::Instant::now(),
at_logical_tick: self.logical_tick,
path,
})
.unwrap();
let event = UndeterminedNotifyEvent {
at_realtime: instant::Instant::now(),
at_logical_tick: self.logical_tick,
path,
};
log_send_error("recheck", self.undetermined_send.send(event));
return None;
}
}
Expand Down Expand Up @@ -490,7 +488,7 @@ impl NotifyActor {
let send = self.undetermined_send.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50) - reserved).await;
send.send(event).unwrap();
log_send_error("reschedule", send.send(event));
});
return None;
}
Expand Down Expand Up @@ -525,10 +523,16 @@ impl NotifyActor {

#[inline]
fn log_notify_error<T>(res: notify::Result<T>, reason: &'static str) -> Option<T> {
res.map_err(|err| log::warn!("{reason}: notify error: {}", err))
res.map_err(|err| log::warn!("{reason}: notify error: {err}"))
.ok()
}

#[inline]
fn log_send_error<T>(chan: &'static str, res: Result<(), mpsc::error::SendError<T>>) -> bool {
res.map_err(|err| log::warn!("NotifyActor: send to {chan} error: {err}"))
.is_ok()
}

pub async fn watch_deps(
inbox: mpsc::UnboundedReceiver<NotifyMessage>,
mut interrupted_by_events: impl FnMut(Option<FilesystemEvent>),
Expand Down

0 comments on commit 667b32e

Please sign in to comment.