blob: 78b17f46fab1bd85e251db4298cf1ac788929de9 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Stream-based Fuchsia VFS directory watcher
#![deny(missing_docs)]
use fuchsia_async as fasync;
use fuchsia_zircon::{self as zx, assoc_values};
use fdio::fdio_sys;
use fidl_fuchsia_io::WATCH_MASK_ALL;
use futures::stream::{FusedStream, Stream};
use std::ffi::OsStr;
use std::io;
use std::marker::Unpin;
use std::os::unix::ffi::OsStrExt;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Describes the type of event that occurred in the direcotry being watched.
#[repr(C)]
#[derive(Copy, Clone, Eq, PartialEq)]
pub struct WatchEvent(u8);
assoc_values!(WatchEvent, [
/// A file was added.
ADD_FILE = fidl_fuchsia_io::WATCH_EVENT_ADDED;
/// A file was removed.
REMOVE_FILE = fidl_fuchsia_io::WATCH_EVENT_REMOVED;
/// A file existed at the time the Watcher was created.
EXISTING = fidl_fuchsia_io::WATCH_EVENT_EXISTING;
/// All existing files have been enumerated.
IDLE = fidl_fuchsia_io::WATCH_EVENT_IDLE;
]);
/// A message containing a `WatchEvent` and the filename (relative to the directory being watched)
/// that triggered the event.
#[derive(Debug)]
pub struct WatchMessage {
/// The event that occurred.
pub event: WatchEvent,
/// The filename that triggered the message.
pub filename: PathBuf,
}
/// Provides a Stream of WatchMessages corresponding to filesystem events for a given directory.
#[derive(Debug)]
#[must_use = "futures/streams must be polled"]
pub struct Watcher {
ch: fasync::Channel,
// If idx >= buf.bytes().len(), you must call reset_buf() before get_next_msg().
buf: zx::MessageBuf,
idx: usize,
}
impl Unpin for Watcher {}
impl Watcher {
/// Creates a new `Watcher` for the directory given by `dir`.
pub async fn new(dir: fidl_fuchsia_io::DirectoryProxy) -> Result<Watcher, anyhow::Error> {
let (h0, h1) = zx::Channel::create()?;
let options = 0u32;
let status = dir.watch(WATCH_MASK_ALL, options, h1).await?;
zx::Status::ok(status)?;
let mut buf = zx::MessageBuf::new();
buf.ensure_capacity_bytes(fidl_fuchsia_io::MAX_BUF as usize);
Ok(Watcher { ch: fasync::Channel::from_channel(h0)?, buf, idx: 0 })
}
fn reset_buf(&mut self) {
self.idx = 0;
self.buf.clear();
}
fn get_next_msg(&mut self) -> WatchMessage {
assert!(self.idx < self.buf.bytes().len());
let next_msg = VfsWatchMsg::from_raw(&self.buf.bytes()[self.idx..])
.expect("Invalid buffer received by Watcher!");
self.idx += next_msg.len();
let mut pathbuf = PathBuf::new();
pathbuf.push(OsStr::from_bytes(next_msg.name()));
let event = next_msg.event();
WatchMessage { event: event, filename: pathbuf }
}
}
impl FusedStream for Watcher {
fn is_terminated(&self) -> bool {
// `Watcher` never completes
// (FIXME: or does it? is an error completion?)
false
}
}
impl Stream for Watcher {
type Item = Result<WatchMessage, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
if this.idx >= this.buf.bytes().len() {
this.reset_buf();
}
if this.idx == 0 {
match this.ch.recv_from(cx, &mut this.buf) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Some(Ok(this.get_next_msg())))
}
}
#[repr(C)]
#[derive(Debug)]
struct vfs_watch_msg_t {
event: u8,
len: u8,
name: fdio_sys::__IncompleteArrayField<u8>,
}
#[derive(Debug)]
struct VfsWatchMsg<'a> {
inner: &'a vfs_watch_msg_t,
}
impl<'a> VfsWatchMsg<'a> {
fn from_raw(buf: &'a [u8]) -> Option<VfsWatchMsg<'a>> {
if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() {
return None;
}
// This is safe as long as the buffer is at least as large as a vfs_watch_msg_t, which we
// just verified. Further, we verify that the buffer has enough bytes to hold the
// "incomplete array field" member.
let m = unsafe { VfsWatchMsg { inner: &*(buf.as_ptr() as *const vfs_watch_msg_t) } };
if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() + m.namelen() {
return None;
}
Some(m)
}
fn len(&self) -> usize {
::std::mem::size_of::<vfs_watch_msg_t>() + self.namelen()
}
fn event(&self) -> WatchEvent {
WatchEvent(self.inner.event)
}
fn namelen(&self) -> usize {
self.inner.len as usize
}
fn name(&self) -> &'a [u8] {
// This is safe because we verified during construction that the inner name field has at
// least namelen() bytes in it.
unsafe { self.inner.name.as_slice(self.namelen()) }
}
}
#[cfg(test)]
mod tests {
use super::*;
use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
use fuchsia_zircon::prelude::*;
use futures::prelude::*;
use io_util::{open_directory_in_namespace, OPEN_RIGHT_READABLE};
use std::fmt::Debug;
use std::fs::File;
use std::path::Path;
use tempfile::tempdir;
fn one_step<'a, S, OK, ERR>(s: &'a mut S) -> impl Future<Output = OK> + 'a
where
S: Stream<Item = Result<OK, ERR>> + Unpin,
ERR: Debug,
{
let f = s.next();
let f = f.on_timeout(500.millis().after_now(), || panic!("timeout waiting for watcher"));
f.map(|next| {
next.expect("the stream yielded no next item")
.unwrap_or_else(|e| panic!("Error waiting for watcher: {:?}", e))
})
}
#[fasync::run_singlethreaded(test)]
async fn test_existing() {
let tmp_dir = tempdir().unwrap();
let _ = File::create(tmp_dir.path().join("file1")).unwrap();
let dir =
open_directory_in_namespace(tmp_dir.path().to_str().unwrap(), OPEN_RIGHT_READABLE)
.unwrap();
let mut w = Watcher::new(dir).await.unwrap();
// TODO(tkilbourn): this assumes "." always comes before "file1". If this test ever starts
// flaking, handle the case of unordered EXISTING files.
let msg = one_step(&mut w).await;
assert_eq!(WatchEvent::EXISTING, msg.event);
assert_eq!(Path::new("."), msg.filename);
let msg = one_step(&mut w).await;
assert_eq!(WatchEvent::EXISTING, msg.event);
assert_eq!(Path::new("file1"), msg.filename);
let msg = one_step(&mut w).await;
assert_eq!(WatchEvent::IDLE, msg.event);
}
#[fasync::run_singlethreaded(test)]
async fn test_add() {
let tmp_dir = tempdir().unwrap();
let dir =
open_directory_in_namespace(tmp_dir.path().to_str().unwrap(), OPEN_RIGHT_READABLE)
.unwrap();
let mut w = Watcher::new(dir).await.unwrap();
loop {
let msg = one_step(&mut w).await;
match msg.event {
WatchEvent::EXISTING => continue,
WatchEvent::IDLE => break,
_ => panic!("Unexpected watch event!"),
}
}
let _ = File::create(tmp_dir.path().join("file1")).unwrap();
let msg = one_step(&mut w).await;
assert_eq!(WatchEvent::ADD_FILE, msg.event);
assert_eq!(Path::new("file1"), msg.filename);
}
#[fasync::run_singlethreaded(test)]
async fn test_remove() {
let tmp_dir = tempdir().unwrap();
let filename = "file1";
let filepath = tmp_dir.path().join(filename);
let _ = File::create(&filepath).unwrap();
let dir =
open_directory_in_namespace(tmp_dir.path().to_str().unwrap(), OPEN_RIGHT_READABLE)
.unwrap();
let mut w = Watcher::new(dir).await.unwrap();
loop {
let msg = one_step(&mut w).await;
match msg.event {
WatchEvent::EXISTING => continue,
WatchEvent::IDLE => break,
_ => panic!("Unexpected watch event!"),
}
}
::std::fs::remove_file(&filepath).unwrap();
let msg = one_step(&mut w).await;
assert_eq!(WatchEvent::REMOVE_FILE, msg.event);
assert_eq!(Path::new(filename), msg.filename);
}
#[fasync::run_singlethreaded(test)]
#[should_panic]
async fn test_timeout() {
let tmp_dir = tempdir().unwrap();
let dir =
open_directory_in_namespace(tmp_dir.path().to_str().unwrap(), OPEN_RIGHT_READABLE)
.unwrap();
let mut w = Watcher::new(dir).await.unwrap();
loop {
let msg = one_step(&mut w).await;
match msg.event {
WatchEvent::EXISTING => continue,
WatchEvent::IDLE => break,
_ => panic!("Unexpected watch event!"),
}
}
// Ensure that our test timeouts actually work by waiting for another event that will never
// arrive.
let _ = one_step(&mut w).await;
}
}