| //! Watcher implementation for the inotify Linux API | |
| //! | |
| //! The inotify API provides a mechanism for monitoring filesystem events. Inotify can be used to | |
| //! monitor individual files, or to monitor directories. When a directory is monitored, inotify | |
| //! will return events for the directory itself, and for files inside the directory. | |
| use super::event::*; | |
| use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher}; | |
| use crate::{bounded, unbounded, BoundSender, Receiver, Sender}; | |
| use inotify as inotify_sys; | |
| use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask}; | |
| use std::collections::HashMap; | |
| use std::env; | |
| use std::ffi::OsStr; | |
| use std::fs::metadata; | |
| use std::os::unix::io::AsRawFd; | |
| use std::path::{Path, PathBuf}; | |
| use std::sync::Arc; | |
| use std::thread; | |
| use std::time::Duration; | |
| use walkdir::WalkDir; | |
| const INOTIFY: mio::Token = mio::Token(0); | |
| const MESSAGE: mio::Token = mio::Token(1); | |
| // The EventLoop will set up a mio::Poll and use it to wait for the following: | |
| // | |
| // - messages telling it what to do | |
| // | |
| // - events telling it that something has happened on one of the watched files. | |
| struct EventLoop { | |
| running: bool, | |
| poll: mio::Poll, | |
| event_loop_waker: Arc<mio::Waker>, | |
| event_loop_tx: Sender<EventLoopMsg>, | |
| event_loop_rx: Receiver<EventLoopMsg>, | |
| inotify: Option<Inotify>, | |
| event_handler: Box<dyn EventHandler>, | |
| watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, | |
| paths: HashMap<WatchDescriptor, PathBuf>, | |
| rename_event: Option<Event>, | |
| } | |
| /// Watcher implementation based on inotify | |
| #[derive(Debug)] | |
| pub struct INotifyWatcher { | |
| channel: Sender<EventLoopMsg>, | |
| waker: Arc<mio::Waker>, | |
| } | |
| enum EventLoopMsg { | |
| AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>), | |
| RemoveWatch(PathBuf, Sender<Result<()>>), | |
| Shutdown, | |
| RenameTimeout(usize), | |
| Configure(Config, BoundSender<Result<bool>>), | |
| } | |
| #[inline] | |
| fn send_pending_rename_event( | |
| rename_event: &mut Option<Event>, | |
| event_handler: &mut dyn EventHandler, | |
| ) { | |
| if let Some(e) = rename_event.take() { | |
| event_handler.handle_event(Ok(e)); | |
| } | |
| } | |
| #[inline] | |
| fn add_watch_by_event( | |
| path: &Option<PathBuf>, | |
| event: &inotify_sys::Event<&OsStr>, | |
| watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, | |
| add_watches: &mut Vec<PathBuf>, | |
| ) { | |
| if let Some(ref path) = *path { | |
| if event.mask.contains(EventMask::ISDIR) { | |
| if let Some(parent_path) = path.parent() { | |
| if let Some(&(_, _, is_recursive)) = watches.get(parent_path) { | |
| if is_recursive { | |
| add_watches.push(path.to_owned()); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| #[inline] | |
| fn remove_watch_by_event( | |
| path: &Option<PathBuf>, | |
| watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, | |
| remove_watches: &mut Vec<PathBuf>, | |
| ) { | |
| if let Some(ref path) = *path { | |
| if watches.contains_key(path) { | |
| remove_watches.push(path.to_owned()); | |
| } | |
| } | |
| } | |
| impl EventLoop { | |
| pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> { | |
| let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>(); | |
| let poll = mio::Poll::new()?; | |
| let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?); | |
| let inotify_fd = inotify.as_raw_fd(); | |
| let mut evented_inotify = mio::unix::SourceFd(&inotify_fd); | |
| poll.registry() | |
| .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?; | |
| let event_loop = EventLoop { | |
| running: true, | |
| poll, | |
| event_loop_waker, | |
| event_loop_tx, | |
| event_loop_rx, | |
| inotify: Some(inotify), | |
| event_handler, | |
| watches: HashMap::new(), | |
| paths: HashMap::new(), | |
| rename_event: None, | |
| }; | |
| Ok(event_loop) | |
| } | |
| // Run the event loop. | |
| pub fn run(self) { | |
| let _ = thread::Builder::new() | |
| .name("notify-rs inotify loop".to_string()) | |
| .spawn(|| self.event_loop_thread()); | |
| } | |
| fn event_loop_thread(mut self) { | |
| let mut events = mio::Events::with_capacity(16); | |
| loop { | |
| // Wait for something to happen. | |
| match self.poll.poll(&mut events, None) { | |
| Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => { | |
| // System call was interrupted, we will retry | |
| // TODO: Not covered by tests (to reproduce likely need to setup signal handlers) | |
| } | |
| Err(e) => panic!("poll failed: {}", e), | |
| Ok(()) => {} | |
| } | |
| // Process whatever happened. | |
| for event in &events { | |
| self.handle_event(event); | |
| } | |
| // Stop, if we're done. | |
| if !self.running { | |
| break; | |
| } | |
| } | |
| } | |
| // Handle a single event. | |
| fn handle_event(&mut self, event: &mio::event::Event) { | |
| match event.token() { | |
| MESSAGE => { | |
| // The channel is readable - handle messages. | |
| self.handle_messages() | |
| } | |
| INOTIFY => { | |
| // inotify has something to tell us. | |
| self.handle_inotify() | |
| } | |
| _ => unreachable!(), | |
| } | |
| } | |
| fn handle_messages(&mut self) { | |
| while let Ok(msg) = self.event_loop_rx.try_recv() { | |
| match msg { | |
| EventLoopMsg::AddWatch(path, recursive_mode, tx) => { | |
| let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true)); | |
| } | |
| EventLoopMsg::RemoveWatch(path, tx) => { | |
| let _ = tx.send(self.remove_watch(path, false)); | |
| } | |
| EventLoopMsg::Shutdown => { | |
| let _ = self.remove_all_watches(); | |
| if let Some(inotify) = self.inotify.take() { | |
| let _ = inotify.close(); | |
| } | |
| self.running = false; | |
| break; | |
| } | |
| EventLoopMsg::RenameTimeout(cookie) => { | |
| let current_cookie = self.rename_event.as_ref().and_then(|e| e.tracker()); | |
| // send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout | |
| if current_cookie == Some(cookie) { | |
| send_pending_rename_event(&mut self.rename_event, &mut *self.event_handler); | |
| } | |
| } | |
| EventLoopMsg::Configure(config, tx) => { | |
| self.configure_raw_mode(config, tx); | |
| } | |
| } | |
| } | |
| } | |
| fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) { | |
| tx.send(Ok(false)) | |
| .expect("configuration channel disconnected"); | |
| } | |
| fn handle_inotify(&mut self) { | |
| let mut add_watches = Vec::new(); | |
| let mut remove_watches = Vec::new(); | |
| if let Some(ref mut inotify) = self.inotify { | |
| let mut buffer = [0; 1024]; | |
| // Read all buffers available. | |
| loop { | |
| match inotify.read_events(&mut buffer) { | |
| Ok(events) => { | |
| let mut num_events = 0; | |
| for event in events { | |
| num_events += 1; | |
| if event.mask.contains(EventMask::Q_OVERFLOW) { | |
| let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan)); | |
| self.event_handler.handle_event(ev); | |
| } | |
| let path = match event.name { | |
| Some(name) => { | |
| self.paths.get(&event.wd).map(|root| root.join(&name)) | |
| } | |
| None => self.paths.get(&event.wd).cloned(), | |
| }; | |
| if event.mask.contains(EventMask::MOVED_FROM) { | |
| send_pending_rename_event( | |
| &mut self.rename_event, | |
| &mut *self.event_handler, | |
| ); | |
| remove_watch_by_event(&path, &self.watches, &mut remove_watches); | |
| self.rename_event = Some( | |
| Event::new(EventKind::Modify(ModifyKind::Name( | |
| RenameMode::From, | |
| ))) | |
| .add_some_path(path.clone()) | |
| .set_tracker(event.cookie as usize), | |
| ); | |
| } else { | |
| let mut evs = Vec::new(); | |
| if event.mask.contains(EventMask::MOVED_TO) { | |
| if let Some(e) = self.rename_event.take() { | |
| if e.tracker() == Some(event.cookie as usize) { | |
| self.event_handler.handle_event(Ok(e.clone())); | |
| evs.push( | |
| Event::new(EventKind::Modify(ModifyKind::Name( | |
| RenameMode::To, | |
| ))) | |
| .set_tracker(event.cookie as usize) | |
| .add_some_path(path.clone()), | |
| ); | |
| evs.push( | |
| Event::new(EventKind::Modify(ModifyKind::Name( | |
| RenameMode::Both, | |
| ))) | |
| .set_tracker(event.cookie as usize) | |
| .add_some_path(e.paths.first().cloned()) | |
| .add_some_path(path.clone()), | |
| ); | |
| } else { | |
| // TODO should it be rename? | |
| evs.push( | |
| Event::new(EventKind::Create( | |
| if event.mask.contains(EventMask::ISDIR) { | |
| CreateKind::Folder | |
| } else { | |
| CreateKind::File | |
| }, | |
| )) | |
| .add_some_path(path.clone()), | |
| ); | |
| } | |
| } else { | |
| // TODO should it be rename? | |
| evs.push( | |
| Event::new(EventKind::Create( | |
| if event.mask.contains(EventMask::ISDIR) { | |
| CreateKind::Folder | |
| } else { | |
| CreateKind::File | |
| }, | |
| )) | |
| .add_some_path(path.clone()), | |
| ); | |
| } | |
| add_watch_by_event( | |
| &path, | |
| &event, | |
| &self.watches, | |
| &mut add_watches, | |
| ); | |
| } | |
| if event.mask.contains(EventMask::MOVE_SELF) { | |
| evs.push( | |
| Event::new(EventKind::Modify(ModifyKind::Name( | |
| RenameMode::From, | |
| ))) | |
| .add_some_path(path.clone()), | |
| ); | |
| // TODO stat the path and get to new path | |
| // - emit To and Both events | |
| // - change prefix for further events | |
| } | |
| if event.mask.contains(EventMask::CREATE) { | |
| evs.push( | |
| Event::new(EventKind::Create( | |
| if event.mask.contains(EventMask::ISDIR) { | |
| CreateKind::Folder | |
| } else { | |
| CreateKind::File | |
| }, | |
| )) | |
| .add_some_path(path.clone()), | |
| ); | |
| add_watch_by_event( | |
| &path, | |
| &event, | |
| &self.watches, | |
| &mut add_watches, | |
| ); | |
| } | |
| if event.mask.contains(EventMask::DELETE_SELF) | |
| || event.mask.contains(EventMask::DELETE) | |
| { | |
| evs.push( | |
| Event::new(EventKind::Remove( | |
| if event.mask.contains(EventMask::ISDIR) { | |
| RemoveKind::Folder | |
| } else { | |
| RemoveKind::File | |
| }, | |
| )) | |
| .add_some_path(path.clone()), | |
| ); | |
| remove_watch_by_event( | |
| &path, | |
| &self.watches, | |
| &mut remove_watches, | |
| ); | |
| } | |
| if event.mask.contains(EventMask::MODIFY) { | |
| evs.push( | |
| Event::new(EventKind::Modify(ModifyKind::Data( | |
| DataChange::Any, | |
| ))) | |
| .add_some_path(path.clone()), | |
| ); | |
| } | |
| if event.mask.contains(EventMask::CLOSE_WRITE) { | |
| evs.push( | |
| Event::new(EventKind::Access(AccessKind::Close( | |
| AccessMode::Write, | |
| ))) | |
| .add_some_path(path.clone()), | |
| ); | |
| } | |
| if event.mask.contains(EventMask::CLOSE_NOWRITE) { | |
| evs.push( | |
| Event::new(EventKind::Access(AccessKind::Close( | |
| AccessMode::Read, | |
| ))) | |
| .add_some_path(path.clone()), | |
| ); | |
| } | |
| if event.mask.contains(EventMask::ATTRIB) { | |
| evs.push( | |
| Event::new(EventKind::Modify(ModifyKind::Metadata( | |
| MetadataKind::Any, | |
| ))) | |
| .add_some_path(path.clone()), | |
| ); | |
| } | |
| if event.mask.contains(EventMask::OPEN) { | |
| evs.push( | |
| Event::new(EventKind::Access(AccessKind::Open( | |
| AccessMode::Any, | |
| ))) | |
| .add_some_path(path.clone()), | |
| ); | |
| } | |
| if !evs.is_empty() { | |
| send_pending_rename_event( | |
| &mut self.rename_event, | |
| &mut *self.event_handler, | |
| ); | |
| } | |
| for ev in evs { | |
| self.event_handler.handle_event(Ok(ev)); | |
| } | |
| } | |
| } | |
| // All events read. Break out. | |
| if num_events == 0 { | |
| break; | |
| } | |
| // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear | |
| // whether the second part (IN_MOVED_TO) will arrive because the file or directory | |
| // could just have been moved out of the watched directory. So it's necessary to wait | |
| // for possible subsequent events in case it's a complete move event but also to make sure | |
| // that the first part of the event is handled in a timely manner in case no subsequent events arrive. | |
| // TODO: don't do this here, instead leave it entirely to the debounce | |
| // -> related to some rename events being reported as creates. | |
| if let Some(ref rename_event) = self.rename_event { | |
| let event_loop_tx = self.event_loop_tx.clone(); | |
| let waker = self.event_loop_waker.clone(); | |
| let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie | |
| let _ = thread::Builder::new() | |
| .name("notify-rs inotify rename".to_string()) | |
| .spawn(move || { | |
| thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event | |
| // An error here means the other end of the channel was closed, a thing that can | |
| // happen normally. | |
| let _ = event_loop_tx.send(EventLoopMsg::RenameTimeout(cookie)); | |
| let _ = waker.wake(); | |
| }); | |
| } | |
| } | |
| Err(e) => { | |
| self.event_handler.handle_event(Err(Error::io(e))); | |
| } | |
| } | |
| } | |
| } | |
| for path in remove_watches { | |
| self.remove_watch(path, true).ok(); | |
| } | |
| for path in add_watches { | |
| self.add_watch(path, true, false).ok(); | |
| } | |
| } | |
| fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> { | |
| // If the watch is not recursive, or if we determine (by stat'ing the path to get its | |
| // metadata) that the watched path is not a directory, add a single path watch. | |
| if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() { | |
| return self.add_single_watch(path, false, true); | |
| } | |
| for entry in WalkDir::new(path) | |
| .follow_links(true) | |
| .into_iter() | |
| .filter_map(filter_dir) | |
| { | |
| self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?; | |
| watch_self = false; | |
| } | |
| Ok(()) | |
| } | |
| fn add_single_watch( | |
| &mut self, | |
| path: PathBuf, | |
| is_recursive: bool, | |
| watch_self: bool, | |
| ) -> Result<()> { | |
| let mut watchmask = WatchMask::ATTRIB | |
| | WatchMask::CREATE | |
| | WatchMask::DELETE | |
| | WatchMask::CLOSE_WRITE | |
| | WatchMask::MODIFY | |
| | WatchMask::MOVED_FROM | |
| | WatchMask::MOVED_TO; | |
| if watch_self { | |
| watchmask.insert(WatchMask::DELETE_SELF); | |
| watchmask.insert(WatchMask::MOVE_SELF); | |
| } | |
| if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) { | |
| watchmask.insert(old_watchmask); | |
| watchmask.insert(WatchMask::MASK_ADD); | |
| } | |
| if let Some(ref mut inotify) = self.inotify { | |
| match inotify.add_watch(&path, watchmask) { | |
| Err(e) => { | |
| Err(if e.raw_os_error() == Some(libc::ENOSPC) { | |
| // do not report inotify limits as "no more space" on linux #266 | |
| Error::new(ErrorKind::MaxFilesWatch) | |
| } else { | |
| Error::io(e) | |
| } | |
| .add_path(path)) | |
| } | |
| Ok(w) => { | |
| watchmask.remove(WatchMask::MASK_ADD); | |
| self.watches | |
| .insert(path.clone(), (w.clone(), watchmask, is_recursive)); | |
| self.paths.insert(w, path); | |
| Ok(()) | |
| } | |
| } | |
| } else { | |
| Ok(()) | |
| } | |
| } | |
| fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> { | |
| match self.watches.remove(&path) { | |
| None => return Err(Error::watch_not_found().add_path(path)), | |
| Some((w, _, is_recursive)) => { | |
| if let Some(ref mut inotify) = self.inotify { | |
| inotify | |
| .rm_watch(w.clone()) | |
| .map_err(|e| Error::io(e).add_path(path.clone()))?; | |
| self.paths.remove(&w); | |
| if is_recursive || remove_recursive { | |
| let mut remove_list = Vec::new(); | |
| for (w, p) in &self.paths { | |
| if p.starts_with(&path) { | |
| inotify | |
| .rm_watch(w.clone()) | |
| .map_err(|e| Error::io(e).add_path(p.into()))?; | |
| self.watches.remove(p); | |
| remove_list.push(w.clone()); | |
| } | |
| } | |
| for w in remove_list { | |
| self.paths.remove(&w); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| Ok(()) | |
| } | |
| fn remove_all_watches(&mut self) -> Result<()> { | |
| if let Some(ref mut inotify) = self.inotify { | |
| for (w, p) in &self.paths { | |
| inotify | |
| .rm_watch(w.clone()) | |
| .map_err(|e| Error::io(e).add_path(p.into()))?; | |
| } | |
| self.watches.clear(); | |
| self.paths.clear(); | |
| } | |
| Ok(()) | |
| } | |
| } | |
| /// return `DirEntry` when it is a directory | |
| fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> { | |
| if let Ok(e) = e { | |
| if let Ok(metadata) = e.metadata() { | |
| if metadata.is_dir() { | |
| return Some(e); | |
| } | |
| } | |
| } | |
| None | |
| } | |
| impl INotifyWatcher { | |
| fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> { | |
| let inotify = Inotify::init()?; | |
| let event_loop = EventLoop::new(inotify, event_handler)?; | |
| let channel = event_loop.event_loop_tx.clone(); | |
| let waker = event_loop.event_loop_waker.clone(); | |
| event_loop.run(); | |
| Ok(INotifyWatcher { channel, waker }) | |
| } | |
| fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { | |
| let pb = if path.is_absolute() { | |
| path.to_owned() | |
| } else { | |
| let p = env::current_dir().map_err(Error::io)?; | |
| p.join(path) | |
| }; | |
| let (tx, rx) = unbounded(); | |
| let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx); | |
| // we expect the event loop to live and reply => unwraps must not panic | |
| self.channel.send(msg).unwrap(); | |
| self.waker.wake().unwrap(); | |
| rx.recv().unwrap() | |
| } | |
| fn unwatch_inner(&mut self, path: &Path) -> Result<()> { | |
| let pb = if path.is_absolute() { | |
| path.to_owned() | |
| } else { | |
| let p = env::current_dir().map_err(Error::io)?; | |
| p.join(path) | |
| }; | |
| let (tx, rx) = unbounded(); | |
| let msg = EventLoopMsg::RemoveWatch(pb, tx); | |
| // we expect the event loop to live and reply => unwraps must not panic | |
| self.channel.send(msg).unwrap(); | |
| self.waker.wake().unwrap(); | |
| rx.recv().unwrap() | |
| } | |
| } | |
| impl Watcher for INotifyWatcher { | |
| /// Create a new watcher. | |
| fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> { | |
| Self::from_event_handler(Box::new(event_handler)) | |
| } | |
| fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { | |
| self.watch_inner(path, recursive_mode) | |
| } | |
| fn unwatch(&mut self, path: &Path) -> Result<()> { | |
| self.unwatch_inner(path) | |
| } | |
| fn configure(&mut self, config: Config) -> Result<bool> { | |
| let (tx, rx) = bounded(1); | |
| self.channel.send(EventLoopMsg::Configure(config, tx))?; | |
| self.waker.wake()?; | |
| rx.recv()? | |
| } | |
| fn kind() -> crate::WatcherKind { | |
| crate::WatcherKind::Inotify | |
| } | |
| } | |
| impl Drop for INotifyWatcher { | |
| fn drop(&mut self) { | |
| // we expect the event loop to live => unwrap must not panic | |
| self.channel.send(EventLoopMsg::Shutdown).unwrap(); | |
| self.waker.wake().unwrap(); | |
| } | |
| } | |
| #[test] | |
| fn inotify_watcher_is_send_and_sync() { | |
| fn check<T: Send + Sync>() {} | |
| check::<INotifyWatcher>(); | |
| } |