| // Copyright 2023 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use anyhow::{Context, Result}; |
| use attribution::{AttributionServer, AttributionServerHandle, Observer, Publisher}; |
| use fidl::endpoints::ControlHandle; |
| use fidl::endpoints::RequestStream; |
| use fidl_fuchsia_component as fcomponent; |
| use fidl_fuchsia_component_runner as fcrunner; |
| use fidl_fuchsia_memory_attribution as fattribution; |
| use fuchsia_async as fasync; |
| use fuchsia_component::server::ServiceFs; |
| use fuchsia_sync::Mutex; |
| use fuchsia_zircon as zx; |
| use futures::{StreamExt, TryStreamExt}; |
| use runner::component::{ChannelEpitaph, Controllable, Controller}; |
| use std::{ |
| collections::HashMap, |
| future::Future, |
| sync::{ |
| atomic::{AtomicU64, Ordering}, |
| Arc, |
| }, |
| }; |
| |
| use tracing::{info, warn}; |
| use zx::{HandleBased, Koid}; |
| |
| mod program; |
| |
| use crate::program::ColocatedProgram; |
| |
| enum IncomingRequest { |
| Runner(fcrunner::ComponentRunnerRequestStream), |
| Memory(fattribution::ProviderRequestStream), |
| } |
| |
| #[fuchsia::main] |
| async fn main() -> Result<()> { |
| let resource_tracker = Arc::new(ResourceTracker { resources: Mutex::new(Default::default()) }); |
| |
| let mut service_fs = ServiceFs::new_local(); |
| service_fs.dir("svc").add_fidl_service(IncomingRequest::Runner); |
| service_fs.dir("svc").add_fidl_service(IncomingRequest::Memory); |
| service_fs.take_and_serve_directory_handle().context("failed to serve outgoing namespace")?; |
| |
| let memory_server_handle = get_memory_server(resource_tracker.clone()); |
| |
| service_fs |
| .for_each_concurrent(None, |request: IncomingRequest| async { |
| match request { |
| IncomingRequest::Runner(stream) => { |
| if let Err(err) = handle_runner_request( |
| stream, |
| resource_tracker.clone(), |
| memory_server_handle.clone(), |
| ) |
| .await |
| { |
| warn!("Error while serving ComponentRunner: {err}"); |
| } |
| } |
| IncomingRequest::Memory(stream) => { |
| let observer = memory_server_handle.new_observer(stream.control_handle()); |
| if let Err(err) = handle_memory_request(stream, observer).await { |
| warn!("Error while serving AttributionProvider: {err}"); |
| } |
| } |
| } |
| }) |
| .await; |
| |
| Ok(()) |
| } |
| |
| fn get_memory_server(resource_tracker: Arc<ResourceTracker>) -> AttributionServerHandle { |
| let state_fn = Box::new(move || get_attribution(resource_tracker.clone())); |
| AttributionServer::new(state_fn) |
| } |
| |
| /// Handles `fuchsia.component.runner/ComponentRunner` requests over a FIDL connection. |
| async fn handle_runner_request( |
| mut stream: fcrunner::ComponentRunnerRequestStream, |
| resource_tracker: Arc<ResourceTracker>, |
| memory_server_handle: AttributionServerHandle, |
| ) -> Result<()> { |
| while let Some(request) = |
| stream.try_next().await.context("failed to serve ComponentRunner protocol")? |
| { |
| let fcrunner::ComponentRunnerRequest::Start { start_info, controller, .. } = request; |
| let url = start_info.resolved_url.clone().unwrap_or_else(|| "unknown url".to_string()); |
| info!("Colocated runner is going to start component {url}"); |
| match start(start_info, resource_tracker.clone(), memory_server_handle.new_publisher()) { |
| Ok((program, on_exit)) => { |
| let controller = Controller::new(program, controller.into_stream().unwrap()); |
| fasync::Task::spawn(controller.serve(on_exit)).detach(); |
| } |
| Err(err) => { |
| warn!("Colocated runner failed to start component {url}: {err}"); |
| let _ = controller.close_with_epitaph(zx::Status::from_raw( |
| fcomponent::Error::Internal.into_primitive() as i32, |
| )); |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| async fn handle_memory_request( |
| mut stream: fattribution::ProviderRequestStream, |
| subscriber: Observer, |
| ) -> Result<()> { |
| while let Some(request) = |
| stream.try_next().await.context("failed to serve AttributionProvider protocol")? |
| { |
| match request { |
| fattribution::ProviderRequest::Get { responder } => { |
| subscriber.next(responder); |
| } |
| fattribution::ProviderRequest::_UnknownMethod { ordinal, control_handle, .. } => { |
| warn!("Invalid request to AttributionProvider: {ordinal}"); |
| control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS); |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| fn get_attribution(resource_tracker: Arc<ResourceTracker>) -> Vec<fattribution::AttributionUpdate> { |
| let mut children = vec![]; |
| for (_, (token, koid)) in resource_tracker.resources.lock().iter() { |
| children.push(fattribution::AttributionUpdate::Add(fattribution::NewPrincipal { |
| identifier: Some(fattribution::Identifier::Component( |
| token.duplicate_handle(fidl::Rights::SAME_RIGHTS).unwrap(), |
| )), |
| detailed_attribution: None, |
| ..Default::default() |
| })); |
| children.push(fattribution::AttributionUpdate::Update(fattribution::UpdatedPrincipal { |
| identifier: Some(fattribution::Identifier::Component( |
| token.duplicate_handle(fidl::Rights::SAME_RIGHTS).unwrap(), |
| )), |
| resources: Some(fattribution::Resources::Data(vec![ |
| fattribution::Resource::KernelObject(koid.raw_koid()), |
| ])), |
| ..Default::default() |
| })); |
| } |
| children |
| } |
| |
| /// Tracks resources used by each [`ColocatedProgram`]. Since each program just allocates |
| /// one VMO, we only need to track one KOID here. |
| struct ResourceTracker { |
| resources: Mutex<HashMap<ProgramId, (zx::Event, Koid)>>, |
| } |
| |
| type ProgramId = u64; |
| |
| static NEXT_ID: AtomicU64 = AtomicU64::new(0); |
| |
| /// Starts a colocated component. |
| fn start( |
| start_info: fcrunner::ComponentStartInfo, |
| resource_tracker: Arc<ResourceTracker>, |
| publisher: Publisher, |
| ) -> Result<(impl Controllable, impl Future<Output = ChannelEpitaph> + Unpin)> { |
| let numbered_handles = start_info.numbered_handles.unwrap_or(vec![]); |
| let program = ColocatedProgram::new(numbered_handles)?; |
| let id = NEXT_ID.fetch_add(1, Ordering::SeqCst); |
| let vmo_koid = program.get_vmo_koid().raw_koid(); |
| // Register this VMO. |
| let instance_token = start_info.component_instance.as_ref().unwrap(); |
| |
| let mut resources = resource_tracker.resources.lock(); |
| resources.insert( |
| id, |
| ( |
| instance_token.duplicate_handle(fidl::Rights::SAME_RIGHTS).unwrap(), |
| program.get_vmo_koid(), |
| ), |
| ); |
| |
| let mut updates = vec![]; |
| updates.push(fattribution::AttributionUpdate::Add(fattribution::NewPrincipal { |
| identifier: Some(fattribution::Identifier::Component( |
| instance_token.duplicate_handle(fidl::Rights::SAME_RIGHTS).unwrap(), |
| )), |
| detailed_attribution: None, |
| ..Default::default() |
| })); |
| updates.push(fattribution::AttributionUpdate::Update(fattribution::UpdatedPrincipal { |
| identifier: Some(fattribution::Identifier::Component( |
| instance_token.duplicate_handle(fidl::Rights::SAME_RIGHTS).unwrap(), |
| )), |
| resources: Some(fattribution::Resources::Data(vec![fattribution::Resource::KernelObject( |
| vmo_koid, |
| )])), |
| ..Default::default() |
| })); |
| publisher.on_update(updates).unwrap(); |
| let termination = program.wait_for_termination(); |
| let termination_clone = program.wait_for_termination(); |
| // Remove this VMO when the program has terminated. |
| let tracker = resource_tracker.clone(); |
| fasync::Task::spawn(async move { |
| termination_clone.await; |
| if let Some((token, _)) = tracker.resources.lock().remove(&id) { |
| publisher |
| .on_update(vec![fattribution::AttributionUpdate::Remove( |
| fattribution::Identifier::Component(token), |
| )]) |
| .unwrap(); |
| } |
| }) |
| .detach(); |
| Ok((program, termination)) |
| } |
| |
| /// Unit test the `ComponentRunner` protocol server implementation. |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use fidl::endpoints::Proxy; |
| use fidl_fuchsia_component_decl as fdecl; |
| use fidl_fuchsia_examples_colocated as fcolocated; |
| use fidl_fuchsia_process::HandleInfo; |
| use fuchsia_runtime::HandleType; |
| |
| #[fuchsia::test] |
| async fn test_start_stop_component() { |
| let resource_tracker = |
| Arc::new(ResourceTracker { resources: Mutex::new(Default::default()) }); |
| let (runner, runner_stream) = |
| fidl::endpoints::create_proxy_and_stream::<fcrunner::ComponentRunnerMarker>().unwrap(); |
| let memory_server = get_memory_server(resource_tracker.clone()); |
| let server = fasync::Task::spawn(handle_runner_request( |
| runner_stream, |
| resource_tracker, |
| memory_server.clone(), |
| )); |
| |
| // Start a colocated component. |
| let decl = fuchsia_fs::file::read_in_namespace_to_fidl::<fdecl::Component>( |
| "/pkg/meta/colocated-component.cm", |
| ) |
| .await |
| .unwrap(); |
| let (controller, controller_server_end) = fidl::endpoints::create_endpoints(); |
| let (user0, user0_peer) = zx::Channel::create(); |
| let start_info = fcrunner::ComponentStartInfo { |
| program: decl.program.unwrap().info, |
| numbered_handles: Some(vec![HandleInfo { |
| handle: user0_peer.into(), |
| id: fuchsia_runtime::HandleInfo::new(HandleType::User0, 0).as_raw(), |
| }]), |
| component_instance: Some(zx::Event::create()), |
| ..Default::default() |
| }; |
| runner.start(start_info, controller_server_end).unwrap(); |
| |
| // Wait until the program has allocated 64 MiB worth of pages. |
| let colocated_component_vmos = |
| fcolocated::ColocatedProxy::new(fasync::Channel::from_channel(user0)) |
| .get_vmos() |
| .await |
| .unwrap(); |
| |
| // Measure our private memory usage again. It should increase by roughly that much more. |
| assert!(!colocated_component_vmos.is_empty()); |
| |
| // Stop the component. |
| let controller = controller.into_proxy().unwrap(); |
| controller.stop().unwrap(); |
| controller.on_closed().await.unwrap(); |
| |
| // Close the connection and verify the server task ends successfully. |
| drop(runner); |
| server.await.unwrap(); |
| } |
| } |