blob: c1ae64d995b2a6852a01b36d20c4856d8e9bfa4c [file] [log] [blame] [edit]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Context, Result},
fidl_fuchsia_device::ControllerMarker,
fidl_fuchsia_io as fio,
fuchsia_vfs_watcher::{WatchEvent, Watcher},
futures::stream::{Stream, StreamExt as _, TryStreamExt as _},
std::path::PathBuf,
};
// Device metadata.
pub struct DeviceInfo<'a> {
// The device's file name within the directory in which it was found.
pub filename: &'a str,
// The device's topological path.
pub topological_path: String,
}
/// Watches the directory for a device for which the predicate returns `Some(t)`
/// and returns `t`.
pub async fn wait_for_device_with<T>(
dev_dir: &fio::DirectoryProxy,
predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
) -> Result<T, anyhow::Error> {
let stream = watch_for_files(Clone::clone(dev_dir)).await?;
let stream = stream.try_filter_map(|filename| {
let predicate = &predicate;
async move {
let filename = filename.to_str().ok_or(format_err!("to_str for filename failed"))?;
let (controller_proxy, server_end) =
fidl::endpoints::create_proxy::<ControllerMarker>()?;
let () = dev_dir
.open(
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
fio::MODE_TYPE_SERVICE,
filename,
server_end.into_channel().into(),
)
.with_context(|| format!("failed to open \"{}\"", filename))?;
let topological_path = controller_proxy.get_topological_path().await;
let topological_path = match topological_path {
Ok(topological_path) => topological_path,
// Special case PEER_CLOSED; the peer is expected to close the
// connection if it doesn't implement the controller protocol.
Err(err) => match err {
fidl::Error::ClientChannelClosed { .. } => return Ok(None),
err => {
return Err(err).with_context(|| {
format!("failed to send get_topological_path on \"{}\"", filename)
})
}
},
};
let topological_path = topological_path
.map_err(fuchsia_zircon_status::Status::from_raw)
.with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
Ok(predicate(DeviceInfo { filename, topological_path }))
}
});
futures::pin_mut!(stream);
let item = stream.try_next().await?;
item.ok_or(format_err!("stream ended prematurely"))
}
/// Returns a stream that contains the paths of any existing files and
/// directories in `dir` and any new files or directories created after this
/// function was invoked. These paths are relative to `dir`.
pub async fn watch_for_files(
dir: fio::DirectoryProxy,
) -> Result<impl Stream<Item = Result<PathBuf>>> {
let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
futures::future::ok(match msg.event {
WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
if msg.filename == std::path::Path::new(".") {
None
} else {
Some(msg.filename)
}
}
_ => None,
})
}))
}
async fn wait_for_file(dir: fio::DirectoryProxy, name: &str) -> Result<()> {
let mut watcher = fuchsia_vfs_watcher::Watcher::new(dir).await?;
while let Some(msg) = watcher.try_next().await? {
if msg.event != fuchsia_vfs_watcher::WatchEvent::EXISTING
&& msg.event != fuchsia_vfs_watcher::WatchEvent::ADD_FILE
{
continue;
}
if msg.filename.to_str().unwrap() == name {
return Ok(());
}
}
unreachable!();
}
/// Open the path `name` within `dir`. This function waits for each directory to
/// be available before it opens it. If the path never appears this function
/// will wait forever.
async fn recursive_wait_and_open_node_with_flags(
mut dir: fio::DirectoryProxy,
name: &str,
flags: fio::OpenFlags,
mode: u32,
) -> Result<fio::NodeProxy> {
let path = std::path::Path::new(name);
let mut components = path.components().peekable();
loop {
let component = components.next().ok_or(format_err!("cannot wait for empty path"))?;
let file = match component {
std::path::Component::Normal(file) => file,
component => {
return Err(format_err!("path contains non-normal component {:?}", component))
}
};
let () = wait_for_file(Clone::clone(&dir), file.to_str().unwrap()).await?;
let file = std::path::Path::new(file);
if components.peek().is_some() {
dir = fuchsia_fs::open_directory(&dir, file, flags)?;
} else {
break fuchsia_fs::open_node(&dir, file, flags, mode);
}
}
}
/// Open the path `name` within `dir`. This function waits for each directory to
/// be available before it opens it. If the path never appears this function
/// will wait forever.
///
/// This function opens node as a RW service. This has the correct permissions
/// to connect to a driver's API.
pub async fn recursive_wait_and_open_node(
dir: &fio::DirectoryProxy,
name: &str,
) -> Result<fio::NodeProxy> {
recursive_wait_and_open_node_with_flags(
Clone::clone(dir),
name,
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
fio::MODE_TYPE_SERVICE,
)
.await
}
#[cfg(test)]
mod tests {
use {
super::*,
fidl_fuchsia_device as fdev, fuchsia_async as fasync,
futures::StreamExt,
std::{collections::HashSet, str::FromStr, sync::Arc},
vfs::{
directory::entry::DirectoryEntry, execution_scope::ExecutionScope,
file::vmo::read_only_static,
},
};
fn create_controller_service(topo_path: String) -> Arc<vfs::service::Service> {
vfs::service::host(move |mut stream: fdev::ControllerRequestStream| {
let topo_path = topo_path.clone();
async move {
match stream.try_next().await.unwrap() {
Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
let _ = responder.send(&mut Ok(topo_path));
}
e => panic!("Unexpected request: {:?}", e),
}
}
})
}
#[fasync::run_singlethreaded(test)]
async fn wait_for_device_by_topological_path() {
let dir = vfs::pseudo_directory! {
"a" => create_controller_service("/dev/test0/a/dev".to_string()),
"1" => create_controller_service("/dev/test1/1/dev".to_string()),
"x" => create_controller_service("/dev/test2/x/dev".to_string()),
"y" => create_controller_service("/dev/test3/y/dev".to_string()),
};
let (dir_proxy, remote) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
let scope = ExecutionScope::new();
dir.open(
scope,
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
fio::MODE_TYPE_DIRECTORY,
vfs::path::Path::dot(),
fidl::endpoints::ServerEnd::new(remote.into_channel()),
);
let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
(topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
})
.await
.unwrap();
assert_eq!("x", path);
}
#[fasync::run_singlethreaded(test)]
async fn watch_for_two_files() {
let dir = vfs::pseudo_directory! {
"a" => read_only_static(b"/a"),
"b" => read_only_static(b"/b"),
};
let (dir_proxy, remote) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
let scope = ExecutionScope::new();
dir.open(
scope,
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
fio::MODE_TYPE_DIRECTORY,
vfs::path::Path::dot(),
fidl::endpoints::ServerEnd::new(remote.into_channel()),
);
let stream = watch_for_files(dir_proxy).await.unwrap();
futures::pin_mut!(stream);
let actual: HashSet<PathBuf> =
vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
.into_iter()
.collect();
let expected: HashSet<PathBuf> =
vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
.into_iter()
.collect();
assert_eq!(actual, expected);
}
#[fasync::run_singlethreaded(test)]
async fn wait_for_device_topo_path_allows_files_and_dirs() {
let dir = vfs::pseudo_directory! {
"1" => vfs::pseudo_directory! {
"test" => read_only_static("test file 1"),
"test2" => read_only_static("test file 2"),
},
"2" => read_only_static("file 2"),
"x" => create_controller_service("/dev/test2/x/dev".to_string()),
"3" => read_only_static("file 3"),
};
let (dir_proxy, remote) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
let scope = ExecutionScope::new();
dir.open(
scope,
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
fio::MODE_TYPE_DIRECTORY,
vfs::path::Path::dot(),
fidl::endpoints::ServerEnd::new(remote.into_channel()),
);
let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
(topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
})
.await
.unwrap();
assert_eq!("x", path);
}
#[fasync::run_singlethreaded(test)]
async fn open_two_directories() {
let (client, server) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
let fs_scope = vfs::execution_scope::ExecutionScope::new();
let root = vfs::pseudo_directory! {
"test" => vfs::pseudo_directory! {
"dir" => vfs::pseudo_directory! {},
},
};
root.open(
fs_scope.clone(),
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_EXECUTABLE,
0,
vfs::path::Path::dot(),
fidl::endpoints::ServerEnd::new(server.into_channel()),
);
fasync::Task::spawn(async move { fs_scope.wait().await }).detach();
recursive_wait_and_open_node_with_flags(
client,
"test/dir",
fuchsia_fs::OpenFlags::RIGHT_READABLE,
0,
)
.await
.unwrap();
}
}