| //! An implementation of `loader::Handle`, based on `walkdir` and `notify`. |
| //! |
| //! The file watching bits here are untested and quite probably buggy. For this |
| //! reason, by default we don't watch files and rely on editor's file watching |
| //! capabilities. |
| //! |
| //! Hopefully, one day a reliable file watching/walking crate appears on |
| //! crates.io, and we can reduce this to trivial glue code. |
| |
| use std::{ |
| fs, |
| path::{Component, Path}, |
| sync::atomic::AtomicUsize, |
| }; |
| |
| use crossbeam_channel::{Receiver, Sender, select, unbounded}; |
| use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; |
| use paths::{AbsPath, AbsPathBuf, Utf8PathBuf}; |
| use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator}; |
| use rustc_hash::FxHashSet; |
| use vfs::loader::{self, LoadingProgress}; |
| use walkdir::WalkDir; |
| |
| #[derive(Debug)] |
| pub struct NotifyHandle { |
| // Relative order of fields below is significant. |
| sender: Sender<Message>, |
| _thread: stdx::thread::JoinHandle, |
| } |
| |
| #[derive(Debug)] |
| enum Message { |
| Config(loader::Config), |
| Invalidate(AbsPathBuf), |
| } |
| |
| impl loader::Handle for NotifyHandle { |
| fn spawn(sender: loader::Sender) -> NotifyHandle { |
| let actor = NotifyActor::new(sender); |
| let (sender, receiver) = unbounded::<Message>(); |
| let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker, "VfsLoader") |
| .spawn(move || actor.run(receiver)) |
| .expect("failed to spawn thread"); |
| NotifyHandle { sender, _thread: thread } |
| } |
| |
| fn set_config(&mut self, config: loader::Config) { |
| self.sender.send(Message::Config(config)).unwrap(); |
| } |
| |
| fn invalidate(&mut self, path: AbsPathBuf) { |
| self.sender.send(Message::Invalidate(path)).unwrap(); |
| } |
| |
| fn load_sync(&mut self, path: &AbsPath) -> Option<Vec<u8>> { |
| read(path) |
| } |
| } |
| |
| type NotifyEvent = notify::Result<notify::Event>; |
| |
| struct NotifyActor { |
| sender: loader::Sender, |
| watched_file_entries: FxHashSet<AbsPathBuf>, |
| watched_dir_entries: Vec<loader::Directories>, |
| // Drop order is significant. |
| watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>, |
| } |
| |
| #[derive(Debug)] |
| enum Event { |
| Message(Message), |
| NotifyEvent(NotifyEvent), |
| } |
| |
| impl NotifyActor { |
| fn new(sender: loader::Sender) -> NotifyActor { |
| NotifyActor { |
| sender, |
| watched_dir_entries: Vec::new(), |
| watched_file_entries: FxHashSet::default(), |
| watcher: None, |
| } |
| } |
| |
| fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> { |
| let Some((_, watcher_receiver)) = &self.watcher else { |
| return receiver.recv().ok().map(Event::Message); |
| }; |
| |
| select! { |
| recv(receiver) -> it => it.ok().map(Event::Message), |
| recv(watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())), |
| } |
| } |
| |
| fn run(mut self, inbox: Receiver<Message>) { |
| while let Some(event) = self.next_event(&inbox) { |
| tracing::debug!(?event, "vfs-notify event"); |
| match event { |
| Event::Message(msg) => match msg { |
| Message::Config(config) => { |
| self.watcher = None; |
| if !config.watch.is_empty() { |
| let (watcher_sender, watcher_receiver) = unbounded(); |
| let watcher = log_notify_error(RecommendedWatcher::new( |
| move |event| { |
| // we don't care about the error. If sending fails that usually |
| // means we were dropped, so unwrapping will just add to the |
| // panic noise. |
| _ = watcher_sender.send(event); |
| }, |
| Config::default(), |
| )); |
| self.watcher = watcher.map(|it| (it, watcher_receiver)); |
| } |
| |
| let config_version = config.version; |
| |
| let n_total = config.load.len(); |
| self.watched_dir_entries.clear(); |
| self.watched_file_entries.clear(); |
| |
| self.send(loader::Message::Progress { |
| n_total, |
| n_done: LoadingProgress::Started, |
| config_version, |
| dir: None, |
| }); |
| |
| let (entry_tx, entry_rx) = unbounded(); |
| let (watch_tx, watch_rx) = unbounded(); |
| let processed = AtomicUsize::new(0); |
| |
| config.load.into_par_iter().enumerate().for_each(|(i, entry)| { |
| let do_watch = config.watch.contains(&i); |
| if do_watch { |
| _ = entry_tx.send(entry.clone()); |
| } |
| let files = Self::load_entry( |
| |f| _ = watch_tx.send(f.to_owned()), |
| entry, |
| do_watch, |
| |file| { |
| self.send(loader::Message::Progress { |
| n_total, |
| n_done: LoadingProgress::Progress( |
| processed.load(std::sync::atomic::Ordering::Relaxed), |
| ), |
| dir: Some(file), |
| config_version, |
| }); |
| }, |
| ); |
| self.send(loader::Message::Loaded { files }); |
| self.send(loader::Message::Progress { |
| n_total, |
| n_done: LoadingProgress::Progress( |
| processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1, |
| ), |
| config_version, |
| dir: None, |
| }); |
| }); |
| |
| drop(watch_tx); |
| for path in watch_rx { |
| self.watch(&path); |
| } |
| |
| drop(entry_tx); |
| for entry in entry_rx { |
| match entry { |
| loader::Entry::Files(files) => { |
| self.watched_file_entries.extend(files) |
| } |
| loader::Entry::Directories(dir) => { |
| self.watched_dir_entries.push(dir) |
| } |
| } |
| } |
| |
| self.send(loader::Message::Progress { |
| n_total, |
| n_done: LoadingProgress::Finished, |
| config_version, |
| dir: None, |
| }); |
| } |
| Message::Invalidate(path) => { |
| let contents = read(path.as_path()); |
| let files = vec![(path, contents)]; |
| self.send(loader::Message::Changed { files }); |
| } |
| }, |
| Event::NotifyEvent(event) => { |
| if let Some(event) = log_notify_error(event) |
| && let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) = |
| event.kind |
| { |
| let files = event |
| .paths |
| .into_iter() |
| .filter_map(|path| { |
| Some( |
| AbsPathBuf::try_from(Utf8PathBuf::from_path_buf(path).ok()?) |
| .expect("path is absolute"), |
| ) |
| }) |
| .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> { |
| let meta = fs::metadata(&path).ok()?; |
| if meta.file_type().is_dir() |
| && self |
| .watched_dir_entries |
| .iter() |
| .any(|dir| dir.contains_dir(&path)) |
| { |
| self.watch(path.as_ref()); |
| return None; |
| } |
| |
| if !meta.file_type().is_file() { |
| return None; |
| } |
| |
| if !(self.watched_file_entries.contains(&path) |
| || self |
| .watched_dir_entries |
| .iter() |
| .any(|dir| dir.contains_file(&path))) |
| { |
| return None; |
| } |
| |
| let contents = read(&path); |
| Some((path, contents)) |
| }) |
| .collect(); |
| self.send(loader::Message::Changed { files }); |
| } |
| } |
| } |
| } |
| } |
| |
| fn load_entry( |
| mut watch: impl FnMut(&Path), |
| entry: loader::Entry, |
| do_watch: bool, |
| send_message: impl Fn(AbsPathBuf), |
| ) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> { |
| match entry { |
| loader::Entry::Files(files) => files |
| .into_iter() |
| .map(|file| { |
| if do_watch { |
| watch(file.as_ref()); |
| } |
| let contents = read(file.as_path()); |
| (file, contents) |
| }) |
| .collect::<Vec<_>>(), |
| loader::Entry::Directories(dirs) => { |
| let mut res = Vec::new(); |
| |
| for root in &dirs.include { |
| send_message(root.clone()); |
| let walkdir = |
| WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| { |
| if !entry.file_type().is_dir() { |
| return true; |
| } |
| let path = entry.path(); |
| |
| if path_might_be_cyclic(path) { |
| return false; |
| } |
| |
| // We want to filter out subdirectories that are roots themselves, because they will be visited separately. |
| dirs.exclude.iter().all(|it| it != path) |
| && (root == path || dirs.include.iter().all(|it| it != path)) |
| }); |
| |
| let files = walkdir.filter_map(|it| it.ok()).filter_map(|entry| { |
| let depth = entry.depth(); |
| let is_dir = entry.file_type().is_dir(); |
| let is_file = entry.file_type().is_file(); |
| let abs_path = AbsPathBuf::try_from( |
| Utf8PathBuf::from_path_buf(entry.into_path()).ok()?, |
| ) |
| .ok()?; |
| if depth < 2 && is_dir { |
| send_message(abs_path.clone()); |
| } |
| if is_dir && do_watch { |
| watch(abs_path.as_ref()); |
| } |
| if !is_file { |
| return None; |
| } |
| let ext = abs_path.extension().unwrap_or_default(); |
| if dirs.extensions.iter().all(|it| it.as_str() != ext) { |
| return None; |
| } |
| Some(abs_path) |
| }); |
| |
| res.extend(files.map(|file| { |
| let contents = read(file.as_path()); |
| (file, contents) |
| })); |
| } |
| res |
| } |
| } |
| } |
| |
| fn watch(&mut self, path: &Path) { |
| if let Some((watcher, _)) = &mut self.watcher { |
| log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive)); |
| } |
| } |
| |
| #[track_caller] |
| fn send(&self, msg: loader::Message) { |
| self.sender.send(msg).unwrap(); |
| } |
| } |
| |
| fn read(path: &AbsPath) -> Option<Vec<u8>> { |
| std::fs::read(path).ok() |
| } |
| |
| fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> { |
| res.map_err(|err| tracing::warn!("notify error: {}", err)).ok() |
| } |
| |
| /// Is `path` a symlink to a parent directory? |
| /// |
| /// Including this path is guaranteed to cause an infinite loop. This |
| /// heuristic is not sufficient to catch all symlink cycles (it's |
| /// possible to construct cycle using two or more symlinks), but it |
| /// catches common cases. |
| fn path_might_be_cyclic(path: &Path) -> bool { |
| let Ok(destination) = std::fs::read_link(path) else { |
| return false; |
| }; |
| |
| // If the symlink is of the form "../..", it's a parent symlink. |
| let is_relative_parent = |
| destination.components().all(|c| matches!(c, Component::CurDir | Component::ParentDir)); |
| |
| is_relative_parent || path.starts_with(destination) |
| } |