| //! Watcher implementation for Darwin's FSEvents API | |
| //! | |
| //! The FSEvents API provides a mechanism to notify clients about directories they ought to re-scan | |
| //! in order to keep their internal data structures up-to-date with respect to the true state of | |
| //! the file system. (For example, when files or directories are created, modified, or removed.) It | |
| //! sends these notifications "in bulk", possibly notifying the client of changes to several | |
| //! directories in a single callback. | |
| //! | |
| //! For more information see the [FSEvents API reference][ref]. | |
| //! | |
| //! TODO: document event translation | |
| //! | |
| //! [ref]: https://developer.apple.com/library/mac/documentation/Darwin/Reference/FSEvents_Ref/ | |
| #![allow(non_upper_case_globals, dead_code)] | |
| use crate::event::*; | |
| use crate::{Config, Error, EventFn, RecursiveMode, Result, Watcher}; | |
| use crossbeam_channel::{unbounded, Receiver, Sender}; | |
| use fsevent_sys as fs; | |
| use fsevent_sys::core_foundation as cf; | |
| use std::collections::HashMap; | |
| use std::convert::AsRef; | |
| use std::ffi::CStr; | |
| use std::os::raw; | |
| use std::path::{Path, PathBuf}; | |
| use std::ptr; | |
| use std::sync::{Arc, Mutex}; | |
| use std::thread; | |
| bitflags::bitflags! { | |
| #[repr(C)] | |
| struct StreamFlags: u32 { | |
| const NONE = fs::kFSEventStreamEventFlagNone; | |
| const MUST_SCAN_SUBDIRS = fs::kFSEventStreamEventFlagMustScanSubDirs; | |
| const USER_DROPPED = fs::kFSEventStreamEventFlagUserDropped; | |
| const KERNEL_DROPPED = fs::kFSEventStreamEventFlagKernelDropped; | |
| const IDS_WRAPPED = fs::kFSEventStreamEventFlagEventIdsWrapped; | |
| const HISTORY_DONE = fs::kFSEventStreamEventFlagHistoryDone; | |
| const ROOT_CHANGED = fs::kFSEventStreamEventFlagRootChanged; | |
| const MOUNT = fs::kFSEventStreamEventFlagMount; | |
| const UNMOUNT = fs::kFSEventStreamEventFlagUnmount; | |
| const ITEM_CREATED = fs::kFSEventStreamEventFlagItemCreated; | |
| const ITEM_REMOVED = fs::kFSEventStreamEventFlagItemRemoved; | |
| const INODE_META_MOD = fs::kFSEventStreamEventFlagItemInodeMetaMod; | |
| const ITEM_RENAMED = fs::kFSEventStreamEventFlagItemRenamed; | |
| const ITEM_MODIFIED = fs::kFSEventStreamEventFlagItemModified; | |
| const FINDER_INFO_MOD = fs::kFSEventStreamEventFlagItemFinderInfoMod; | |
| const ITEM_CHANGE_OWNER = fs::kFSEventStreamEventFlagItemChangeOwner; | |
| const ITEM_XATTR_MOD = fs::kFSEventStreamEventFlagItemXattrMod; | |
| const IS_FILE = fs::kFSEventStreamEventFlagItemIsFile; | |
| const IS_DIR = fs::kFSEventStreamEventFlagItemIsDir; | |
| const IS_SYMLINK = fs::kFSEventStreamEventFlagItemIsSymlink; | |
| const OWN_EVENT = fs::kFSEventStreamEventFlagOwnEvent; | |
| const IS_HARDLINK = fs::kFSEventStreamEventFlagItemIsHardlink; | |
| const IS_LAST_HARDLINK = fs::kFSEventStreamEventFlagItemIsLastHardlink; | |
| const ITEM_CLONED = fs::kFSEventStreamEventFlagItemCloned; | |
| } | |
| } | |
| /// FSEvents-based `Watcher` implementation | |
| pub struct FsEventWatcher { | |
| paths: cf::CFMutableArrayRef, | |
| since_when: fs::FSEventStreamEventId, | |
| latency: cf::CFTimeInterval, | |
| flags: fs::FSEventStreamCreateFlags, | |
| event_fn: Arc<Mutex<dyn EventFn>>, | |
| runloop: Option<(cf::CFRunLoopRef, Receiver<()>)>, | |
| recursive_info: HashMap<PathBuf, bool>, | |
| } | |
| // CFMutableArrayRef is a type alias to *mut libc::c_void, so FsEventWatcher is not Send/Sync | |
| // automatically. It's Send because the pointer is not used in other threads. | |
| unsafe impl Send for FsEventWatcher {} | |
| // It's Sync because all methods that change the mutable state use `&mut self`. | |
| unsafe impl Sync for FsEventWatcher {} | |
| fn translate_flags(flags: StreamFlags, precise: bool) -> Vec<Event> { | |
| let mut evs = Vec::new(); | |
| // «Denotes a sentinel event sent to mark the end of the "historical" events | |
| // sent as a result of specifying a `sinceWhen` value in the FSEvents.Create | |
| // call that created this event stream. After invoking the client's callback | |
| // with all the "historical" events that occurred before now, the client's | |
| // callback will be invoked with an event where the HistoryDone flag is set. | |
| // The client should ignore the path supplied in this callback.» | |
| // — https://www.mbsplugins.eu/FSEventsNextEvent.shtml | |
| // | |
| // As a result, we just stop processing here and return an empty vec, which | |
| // will ignore this completely and not emit any Events whatsoever. | |
| if flags.contains(StreamFlags::HISTORY_DONE) { | |
| return evs; | |
| } | |
| // FSEvents provides two possible hints as to why events were dropped, | |
| // however documentation on what those mean is scant, so we just pass them | |
| // through in the info attr field. The intent is clear enough, and the | |
| // additional information is provided if the user wants it. | |
| if flags.contains(StreamFlags::MUST_SCAN_SUBDIRS) { | |
| let e = Event::new(EventKind::Other).set_flag(Flag::Rescan); | |
| evs.push(if flags.contains(StreamFlags::USER_DROPPED) { | |
| e.set_info("rescan: user dropped") | |
| } else if flags.contains(StreamFlags::KERNEL_DROPPED) { | |
| e.set_info("rescan: kernel dropped") | |
| } else { | |
| e | |
| }); | |
| } | |
| // In imprecise mode, let's not even bother parsing the kind of the event | |
| // except for the above very special events. | |
| if !precise { | |
| evs.push(Event::new(EventKind::Any)); | |
| return evs; | |
| } | |
| // This is most likely a rename or a removal. We assume rename but may want | |
| // to figure out if it was a removal some way later (TODO). To denote the | |
| // special nature of the event, we add an info string. | |
| if flags.contains(StreamFlags::ROOT_CHANGED) { | |
| evs.push( | |
| Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::From))) | |
| .set_info("root changed"), | |
| ); | |
| } | |
| // A path was mounted at the event path; we treat that as a create. | |
| if flags.contains(StreamFlags::MOUNT) { | |
| evs.push(Event::new(EventKind::Create(CreateKind::Other)).set_info("mount")); | |
| } | |
| // A path was unmounted at the event path; we treat that as a remove. | |
| if flags.contains(StreamFlags::UNMOUNT) { | |
| evs.push(Event::new(EventKind::Remove(RemoveKind::Other)).set_info("mount")); | |
| } | |
| if flags.contains(StreamFlags::ITEM_CREATED) { | |
| evs.push(if flags.contains(StreamFlags::IS_DIR) { | |
| Event::new(EventKind::Create(CreateKind::Folder)) | |
| } else if flags.contains(StreamFlags::IS_FILE) { | |
| Event::new(EventKind::Create(CreateKind::File)) | |
| } else { | |
| let e = Event::new(EventKind::Create(CreateKind::Other)); | |
| if flags.contains(StreamFlags::IS_SYMLINK) { | |
| e.set_info("is: symlink") | |
| } else if flags.contains(StreamFlags::IS_HARDLINK) { | |
| e.set_info("is: hardlink") | |
| } else if flags.contains(StreamFlags::ITEM_CLONED) { | |
| e.set_info("is: clone") | |
| } else { | |
| Event::new(EventKind::Create(CreateKind::Any)) | |
| } | |
| }); | |
| } | |
| if flags.contains(StreamFlags::ITEM_REMOVED) { | |
| evs.push(if flags.contains(StreamFlags::IS_DIR) { | |
| Event::new(EventKind::Remove(RemoveKind::Folder)) | |
| } else if flags.contains(StreamFlags::IS_FILE) { | |
| Event::new(EventKind::Remove(RemoveKind::File)) | |
| } else { | |
| let e = Event::new(EventKind::Remove(RemoveKind::Other)); | |
| if flags.contains(StreamFlags::IS_SYMLINK) { | |
| e.set_info("is: symlink") | |
| } else if flags.contains(StreamFlags::IS_HARDLINK) { | |
| e.set_info("is: hardlink") | |
| } else if flags.contains(StreamFlags::ITEM_CLONED) { | |
| e.set_info("is: clone") | |
| } else { | |
| Event::new(EventKind::Remove(RemoveKind::Any)) | |
| } | |
| }); | |
| } | |
| if flags.contains(StreamFlags::ITEM_RENAMED) { | |
| evs.push(Event::new(EventKind::Modify(ModifyKind::Name( | |
| RenameMode::From, | |
| )))); | |
| } | |
| // This is only described as "metadata changed", but it may be that it's | |
| // only emitted for some more precise subset of events... if so, will need | |
| // amending, but for now we have an Any-shaped bucket to put it in. | |
| if flags.contains(StreamFlags::INODE_META_MOD) { | |
| evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( | |
| MetadataKind::Any, | |
| )))); | |
| } | |
| if flags.contains(StreamFlags::FINDER_INFO_MOD) { | |
| evs.push( | |
| Event::new(EventKind::Modify(ModifyKind::Metadata(MetadataKind::Other))) | |
| .set_info("meta: finder info"), | |
| ); | |
| } | |
| if flags.contains(StreamFlags::ITEM_CHANGE_OWNER) { | |
| evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( | |
| MetadataKind::Ownership, | |
| )))); | |
| } | |
| if flags.contains(StreamFlags::ITEM_XATTR_MOD) { | |
| evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( | |
| MetadataKind::Extended, | |
| )))); | |
| } | |
| // This is specifically described as a data change, which we take to mean | |
| // is a content change. | |
| if flags.contains(StreamFlags::ITEM_MODIFIED) { | |
| evs.push(Event::new(EventKind::Modify(ModifyKind::Data( | |
| DataChange::Content, | |
| )))); | |
| } | |
| if flags.contains(StreamFlags::OWN_EVENT) { | |
| for ev in &mut evs { | |
| *ev = std::mem::take(ev).set_process_id(std::process::id()); | |
| } | |
| } | |
| evs | |
| } | |
| struct StreamContextInfo { | |
| event_fn: Arc<Mutex<dyn EventFn>>, | |
| recursive_info: HashMap<PathBuf, bool>, | |
| } | |
| extern "C" { | |
| /// Indicates whether the run loop is waiting for an event. | |
| fn CFRunLoopIsWaiting(runloop: cf::CFRunLoopRef) -> cf::Boolean; | |
| } | |
| impl FsEventWatcher { | |
| fn from_event_fn(event_fn: Arc<Mutex<dyn EventFn>>) -> Result<Self> { | |
| Ok(FsEventWatcher { | |
| paths: unsafe { | |
| cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks) | |
| }, | |
| since_when: fs::kFSEventStreamEventIdSinceNow, | |
| latency: 0.0, | |
| flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer, | |
| event_fn, | |
| runloop: None, | |
| recursive_info: HashMap::new(), | |
| }) | |
| } | |
| fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { | |
| self.stop(); | |
| let result = self.append_path(path, recursive_mode); | |
| // ignore return error: may be empty path list | |
| let _ = self.run(); | |
| result | |
| } | |
| fn unwatch_inner(&mut self, path: &Path) -> Result<()> { | |
| self.stop(); | |
| let result = self.remove_path(path); | |
| // ignore return error: may be empty path list | |
| let _ = self.run(); | |
| result | |
| } | |
| #[inline] | |
| fn is_running(&self) -> bool { | |
| self.runloop.is_some() | |
| } | |
| fn stop(&mut self) { | |
| if !self.is_running() { | |
| return; | |
| } | |
| if let Some((runloop, done)) = self.runloop.take() { | |
| unsafe { | |
| let runloop = runloop as *mut raw::c_void; | |
| while CFRunLoopIsWaiting(runloop) == 0 { | |
| thread::yield_now(); | |
| } | |
| cf::CFRunLoopStop(runloop); | |
| } | |
| // sync done channel | |
| match done.recv() { | |
| Ok(()) => (), | |
| Err(_) => panic!("the runloop may not be finished!"), | |
| } | |
| } | |
| } | |
| fn remove_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> { | |
| let str_path = path.as_ref().to_str().unwrap(); | |
| unsafe { | |
| let mut err: cf::CFErrorRef = ptr::null_mut(); | |
| let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err); | |
| if cf_path.is_null() { | |
| cf::CFRelease(err as cf::CFRef); | |
| return Err(Error::watch_not_found().add_path(path.as_ref().into())); | |
| } | |
| let mut to_remove = Vec::new(); | |
| for idx in 0..cf::CFArrayGetCount(self.paths) { | |
| let item = cf::CFArrayGetValueAtIndex(self.paths, idx); | |
| if cf::CFStringCompare(item, cf_path, cf::kCFCompareCaseInsensitive) | |
| == cf::kCFCompareEqualTo | |
| { | |
| to_remove.push(idx); | |
| } | |
| } | |
| cf::CFRelease(cf_path); | |
| for idx in to_remove.iter().rev() { | |
| cf::CFArrayRemoveValueAtIndex(self.paths, *idx); | |
| } | |
| } | |
| let p = if let Ok(canonicalized_path) = path.as_ref().canonicalize() { | |
| canonicalized_path | |
| } else { | |
| path.as_ref().to_owned() | |
| }; | |
| match self.recursive_info.remove(&p) { | |
| Some(_) => Ok(()), | |
| None => Err(Error::watch_not_found()), | |
| } | |
| } | |
| // https://github.com/thibaudgg/rb-fsevent/blob/master/ext/fsevent_watch/main.c | |
| fn append_path<P: AsRef<Path>>( | |
| &mut self, | |
| path: P, | |
| recursive_mode: RecursiveMode, | |
| ) -> Result<()> { | |
| if !path.as_ref().exists() { | |
| return Err(Error::path_not_found().add_path(path.as_ref().into())); | |
| } | |
| let str_path = path.as_ref().to_str().unwrap(); | |
| unsafe { | |
| let mut err: cf::CFErrorRef = ptr::null_mut(); | |
| let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err); | |
| if cf_path.is_null() { | |
| // Most likely the directory was deleted, or permissions changed, | |
| // while the above code was running. | |
| cf::CFRelease(err as cf::CFRef); | |
| return Err(Error::path_not_found().add_path(path.as_ref().into())); | |
| } | |
| cf::CFArrayAppendValue(self.paths, cf_path); | |
| cf::CFRelease(cf_path); | |
| } | |
| self.recursive_info.insert( | |
| path.as_ref().to_path_buf().canonicalize().unwrap(), | |
| recursive_mode.is_recursive(), | |
| ); | |
| Ok(()) | |
| } | |
| fn run(&mut self) -> Result<()> { | |
| if unsafe { cf::CFArrayGetCount(self.paths) } == 0 { | |
| // TODO: Reconstruct and add paths to error | |
| return Err(Error::path_not_found()); | |
| } | |
| // done channel is used to sync quit status of runloop thread | |
| let (done_tx, done_rx) = unbounded(); | |
| let info = StreamContextInfo { | |
| event_fn: self.event_fn.clone(), | |
| recursive_info: self.recursive_info.clone(), | |
| }; | |
| // Unfortunately fsevents doesn't provide a mechanism for getting the context back from a | |
| // stream after it's been created. So in order to avoid a memory leak in the normal case, | |
| // we need to box the pointer up, alias the pointer, then drop it when the stream has | |
| // completed. This box will be leaked if a panic is triggered before the inner thread has a | |
| // chance to drop the box. | |
| let context = Box::into_raw(Box::new(info)); | |
| // Safety | |
| // - StreamContextInfo is Send+Sync. | |
| // - This is safe to move into a thread because it will only be accessed when the stream is | |
| // completed and no longer accessing the context. | |
| struct ContextPtr(*mut StreamContextInfo); | |
| unsafe impl Send for ContextPtr {} | |
| let thread_context_ptr = ContextPtr(context); | |
| let stream_context = fs::FSEventStreamContext { | |
| version: 0, | |
| info: context as *mut libc::c_void, | |
| retain: None, | |
| release: None, | |
| copy_description: None, | |
| }; | |
| let stream = unsafe { | |
| fs::FSEventStreamCreate( | |
| cf::kCFAllocatorDefault, | |
| callback, | |
| &stream_context, | |
| self.paths, | |
| self.since_when, | |
| self.latency, | |
| self.flags, | |
| ) | |
| }; | |
| // Wrapper to help send CFRef types across threads. | |
| struct CFSendWrapper(cf::CFRef); | |
| // Safety: | |
| // - According to the Apple documentation, it's safe to move `CFRef`s across threads. | |
| // https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/ThreadSafetySummary/ThreadSafetySummary.html | |
| unsafe impl Send for CFSendWrapper {} | |
| // move into thread | |
| let stream = CFSendWrapper(stream); | |
| // channel to pass runloop around | |
| let (rl_tx, rl_rx) = unbounded(); | |
| thread::spawn(move || { | |
| let stream = stream.0; | |
| unsafe { | |
| let cur_runloop = cf::CFRunLoopGetCurrent(); | |
| fs::FSEventStreamScheduleWithRunLoop( | |
| stream, | |
| cur_runloop, | |
| cf::kCFRunLoopDefaultMode, | |
| ); | |
| fs::FSEventStreamStart(stream); | |
| // the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop() | |
| rl_tx | |
| .send(CFSendWrapper(cur_runloop)) | |
| .expect("Unable to send runloop to watcher"); | |
| cf::CFRunLoopRun(); | |
| fs::FSEventStreamStop(stream); | |
| fs::FSEventStreamInvalidate(stream); | |
| fs::FSEventStreamRelease(stream); | |
| // Safety: | |
| // - It's safe to drop the context now that the stream has been shut down and no | |
| // longer references the context pointer. | |
| let _context = Box::from_raw(thread_context_ptr.0); | |
| } | |
| done_tx | |
| .send(()) | |
| .expect("error while signal run loop is done"); | |
| }); | |
| // block until runloop has been sent | |
| self.runloop = Some((rl_rx.recv().unwrap().0, done_rx)); | |
| Ok(()) | |
| } | |
| fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) { | |
| tx.send(Ok(false)) | |
| .expect("configuration channel disconnect"); | |
| } | |
| } | |
| extern "C" fn callback( | |
| stream_ref: fs::FSEventStreamRef, | |
| info: *mut libc::c_void, | |
| num_events: libc::size_t, // size_t numEvents | |
| event_paths: *mut libc::c_void, // void *eventPaths | |
| event_flags: *const fs::FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[] | |
| event_ids: *const fs::FSEventStreamEventId, // const FSEventStreamEventId eventIds[] | |
| ) { | |
| unsafe { | |
| callback_impl( | |
| stream_ref, | |
| info, | |
| num_events, | |
| event_paths, | |
| event_flags, | |
| event_ids, | |
| ) | |
| } | |
| } | |
| unsafe fn callback_impl( | |
| _stream_ref: fs::FSEventStreamRef, | |
| info: *mut libc::c_void, | |
| num_events: libc::size_t, // size_t numEvents | |
| event_paths: *mut libc::c_void, // void *eventPaths | |
| event_flags: *const fs::FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[] | |
| _event_ids: *const fs::FSEventStreamEventId, // const FSEventStreamEventId eventIds[] | |
| ) { | |
| let event_paths = event_paths as *const *const libc::c_char; | |
| let info = info as *const StreamContextInfo; | |
| let event_fn = &(*info).event_fn; | |
| for p in 0..num_events { | |
| let path = CStr::from_ptr(*event_paths.add(p)) | |
| .to_str() | |
| .expect("Invalid UTF8 string."); | |
| let path = PathBuf::from(path); | |
| let flag = *event_flags.add(p); | |
| let flag = StreamFlags::from_bits(flag).unwrap_or_else(|| { | |
| panic!("Unable to decode StreamFlags: {}", flag); | |
| }); | |
| let mut handle_event = false; | |
| for (p, r) in &(*info).recursive_info { | |
| if path.starts_with(p) { | |
| if *r || &path == p { | |
| handle_event = true; | |
| break; | |
| } else if let Some(parent_path) = path.parent() { | |
| if parent_path == p { | |
| handle_event = true; | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| if !handle_event { | |
| continue; | |
| } | |
| for ev in translate_flags(flag, true).into_iter() { | |
| // TODO: precise | |
| let ev = ev.add_path(path.clone()); | |
| let event_fn = event_fn.lock().expect("lock not to be poisoned"); | |
| (event_fn)(Ok(ev)); | |
| } | |
| } | |
| } | |
| impl Watcher for FsEventWatcher { | |
| fn new_immediate<F: EventFn>(event_fn: F) -> Result<FsEventWatcher> { | |
| FsEventWatcher::from_event_fn(Arc::new(Mutex::new(event_fn))) | |
| } | |
| fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> { | |
| self.watch_inner(path.as_ref(), recursive_mode) | |
| } | |
| fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> { | |
| self.unwatch_inner(path.as_ref()) | |
| } | |
| fn configure(&mut self, config: Config) -> Result<bool> { | |
| let (tx, rx) = unbounded(); | |
| self.configure_raw_mode(config, tx); | |
| rx.recv()? | |
| } | |
| } | |
| impl Drop for FsEventWatcher { | |
| fn drop(&mut self) { | |
| self.stop(); | |
| unsafe { | |
| cf::CFRelease(self.paths); | |
| } | |
| } | |
| } | |
| #[test] | |
| fn test_fsevent_watcher_drop() { | |
| use super::*; | |
| use std::time::Duration; | |
| let dir = tempfile::tempdir().unwrap(); | |
| let (tx, rx) = std::sync::mpsc::channel(); | |
| let event_fn = move |res| tx.send(res).unwrap(); | |
| { | |
| let mut watcher: RecommendedWatcher = Watcher::new_immediate(event_fn).unwrap(); | |
| watcher.watch(dir.path(), RecursiveMode::Recursive).unwrap(); | |
| thread::sleep(Duration::from_millis(2000)); | |
| println!("is running -> {}", watcher.is_running()); | |
| thread::sleep(Duration::from_millis(1000)); | |
| watcher.unwatch(dir.path()).unwrap(); | |
| println!("is running -> {}", watcher.is_running()); | |
| } | |
| thread::sleep(Duration::from_millis(1000)); | |
| for res in rx { | |
| let e = res.unwrap(); | |
| println!("debug => {:?} {:?}", e.kind, e.paths); | |
| } | |
| println!("in test: {} works", file!()); | |
| } | |
| #[test] | |
| fn test_steam_context_info_send() { | |
| fn check_send<T: Send>() {} | |
| check_send::<StreamContextInfo>(); | |
| } |