blob: f4c0066e4c2ad1de3d9920dd83d8d4a037e16a79 [file] [log] [blame]
#![allow(missing_docs)]
//! Watcher implementation for Windows' directory management APIs
//!
//! For more information see the [ReadDirectoryChangesW reference][ref].
//!
//! [ref]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa363950(v=vs.85).aspx
use winapi::shared::minwindef::TRUE;
use winapi::shared::winerror::ERROR_OPERATION_ABORTED;
use winapi::um::fileapi;
use winapi::um::handleapi::{self, INVALID_HANDLE_VALUE};
use winapi::um::ioapiset;
use winapi::um::minwinbase::{LPOVERLAPPED, OVERLAPPED};
use winapi::um::synchapi;
use winapi::um::winbase::{self, INFINITE, WAIT_OBJECT_0};
use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE};
use crate::event::*;
use crate::{Config, Error, EventFn, RecursiveMode, Result, Watcher};
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use std::collections::HashMap;
use std::env;
use std::ffi::OsString;
use std::mem;
use std::os::raw::c_void;
use std::os::windows::ffi::{OsStrExt, OsStringExt};
use std::path::{Path, PathBuf};
use std::ptr;
use std::slice;
use std::sync::{Arc, Mutex};
use std::thread;
const BUF_SIZE: u32 = 16384;
#[derive(Clone)]
struct ReadData {
dir: PathBuf, // directory that is being watched
file: Option<PathBuf>, // if a file is being watched, this is its full path
complete_sem: HANDLE,
is_recursive: bool,
}
struct ReadDirectoryRequest {
event_fn: Arc<Mutex<dyn EventFn>>,
buffer: [u8; BUF_SIZE as usize],
handle: HANDLE,
data: ReadData,
}
enum Action {
Watch(PathBuf, RecursiveMode),
Unwatch(PathBuf),
Stop,
Configure(Config, Sender<Result<bool>>),
}
pub enum MetaEvent {
SingleWatchComplete,
WatcherAwakened,
}
struct WatchState {
dir_handle: HANDLE,
complete_sem: HANDLE,
}
struct ReadDirectoryChangesServer {
rx: Receiver<Action>,
event_fn: Arc<Mutex<dyn EventFn>>,
meta_tx: Sender<MetaEvent>,
cmd_tx: Sender<Result<PathBuf>>,
watches: HashMap<PathBuf, WatchState>,
wakeup_sem: HANDLE,
}
impl ReadDirectoryChangesServer {
fn start(
event_fn: Arc<Mutex<dyn EventFn>>,
meta_tx: Sender<MetaEvent>,
cmd_tx: Sender<Result<PathBuf>>,
wakeup_sem: HANDLE,
) -> Sender<Action> {
let (action_tx, action_rx) = unbounded();
// it is, in fact, ok to send the semaphore across threads
let sem_temp = wakeup_sem as u64;
thread::spawn(move || {
let wakeup_sem = sem_temp as HANDLE;
let server = ReadDirectoryChangesServer {
rx: action_rx,
event_fn,
meta_tx,
cmd_tx,
watches: HashMap::new(),
wakeup_sem,
};
server.run();
});
action_tx
}
fn run(mut self) {
loop {
// process all available actions first
let mut stopped = false;
while let Ok(action) = self.rx.try_recv() {
match action {
Action::Watch(path, recursive_mode) => {
let res = self.add_watch(path, recursive_mode.is_recursive());
let _ = self.cmd_tx.send(res);
}
Action::Unwatch(path) => self.remove_watch(path),
Action::Stop => {
stopped = true;
for ws in self.watches.values() {
stop_watch(ws, &self.meta_tx);
}
break;
}
Action::Configure(config, tx) => {
self.configure_raw_mode(config, tx);
}
}
}
if stopped {
break;
}
unsafe {
// wait with alertable flag so that the completion routine fires
let waitres = synchapi::WaitForSingleObjectEx(self.wakeup_sem, 100, TRUE);
if waitres == WAIT_OBJECT_0 {
let _ = self.meta_tx.send(MetaEvent::WatcherAwakened);
}
}
}
// we have to clean this up, since the watcher may be long gone
unsafe {
handleapi::CloseHandle(self.wakeup_sem);
}
}
fn add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<PathBuf> {
// path must exist and be either a file or directory
if !path.is_dir() && !path.is_file() {
return Err(Error::generic(
"Input watch path is neither a file nor a directory.",
));
}
let (watching_file, dir_target) = {
if path.is_dir() {
(false, path.clone())
} else {
// emulate file watching by watching the parent directory
(true, path.parent().unwrap().to_path_buf())
}
};
let encoded_path: Vec<u16> = dir_target
.as_os_str()
.encode_wide()
.chain(Some(0))
.collect();
let handle;
unsafe {
handle = fileapi::CreateFileW(
encoded_path.as_ptr(),
winnt::FILE_LIST_DIRECTORY,
winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE | winnt::FILE_SHARE_WRITE,
ptr::null_mut(),
fileapi::OPEN_EXISTING,
winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED,
ptr::null_mut(),
);
if handle == INVALID_HANDLE_VALUE {
return Err(if watching_file {
Error::generic(
"You attempted to watch a single file, but parent \
directory could not be opened.",
)
} else {
// TODO: Call GetLastError for better error info?
Error::path_not_found().add_path(path)
});
}
}
let wf = if watching_file {
Some(path.clone())
} else {
None
};
// every watcher gets its own semaphore to signal completion
let semaphore =
unsafe { synchapi::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
if semaphore.is_null() || semaphore == INVALID_HANDLE_VALUE {
unsafe {
handleapi::CloseHandle(handle);
}
return Err(Error::generic("Failed to create semaphore for watch."));
}
let rd = ReadData {
dir: dir_target,
file: wf,
complete_sem: semaphore,
is_recursive,
};
let ws = WatchState {
dir_handle: handle,
complete_sem: semaphore,
};
self.watches.insert(path.clone(), ws);
start_read(&rd, self.event_fn.clone(), handle);
Ok(path)
}
fn remove_watch(&mut self, path: PathBuf) {
if let Some(ws) = self.watches.remove(&path) {
stop_watch(&ws, &self.meta_tx);
}
}
fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) {
tx.send(Ok(false))
.expect("configuration channel disconnect");
}
}
fn stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>) {
unsafe {
let cio = ioapiset::CancelIo(ws.dir_handle);
let ch = handleapi::CloseHandle(ws.dir_handle);
// have to wait for it, otherwise we leak the memory allocated for there read request
if cio != 0 && ch != 0 {
while synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE) != WAIT_OBJECT_0
{
// drain the apc queue, fix for https://github.com/notify-rs/notify/issues/287#issuecomment-801465550
}
}
handleapi::CloseHandle(ws.complete_sem);
}
let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
}
fn start_read(rd: &ReadData, event_fn: Arc<Mutex<dyn EventFn>>, handle: HANDLE) {
let mut request = Box::new(ReadDirectoryRequest {
event_fn,
handle,
buffer: [0u8; BUF_SIZE as usize],
data: rd.clone(),
});
let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME
| winnt::FILE_NOTIFY_CHANGE_DIR_NAME
| winnt::FILE_NOTIFY_CHANGE_ATTRIBUTES
| winnt::FILE_NOTIFY_CHANGE_SIZE
| winnt::FILE_NOTIFY_CHANGE_LAST_WRITE
| winnt::FILE_NOTIFY_CHANGE_CREATION
| winnt::FILE_NOTIFY_CHANGE_SECURITY;
let monitor_subdir = if (&request.data.file).is_none() && request.data.is_recursive {
1
} else {
0
};
unsafe {
let mut overlapped: Box<OVERLAPPED> = Box::new(mem::zeroed());
// When using callback based async requests, we are allowed to use the hEvent member
// for our own purposes
let req_buf = request.buffer.as_mut_ptr() as *mut c_void;
let request_p = Box::into_raw(request) as *mut c_void;
overlapped.hEvent = request_p;
// This is using an asynchronous call with a completion routine for receiving notifications
// An I/O completion port would probably be more performant
let ret = winbase::ReadDirectoryChangesW(
handle,
req_buf,
BUF_SIZE,
monitor_subdir,
flags,
&mut 0u32 as *mut u32, // not used for async reqs
&mut *overlapped as *mut OVERLAPPED,
Some(handle_event),
);
if ret == 0 {
// error reading. retransmute request memory to allow drop.
// allow overlapped to drop by omitting forget()
let request: Box<ReadDirectoryRequest> = mem::transmute(request_p);
synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
} else {
// read ok. forget overlapped to let the completion routine handle memory
mem::forget(overlapped);
}
}
}
unsafe extern "system" fn handle_event(
error_code: u32,
_bytes_written: u32,
overlapped: LPOVERLAPPED,
) {
let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);
if error_code == ERROR_OPERATION_ABORTED {
// received when dir is unwatched or watcher is shutdown; return and let overlapped/request
// get drop-cleaned
synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
return;
}
// Get the next request queued up as soon as possible
start_read(&request.data, request.event_fn.clone(), request.handle);
// The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length
// string as its last member. Each struct contains an offset for getting the next entry in
// the buffer.
let mut cur_offset: *const u8 = request.buffer.as_ptr();
let mut cur_entry = cur_offset as *const FILE_NOTIFY_INFORMATION;
loop {
// filename length is size in bytes, so / 2
let len = (*cur_entry).FileNameLength as usize / 2;
let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len);
// prepend root to get a full path
let path = request
.data
.dir
.join(PathBuf::from(OsString::from_wide(encoded_path)));
// if we are watching a single file, ignore the event unless the path is exactly
// the watched file
let skip = match request.data.file {
None => false,
Some(ref watch_path) => *watch_path != path,
};
if !skip {
let newe = Event::new(EventKind::Any).add_path(path);
fn emit_event(event_fn: &Mutex<dyn EventFn>, res: Result<Event>) {
if let Ok(guard) = event_fn.lock() {
let f: &dyn EventFn = &*guard;
f(res);
}
}
let event_fn = |res| emit_event(&request.event_fn, res);
if (*cur_entry).Action == winnt::FILE_ACTION_RENAMED_OLD_NAME {
let mode = RenameMode::From;
let kind = ModifyKind::Name(mode);
let kind = EventKind::Modify(kind);
let ev = newe.set_kind(kind);
event_fn(Ok(ev))
} else {
match (*cur_entry).Action {
winnt::FILE_ACTION_RENAMED_NEW_NAME => {
let kind = EventKind::Modify(ModifyKind::Name(RenameMode::To));
let ev = newe.set_kind(kind);
event_fn(Ok(ev));
}
winnt::FILE_ACTION_ADDED => {
let kind = EventKind::Create(CreateKind::Any);
let ev = newe.set_kind(kind);
event_fn(Ok(ev));
}
winnt::FILE_ACTION_REMOVED => {
let kind = EventKind::Remove(RemoveKind::Any);
let ev = newe.set_kind(kind);
event_fn(Ok(ev));
}
winnt::FILE_ACTION_MODIFIED => {
let kind = EventKind::Modify(ModifyKind::Any);
let ev = newe.set_kind(kind);
event_fn(Ok(ev));
}
_ => (),
};
}
}
if (*cur_entry).NextEntryOffset == 0 {
break;
}
cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize);
cur_entry = cur_offset as *const FILE_NOTIFY_INFORMATION;
}
}
/// Watcher implementation based on ReadDirectoryChanges
pub struct ReadDirectoryChangesWatcher {
tx: Sender<Action>,
cmd_rx: Receiver<Result<PathBuf>>,
wakeup_sem: HANDLE,
}
impl ReadDirectoryChangesWatcher {
pub fn create(
event_fn: Arc<Mutex<dyn EventFn>>,
meta_tx: Sender<MetaEvent>,
) -> Result<ReadDirectoryChangesWatcher> {
let (cmd_tx, cmd_rx) = unbounded();
let wakeup_sem =
unsafe { synchapi::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
if wakeup_sem.is_null() || wakeup_sem == INVALID_HANDLE_VALUE {
return Err(Error::generic("Failed to create wakeup semaphore."));
}
let action_tx = ReadDirectoryChangesServer::start(event_fn, meta_tx, cmd_tx, wakeup_sem);
Ok(ReadDirectoryChangesWatcher {
tx: action_tx,
cmd_rx,
wakeup_sem,
})
}
fn wakeup_server(&mut self) {
// breaks the server out of its wait state. right now this is really just an optimization,
// so that if you add a watch you don't block for 100ms in watch() while the
// server sleeps.
unsafe {
synchapi::ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut());
}
}
fn send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()> {
self.tx
.send(action)
.map_err(|_| Error::generic("Error sending to internal channel"))?;
// wake 'em up, we don't want to wait around for the ack
self.wakeup_server();
let ack_pb = self
.cmd_rx
.recv()
.map_err(|_| Error::generic("Error receiving from command channel"))?
.map_err(|e| Error::generic(&format!("Error in watcher: {:?}", e)))?;
if pb.as_path() != ack_pb.as_path() {
Err(Error::generic(&format!(
"Expected ack for {:?} but got \
ack for {:?}",
pb, ack_pb
)))
} else {
Ok(())
}
}
fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
let pb = if path.is_absolute() {
path.to_owned()
} else {
let p = env::current_dir().map_err(Error::io)?;
p.join(path)
};
// path must exist and be either a file or directory
if !pb.is_dir() && !pb.is_file() {
return Err(Error::generic(
"Input watch path is neither a file nor a directory.",
));
}
self.send_action_require_ack(Action::Watch(pb.clone(), recursive_mode), &pb)
}
fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
let pb = if path.is_absolute() {
path.to_owned()
} else {
let p = env::current_dir().map_err(Error::io)?;
p.join(path)
};
let res = self
.tx
.send(Action::Unwatch(pb))
.map_err(|_| Error::generic("Error sending to internal channel"));
self.wakeup_server();
res
}
}
impl Watcher for ReadDirectoryChangesWatcher {
fn new_immediate<F: EventFn>(event_fn: F) -> Result<ReadDirectoryChangesWatcher> {
// create dummy channel for meta event
// TODO: determine the original purpose of this - can we remove it?
let (meta_tx, _) = unbounded();
let event_fn = Arc::new(Mutex::new(event_fn));
ReadDirectoryChangesWatcher::create(event_fn, meta_tx)
}
fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path.as_ref(), recursive_mode)
}
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
self.unwatch_inner(path.as_ref())
}
fn configure(&mut self, config: Config) -> Result<bool> {
let (tx, rx) = bounded(1);
self.tx.send(Action::Configure(config, tx))?;
rx.recv()?
}
}
impl Drop for ReadDirectoryChangesWatcher {
fn drop(&mut self) {
let _ = self.tx.send(Action::Stop);
// better wake it up
self.wakeup_server();
}
}
// `ReadDirectoryChangesWatcher` is not Send/Sync because of the semaphore Handle.
// As said elsewhere it's perfectly safe to send it across threads.
unsafe impl Send for ReadDirectoryChangesWatcher {}
// Because all public methods are `&mut self` it's also perfectly safe to share references.
unsafe impl Sync for ReadDirectoryChangesWatcher {}