[rust][vfs] wire up readdir

Change-Id: I5f77f6c7e39acbaafe2bc9b0147eb1021cd7b902
diff --git a/public/rust/crates/fuchsia-vfs/src/lib.rs b/public/rust/crates/fuchsia-vfs/src/lib.rs
index 30afc4c..db1f08c 100644
--- a/public/rust/crates/fuchsia-vfs/src/lib.rs
+++ b/public/rust/crates/fuchsia-vfs/src/lib.rs
@@ -6,7 +6,7 @@
 
 extern crate bytes;
 extern crate fdio;
-extern crate fuchsia_zircon as zircon;
+extern crate fuchsia_zircon as zx;
 extern crate futures;
 extern crate libc;
 #[macro_use]
@@ -15,7 +15,7 @@
 
 use std::path::Path;
 use std::sync::Arc;
-use zircon::AsHandleRef;
+use zx::AsHandleRef;
 
 mod mount;
 
@@ -27,12 +27,12 @@
     vfs: Arc<Vfs>,
     vn: Arc<Vnode>,
     handle: &tokio_core::reactor::Handle,
-) -> Result<mount::Mount, zircon::Status> {
-    let (c1, c2) = zircon::Channel::create()?;
+) -> Result<mount::Mount, zx::Status> {
+    let (c1, c2) = zx::Channel::create()?;
     let m = mount::mount(path, c1)?;
     c2.signal_handle(
-        zircon::Signals::NONE,
-        zircon::Signals::USER_0,
+        zx::Signals::NONE,
+        zx::Signals::USER_0,
     )?;
     let c = Connection::new(Arc::clone(&vfs), vn, c2, handle)?;
     vfs.register_connection(c, handle);
@@ -42,6 +42,8 @@
 #[cfg(test)]
 mod test {
     use super::*;
+    use std::os::unix::ffi::OsStrExt;
+    use std::os::raw::c_char;
 
     extern crate tempdir;
 
@@ -69,17 +71,51 @@
         let path = d.path().to_owned();
 
         std::thread::spawn(move || {
+            eprintln!("thread start");
+            eprintln!("about to open");
+
             let e = std::fs::OpenOptions::new()
                 .read(true)
                 .open(path)
                 .expect_err("expected notsupported");
+
+            eprintln!("open done");
             tx.send(e).unwrap();
+            eprintln!("thread done");
         });
 
+            // XXX(raggi): deterministic deadlock:
+            std::thread::sleep(std::time::Duration::from_millis(100));
+
+        eprintln!("about to run receive future");
         let e = core.run(rx).unwrap();
+        eprintln!("receive future complete");
 
         assert_eq!(std::io::ErrorKind::Other, e.kind());
 
+        eprintln!("about to drop (unmount)");
         std::mem::drop(m);
+        eprintln!("drop (unmount) done");
     }
+
+    // hellofs implements a filesystem that exposes:
+    //  ./
+    //    hello/
+    //          world
+    //               `hello world`
+    struct HelloFS {}
+
+    impl Vfs for HelloFS {
+        fn open(&self, vn: &Arc<Vnode>, path: std::path::PathBuf, flags: u32, mode: u32) -> Result<(Arc<Vnode>, std::path::PathBuf), zx::Status> {
+            Err(zx::Status::NOT_FOUND)
+        }
+    }
+
+    struct HelloDir {}
+
+    impl Vnode for HelloDir {}
+
+    struct HelloFile {}
+
+    impl Vnode for HelloFile {}
 }
diff --git a/public/rust/crates/fuchsia-vfs/src/mount.rs b/public/rust/crates/fuchsia-vfs/src/mount.rs
index ce3ccd0..b292feb 100644
--- a/public/rust/crates/fuchsia-vfs/src/mount.rs
+++ b/public/rust/crates/fuchsia-vfs/src/mount.rs
@@ -2,8 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use zircon;
-use zircon::{DurationNum, HandleBased};
+use zx;
+use zx::{DurationNum, HandleBased};
 use fdio;
 
 use std;
@@ -18,9 +18,9 @@
 #[link(name = "fs-management")]
 extern "C" {
     fn vfs_unmount_handle(
-        srv: zircon::sys::zx_handle_t,
-        deadline: zircon::sys::zx_time_t,
-    ) -> zircon::sys::zx_status_t;
+        srv: zx::sys::zx_handle_t,
+        deadline: zx::sys::zx_time_t,
+    ) -> zx::sys::zx_status_t;
 }
 
 pub struct Mount {
@@ -29,7 +29,7 @@
 
 impl Drop for Mount {
     fn drop(&mut self) {
-        let mut h: zircon::sys::zx_handle_t = zircon::sys::ZX_HANDLE_INVALID;
+        let mut h: zx::sys::zx_handle_t = zx::sys::ZX_HANDLE_INVALID;
 
         let sz = unsafe {
             fdio::ioctl(
@@ -38,7 +38,7 @@
                 std::ptr::null_mut(),
                 0,
                 &mut h as *mut _ as *mut std::os::raw::c_void,
-                std::mem::size_of::<zircon::sys::zx_handle_t>(),
+                std::mem::size_of::<zx::sys::zx_handle_t>(),
             )
         };
 
@@ -49,18 +49,18 @@
             // TODO(raggi): what is a reasonable timeout value here?
             let deadline = 1_000_000.nanos().after_now();
             let status = unsafe { vfs_unmount_handle(h, deadline.nanos()) };
-            if status != zircon::sys::ZX_OK {
+            if status != zx::sys::ZX_OK {
                 eprintln!("fuchsia-vfs: failed to unmount handle: {:?}", status);
             }
         }
 
-        unsafe { zircon::sys::zx_handle_close(h) };
+        unsafe { zx::sys::zx_handle_close(h) };
 
         std::mem::drop(unsafe { fs::File::from_raw_fd(self.mountfd) });
     }
 }
 
-pub fn mount(path: &Path, chan: zircon::Channel) -> Result<Mount, zircon::Status> {
+pub fn mount(path: &Path, chan: zx::Channel) -> Result<Mount, zx::Status> {
     let dir = fs::OpenOptions::new()
         .read(true)
         .custom_flags(O_DIRECTORY | O_ADMIN | O_NOREMOTE)
@@ -76,15 +76,15 @@
             mount.mountfd,
             fdio::IOCTL_VFS_MOUNT_FS,
             &h as *const _ as *const std::os::raw::c_void,
-            std::mem::size_of::<zircon::sys::zx_handle_t>(),
+            std::mem::size_of::<zx::sys::zx_handle_t>(),
             std::ptr::null_mut(),
             0,
         )
     };
 
     if sz != 0 {
-        unsafe { zircon::sys::zx_handle_close(h) };
-        return Err(zircon::Status::from_raw(sz as i32));
+        unsafe { zx::sys::zx_handle_close(h) };
+        return Err(zx::Status::from_raw(sz as i32));
     }
 
     Ok(mount)
@@ -93,25 +93,25 @@
 #[cfg(test)]
 mod test {
     use super::*;
-    use zircon::AsHandleRef;
+    use zx::AsHandleRef;
 
     extern crate tempdir;
 
     #[test]
     fn test_mount_unmount() {
-        let (c1, c2) = zircon::Channel::create().unwrap();
+        let (c1, c2) = zx::Channel::create().unwrap();
 
         // TODO(raggi): where is the appropriate place to put this, it's part of the mount protocol?
-        c2.signal_handle(zircon::Signals::NONE, zircon::Signals::USER_0)
+        c2.signal_handle(zx::Signals::NONE, zx::Signals::USER_0)
             .unwrap();
 
-        let port = zircon::Port::create().unwrap();
+        let port = zx::Port::create().unwrap();
 
         c2.wait_async_handle(
             &port,
             1,
-            zircon::Signals::CHANNEL_PEER_CLOSED,
-            zircon::WaitAsyncOpts::Once,
+            zx::Signals::CHANNEL_PEER_CLOSED,
+            zx::WaitAsyncOpts::Once,
         ).unwrap();
 
         let td = tempdir::TempDir::new("test_mount_unmount").unwrap();
@@ -119,7 +119,7 @@
         let m = mount(&td.path(), c1).unwrap();
 
         assert_eq!(
-            zircon::Status::TIMED_OUT,
+            zx::Status::TIMED_OUT,
             port.wait(2_000_000.nanos().after_now()).expect_err(
                 "timeout",
             )
@@ -129,10 +129,10 @@
 
         let packet = port.wait(2_000_000.nanos().after_now()).unwrap();
         match packet.contents() {
-            zircon::PacketContents::SignalOne(sp) => {
+            zx::PacketContents::SignalOne(sp) => {
                 assert_eq!(
-                    zircon::Signals::CHANNEL_PEER_CLOSED,
-                    sp.observed() & zircon::Signals::CHANNEL_PEER_CLOSED
+                    zx::Signals::CHANNEL_PEER_CLOSED,
+                    sp.observed() & zx::Signals::CHANNEL_PEER_CLOSED
                 );
             }
             _ => assert!(false, "expected signalone packet"),
diff --git a/public/rust/crates/fuchsia-vfs/src/vfs.rs b/public/rust/crates/fuchsia-vfs/src/vfs.rs
index c91f6b3..566cd9f 100644
--- a/public/rust/crates/fuchsia-vfs/src/vfs.rs
+++ b/public/rust/crates/fuchsia-vfs/src/vfs.rs
@@ -6,7 +6,7 @@
 
 use std::sync::Arc;
 use tokio_core;
-use zircon;
+use zx;
 use tokio_fuchsia;
 use futures;
 use futures::Future;
@@ -14,6 +14,34 @@
 use std::io;
 use std::os::unix::ffi::OsStrExt;
 use fdio;
+use libc;
+
+// validate open flags that are not vnode specific
+fn prevalidate_flags(flags: i32) -> Result<(), zx::Status> {
+    let f = flags & libc::O_ACCMODE;
+    if f == libc::O_PATH {
+        return Ok(());
+    }
+    if f & libc::O_RDONLY != 0 {
+        if flags & libc::O_TRUNC != 0 {
+            return Err(zx::Status::INVALID_ARGS)
+        }
+        return Ok(());
+    }
+    if f == libc::O_WRONLY || f == libc::O_RDWR {
+        return Ok(());
+    }
+    Err(zx::Status::INVALID_ARGS)
+}
+
+#[test]
+fn test_prevalidate_flags() {
+    assert!(prevalidate_flags(libc::O_PATH).is_ok());
+    assert!(prevalidate_flags(libc::O_RDONLY).is_ok());
+    assert!(prevalidate_flags(libc::O_WRONLY).is_ok());
+    assert!(prevalidate_flags(libc::O_RDWR).is_ok());
+
+}
 
 /// Vfs contains filesystem global state and outlives all Vnodes that it
 /// services. It fundamentally handles filesystem global concerns such as path
@@ -23,11 +51,13 @@
         &self,
         _vn: &Arc<Vnode>,
         _path: std::path::PathBuf,
-        _flags: i32,
+        _flags: u32,
         _mode: u32,
-    ) -> Result<(Arc<Vnode>, std::path::PathBuf), zircon::Status> {
-        // TODO(raggi): ...
-        Err(zircon::Status::NOT_SUPPORTED)
+    ) -> Result<(Arc<Vnode>, std::path::PathBuf), zx::Status> {
+        // TODO(raggi): locking
+
+
+        Err(zx::Status::NOT_SUPPORTED)
     }
 
     fn register_connection(&self, c: Connection, handle: &tokio_core::reactor::Handle) {
@@ -41,15 +71,16 @@
 /// addressable via more than one path). It may have file, directory, mount or
 /// device semantics.
 pub trait Vnode {
-    fn close(&self) -> zircon::Status {
-        zircon::Status::OK
+    fn close(&self) -> zx::Status {
+        zx::Status::OK
     }
 
-    fn serve(&self, _vfs: Arc<Vfs>, _chan: tokio_fuchsia::Channel, _flags: i32) {
-        // TODO(raggi): ...
-        // TODO(raggi): ...
-        // TODO(raggi): ...
-        // TODO(raggi): ...
+    /// If the Vnode should be served as a regular FDIO connection, consume the
+    /// flags as required and return the channel. A Connection will be
+    /// constructed and FDIO messages dispatched to this Vnode. Otherwise,
+    /// consume the channel and return None.
+    fn should_serve(&self, chan: tokio_fuchsia::Channel, _flags: u32, _handle: &tokio_core::reactor::Handle) -> Option<tokio_fuchsia::Channel> {
+        Some(chan)
     }
 }
 
@@ -67,9 +98,9 @@
     pub fn new(
         vfs: Arc<Vfs>,
         vn: Arc<Vnode>,
-        chan: zircon::Channel,
+        chan: zx::Channel,
         handle: &tokio_core::reactor::Handle,
-    ) -> Result<Connection, io::Error> {
+    ) -> Result<Connection, zx::Status> {
         let c = Connection {
             vfs: vfs,
             vn: vn,
@@ -80,20 +111,25 @@
         Ok(c)
     }
 
-    fn dispatch(&mut self, msg: &mut fdio::rio::Message) -> Result<(), std::io::Error> {
-        // TODO(raggi): in the case of protocol errors for non-pipelined opens,
-        // we sometimes will fail to send an appropriate object description back
-        // to the serving channel. This needs to be addressed.
-        msg.validate().map_err(|_| {
-            std::io::Error::from(std::io::ErrorKind::InvalidInput)
-        })?;
+    fn dispatch(&mut self, msg: &mut fdio::rio::Message) -> Result<(), zx::Status> {
+        let pipelined = msg.arg() & fdio::fdio_sys::O_PIPELINE != 0;
+
+        if let Err(e) = msg.validate() {
+            println!("{:?} <- {:?} (INVALID {:?})", self.chan, msg, e);
+            // if the request is pipelined, just drop the reply channel and all is well
+            if !pipelined {
+                self.reply_status(&self.chan, zx::Status::INVALID_ARGS)?;
+                // TODO(raggi): return ok here? need to define what dispatch errors really mean
+                return Err(zx::Status::INVALID_ARGS.into());
+            }
+        }
 
         println!("{:?} <- {:?}", self.chan, msg);
 
         match msg.op() {
             fdio::fdio_sys::ZXRIO_OPEN => {
                 let chan = tokio_fuchsia::Channel::from_channel(
-                    zircon::Channel::from(
+                    zx::Channel::from(
                         msg.take_handle(0).expect("vfs: handle disappeared"),
                     ),
                     &self.handle,
@@ -101,26 +137,27 @@
 
                 // TODO(raggi): enforce O_ADMIN
                 if msg.datalen() < 1 || msg.datalen() > PATH_MAX as u32 {
-                    self.reply_status(&chan, zircon::Status::INVALID_ARGS)?;
-                    return Err(zircon::Status::INVALID_ARGS.into());
+                    if !pipelined {
+                        self.reply_status(&self.chan, zx::Status::INVALID_ARGS)?;
+                    }
+                    // TODO(raggi): return ok here? need to define what dispatch errors really mean
+                    return Err(zx::Status::INVALID_ARGS.into());
                 }
 
                 let path = std::path::PathBuf::from(std::ffi::OsStr::from_bytes(msg.data()));
 
                 // TODO(raggi): verify if the protocol mistreatment of args signage is intentionally unchecked here:
-                self.open(chan, path, msg.arg(), msg.mode())?;
+                self.open(chan, path, msg.arg(), msg.mode())
             }
             // ZXRIO_STAT => self.stat(msg, chan, handle),
             // ZXRIO_CLOSE => self.close(msg, chan, handle),
             _ => {
                 self.reply_status(
                     &self.chan,
-                    zircon::Status::NOT_SUPPORTED,
-                )?
+                    zx::Status::NOT_SUPPORTED,
+                )
             }
         }
-
-        Ok(())
     }
 
     fn open(
@@ -129,13 +166,13 @@
         path: std::path::PathBuf,
         flags: i32,
         mode: u32,
-    ) -> Result<(), std::io::Error> {
+    ) -> Result<(), zx::Status> {
         let pipeline = flags & fdio::fdio_sys::O_PIPELINE != 0;
-        let open_flags = flags & !fdio::fdio_sys::O_PIPELINE;
+        let open_flags: u32 = (flags & !fdio::fdio_sys::O_PIPELINE) as u32;
 
-        let mut status = zircon::Status::OK;
+        let mut status = zx::Status::OK;
         let mut proto = fdio::fdio_sys::FDIO_PROTOCOL_REMOTE;
-        let mut handles: Vec<zircon::Handle> = vec![];
+        let mut handles: Vec<zx::Handle> = vec![];
 
         match self.vfs.open(&self.vn, path, open_flags, mode) {
             Ok((vn, _path)) => {
@@ -147,40 +184,52 @@
                     return Err(std::io::ErrorKind::InvalidInput.into());
                 }
 
-                if status != zircon::Status::OK {
+                if status != zx::Status::OK {
                     return Err(std::io::ErrorKind::InvalidInput.into());
                 }
 
-                if !pipeline {
-                    fdio::rio::write_object(&chan, status, proto, &[], &mut handles).ok();
+                if let Some(chan) = vn.should_serve(chan, open_flags, &self.handle) {
+
+                    if !pipeline {
+                        self.reply_object(&chan, status, proto, &[], &mut handles)?;
+                    }
+
+                    self.vfs.register_connection(Connection{vfs: self.vfs.clone(), vn, chan, handle: self.handle.clone()}, &self.handle)
                 }
-
-                // TODO(raggi): construct connection...
-                vn.serve(Arc::clone(&self.vfs), chan, open_flags);
-
-                return Ok(());
+                // if should_serve consumed the channel, it must also handle the reply
+                return Ok(())
             }
             Err(e) => {
                 proto = 0;
                 status = e;
                 eprintln!("vfs: open error: {:?}", e);
+
+                if !pipeline {
+                    self.reply_object(&chan, status, proto, &[], &mut handles)?;
+                }
+                return Ok(())
             }
         }
+    }
 
-        if !pipeline {
-            return fdio::rio::write_object(&chan, status, proto, &[], &mut handles)
-                .map_err(Into::into);
-        }
-        Ok(())
+    fn reply_object(
+        &self,
+        chan: &tokio_fuchsia::Channel,
+        status: zx::Status,
+        type_: u32,
+        extra: &[u8],
+        handles: &mut Vec<zx::Handle>,
+    ) -> Result<(), zx::Status> {
+        println!("{:?} -> {:?}", &chan, status);
+        fdio::rio::write_object(chan, status, type_, extra, handles)
     }
 
     fn reply_status(
         &self,
         chan: &tokio_fuchsia::Channel,
-        status: zircon::Status,
-    ) -> Result<(), io::Error> {
-        println!("{:?} -> {:?}", &chan, status);
-        fdio::rio::write_object(chan, status, 0, &[], &mut vec![]).map_err(Into::into)
+        status: zx::Status,
+    ) -> Result<(), zx::Status> {
+        self.reply_object(chan, status, 0, &[], &mut vec![])
     }
 }
 
@@ -189,7 +238,7 @@
     type Error = io::Error;
 
     fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
-        let mut buf = zircon::MessageBuf::new();
+        let mut buf = zx::MessageBuf::new();
         buf.ensure_capacity_bytes(fdio::fdio_sys::ZXRIO_MSG_SZ);
         loop {
             try_nb!(self.chan.recv_from(&mut buf));