Skip to content

Commit

Permalink
dev: synchronize memory change with NotifyActor
Browse files Browse the repository at this point in the history
  • Loading branch information
Myriad-Dreamin committed Sep 27, 2023
1 parent 89a7280 commit df93d1a
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 42 deletions.
14 changes: 12 additions & 2 deletions compiler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ pub mod service;
/// Run the compiler in the system environment.
#[cfg(feature = "system-compile")]
pub(crate) mod system;
use std::path::{Path, PathBuf};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

#[cfg(feature = "system-compile")]
pub use system::TypstSystemWorld;
Expand All @@ -72,8 +75,15 @@ pub trait ShadowApi {
unimplemented!()
}

/// Get the shadow files.
fn shadow_paths(&self) -> Vec<Arc<Path>>;

/// Reset the shadow files.
fn reset_shadow(&mut self);
fn reset_shadow(&mut self) {
for path in self.shadow_paths() {
self.unmap_shadow(&path).unwrap();
}
}

/// Add a shadow file to the driver.
fn map_shadow(&self, path: &Path, content: Bytes) -> FileResult<()>;
Expand Down
78 changes: 58 additions & 20 deletions compiler/src/service/compile.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::{num::NonZeroUsize, path::PathBuf, sync::Arc, thread::JoinHandle};
use std::{
collections::HashSet,
num::NonZeroUsize,
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
thread::JoinHandle,
};

use serde::Serialize;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -10,7 +17,7 @@ use typst::{
};

use crate::{
vfs::notify::{FileChangeSet, FilesystemEvent, NotifyMessage},
vfs::notify::{FilesystemEvent, MemoryEvent, NotifyMessage},
world::{CompilerFeat, CompilerWorld},
ShadowApi,
};
Expand All @@ -33,11 +40,6 @@ fn ensure_single_thread<F: std::future::Future<Output = ()> + Send + 'static>(
})
}

pub enum MemoryEvent {
Sync(FileChangeSet),
Update(FileChangeSet),
}

enum CompilerInterrupt<Ctx> {
/// Interrupted by file system event.
Fs(Option<FilesystemEvent>),
Expand All @@ -58,6 +60,7 @@ pub struct CompileActor<C: Compiler> {
pub root: PathBuf,
pub enable_watch: bool,

estimated_shadow_files: HashSet<Arc<Path>>,
latest_doc: Option<Arc<TypstDocument>>,

steal_send: mpsc::UnboundedSender<BorrowTask<Self>>,
Expand All @@ -84,6 +87,7 @@ where
root,
enable_watch: false,

estimated_shadow_files: Default::default(),
latest_doc: None,

steal_send,
Expand All @@ -98,7 +102,8 @@ where
use CompilerResponse::*;

// compile
self.compiler
self.latest_doc = self
.compiler
.with_stage_diag::<true, _>("compiling", |driver| driver.compile());

comemo::evict(30);
Expand All @@ -110,12 +115,34 @@ where
// tx
}

fn process(&mut self, event: CompilerInterrupt<Self>) -> bool {
fn process(&mut self, event: CompilerInterrupt<Self>, send: impl Fn(CompilerResponse)) -> bool {
use CompilerResponse::*;

match event {
CompilerInterrupt::Fs(event) => {
log::info!("CompileActor: fs event incoming {:?}", event);

if let Some(event) = event {
if let Some(mut event) = event {
if let FilesystemEvent::UpstreamUpdate { upstream_event, .. } = &mut event {
let event = upstream_event.take().unwrap().opaque;
let event = *event.downcast::<MemoryEvent>().unwrap();

if matches!(event, MemoryEvent::Sync(..)) {
self.compiler.reset_shadow();
}
match event {
MemoryEvent::Update(event) | MemoryEvent::Sync(event) => {
for removes in event.removes {
let _ = self.compiler.unmap_shadow(&removes);
}
for (p, t) in event.inserts {
let _ =
self.compiler.map_shadow(&p, t.content().cloned().unwrap());
}
}
}
}

self.compiler.notify_fs_event(event);
}

Expand All @@ -124,21 +151,32 @@ where
CompilerInterrupt::Memory(event) => {
log::info!("CompileActor: memory event incoming");

let mut files = HashSet::new();
if matches!(event, MemoryEvent::Sync(..)) {
self.compiler.reset_shadow();
files = self.estimated_shadow_files.clone();
self.estimated_shadow_files.clear();
}
match event {
MemoryEvent::Update(event) | MemoryEvent::Sync(event) => {
for removes in event.removes {
let _ = self.compiler.unmap_shadow(&removes);
match &event {
MemoryEvent::Sync(event) | MemoryEvent::Update(event) => {
for path in event.removes.iter().map(Deref::deref) {
self.estimated_shadow_files.remove(path);
files.insert(path.into());
}
for (p, t) in event.inserts {
let _ = self.compiler.map_shadow(&p, t.content().cloned().unwrap());
for path in event.inserts.iter().map(|e| e.0.deref()) {
self.estimated_shadow_files.insert(path.into());
files.remove(path);
}
}
}

true
send(Notify(NotifyMessage::UpstreamUpdate(
crate::vfs::notify::UpstreamUpdateEvent {
invalidates: files.iter().map(|e| e.as_ref().to_owned()).collect(),
opaque: Box::new(event),
},
)));

false
}
CompilerInterrupt::Task(task) => {
task(self);
Expand Down Expand Up @@ -173,7 +211,7 @@ where
Some(it) = self.steal_recv.recv() => Some(CompilerInterrupt::Task(it)),
} {
let mut need_recompile = false;
need_recompile = self.process(event) || need_recompile;
need_recompile = self.process(event, &compiler_ack) || need_recompile;
while let Some(event) = fs_rx
.try_recv()
.ok()
Expand All @@ -186,7 +224,7 @@ where
})
.or_else(|| self.steal_recv.try_recv().ok().map(CompilerInterrupt::Task))
{
need_recompile = self.process(event) || need_recompile;
need_recompile = self.process(event, &compiler_ack) || need_recompile;
}

if need_recompile {
Expand Down
14 changes: 13 additions & 1 deletion compiler/src/service/driver.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::path::{Path, PathBuf};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

use crate::{NotifyApi, ShadowApi};
use typst::{diag::SourceResult, syntax::VirtualPath, World};
Expand Down Expand Up @@ -104,18 +107,27 @@ impl<W: World + WorkspaceProvider + NotifyApi> Compiler for CompileDriverImpl<W>
}

impl<W: World + ShadowApi> ShadowApi for CompileDriverImpl<W> {
#[inline]
fn _shadow_map_id(&self, file_id: TypstFileId) -> typst::diag::FileResult<PathBuf> {
self.world._shadow_map_id(file_id)
}

#[inline]
fn shadow_paths(&self) -> Vec<Arc<Path>> {
self.world.shadow_paths()
}

#[inline]
fn reset_shadow(&mut self) {
self.world.reset_shadow()
}

#[inline]
fn map_shadow(&self, path: &Path, content: Bytes) -> typst::diag::FileResult<()> {
self.world.map_shadow(path, content)
}

#[inline]
fn unmap_shadow(&self, path: &Path) -> typst::diag::FileResult<()> {
self.world.unmap_shadow(path)
}
Expand Down
5 changes: 5 additions & 0 deletions compiler/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ where
self.inner()._shadow_map_id(_file_id)
}

#[inline]
fn shadow_paths(&self) -> Vec<Arc<Path>> {
self.inner().shadow_paths()
}

#[inline]
fn reset_shadow(&mut self) {
self.inner_mut().reset_shadow()
Expand Down
36 changes: 24 additions & 12 deletions compiler/src/service/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use typst::diag::{FileError, FileResult};
use typst_ts_core::Bytes;

use crate::vfs::{
notify::{FileChangeSet, FilesystemEvent, NotifyFile, NotifyMessage},
notify::{FileChangeSet, FilesystemEvent, NotifyFile, NotifyMessage, UpstreamUpdateEvent},
system::SystemAccessModel,
AccessModel,
};
Expand Down Expand Up @@ -139,7 +139,7 @@ impl NotifyActor {
}
}

/// Send a message to the actor.
/// Send a filesystem event to remove.
fn send(&mut self, msg: FilesystemEvent) {
self.sender.send(msg).unwrap();
}
Expand Down Expand Up @@ -185,13 +185,18 @@ impl NotifyActor {
// log::info!("vfs-notify event {event:?}");
// function entries to handle some event
match event {
ActorEvent::Message(NotifyMessage::UpstreamUpdate(event)) => {
self.invalidate_upstream(event);
}
ActorEvent::Message(NotifyMessage::SyncDependency(paths)) => {
self.update_watches(paths).await;
if let Some(changeset) = self.update_watches(&paths) {
self.send(FilesystemEvent::Update(changeset));
}
}
ActorEvent::NotifyEvent(event) => {
// log::info!("notify event {event:?}");
if let Some(event) = log_notify_error(event, "failed to notify") {
self.notify_event(event).await;
self.notify_event(event);
}
}
ActorEvent::ReCheck(event) => {
Expand All @@ -201,8 +206,17 @@ impl NotifyActor {
}
}

/// Update the watches of corresponding invalidation
fn invalidate_upstream(&mut self, event: UpstreamUpdateEvent) {
let changeset = self.update_watches(&event.invalidates).unwrap_or_default();
self.send(FilesystemEvent::UpstreamUpdate {
changeset,
upstream_event: Some(event),
});
}

/// Update the watches of corresponding files.
async fn update_watches(&mut self, paths: Vec<PathBuf>) {
fn update_watches(&mut self, paths: &[PathBuf]) -> Option<FileChangeSet> {
// Increase the lifetime per external message.
self.lifetime += 1;

Expand Down Expand Up @@ -247,7 +261,7 @@ impl NotifyActor {
//
// Also check whether the file is updated since there is a window
// between unwatch the file and watch the file again.
for path in paths.into_iter() {
for path in paths.iter() {
// Update or insert the entry with the new lifetime.
let entry = self
.watched_entries
Expand Down Expand Up @@ -288,9 +302,9 @@ impl NotifyActor {
} else {
let watched = self
.inner
.content(&path)
.content(path)
.map(|e| (meta.modified().unwrap(), e));
changeset.inserts.push((path, watched.into()));
changeset.inserts.push((path.clone(), watched.into()));
}
}

Expand All @@ -306,13 +320,11 @@ impl NotifyActor {
}
});

if !changeset.is_empty() {
self.send(FilesystemEvent::Update(changeset));
}
(!changeset.is_empty()).then_some(changeset)
}

/// Notify the event from the builtin watcher.
async fn notify_event(&mut self, event: notify::Event) {
fn notify_event(&mut self, event: notify::Event) {
// Account file updates.
let mut changeset = FileChangeSet::default();
for path in event.paths.into_iter() {
Expand Down
4 changes: 4 additions & 0 deletions compiler/src/vfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl<M: AccessModel + Sized> Vfs<M> {
self.access_model.inner().clear_shadow();
}

pub fn shadow_paths(&self) -> Vec<Arc<Path>> {
self.access_model.inner().file_paths()
}

/// Set the `do_reparse` flag.
pub fn set_do_reparse(&mut self, do_reparse: bool) {
self.do_reparse = do_reparse;
Expand Down
Loading

0 comments on commit df93d1a

Please sign in to comment.