[realm_builder][rust] allow local components to get stop notification

Adds a feature to rust realm builder with which local components can
register a stop notifier. Once this task is performed, any stop requests
that realm builder receives for the local component is reported over the
stop notifier, and it is then the local component's job to exit.

As with any component, if the component does not exit in a timely
fashion component manager will request the component be killed, which
causes realm builder to stop the local component immediately.

Fixed: 82021
Change-Id: I93d3e6ab68df9c9c83735e58b17aac79a5f116d4
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/692683
Reviewed-by: Yaneury Fermin <yaneury@google.com>
Commit-Queue: Derek Gonyeo <dgonyeo@google.com>
diff --git a/src/lib/fuchsia-component-test/src/local_component_runner.rs b/src/lib/fuchsia-component-test/src/local_component_runner.rs
index 9b5f7ee..82ebfe6 100644
--- a/src/lib/fuchsia-component-test/src/local_component_runner.rs
+++ b/src/lib/fuchsia-component-test/src/local_component_runner.rs
@@ -12,7 +12,7 @@
     fidl_fuchsia_data as fdata, fidl_fuchsia_io as fio, fuchsia_async as fasync,
     fuchsia_component::DEFAULT_SERVICE_INSTANCE,
     fuchsia_fs, fuchsia_zircon as zx,
-    futures::{future::BoxFuture, lock::Mutex, select, FutureExt, TryStreamExt},
+    futures::{channel::oneshot, future::BoxFuture, lock::Mutex, select, FutureExt, TryStreamExt},
     runner::get_value as get_dictionary_value,
     std::{collections::HashMap, path::Path, sync::Arc},
     tracing::*,
@@ -37,6 +37,8 @@
 pub struct LocalComponentHandles {
     namespace: HashMap<String, fio::DirectoryProxy>,
 
+    stop_notifier: Arc<Mutex<Option<oneshot::Sender<()>>>>,
+
     /// The outgoing directory handle for a local component. This can be used to run a ServiceFs
     /// for the component.
     pub outgoing_dir: ServerEnd<fio::DirectoryMarker>,
@@ -46,7 +48,8 @@
     fn new(
         fidl_namespace: Vec<fcrunner::ComponentNamespaceEntry>,
         outgoing_dir: ServerEnd<fio::DirectoryMarker>,
-    ) -> Result<Self, Error> {
+    ) -> Result<(Self, Arc<Mutex<Option<oneshot::Sender<()>>>>), Error> {
+        let stop_notifier = Arc::new(Mutex::new(None));
         let mut namespace = HashMap::new();
         for namespace_entry in fidl_namespace {
             namespace.insert(
@@ -58,7 +61,27 @@
                     .expect("failed to convert handle to proxy"),
             );
         }
-        Ok(Self { namespace, outgoing_dir })
+        Ok((Self { namespace, outgoing_dir, stop_notifier: stop_notifier.clone() }, stop_notifier))
+    }
+
+    /// Registers a new stop notifier for this component. If this function is called, then realm
+    /// builder will deliver a message on the returned oneshot when component manager asks for this
+    /// component to stop. It is then this component's responsibility to exit. If it takes too long
+    /// to exit (the default is 5 seconds) then it will be killed.
+    ///
+    /// If this function is not called, then the component is immediately killed when component
+    /// manager asks for it to be stopped. Killing the component is performed by dropping the
+    /// underlying future, effectively cancelling all pending work.
+    ///
+    /// If this function is called more than once on a single local component, then it will panic.
+    pub async fn register_stop_notifier(&self) -> oneshot::Receiver<()> {
+        let mut stop_notifier_guard = self.stop_notifier.lock().await;
+        if stop_notifier_guard.is_some() {
+            panic!("cannot register multiple stop handlers for a single local component");
+        }
+        let (sender, receiver) = oneshot::channel();
+        *stop_notifier_guard = Some(sender);
+        receiver
     }
 
     /// Connects to a FIDL protocol and returns a proxy to that protocol.
@@ -293,7 +316,8 @@
                         .get(&local_component_name)
                         .ok_or(format_err!("no such local component: {:?}", local_component_name))?
                         .clone();
-                    let component_handles = LocalComponentHandles::new(namespace, outgoing_dir)?;
+                    let (component_handles, stop_notifier) =
+                        LocalComponentHandles::new(namespace, outgoing_dir)?;
 
                     let runtime_dir = pseudo_directory!(
                         "local_component_name" =>
@@ -313,24 +337,42 @@
                             (*local_component_implementation)(component_handles).fuse();
                         let mut controller_request_fut =
                             controller_request_stream.try_next().fuse();
-                        select! {
-                            res = local_component_implementation_fut => {
-                                if let Err(e) = res {
-                                    error!(
-                                        "the local component {:?} returned an error: {:?}",
-                                        local_component_name,
-                                        e,
-                                    );
+                        loop {
+                            select! {
+                                res = local_component_implementation_fut => {
+                                    if let Err(e) = res {
+                                        error!(
+                                            "the local component {:?} returned an error: {:?}",
+                                            local_component_name,
+                                            e,
+                                        );
+                                    }
+                                    return;
                                 }
-                            }
-                            req_res = controller_request_fut => {
-                                match req_res.expect("invalid controller request") {
-                                    // TODO(https://fxbug.dev/82021): notify impl on stop
-                                    Some(fcrunner::ComponentControllerRequest::Stop { .. }) => (),
-                                    _ => (),
+                                req_res = controller_request_fut => {
+                                    match req_res.expect("invalid controller request") {
+                                        Some(fcrunner::ComponentControllerRequest::Stop { .. }) => {
+                                            if let Some(stop_notifier) =
+                                                stop_notifier.lock().await.take()
+                                            {
+                                                // If the local component happened to exit the same
+                                                // moment that the component controller stop
+                                                // request was received, then the receiver is
+                                                // already dropped. Let's ignore any errors about
+                                                // sending this.
+                                                let _ = stop_notifier.send(());
+                                            } else {
+                                                return;
+                                            }
+                                        }
+                                        Some(fcrunner::ComponentControllerRequest::Kill { .. }) => {
+                                            return;
+                                        }
+                                        _ => return,
+                                    }
                                 }
-                            }
-                        };
+                            };
+                        }
                     });
                 }
             }
@@ -353,6 +395,7 @@
 mod tests {
     use {
         super::*,
+        assert_matches::assert_matches,
         fidl::endpoints::create_proxy,
         fuchsia_fs::directory::{readdir, DirEntry, DirentKind},
         fuchsia_zircon::AsHandleRef,
@@ -381,7 +424,11 @@
             .unwrap();
 
         let (_, outgoing_dir) = create_proxy().unwrap();
-        let handles = LocalComponentHandles { namespace: HashMap::new(), outgoing_dir };
+        let handles = LocalComponentHandles {
+            namespace: HashMap::new(),
+            outgoing_dir,
+            stop_notifier: Arc::new(Mutex::new(None)),
+        };
         let local_component_implementation = runner_builder
             .local_component_implementations
             .lock()
@@ -577,8 +624,6 @@
 
         let runner_and_handles = build_and_start(runner_builder, component_name).await;
         runner_and_handles.controller_proxy.stop().expect("failed to send stop");
-        // TODO: once we support notifying a component implementation that it's about to be
-        // stopped, test for that here
 
         assert_eq!(Err(oneshot::Canceled), receiver.await);
     }
@@ -613,6 +658,61 @@
     }
 
     #[fuchsia::test]
+    async fn stopping_a_component_calls_the_notifier() {
+        let runner_builder = LocalComponentRunnerBuilder::new();
+        let (notifier_registered_sender, notifier_registered_receiver) = oneshot::channel::<()>();
+        let notifier_registered_sender = Arc::new(Mutex::new(Some(notifier_registered_sender)));
+
+        let (notifier_fired_sender, notifier_fired_receiver) = oneshot::channel::<()>();
+        let notifier_fired_sender = Arc::new(Mutex::new(Some(notifier_fired_sender)));
+
+        let component_name = "test".to_string();
+
+        runner_builder
+            .register_local_component(component_name.clone(), move |handles| {
+                let notifier_registered_sender = notifier_registered_sender.clone();
+                let notifier_fired_sender = notifier_fired_sender.clone();
+                async move {
+                    let stop_notifier = handles.register_stop_notifier().await;
+
+                    let sender = notifier_registered_sender
+                        .lock()
+                        .await
+                        .take()
+                        .expect("local component invoked twice");
+                    sender.send(()).expect("failed to send that the stop notifier was registered");
+
+                    stop_notifier.await.expect("failed to wait for stop notification");
+
+                    let sender = notifier_fired_sender
+                        .lock()
+                        .await
+                        .take()
+                        .expect("local component invoked twice");
+                    sender
+                        .send(())
+                        .expect("failed to send that the stop notifier received a message");
+
+                    Ok(())
+                }
+                .boxed()
+            })
+            .await
+            .unwrap();
+
+        let runner_and_handles = build_and_start(runner_builder, component_name).await;
+
+        // Wait for the component to have started and registered a stop notifier
+        assert_matches!(notifier_registered_receiver.await, Ok(()));
+
+        // Ask to stop the component
+        runner_and_handles.controller_proxy.stop().expect("failed to send stop");
+
+        // Wait for the component to have received the stop message
+        assert_matches!(notifier_fired_receiver.await, Ok(()));
+    }
+
+    #[fuchsia::test]
     async fn dropping_the_runner_will_kill_a_component() {
         let runner_builder = LocalComponentRunnerBuilder::new();
         let (sender, receiver) = oneshot::channel::<()>();
diff --git a/src/lib/fuchsia-component-test/tests/src/lib.rs b/src/lib/fuchsia-component-test/tests/src/lib.rs
index ef0a13e..6476c91 100644
--- a/src/lib/fuchsia-component-test/tests/src/lib.rs
+++ b/src/lib/fuchsia-component-test/tests/src/lib.rs
@@ -367,6 +367,49 @@
 }
 
 #[fuchsia::test]
+async fn local_component_stop_notifier() -> Result<(), Error> {
+    let (send_stop_notifier_registered, mut receive_stop_notifier_registered) = mpsc::channel(1);
+    let (send_stop_notifier_called, mut receive_stop_notifier_called) = mpsc::channel(1);
+
+    let builder = RealmBuilder::new().await?;
+    let _child = builder
+        .add_local_child(
+            "child",
+            move |handles| {
+                let mut send_stop_notifier_registered = send_stop_notifier_registered.clone();
+                let mut send_stop_notifier_called = send_stop_notifier_called.clone();
+                async move {
+                    let stop_notifier = handles.register_stop_notifier().await;
+                    send_stop_notifier_registered
+                        .send(())
+                        .await
+                        .expect("failed to send that the stop notifier was registered");
+                    stop_notifier.await.expect("failed to wait for stop notification");
+                    send_stop_notifier_called
+                        .send(())
+                        .await
+                        .expect("failed to send that the stop notifier was registered");
+                    Ok(())
+                }
+                .boxed()
+            },
+            ChildOptions::new().eager(),
+        )
+        .await?;
+    let instance = builder.build().await?;
+    assert!(
+        receive_stop_notifier_registered.next().await.is_some(),
+        "failed to observe the local component register a stop notifier"
+    );
+    drop(instance);
+    assert!(
+        receive_stop_notifier_called.next().await.is_some(),
+        "failed to observe the local component receive notice with the stop notifier"
+    );
+    Ok(())
+}
+
+#[fuchsia::test]
 async fn get_and_replace_realm_decl() -> Result<(), Error> {
     let builder = RealmBuilder::new().await?;
     let mut root_decl = builder.get_realm_decl().await?;