[rust][vfs] wire up readdir
Change-Id: I5f77f6c7e39acbaafe2bc9b0147eb1021cd7b902
diff --git a/public/rust/crates/fuchsia-vfs/src/lib.rs b/public/rust/crates/fuchsia-vfs/src/lib.rs
index 30afc4c..db1f08c 100644
--- a/public/rust/crates/fuchsia-vfs/src/lib.rs
+++ b/public/rust/crates/fuchsia-vfs/src/lib.rs
@@ -6,7 +6,7 @@
extern crate bytes;
extern crate fdio;
-extern crate fuchsia_zircon as zircon;
+extern crate fuchsia_zircon as zx;
extern crate futures;
extern crate libc;
#[macro_use]
@@ -15,7 +15,7 @@
use std::path::Path;
use std::sync::Arc;
-use zircon::AsHandleRef;
+use zx::AsHandleRef;
mod mount;
@@ -27,12 +27,12 @@
vfs: Arc<Vfs>,
vn: Arc<Vnode>,
handle: &tokio_core::reactor::Handle,
-) -> Result<mount::Mount, zircon::Status> {
- let (c1, c2) = zircon::Channel::create()?;
+) -> Result<mount::Mount, zx::Status> {
+ let (c1, c2) = zx::Channel::create()?;
let m = mount::mount(path, c1)?;
c2.signal_handle(
- zircon::Signals::NONE,
- zircon::Signals::USER_0,
+ zx::Signals::NONE,
+ zx::Signals::USER_0,
)?;
let c = Connection::new(Arc::clone(&vfs), vn, c2, handle)?;
vfs.register_connection(c, handle);
@@ -42,6 +42,8 @@
#[cfg(test)]
mod test {
use super::*;
+ use std::os::unix::ffi::OsStrExt;
+ use std::os::raw::c_char;
extern crate tempdir;
@@ -69,17 +71,51 @@
let path = d.path().to_owned();
std::thread::spawn(move || {
+ eprintln!("thread start");
+ eprintln!("about to open");
+
let e = std::fs::OpenOptions::new()
.read(true)
.open(path)
.expect_err("expected notsupported");
+
+ eprintln!("open done");
tx.send(e).unwrap();
+ eprintln!("thread done");
});
+ // XXX(raggi): deterministic deadlock:
+ std::thread::sleep(std::time::Duration::from_millis(100));
+
+ eprintln!("about to run receive future");
let e = core.run(rx).unwrap();
+ eprintln!("receive future complete");
assert_eq!(std::io::ErrorKind::Other, e.kind());
+ eprintln!("about to drop (unmount)");
std::mem::drop(m);
+ eprintln!("drop (unmount) done");
}
+
+ // hellofs implements a filesystem that exposes:
+ // ./
+ // hello/
+ // world
+ // `hello world`
+ struct HelloFS {}
+
+ impl Vfs for HelloFS {
+ fn open(&self, vn: &Arc<Vnode>, path: std::path::PathBuf, flags: u32, mode: u32) -> Result<(Arc<Vnode>, std::path::PathBuf), zx::Status> {
+ Err(zx::Status::NOT_FOUND)
+ }
+ }
+
+ struct HelloDir {}
+
+ impl Vnode for HelloDir {}
+
+ struct HelloFile {}
+
+ impl Vnode for HelloFile {}
}
diff --git a/public/rust/crates/fuchsia-vfs/src/mount.rs b/public/rust/crates/fuchsia-vfs/src/mount.rs
index ce3ccd0..b292feb 100644
--- a/public/rust/crates/fuchsia-vfs/src/mount.rs
+++ b/public/rust/crates/fuchsia-vfs/src/mount.rs
@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-use zircon;
-use zircon::{DurationNum, HandleBased};
+use zx;
+use zx::{DurationNum, HandleBased};
use fdio;
use std;
@@ -18,9 +18,9 @@
#[link(name = "fs-management")]
extern "C" {
fn vfs_unmount_handle(
- srv: zircon::sys::zx_handle_t,
- deadline: zircon::sys::zx_time_t,
- ) -> zircon::sys::zx_status_t;
+ srv: zx::sys::zx_handle_t,
+ deadline: zx::sys::zx_time_t,
+ ) -> zx::sys::zx_status_t;
}
pub struct Mount {
@@ -29,7 +29,7 @@
impl Drop for Mount {
fn drop(&mut self) {
- let mut h: zircon::sys::zx_handle_t = zircon::sys::ZX_HANDLE_INVALID;
+ let mut h: zx::sys::zx_handle_t = zx::sys::ZX_HANDLE_INVALID;
let sz = unsafe {
fdio::ioctl(
@@ -38,7 +38,7 @@
std::ptr::null_mut(),
0,
&mut h as *mut _ as *mut std::os::raw::c_void,
- std::mem::size_of::<zircon::sys::zx_handle_t>(),
+ std::mem::size_of::<zx::sys::zx_handle_t>(),
)
};
@@ -49,18 +49,18 @@
// TODO(raggi): what is a reasonable timeout value here?
let deadline = 1_000_000.nanos().after_now();
let status = unsafe { vfs_unmount_handle(h, deadline.nanos()) };
- if status != zircon::sys::ZX_OK {
+ if status != zx::sys::ZX_OK {
eprintln!("fuchsia-vfs: failed to unmount handle: {:?}", status);
}
}
- unsafe { zircon::sys::zx_handle_close(h) };
+ unsafe { zx::sys::zx_handle_close(h) };
std::mem::drop(unsafe { fs::File::from_raw_fd(self.mountfd) });
}
}
-pub fn mount(path: &Path, chan: zircon::Channel) -> Result<Mount, zircon::Status> {
+pub fn mount(path: &Path, chan: zx::Channel) -> Result<Mount, zx::Status> {
let dir = fs::OpenOptions::new()
.read(true)
.custom_flags(O_DIRECTORY | O_ADMIN | O_NOREMOTE)
@@ -76,15 +76,15 @@
mount.mountfd,
fdio::IOCTL_VFS_MOUNT_FS,
&h as *const _ as *const std::os::raw::c_void,
- std::mem::size_of::<zircon::sys::zx_handle_t>(),
+ std::mem::size_of::<zx::sys::zx_handle_t>(),
std::ptr::null_mut(),
0,
)
};
if sz != 0 {
- unsafe { zircon::sys::zx_handle_close(h) };
- return Err(zircon::Status::from_raw(sz as i32));
+ unsafe { zx::sys::zx_handle_close(h) };
+ return Err(zx::Status::from_raw(sz as i32));
}
Ok(mount)
@@ -93,25 +93,25 @@
#[cfg(test)]
mod test {
use super::*;
- use zircon::AsHandleRef;
+ use zx::AsHandleRef;
extern crate tempdir;
#[test]
fn test_mount_unmount() {
- let (c1, c2) = zircon::Channel::create().unwrap();
+ let (c1, c2) = zx::Channel::create().unwrap();
// TODO(raggi): where is the appropriate place to put this, it's part of the mount protocol?
- c2.signal_handle(zircon::Signals::NONE, zircon::Signals::USER_0)
+ c2.signal_handle(zx::Signals::NONE, zx::Signals::USER_0)
.unwrap();
- let port = zircon::Port::create().unwrap();
+ let port = zx::Port::create().unwrap();
c2.wait_async_handle(
&port,
1,
- zircon::Signals::CHANNEL_PEER_CLOSED,
- zircon::WaitAsyncOpts::Once,
+ zx::Signals::CHANNEL_PEER_CLOSED,
+ zx::WaitAsyncOpts::Once,
).unwrap();
let td = tempdir::TempDir::new("test_mount_unmount").unwrap();
@@ -119,7 +119,7 @@
let m = mount(&td.path(), c1).unwrap();
assert_eq!(
- zircon::Status::TIMED_OUT,
+ zx::Status::TIMED_OUT,
port.wait(2_000_000.nanos().after_now()).expect_err(
"timeout",
)
@@ -129,10 +129,10 @@
let packet = port.wait(2_000_000.nanos().after_now()).unwrap();
match packet.contents() {
- zircon::PacketContents::SignalOne(sp) => {
+ zx::PacketContents::SignalOne(sp) => {
assert_eq!(
- zircon::Signals::CHANNEL_PEER_CLOSED,
- sp.observed() & zircon::Signals::CHANNEL_PEER_CLOSED
+ zx::Signals::CHANNEL_PEER_CLOSED,
+ sp.observed() & zx::Signals::CHANNEL_PEER_CLOSED
);
}
_ => assert!(false, "expected signalone packet"),
diff --git a/public/rust/crates/fuchsia-vfs/src/vfs.rs b/public/rust/crates/fuchsia-vfs/src/vfs.rs
index c91f6b3..566cd9f 100644
--- a/public/rust/crates/fuchsia-vfs/src/vfs.rs
+++ b/public/rust/crates/fuchsia-vfs/src/vfs.rs
@@ -6,7 +6,7 @@
use std::sync::Arc;
use tokio_core;
-use zircon;
+use zx;
use tokio_fuchsia;
use futures;
use futures::Future;
@@ -14,6 +14,34 @@
use std::io;
use std::os::unix::ffi::OsStrExt;
use fdio;
+use libc;
+
+// validate open flags that are not vnode specific
+fn prevalidate_flags(flags: i32) -> Result<(), zx::Status> {
+ let f = flags & libc::O_ACCMODE;
+ if f == libc::O_PATH {
+ return Ok(());
+ }
+ if f & libc::O_RDONLY != 0 {
+ if flags & libc::O_TRUNC != 0 {
+ return Err(zx::Status::INVALID_ARGS)
+ }
+ return Ok(());
+ }
+ if f == libc::O_WRONLY || f == libc::O_RDWR {
+ return Ok(());
+ }
+ Err(zx::Status::INVALID_ARGS)
+}
+
+#[test]
+fn test_prevalidate_flags() {
+ assert!(prevalidate_flags(libc::O_PATH).is_ok());
+ assert!(prevalidate_flags(libc::O_RDONLY).is_ok());
+ assert!(prevalidate_flags(libc::O_WRONLY).is_ok());
+ assert!(prevalidate_flags(libc::O_RDWR).is_ok());
+
+}
/// Vfs contains filesystem global state and outlives all Vnodes that it
/// services. It fundamentally handles filesystem global concerns such as path
@@ -23,11 +51,13 @@
&self,
_vn: &Arc<Vnode>,
_path: std::path::PathBuf,
- _flags: i32,
+ _flags: u32,
_mode: u32,
- ) -> Result<(Arc<Vnode>, std::path::PathBuf), zircon::Status> {
- // TODO(raggi): ...
- Err(zircon::Status::NOT_SUPPORTED)
+ ) -> Result<(Arc<Vnode>, std::path::PathBuf), zx::Status> {
+ // TODO(raggi): locking
+
+
+ Err(zx::Status::NOT_SUPPORTED)
}
fn register_connection(&self, c: Connection, handle: &tokio_core::reactor::Handle) {
@@ -41,15 +71,16 @@
/// addressable via more than one path). It may have file, directory, mount or
/// device semantics.
pub trait Vnode {
- fn close(&self) -> zircon::Status {
- zircon::Status::OK
+ fn close(&self) -> zx::Status {
+ zx::Status::OK
}
- fn serve(&self, _vfs: Arc<Vfs>, _chan: tokio_fuchsia::Channel, _flags: i32) {
- // TODO(raggi): ...
- // TODO(raggi): ...
- // TODO(raggi): ...
- // TODO(raggi): ...
+ /// If the Vnode should be served as a regular FDIO connection, consume the
+ /// flags as required and return the channel. A Connection will be
+ /// constructed and FDIO messages dispatched to this Vnode. Otherwise,
+ /// consume the channel and return None.
+ fn should_serve(&self, chan: tokio_fuchsia::Channel, _flags: u32, _handle: &tokio_core::reactor::Handle) -> Option<tokio_fuchsia::Channel> {
+ Some(chan)
}
}
@@ -67,9 +98,9 @@
pub fn new(
vfs: Arc<Vfs>,
vn: Arc<Vnode>,
- chan: zircon::Channel,
+ chan: zx::Channel,
handle: &tokio_core::reactor::Handle,
- ) -> Result<Connection, io::Error> {
+ ) -> Result<Connection, zx::Status> {
let c = Connection {
vfs: vfs,
vn: vn,
@@ -80,20 +111,25 @@
Ok(c)
}
- fn dispatch(&mut self, msg: &mut fdio::rio::Message) -> Result<(), std::io::Error> {
- // TODO(raggi): in the case of protocol errors for non-pipelined opens,
- // we sometimes will fail to send an appropriate object description back
- // to the serving channel. This needs to be addressed.
- msg.validate().map_err(|_| {
- std::io::Error::from(std::io::ErrorKind::InvalidInput)
- })?;
+ fn dispatch(&mut self, msg: &mut fdio::rio::Message) -> Result<(), zx::Status> {
+ let pipelined = msg.arg() & fdio::fdio_sys::O_PIPELINE != 0;
+
+ if let Err(e) = msg.validate() {
+ println!("{:?} <- {:?} (INVALID {:?})", self.chan, msg, e);
+ // if the request is pipelined, just drop the reply channel and all is well
+ if !pipelined {
+ self.reply_status(&self.chan, zx::Status::INVALID_ARGS)?;
+ // TODO(raggi): return ok here? need to define what dispatch errors really mean
+ return Err(zx::Status::INVALID_ARGS.into());
+ }
+ }
println!("{:?} <- {:?}", self.chan, msg);
match msg.op() {
fdio::fdio_sys::ZXRIO_OPEN => {
let chan = tokio_fuchsia::Channel::from_channel(
- zircon::Channel::from(
+ zx::Channel::from(
msg.take_handle(0).expect("vfs: handle disappeared"),
),
&self.handle,
@@ -101,26 +137,27 @@
// TODO(raggi): enforce O_ADMIN
if msg.datalen() < 1 || msg.datalen() > PATH_MAX as u32 {
- self.reply_status(&chan, zircon::Status::INVALID_ARGS)?;
- return Err(zircon::Status::INVALID_ARGS.into());
+ if !pipelined {
+ self.reply_status(&self.chan, zx::Status::INVALID_ARGS)?;
+ }
+ // TODO(raggi): return ok here? need to define what dispatch errors really mean
+ return Err(zx::Status::INVALID_ARGS.into());
}
let path = std::path::PathBuf::from(std::ffi::OsStr::from_bytes(msg.data()));
// TODO(raggi): verify if the protocol mistreatment of args signage is intentionally unchecked here:
- self.open(chan, path, msg.arg(), msg.mode())?;
+ self.open(chan, path, msg.arg(), msg.mode())
}
// ZXRIO_STAT => self.stat(msg, chan, handle),
// ZXRIO_CLOSE => self.close(msg, chan, handle),
_ => {
self.reply_status(
&self.chan,
- zircon::Status::NOT_SUPPORTED,
- )?
+ zx::Status::NOT_SUPPORTED,
+ )
}
}
-
- Ok(())
}
fn open(
@@ -129,13 +166,13 @@
path: std::path::PathBuf,
flags: i32,
mode: u32,
- ) -> Result<(), std::io::Error> {
+ ) -> Result<(), zx::Status> {
let pipeline = flags & fdio::fdio_sys::O_PIPELINE != 0;
- let open_flags = flags & !fdio::fdio_sys::O_PIPELINE;
+ let open_flags: u32 = (flags & !fdio::fdio_sys::O_PIPELINE) as u32;
- let mut status = zircon::Status::OK;
+ let mut status = zx::Status::OK;
let mut proto = fdio::fdio_sys::FDIO_PROTOCOL_REMOTE;
- let mut handles: Vec<zircon::Handle> = vec![];
+ let mut handles: Vec<zx::Handle> = vec![];
match self.vfs.open(&self.vn, path, open_flags, mode) {
Ok((vn, _path)) => {
@@ -147,40 +184,52 @@
return Err(std::io::ErrorKind::InvalidInput.into());
}
- if status != zircon::Status::OK {
+ if status != zx::Status::OK {
return Err(std::io::ErrorKind::InvalidInput.into());
}
- if !pipeline {
- fdio::rio::write_object(&chan, status, proto, &[], &mut handles).ok();
+ if let Some(chan) = vn.should_serve(chan, open_flags, &self.handle) {
+
+ if !pipeline {
+ self.reply_object(&chan, status, proto, &[], &mut handles)?;
+ }
+
+ self.vfs.register_connection(Connection{vfs: self.vfs.clone(), vn, chan, handle: self.handle.clone()}, &self.handle)
}
-
- // TODO(raggi): construct connection...
- vn.serve(Arc::clone(&self.vfs), chan, open_flags);
-
- return Ok(());
+ // if should_serve consumed the channel, it must also handle the reply
+ return Ok(())
}
Err(e) => {
proto = 0;
status = e;
eprintln!("vfs: open error: {:?}", e);
+
+ if !pipeline {
+ self.reply_object(&chan, status, proto, &[], &mut handles)?;
+ }
+ return Ok(())
}
}
+ }
- if !pipeline {
- return fdio::rio::write_object(&chan, status, proto, &[], &mut handles)
- .map_err(Into::into);
- }
- Ok(())
+ fn reply_object(
+ &self,
+ chan: &tokio_fuchsia::Channel,
+ status: zx::Status,
+ type_: u32,
+ extra: &[u8],
+ handles: &mut Vec<zx::Handle>,
+ ) -> Result<(), zx::Status> {
+ println!("{:?} -> {:?}", &chan, status);
+ fdio::rio::write_object(chan, status, type_, extra, handles)
}
fn reply_status(
&self,
chan: &tokio_fuchsia::Channel,
- status: zircon::Status,
- ) -> Result<(), io::Error> {
- println!("{:?} -> {:?}", &chan, status);
- fdio::rio::write_object(chan, status, 0, &[], &mut vec![]).map_err(Into::into)
+ status: zx::Status,
+ ) -> Result<(), zx::Status> {
+ self.reply_object(chan, status, 0, &[], &mut vec![])
}
}
@@ -189,7 +238,7 @@
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- let mut buf = zircon::MessageBuf::new();
+ let mut buf = zx::MessageBuf::new();
buf.ensure_capacity_bytes(fdio::fdio_sys::ZXRIO_MSG_SZ);
loop {
try_nb!(self.chan.recv_from(&mut buf));