| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::host_identifier::HostIdentifier, |
| anyhow::{Context as _, Result}, |
| component_debug::dirs::*, |
| component_debug::lifecycle::*, |
| fidl::endpoints::ServerEnd, |
| fidl::prelude::*, |
| fidl_fuchsia_developer_remotecontrol as rcs, |
| fidl_fuchsia_diagnostics::Selector, |
| fidl_fuchsia_io as io, |
| fidl_fuchsia_net_ext::SocketAddress as SocketAddressExt, |
| fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, |
| fuchsia_component::client::connect_to_protocol_at_path, |
| fuchsia_zircon as zx, |
| futures::future::join, |
| futures::prelude::*, |
| moniker::{RelativeMoniker, RelativeMonikerBase}, |
| selector_maps::{MappingError, SelectorMappingList}, |
| selectors::{StringSelector, TreeSelector}, |
| std::{borrow::Borrow, cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc, rc::Weak}, |
| tracing::*, |
| }; |
| |
| mod host_identifier; |
| |
| pub struct RemoteControlService { |
| ids: RefCell<Vec<Weak<RefCell<Vec<u64>>>>>, |
| id_allocator: fn() -> Result<HostIdentifier>, |
| connector: Box<dyn Fn(fidl::Socket)>, |
| maps: SelectorMappingList, |
| moniker_map: HashMap<String, String>, |
| } |
| |
| struct Client { |
| // Maintain reference-counts to this client's ids. |
| // The ids may be shared (e.g. when Overnet maintains two |
| // connections to the target -- legacy + CSO), so we can't |
| // just maintain a list of RCS's ids and remove when one |
| // disappars. Instead, when these are freed due to the client |
| // being dropped, the RCS Weak references will become invalid. |
| allocated_ids: Rc<RefCell<Vec<u64>>>, |
| } |
| |
| impl RemoteControlService { |
| pub async fn new(connector: impl Fn(fidl::Socket) + 'static) -> Self { |
| let (list, moniker_map) = join(Self::load_selector_map(), Self::load_moniker_map()).await; |
| Self::new_with_allocator_and_maps(connector, || HostIdentifier::new(), list, moniker_map) |
| } |
| |
| async fn load_moniker_map() -> HashMap<String, String> { |
| let f = match fuchsia_fs::file::open_in_namespace( |
| "/pkg/data/moniker-map.json", |
| io::OpenFlags::RIGHT_READABLE, |
| ) { |
| Ok(f) => f, |
| Err(e) => { |
| error!(%e, "failed to open moniker maps json file"); |
| return HashMap::default(); |
| } |
| }; |
| let bytes = match fuchsia_fs::file::read(&f).await { |
| Ok(b) => b, |
| Err(e) => { |
| error!(?e, "failed to read bytes from moniker map json"); |
| return HashMap::default(); |
| } |
| }; |
| match serde_json::from_slice(bytes.as_slice()) { |
| Ok(m) => m, |
| Err(e) => { |
| error!(?e, "failed to parse moniker map json"); |
| HashMap::default() |
| } |
| } |
| } |
| |
| async fn load_selector_map() -> SelectorMappingList { |
| let f = match fuchsia_fs::file::open_in_namespace( |
| "/pkg/data/selector-maps.json", |
| io::OpenFlags::RIGHT_READABLE, |
| ) { |
| Ok(f) => f, |
| Err(e) => { |
| error!(%e, "failed to open selector maps json file"); |
| return SelectorMappingList::default(); |
| } |
| }; |
| let bytes = match fuchsia_fs::file::read(&f).await { |
| Ok(b) => b, |
| Err(e) => { |
| error!(?e, "failed to read bytes from selector maps json"); |
| return SelectorMappingList::default(); |
| } |
| }; |
| match serde_json::from_slice(bytes.as_slice()) { |
| Ok(m) => m, |
| Err(e) => { |
| error!(?e, "failed to parse selector map json"); |
| SelectorMappingList::default() |
| } |
| } |
| } |
| |
| pub(crate) fn new_with_allocator_and_maps( |
| connector: impl Fn(fidl::Socket) + 'static, |
| id_allocator: fn() -> Result<HostIdentifier>, |
| maps: SelectorMappingList, |
| moniker_map: HashMap<String, String>, |
| ) -> Self { |
| Self { |
| id_allocator, |
| ids: Default::default(), |
| connector: Box::new(connector), |
| maps, |
| moniker_map, |
| } |
| } |
| |
| // Some of the ID-lists may be gone because old clients have shut down. |
| // They will have a strong_count of 0. Drop 'em. |
| fn remove_old_ids(self: &Rc<Self>) { |
| self.ids.borrow_mut().retain(|wirc| wirc.strong_count() > 0); |
| } |
| |
| async fn handle( |
| self: &Rc<Self>, |
| client: &Client, |
| request: rcs::RemoteControlRequest, |
| ) -> Result<()> { |
| match request { |
| rcs::RemoteControlRequest::EchoString { value, responder } => { |
| info!("Received echo string {}", value); |
| responder.send(&value)?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::AddId { id, responder } => { |
| client.allocated_ids.borrow_mut().push(id); |
| responder.send()?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::AddOvernetLink { id, socket, responder } => { |
| (self.connector)(socket); |
| client.allocated_ids.borrow_mut().push(id); |
| responder.send()?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::IdentifyHost { responder } => { |
| self.clone().identify_host(responder).await?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::ConnectCapability { |
| moniker, |
| capability_name, |
| server_chan, |
| flags, |
| responder, |
| } => { |
| responder.send( |
| self.clone() |
| .connect_capability(moniker, capability_name, flags, server_chan) |
| .await, |
| )?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::Connect { selector, service_chan, responder } => { |
| let response = self.clone().connect_to_service(selector, service_chan).await; |
| responder.send(response.as_ref().map_err(|e| *e))?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::RootRealmExplorer { server, responder } => { |
| responder.send( |
| fdio::service_connect( |
| &format!( |
| "/svc/{}.root", |
| fidl_fuchsia_sys2::RealmExplorerMarker::PROTOCOL_NAME |
| ), |
| server.into_channel(), |
| ) |
| .map_err(|i| i.into_raw()), |
| )?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::RootRealmQuery { server, responder } => { |
| responder.send( |
| fdio::service_connect( |
| &format!( |
| "/svc/{}.root", |
| fidl_fuchsia_sys2::RealmQueryMarker::PROTOCOL_NAME |
| ), |
| server.into_channel(), |
| ) |
| .map_err(|i| i.into_raw()), |
| )?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::RootLifecycleController { server, responder } => { |
| responder.send( |
| fdio::service_connect( |
| &format!( |
| "/svc/{}.root", |
| fidl_fuchsia_sys2::LifecycleControllerMarker::PROTOCOL_NAME |
| ), |
| server.into_channel(), |
| ) |
| .map_err(|i| i.into_raw()), |
| )?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::RootRouteValidator { server, responder } => { |
| responder.send( |
| fdio::service_connect( |
| &format!( |
| "/svc/{}.root", |
| fidl_fuchsia_sys2::RouteValidatorMarker::PROTOCOL_NAME |
| ), |
| server.into_channel(), |
| ) |
| .map_err(|i| i.into_raw()), |
| )?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::KernelStats { server, responder } => { |
| responder.send( |
| fdio::service_connect( |
| &format!("/svc/{}", fidl_fuchsia_kernel::StatsMarker::PROTOCOL_NAME), |
| server.into_channel(), |
| ) |
| .map_err(|i| i.into_raw()), |
| )?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::BootArguments { server, responder } => { |
| responder.send( |
| fdio::service_connect( |
| &format!("/svc/{}", fidl_fuchsia_boot::ArgumentsMarker::PROTOCOL_NAME), |
| server.into_channel(), |
| ) |
| .map_err(|i| i.into_raw()), |
| )?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::ForwardTcp { addr, socket, responder } => { |
| let addr: SocketAddressExt = addr.into(); |
| let addr = addr.0; |
| let result = match fasync::Socket::from_socket(socket) { |
| Ok(socket) => match self.connect_forwarded_port(addr, socket).await { |
| Ok(()) => Ok(()), |
| Err(e) => { |
| error!("Port forward connection failed: {:?}", e); |
| Err(rcs::TunnelError::ConnectFailed) |
| } |
| }, |
| Err(e) => { |
| error!("Could not use socket asynchronously: {:?}", e); |
| Err(rcs::TunnelError::SocketFailed) |
| } |
| }; |
| responder.send(result)?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::ReverseTcp { addr, client, responder } => { |
| let addr: SocketAddressExt = addr.into(); |
| let addr = addr.0; |
| let client = match client.into_proxy() { |
| Ok(proxy) => proxy, |
| Err(e) => { |
| error!("Could not communicate with callback: {:?}", e); |
| responder.send(Err(rcs::TunnelError::CallbackError))?; |
| return Ok(()); |
| } |
| }; |
| let result = match self.listen_reversed_port(addr, client).await { |
| Ok(()) => Ok(()), |
| Err(e) => { |
| error!("Port forward connection failed: {:?}", e); |
| Err(rcs::TunnelError::ConnectFailed) |
| } |
| }; |
| responder.send(result)?; |
| Ok(()) |
| } |
| rcs::RemoteControlRequest::GetTime { responder } => { |
| responder.send(fuchsia_zircon::Time::get_monotonic().into_nanos())?; |
| Ok(()) |
| } |
| } |
| } |
| |
| pub async fn serve_stream(self: Rc<Self>, stream: rcs::RemoteControlRequestStream) { |
| // When the stream ends, the client (and its ids) will drop |
| let allocated_ids = Rc::new(RefCell::new(vec![])); |
| self.ids.borrow_mut().push(Rc::downgrade(&allocated_ids)); |
| let client = Client { allocated_ids }; |
| stream |
| .for_each_concurrent(None, |request| async { |
| match request { |
| Ok(request) => { |
| let _ = self |
| .handle(&client, request) |
| .await |
| .map_err(|e| warn!("stream request handling error: {:?}", e)); |
| } |
| Err(e) => warn!("stream error: {:?}", e), |
| } |
| }) |
| .await; |
| } |
| |
| async fn listen_reversed_port( |
| &self, |
| listen_addr: SocketAddr, |
| client: rcs::ForwardCallbackProxy, |
| ) -> Result<(), std::io::Error> { |
| let mut listener = fasync::net::TcpListener::bind(&listen_addr)?.accept_stream(); |
| |
| fasync::Task::local(async move { |
| let mut client_closed = client.on_closed().fuse(); |
| |
| loop { |
| // Listen for a connection, or exit if the client has gone away. |
| let (stream, addr) = futures::select! { |
| result = listener.next().fuse() => { |
| match result { |
| Some(Ok(x)) => x, |
| Some(Err(e)) => { |
| warn!("Error accepting connection: {:?}", e); |
| continue; |
| } |
| None => { |
| warn!("reverse tunnel to {:?} listener socket closed", listen_addr); |
| break; |
| } |
| } |
| } |
| _ = client_closed => { |
| info!("reverse tunnel {:?} client has closed", listen_addr); |
| break; |
| } |
| }; |
| |
| info!("reverse tunnel connection from {:?} to {:?}", addr, listen_addr); |
| |
| let (local, remote) = zx::Socket::create_stream(); |
| |
| let local = match fasync::Socket::from_socket(local) { |
| Ok(x) => x, |
| Err(e) => { |
| warn!("Error converting socket to async: {:?}", e); |
| continue; |
| } |
| }; |
| |
| spawn_forward_traffic(stream, local); |
| |
| // Send the socket to the client. |
| if let Err(e) = client.forward(remote, &SocketAddressExt(addr).into()) { |
| // The client has gone away, so stop the task. |
| if let fidl::Error::ClientChannelClosed { .. } = e { |
| warn!("tunnel client channel closed while forwarding socket"); |
| break; |
| } |
| |
| warn!("Could not return forwarded socket to client: {:?}", e); |
| } |
| } |
| }) |
| .detach(); |
| |
| Ok(()) |
| } |
| |
| async fn connect_forwarded_port( |
| &self, |
| addr: SocketAddr, |
| socket: fasync::Socket, |
| ) -> Result<(), std::io::Error> { |
| let tcp_conn = fasync::net::TcpStream::connect(addr)?.await?; |
| |
| spawn_forward_traffic(tcp_conn, socket); |
| |
| Ok(()) |
| } |
| |
| fn map_moniker(self: &Rc<Self>, moniker: String) -> String { |
| self.moniker_map.get(&moniker).cloned().unwrap_or(moniker) |
| } |
| |
| pub(crate) fn map_selector( |
| self: &Rc<Self>, |
| selector: Selector, |
| ) -> Result<Selector, rcs::ConnectError> { |
| self.maps.map_selector(selector.clone()).map_err(|e| { |
| match e { |
| MappingError::BadSelector(selector_str, err) => { |
| error!(?selector, ?selector_str, %err, "got invalid selector mapping"); |
| } |
| MappingError::BadInputSelector(err) => { |
| error!(%err, "input selector invalid"); |
| } |
| MappingError::Unbounded => { |
| error!(?selector, %e, "got a cycle in mapping selector"); |
| } |
| } |
| rcs::ConnectError::ServiceRerouteFailed |
| }) |
| } |
| |
| pub async fn identify_host( |
| self: &Rc<Self>, |
| responder: rcs::RemoteControlIdentifyHostResponder, |
| ) -> Result<()> { |
| let identifier = match (self.id_allocator)() { |
| Ok(i) => i, |
| Err(e) => { |
| error!(%e, "Allocating host identifier"); |
| return responder |
| .send(Err(rcs::IdentifyHostError::ProxyConnectionFailed)) |
| .context("responding to client"); |
| } |
| }; |
| |
| // We need to clean up the ids at some point. Let's do |
| // it when those IDs are asked for. |
| self.remove_old_ids(); |
| // Now the only vecs should be ones which are still held with a strong |
| // Rc reference. Extract those. |
| let ids: Vec<u64> = self |
| .ids |
| .borrow() |
| .iter() |
| .flat_map(|w| -> Vec<u64> { |
| // This is all sadmac's fault. Grr. (Because he suggested, correctly, that |
| // we use a Rc<Vec<_>> instead of Vec<Rc<_>>) |
| <Rc<RefCell<Vec<u64>>> as Borrow<RefCell<Vec<u64>>>>::borrow( |
| &w.upgrade().expect("Didn't we just clear out refs with expired values??"), |
| ) |
| .borrow() |
| .clone() |
| }) |
| .collect(); |
| let target_identity = identifier.identify().await.map(move |mut i| { |
| i.ids = Some(ids); |
| i |
| }); |
| responder.send(target_identity.as_ref().map_err(|e| *e)).context("responding to client")?; |
| Ok(()) |
| } |
| |
| /// Connects to an exposed capability identified by the given moniker and capability name. |
| async fn connect_capability( |
| self: &Rc<Self>, |
| moniker: String, |
| capability_name: String, |
| flags: io::OpenFlags, |
| server_end: zx::Channel, |
| ) -> Result<(), rcs::ConnectCapabilityError> { |
| let moniker = self.map_moniker(moniker); |
| // Connect to the root LifecycleController protocol |
| let lifecycle = connect_to_protocol_at_path::<fsys::LifecycleControllerMarker>( |
| "/svc/fuchsia.sys2.LifecycleController.root", |
| ) |
| .map_err(|err| { |
| error!(%err, "could not connect to lifecycle controller"); |
| rcs::ConnectCapabilityError::CapabilityConnectFailed |
| })?; |
| |
| // Connect to the root RealmQuery protocol |
| let query = connect_to_protocol_at_path::<fsys::RealmQueryMarker>( |
| "/svc/fuchsia.sys2.RealmQuery.root", |
| ) |
| .map_err(|err| { |
| error!(%err, "could not connect to realm query"); |
| rcs::ConnectCapabilityError::CapabilityConnectFailed |
| })?; |
| |
| let moniker = |
| relative_moniker(moniker).ok_or(rcs::ConnectCapabilityError::InvalidMoniker)?; |
| connect_to_exposed_capability(moniker, capability_name, server_end, flags, lifecycle, query) |
| .await |
| } |
| |
| /// Connects to an exposed capability identified by the given selector. |
| async fn connect_to_service( |
| self: &Rc<Self>, |
| selector: Selector, |
| server_end: zx::Channel, |
| ) -> Result<rcs::ServiceMatch, rcs::ConnectError> { |
| // If applicable, get a remapped selector |
| let selector = self.map_selector(selector)?; |
| |
| // Connect to the root LifecycleController protocol |
| let lifecycle = connect_to_protocol_at_path::<fsys::LifecycleControllerMarker>( |
| "/svc/fuchsia.sys2.LifecycleController.root", |
| ) |
| .map_err(|err| { |
| error!(%err, "could not connect to lifecycle controller"); |
| rcs::ConnectError::ServiceConnectFailed |
| })?; |
| |
| // Connect to the root RealmQuery protocol |
| let query = connect_to_protocol_at_path::<fsys::RealmQueryMarker>( |
| "/svc/fuchsia.sys2.RealmQuery.root", |
| ) |
| .map_err(|err| { |
| error!(%err, "could not connect to realm query"); |
| rcs::ConnectError::ServiceConnectFailed |
| })?; |
| |
| let (moniker, protocol_name) = extract_moniker_and_protocol_from_selector(selector) |
| .ok_or(rcs::ConnectError::ServiceConnectFailed)?; |
| let moniker_parts = moniker.path().iter().map(|part| part.to_string()).collect(); |
| |
| connect_to_exposed_capability( |
| moniker, |
| protocol_name.clone(), |
| server_end, |
| io::OpenFlags::RIGHT_READABLE, |
| lifecycle, |
| query, |
| ) |
| .await |
| .map_err(|err| match err { |
| rcs::ConnectCapabilityError::NoMatchingComponent => { |
| rcs::ConnectError::NoMatchingServices |
| } |
| rcs::ConnectCapabilityError::CapabilityConnectFailed => { |
| rcs::ConnectError::ServiceConnectFailed |
| } |
| rcs::ConnectCapabilityError::NoMatchingCapabilities => { |
| rcs::ConnectError::NoMatchingServices |
| } |
| _ => unreachable!("we only emit the errors above"), |
| })?; |
| |
| Ok(rcs::ServiceMatch { |
| moniker: moniker_parts, |
| subdir: "expose".to_string(), |
| service: protocol_name, |
| }) |
| } |
| } |
| |
| fn relative_moniker(moniker: String) -> Option<RelativeMoniker> { |
| // If we have an absolute moniker, make it relative, otherwise attempt to parse as a relative |
| // moniker. |
| let moniker = if moniker.starts_with("/") { format!(".{moniker}") } else { moniker }; |
| RelativeMoniker::try_from(moniker.as_str()).ok() |
| } |
| |
| async fn connect_to_exposed_capability( |
| moniker: RelativeMoniker, |
| capability_name: String, |
| server_end: zx::Channel, |
| flags: io::OpenFlags, |
| lifecycle: fsys::LifecycleControllerProxy, |
| query: fsys::RealmQueryProxy, |
| ) -> Result<(), rcs::ConnectCapabilityError> { |
| // This is a no-op if already resolved. |
| resolve_instance(&lifecycle, &moniker) |
| .map_err(|err| match err { |
| ResolveError::ActionError(ActionError::InstanceNotFound) => { |
| rcs::ConnectCapabilityError::NoMatchingComponent |
| } |
| err => { |
| error!(?err, "error resolving component"); |
| rcs::ConnectCapabilityError::CapabilityConnectFailed |
| } |
| }) |
| .await?; |
| |
| let exposed_dir = open_instance_dir_root_readable(&moniker, OpenDirType::Exposed, &query) |
| .map_err(|err| { |
| error!(?err, "error opening exposed dir"); |
| rcs::ConnectCapabilityError::CapabilityConnectFailed |
| }) |
| .await?; |
| |
| connect_to_capability_in_exposed_dir(&exposed_dir, &capability_name, server_end, flags).await?; |
| Ok(()) |
| } |
| |
| fn extract_moniker_and_protocol_from_selector( |
| selector: Selector, |
| ) -> Option<(RelativeMoniker, String)> { |
| // Construct the moniker from the selector |
| let moniker_segments = selector.component_selector?.moniker_segments?; |
| let mut children = vec![]; |
| for segment in moniker_segments { |
| let child = segment.exact_match()?.to_string(); |
| children.push(child); |
| } |
| let moniker = format!("./{}", children.join("/")); |
| |
| let tree_selector = selector.tree_selector?; |
| |
| // Namespace must be `expose`. Nothing else is supported. |
| let namespace = tree_selector.node_path()?.get(0)?.exact_match()?; |
| if namespace != "expose" { |
| return None; |
| } |
| |
| // Get the protocol name |
| let property_selector = tree_selector.property()?; |
| let protocol_name = property_selector.exact_match()?.to_string(); |
| |
| // Make sure the moniker is valid |
| let moniker = RelativeMoniker::try_from(moniker.as_str()) |
| .map_err(|err| { |
| error!(%err, "moniker invalid"); |
| err |
| }) |
| .ok()?; |
| |
| Some((moniker, protocol_name)) |
| } |
| |
| async fn connect_to_capability_in_exposed_dir( |
| exposed_dir: &io::DirectoryProxy, |
| capability_name: &str, |
| server_end: zx::Channel, |
| flags: io::OpenFlags, |
| ) -> Result<(), rcs::ConnectCapabilityError> { |
| // Check if capability exists in exposed dir. |
| let entries = fuchsia_fs::directory::readdir(exposed_dir) |
| .await |
| .map_err(|_| rcs::ConnectCapabilityError::CapabilityConnectFailed)?; |
| let is_capability_exposed = entries.iter().any(|e| &e.name == &capability_name); |
| if !is_capability_exposed { |
| return Err(rcs::ConnectCapabilityError::NoMatchingCapabilities); |
| } |
| |
| // Connect to the capability |
| exposed_dir |
| .open(flags, io::ModeType::empty(), capability_name, ServerEnd::new(server_end)) |
| .map_err(|err| { |
| error!(%err, "error opening capability from exposed dir"); |
| rcs::ConnectCapabilityError::CapabilityConnectFailed |
| }) |
| } |
| |
| #[derive(Debug)] |
| enum ForwardError { |
| TcpToZx(anyhow::Error), |
| ZxToTcp(anyhow::Error), |
| Both { tcp_to_zx: anyhow::Error, zx_to_tcp: anyhow::Error }, |
| } |
| |
| fn spawn_forward_traffic(tcp_side: fasync::net::TcpStream, zx_side: fasync::Socket) { |
| fasync::Task::local(async move { |
| match forward_traffic(tcp_side, zx_side).await { |
| Ok(()) => {} |
| Err(ForwardError::TcpToZx(err)) => { |
| error!("error forwarding from tcp to zx socket: {:#}", err); |
| } |
| Err(ForwardError::ZxToTcp(err)) => { |
| error!("error forwarding from zx to tcp socket: {:#}", err); |
| } |
| Err(ForwardError::Both { tcp_to_zx, zx_to_tcp }) => { |
| error!("error forwarding from zx to tcp socket:\n{:#}\n{:#}", tcp_to_zx, zx_to_tcp); |
| } |
| } |
| }) |
| .detach() |
| } |
| |
| async fn forward_traffic( |
| tcp_side: fasync::net::TcpStream, |
| zx_side: fasync::Socket, |
| ) -> Result<(), ForwardError> { |
| // We will forward traffic with two sub-tasks. One to stream bytes from the |
| // tcp socket to the zircon socket, and vice versa. Since we have two tasks, |
| // we need to handle how we exit the loops, otherwise we risk leaking |
| // resource. |
| // |
| // To handle this, we'll create two promises that will resolve upon the |
| // stream closing. For the zircon socket, we can use a native signal, but |
| // unfortunately fasync::net::TcpStream doesn't support listening for |
| // closure, so we'll just use a oneshot channel to signal to the other task |
| // when the tcp stream closes. |
| let (tcp_closed_tx, mut tcp_closed_rx) = futures::channel::oneshot::channel::<()>(); |
| let mut zx_closed = fasync::OnSignals::new(&zx_side, zx::Signals::SOCKET_PEER_CLOSED).fuse(); |
| let zx_side = &zx_side; |
| |
| let (mut tcp_read, mut tcp_write) = tcp_side.split(); |
| let (mut zx_read, mut zx_write) = zx_side.split(); |
| |
| let tcp_to_zx = async move { |
| let res = async move { |
| // TODO(84188): Use a buffer pool once we have them. |
| let mut buf = [0; 4096]; |
| loop { |
| futures::select! { |
| res = tcp_read.read(&mut buf).fuse() => { |
| let num_bytes = res.context("read tcp socket")?; |
| if num_bytes == 0 { |
| return Ok(()); |
| } |
| |
| zx_write.write_all(&buf[..num_bytes]).await.context("write zx socket")?; |
| zx_write.flush().await.context("flush zx socket")?; |
| } |
| _ = zx_closed => { |
| return Ok(()); |
| } |
| } |
| } |
| } |
| .await; |
| |
| // Let the other task know the tcp stream has shut down. If the other |
| // task finished before this one, this send could fail. That's okay, so |
| // just ignore the result. |
| let _ = tcp_closed_tx.send(()); |
| |
| res |
| }; |
| |
| let zx_to_tcp = async move { |
| // TODO(84188): Use a buffer pool once we have them. |
| let mut buf = [0; 4096]; |
| loop { |
| futures::select! { |
| res = zx_read.read(&mut buf).fuse() => { |
| let num_bytes = res.context("read zx socket")?; |
| if num_bytes == 0 { |
| return Ok(()); |
| } |
| tcp_write.write_all(&buf[..num_bytes]).await.context("write tcp socket")?; |
| tcp_write.flush().await.context("flush tcp socket")?; |
| } |
| _ = tcp_closed_rx => { |
| break Ok(()); |
| } |
| } |
| } |
| }; |
| |
| match join(tcp_to_zx, zx_to_tcp).await { |
| (Ok(()), Ok(())) => Ok(()), |
| (Err(tcp_to_zx), Err(zx_to_tcp)) => Err(ForwardError::Both { tcp_to_zx, zx_to_tcp }), |
| (Err(tcp_to_zx), Ok(())) => Err(ForwardError::TcpToZx(tcp_to_zx)), |
| (Ok(()), Err(zx_to_tcp)) => Err(ForwardError::ZxToTcp(zx_to_tcp)), |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| assert_matches::assert_matches, |
| fidl_fuchsia_buildinfo as buildinfo, fidl_fuchsia_developer_remotecontrol as rcs, |
| fidl_fuchsia_device as fdevice, fidl_fuchsia_hwinfo as hwinfo, fidl_fuchsia_io as fio, |
| fidl_fuchsia_net as fnet, fidl_fuchsia_net_interfaces as fnet_interfaces, |
| fuchsia_component::server::ServiceFs, |
| fuchsia_zircon as zx, |
| selectors::{parse_selector, VerboseError}, |
| std::net::Ipv4Addr, |
| }; |
| |
| const NODENAME: &'static str = "thumb-set-human-shred"; |
| const BOOT_TIME: u64 = 123456789000000000; |
| const SERIAL: &'static str = "test_serial"; |
| const BOARD_CONFIG: &'static str = "test_board_name"; |
| const PRODUCT_CONFIG: &'static str = "core"; |
| const FAKE_SERVICE_SELECTOR: &'static str = "my/component:expose:some.fake.Service"; |
| const MAPPED_SERVICE_SELECTOR: &'static str = "my/other/component:out:some.fake.mapped.Service"; |
| |
| const IPV4_ADDR: [u8; 4] = [127, 0, 0, 1]; |
| const IPV6_ADDR: [u8; 16] = [127, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]; |
| |
| fn setup_fake_device_service() -> hwinfo::DeviceProxy { |
| let (proxy, mut stream) = |
| fidl::endpoints::create_proxy_and_stream::<hwinfo::DeviceMarker>().unwrap(); |
| fasync::Task::spawn(async move { |
| while let Ok(Some(req)) = stream.try_next().await { |
| match req { |
| hwinfo::DeviceRequest::GetInfo { responder } => { |
| let _ = responder.send(&hwinfo::DeviceInfo { |
| serial_number: Some(String::from(SERIAL)), |
| ..Default::default() |
| }); |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| proxy |
| } |
| |
| fn setup_fake_build_info_service() -> buildinfo::ProviderProxy { |
| let (proxy, mut stream) = |
| fidl::endpoints::create_proxy_and_stream::<buildinfo::ProviderMarker>().unwrap(); |
| fasync::Task::spawn(async move { |
| while let Ok(Some(req)) = stream.try_next().await { |
| match req { |
| buildinfo::ProviderRequest::GetBuildInfo { responder } => { |
| let _ = responder.send(&buildinfo::BuildInfo { |
| board_config: Some(String::from(BOARD_CONFIG)), |
| product_config: Some(String::from(PRODUCT_CONFIG)), |
| ..Default::default() |
| }); |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| proxy |
| } |
| |
| fn setup_fake_name_provider_service() -> fdevice::NameProviderProxy { |
| let (proxy, mut stream) = |
| fidl::endpoints::create_proxy_and_stream::<fdevice::NameProviderMarker>().unwrap(); |
| |
| fasync::Task::spawn(async move { |
| while let Ok(Some(req)) = stream.try_next().await { |
| match req { |
| fdevice::NameProviderRequest::GetDeviceName { responder } => { |
| let _ = responder.send(Ok(NODENAME)); |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| proxy |
| } |
| |
| fn setup_fake_interface_state_service() -> fnet_interfaces::StateProxy { |
| let (proxy, mut stream) = |
| fidl::endpoints::create_proxy_and_stream::<fnet_interfaces::StateMarker>().unwrap(); |
| |
| fasync::Task::spawn(async move { |
| while let Ok(Some(req)) = stream.try_next().await { |
| match req { |
| fnet_interfaces::StateRequest::GetWatcher { |
| options: _, |
| watcher, |
| control_handle: _, |
| } => { |
| let mut stream = watcher.into_stream().unwrap(); |
| let mut first = true; |
| while let Ok(Some(req)) = stream.try_next().await { |
| match req { |
| fnet_interfaces::WatcherRequest::Watch { responder } => { |
| let event = if first { |
| first = false; |
| fnet_interfaces::Event::Existing( |
| fnet_interfaces::Properties { |
| id: Some(1), |
| addresses: Some( |
| IntoIterator::into_iter([ |
| fnet::Subnet { |
| addr: fnet::IpAddress::Ipv4( |
| fnet::Ipv4Address { |
| addr: IPV4_ADDR, |
| }, |
| ), |
| prefix_len: 4, |
| }, |
| fnet::Subnet { |
| addr: fnet::IpAddress::Ipv6( |
| fnet::Ipv6Address { |
| addr: IPV6_ADDR, |
| }, |
| ), |
| prefix_len: 110, |
| }, |
| ]) |
| .map(Some) |
| .map(|addr| fnet_interfaces::Address { |
| addr, |
| valid_until: Some(1), |
| ..Default::default() |
| }) |
| .collect(), |
| ), |
| online: Some(true), |
| device_class: Some( |
| fnet_interfaces::DeviceClass::Loopback( |
| fnet_interfaces::Empty {}, |
| ), |
| ), |
| has_default_ipv4_route: Some(false), |
| has_default_ipv6_route: Some(false), |
| name: Some(String::from("eth0")), |
| ..Default::default() |
| }, |
| ) |
| } else { |
| fnet_interfaces::Event::Idle(fnet_interfaces::Empty {}) |
| }; |
| let () = responder.send(&event).unwrap(); |
| } |
| } |
| } |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| proxy |
| } |
| |
| fn make_rcs() -> Rc<RemoteControlService> { |
| make_rcs_with_maps(vec![], HashMap::default()) |
| } |
| |
| fn make_rcs_with_maps( |
| maps: Vec<(&str, &str)>, |
| moniker_map: HashMap<String, String>, |
| ) -> Rc<RemoteControlService> { |
| Rc::new(RemoteControlService::new_with_allocator_and_maps( |
| |_| (), |
| || { |
| Ok(HostIdentifier { |
| interface_state_proxy: setup_fake_interface_state_service(), |
| name_provider_proxy: setup_fake_name_provider_service(), |
| device_info_proxy: setup_fake_device_service(), |
| build_info_proxy: setup_fake_build_info_service(), |
| boot_timestamp_nanos: BOOT_TIME, |
| }) |
| }, |
| SelectorMappingList::new( |
| maps.iter().map(|s| (s.0.to_string(), s.1.to_string())).collect(), |
| ), |
| moniker_map, |
| )) |
| } |
| |
| fn setup_rcs_proxy() -> rcs::RemoteControlProxy { |
| let service = make_rcs(); |
| |
| let (rcs_proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<rcs::RemoteControlMarker>().unwrap(); |
| fasync::Task::local(async move { |
| service.serve_stream(stream).await; |
| }) |
| .detach(); |
| |
| return rcs_proxy; |
| } |
| |
| fn setup_fake_lifecycle_controller() -> fsys::LifecycleControllerProxy { |
| fidl::endpoints::spawn_stream_handler( |
| move |request: fsys::LifecycleControllerRequest| async move { |
| match request { |
| fsys::LifecycleControllerRequest::ResolveInstance { moniker, responder } => { |
| assert_eq!(moniker, "./core/my_component"); |
| responder.send(Ok(())).unwrap() |
| } |
| _ => panic!("unexpected request: {:?}", request), |
| } |
| }, |
| ) |
| .unwrap() |
| } |
| |
| fn setup_exposed_dir(server: ServerEnd<fio::DirectoryMarker>) { |
| let mut fs = ServiceFs::new(); |
| fs.add_fidl_service(move |_: hwinfo::BoardRequestStream| {}); |
| fs.serve_connection(server).unwrap(); |
| fasync::Task::spawn(fs.collect::<()>()).detach(); |
| } |
| |
| fn setup_fake_realm_query() -> fsys::RealmQueryProxy { |
| fidl::endpoints::spawn_stream_handler(move |request: fsys::RealmQueryRequest| async move { |
| match request { |
| fsys::RealmQueryRequest::Open { |
| moniker, |
| dir_type, |
| flags, |
| mode, |
| path, |
| object, |
| responder, |
| } => { |
| assert_eq!(moniker, "./core/my_component"); |
| assert_eq!(dir_type, fsys::OpenDirType::ExposedDir); |
| assert_eq!(flags, fio::OpenFlags::RIGHT_READABLE); |
| assert_eq!(mode, fio::ModeType::empty()); |
| assert_eq!(path, "."); |
| |
| setup_exposed_dir(object.into_channel().into()); |
| |
| responder.send(Ok(())).unwrap() |
| } |
| _ => panic!("unexpected request: {:?}", request), |
| } |
| }) |
| .unwrap() |
| } |
| |
| #[fuchsia::test] |
| async fn test_extract_moniker_protocol() -> Result<()> { |
| let selector = |
| parse_selector::<VerboseError>("core/my_component:expose:fuchsia.foo.bar").unwrap(); |
| let (moniker, protocol) = extract_moniker_and_protocol_from_selector(selector).unwrap(); |
| |
| assert_eq!(moniker, RelativeMoniker::try_from("./core/my_component").unwrap()); |
| assert_eq!(protocol, "fuchsia.foo.bar"); |
| |
| for selector in [ |
| "*:*:*", |
| "core/my_component:expose", |
| "core/my_component:expose:*", |
| "*:expose:fuchsia.foo.bar", |
| "core/my_component:*:fuchsia.foo.bar", |
| "core/my_component:out:fuchsia.foo.bar", |
| "core/my_component:in:fuchsia.foo.bar", |
| ] { |
| let selector = parse_selector::<VerboseError>(selector).unwrap(); |
| assert!(extract_moniker_and_protocol_from_selector(selector).is_none()); |
| } |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_relative_moniker() { |
| assert!(relative_moniker("/core/foo".to_string()).is_some()); |
| assert!(relative_moniker("./core/foo".to_string()).is_some()); |
| assert!(relative_moniker("core/foo".to_string()).is_none()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_connect_to_exposed_capability() -> Result<()> { |
| let (_client, server) = zx::Channel::create(); |
| let lifecycle = setup_fake_lifecycle_controller(); |
| let query = setup_fake_realm_query(); |
| connect_to_exposed_capability( |
| RelativeMoniker::try_from("./core/my_component").unwrap(), |
| "fuchsia.hwinfo.Board".to_string(), |
| server, |
| io::OpenFlags::RIGHT_READABLE, |
| lifecycle, |
| query, |
| ) |
| .await |
| .unwrap(); |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_connect_to_capability_not_exposed() -> Result<()> { |
| let (_client, server) = zx::Channel::create(); |
| let lifecycle = setup_fake_lifecycle_controller(); |
| let query = setup_fake_realm_query(); |
| let error = connect_to_exposed_capability( |
| RelativeMoniker::try_from("./core/my_component").unwrap(), |
| "fuchsia.not.exposed".to_string(), |
| server, |
| io::OpenFlags::RIGHT_READABLE, |
| lifecycle, |
| query, |
| ) |
| .await |
| .unwrap_err(); |
| assert_eq!(error, rcs::ConnectCapabilityError::NoMatchingCapabilities); |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_identify_host() -> Result<()> { |
| let rcs_proxy = setup_rcs_proxy(); |
| |
| let resp = rcs_proxy.identify_host().await.unwrap().unwrap(); |
| |
| assert_eq!(resp.serial_number.unwrap(), SERIAL); |
| assert_eq!(resp.board_config.unwrap(), BOARD_CONFIG); |
| assert_eq!(resp.product_config.unwrap(), PRODUCT_CONFIG); |
| assert_eq!(resp.nodename.unwrap(), NODENAME); |
| |
| let addrs = resp.addresses.unwrap(); |
| assert_eq!( |
| addrs[..], |
| [ |
| fnet::Subnet { |
| addr: fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: IPV4_ADDR }), |
| prefix_len: 4, |
| }, |
| fnet::Subnet { |
| addr: fnet::IpAddress::Ipv6(fnet::Ipv6Address { addr: IPV6_ADDR }), |
| prefix_len: 110, |
| } |
| ] |
| ); |
| |
| assert_eq!(resp.boot_timestamp_nanos.unwrap(), BOOT_TIME); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_ids_in_host_identify() -> Result<()> { |
| let rcs_proxy = setup_rcs_proxy(); |
| |
| let ident = rcs_proxy.identify_host().await.unwrap().unwrap(); |
| assert_eq!(ident.ids, Some(vec![])); |
| |
| rcs_proxy.add_id(1234).await.unwrap(); |
| rcs_proxy.add_id(4567).await.unwrap(); |
| |
| let ident = rcs_proxy.identify_host().await.unwrap().unwrap(); |
| let ids = ident.ids.unwrap(); |
| assert_eq!(ids.len(), 2); |
| assert_eq!(1234u64, ids[0]); |
| assert_eq!(4567u64, ids[1]); |
| |
| Ok(()) |
| } |
| |
| fn service_selector() -> Selector { |
| parse_selector::<VerboseError>(FAKE_SERVICE_SELECTOR).unwrap() |
| } |
| |
| fn mapped_service_selector() -> Selector { |
| parse_selector::<VerboseError>(MAPPED_SERVICE_SELECTOR).unwrap() |
| } |
| |
| #[fuchsia::test] |
| async fn test_map_selector() -> Result<()> { |
| let service = make_rcs_with_maps( |
| vec![(FAKE_SERVICE_SELECTOR, MAPPED_SERVICE_SELECTOR)], |
| HashMap::default(), |
| ); |
| |
| assert_eq!(service.map_selector(service_selector()).unwrap(), mapped_service_selector()); |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_map_selector_broken_mapping() -> Result<()> { |
| let service = make_rcs_with_maps( |
| vec![(FAKE_SERVICE_SELECTOR, "not_a_selector:::::")], |
| HashMap::default(), |
| ); |
| |
| assert_matches!( |
| service.map_selector(service_selector()).unwrap_err(), |
| rcs::ConnectError::ServiceRerouteFailed |
| ); |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_map_selector_unbounded_mapping() -> Result<()> { |
| let service = make_rcs_with_maps( |
| vec![ |
| (FAKE_SERVICE_SELECTOR, MAPPED_SERVICE_SELECTOR), |
| (MAPPED_SERVICE_SELECTOR, FAKE_SERVICE_SELECTOR), |
| ], |
| HashMap::default(), |
| ); |
| |
| assert_matches!( |
| service.map_selector(service_selector()).unwrap_err(), |
| rcs::ConnectError::ServiceRerouteFailed |
| ); |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_map_selector_no_matches() -> Result<()> { |
| let service = make_rcs_with_maps( |
| vec![("not/a/match:out:some.Service", MAPPED_SERVICE_SELECTOR)], |
| HashMap::default(), |
| ); |
| |
| assert_eq!(service.map_selector(service_selector()).unwrap(), service_selector()); |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn test_map_moniker() -> Result<()> { |
| let map = [(FAKE_SERVICE_SELECTOR.to_string(), MAPPED_SERVICE_SELECTOR.to_string())] |
| .into_iter() |
| .collect(); |
| let service = make_rcs_with_maps(vec![], map); |
| assert_eq!(service.map_moniker(FAKE_SERVICE_SELECTOR.to_string()), MAPPED_SERVICE_SELECTOR); |
| |
| let service = make_rcs_with_maps(vec![], HashMap::new()); |
| assert_eq!(service.map_moniker(FAKE_SERVICE_SELECTOR.to_string()), FAKE_SERVICE_SELECTOR); |
| Ok(()) |
| } |
| |
| async fn create_forward_tunnel( |
| ) -> (fasync::net::TcpStream, fasync::Socket, fasync::Task<Result<(), ForwardError>>) { |
| let addr = (Ipv4Addr::LOCALHOST, 0).into(); |
| let listener = fasync::net::TcpListener::bind(&addr).unwrap(); |
| let listen_addr = listener.local_addr().unwrap(); |
| let mut listener_stream = listener.accept_stream(); |
| |
| let (remote_tx, remote_rx) = futures::channel::oneshot::channel(); |
| |
| // Run the listener in a background task so it can forward traffic in |
| // parallel with the test. |
| let forward_task = fasync::Task::local(async move { |
| let (stream, _) = listener_stream.next().await.unwrap().unwrap(); |
| |
| let (local, remote) = zx::Socket::create_stream(); |
| let local = fasync::Socket::from_socket(local).unwrap(); |
| let remote = fasync::Socket::from_socket(remote).unwrap(); |
| |
| remote_tx.send(remote).unwrap(); |
| |
| forward_traffic(stream, local).await |
| }); |
| |
| // We should connect to the TCP socket, which should set us up a zircon socket. |
| let tcp_stream = fasync::net::TcpStream::connect(listen_addr).unwrap().await.unwrap(); |
| let zx_socket = remote_rx.await.unwrap(); |
| |
| (tcp_stream, zx_socket, forward_task) |
| } |
| |
| #[fuchsia::test] |
| async fn test_forward_traffic_tcp_closes_first() { |
| let (mut tcp_stream, mut zx_socket, forward_task) = create_forward_tunnel().await; |
| |
| // Now any traffic that is sent to the tcp stream should come out of the zx socket. |
| let msg = b"ping"; |
| tcp_stream.write_all(msg).await.unwrap(); |
| |
| let mut buf = [0; 4096]; |
| zx_socket.read_exact(&mut buf[..msg.len()]).await.unwrap(); |
| assert_eq!(&buf[..msg.len()], msg); |
| |
| // Send a reply from the zx socket to the tcp stream. |
| let msg = b"pong"; |
| zx_socket.write_all(msg).await.unwrap(); |
| |
| tcp_stream.read_exact(&mut buf[..msg.len()]).await.unwrap(); |
| assert_eq!(&buf[..msg.len()], msg); |
| |
| // Now, close the tcp stream, this should cause the zx socket to close as well. |
| std::mem::drop(tcp_stream); |
| |
| let mut buf = vec![]; |
| zx_socket.read_to_end(&mut buf).await.unwrap(); |
| assert_eq!(&buf, &Vec::<u8>::default()); |
| |
| // Make sure the forward task shuts down as well. |
| assert_matches!(forward_task.await, Ok(())); |
| } |
| |
| #[fuchsia::test] |
| async fn test_forward_traffic_zx_socket_closes_first() { |
| let (mut tcp_stream, mut zx_socket, forward_task) = create_forward_tunnel().await; |
| |
| // Check that the zx socket can send the first data. |
| let msg = b"ping"; |
| zx_socket.write_all(msg).await.unwrap(); |
| |
| let mut buf = [0; 4096]; |
| tcp_stream.read_exact(&mut buf[..msg.len()]).await.unwrap(); |
| assert_eq!(&buf[..msg.len()], msg); |
| |
| let msg = b"pong"; |
| tcp_stream.write_all(msg).await.unwrap(); |
| |
| zx_socket.read_exact(&mut buf[..msg.len()]).await.unwrap(); |
| assert_eq!(&buf[..msg.len()], msg); |
| |
| // Now, close the zx socket, this should cause the tcp stream to close as well. |
| std::mem::drop(zx_socket); |
| |
| let mut buf = vec![]; |
| tcp_stream.read_to_end(&mut buf).await.unwrap(); |
| assert_eq!(&buf, &Vec::<u8>::default()); |
| |
| // Make sure the forward task shuts down as well. |
| assert_matches!(forward_task.await, Ok(())); |
| } |
| } |