[cf] Refactor ELF runner mod.

This CL splits out the internal `elf_runner` module
into separate files. This refactoring will allow us
to:

1. Improve test coverage for each file.
2. Better grok the module itself.
3. Extend the module in the future.

Note, this CL is *strictly* an internal refactoring
of file structure and shouldn't affect any other part
of CM or the system adversely.

Change-Id: I88b0187c485a5287fe4a192441edace0449aa8ee
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/511618
Commit-Queue: Yaneury Fermin <yaneury@google.com>
Reviewed-by: Justin Mattson <jmatt@google.com>
diff --git a/src/sys/component_manager/BUILD.gn b/src/sys/component_manager/BUILD.gn
index 5e2bc0c..7a8b467 100644
--- a/src/sys/component_manager/BUILD.gn
+++ b/src/sys/component_manager/BUILD.gn
@@ -115,7 +115,12 @@
     "src/component_tree_stats.rs",
     "src/config.rs",
     "src/constants.rs",
+    "src/elf_runner/component.rs",
+    "src/elf_runner/config.rs",
+    "src/elf_runner/error.rs",
+    "src/elf_runner/launcher.rs",
     "src/elf_runner/mod.rs",
+    "src/elf_runner/runtime_dir.rs",
     "src/elf_runner/stdout.rs",
     "src/framework.rs",
     "src/fuchsia_boot_resolver.rs",
diff --git a/src/sys/component_manager/src/elf_runner/component.rs b/src/sys/component_manager/src/elf_runner/component.rs
new file mode 100644
index 0000000..9fb52c3
--- /dev/null
+++ b/src/sys/component_manager/src/elf_runner/component.rs
@@ -0,0 +1,170 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    super::{error::ElfRunnerError, runtime_dir::RuntimeDirectory},
+    async_trait::async_trait,
+    fidl::endpoints::Proxy,
+    fidl_fuchsia_process_lifecycle::LifecycleProxy,
+    fuchsia_async as fasync,
+    fuchsia_zircon::{self as zx, AsHandleRef, Job, Process, Task},
+    futures::future::{BoxFuture, FutureExt},
+    log::{error, warn},
+    runner::component::Controllable,
+    std::sync::Arc,
+};
+
+/// Structure representing a running elf component.
+pub struct ElfComponent {
+    /// Namespace directory for this component, kept just as a reference to
+    /// keep the namespace alive.
+    _runtime_dir: RuntimeDirectory,
+
+    /// Job in which the underlying process that represents the component is
+    /// running.
+    job: Arc<Job>,
+
+    /// Process made for the program binary defined for this component.
+    process: Option<Arc<Process>>,
+
+    /// Client end of the channel given to an ElfComponent which says it
+    /// implements the Lifecycle protocol. If the component does not implement
+    /// the protocol, this will be None.
+    lifecycle_channel: Option<LifecycleProxy>,
+
+    /// We need to remember if we marked the main process as critical, because if we're asked to
+    /// kill a component that has such a marking it'll bring down everything.
+    main_process_critical: bool,
+
+    /// Any tasks spawned to serve this component. For example, stdout and stderr
+    /// listeners are Task objects that live for the duration of the component's
+    /// lifetime.
+    _tasks: Vec<fasync::Task<()>>,
+
+    /// URL with which the component was launched.
+    component_url: String,
+}
+
+impl ElfComponent {
+    pub fn new(
+        _runtime_dir: RuntimeDirectory,
+        job: Job,
+        process: Process,
+        lifecycle_channel: Option<LifecycleProxy>,
+        main_process_critical: bool,
+        tasks: Vec<fasync::Task<()>>,
+        component_url: String,
+    ) -> Self {
+        Self {
+            _runtime_dir,
+            job: Arc::new(job),
+            process: Some(Arc::new(process)),
+            lifecycle_channel,
+            main_process_critical,
+            _tasks: tasks,
+            component_url,
+        }
+    }
+
+    /// Return a pointer to the Process, returns None if the component has no
+    /// Process.
+    pub fn copy_process(&self) -> Option<Arc<Process>> {
+        self.process.clone()
+    }
+
+    /// Return a handle to the Job containing the process for this component.
+    ///
+    /// The rights of the job will be set such that the resulting handle will be apppropriate to
+    /// use for diagnostics-only purposes. Right now that is ZX_RIGHTS_BASIC (which includes
+    /// INSPECT).
+    pub fn copy_job_for_diagnostics(&self) -> Result<Job, ElfRunnerError> {
+        self.job.as_handle_ref().duplicate(zx::Rights::BASIC).map(|h| Job::from(h)).map_err(
+            |status| {
+                ElfRunnerError::component_job_duplication_error(self.component_url.clone(), status)
+            },
+        )
+    }
+}
+
+#[async_trait]
+impl Controllable for ElfComponent {
+    async fn kill(mut self) {
+        if self.main_process_critical {
+            warn!("killing a component with 'main_process_critical', so this will also kill component_manager and all of its components");
+        }
+        let _ = self.job.kill().map_err(|e| error!("failed killing job during kill: {}", e));
+    }
+
+    fn stop<'a>(&mut self) -> BoxFuture<'a, ()> {
+        if let Some(lifecycle_chan) = self.lifecycle_channel.take() {
+            let _ = lifecycle_chan.stop();
+
+            let job = self.job.clone();
+
+            // If the component's main process is critical we must watch for
+            // the main process to exit, otherwise we could end up killing that
+            // process and therefore killing the root job.
+            if self.main_process_critical {
+                if self.process.is_none() {
+                    // This is a bit strange because there's no process, but there is a lifecycle
+                    // channel. Since there is no process it seems like killing it can't kill
+                    // component manager.
+                    warn!("killing job of component with 'main_process_critical' set because component has lifecycle channel, but no process main process.");
+                    let _ = self.job.kill().map_err(|e| {
+                        error!("failed killing job for component with no lifecycle channel: {}", e)
+                    });
+                    return async {}.boxed();
+                }
+                // Try to duplicate the Process handle so we can us it to wait for
+                // process termination
+                let proc_handle = self.process.take().unwrap();
+
+                async move {
+                    let _ = fasync::OnSignals::new(
+                        &proc_handle.as_handle_ref(),
+                        zx::Signals::PROCESS_TERMINATED,
+                    )
+                    .await
+                    .map_err(|e| {
+                        error!(
+                        "killing component's job after failure waiting on process exit, err: {}",
+                        e
+                    )
+                    });
+                    let _ = job.kill().map_err(|e| {
+                        error!("failed killing job in stop after lifecycle channel closed: {}", e)
+                    });
+                }
+                .boxed()
+            } else {
+                async move {
+                    let _ = lifecycle_chan.on_closed()
+                    .await
+                    .map_err(|e| {
+                        error!(
+                        "killing component's job after failure waiting on lifecycle channel, err: {}",
+                        e
+                        )
+                    });
+                    let _ = job.kill().map_err(|e| {
+                        error!("failed killing job in stop after lifecycle channel closed: {}", e)
+                    });
+                }
+                .boxed()
+            }
+        } else {
+            let _ = self.job.kill().map_err(|e| {
+                error!("failed killing job for component with no lifecycle channel: {}", e)
+            });
+            async {}.boxed()
+        }
+    }
+}
+
+impl Drop for ElfComponent {
+    fn drop(&mut self) {
+        // just in case we haven't killed the job already
+        let _ = self.job.kill().map_err(|e| error!("failed to kill job in drop: {}", e));
+    }
+}
diff --git a/src/sys/component_manager/src/elf_runner/config.rs b/src/sys/component_manager/src/elf_runner/config.rs
new file mode 100644
index 0000000..050874f
--- /dev/null
+++ b/src/sys/component_manager/src/elf_runner/config.rs
@@ -0,0 +1,108 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    super::error::ElfRunnerError, crate::model::policy::ScopedPolicyChecker,
+    fidl_fuchsia_data as fdata,
+};
+
+/// Wraps ELF runner-specific keys in component's "program" dictionary.
+#[derive(Default)]
+pub struct ElfProgramConfig {
+    pub notify_lifecycle_stop: bool,
+    pub ambient_mark_vmo_exec: bool,
+    pub main_process_critical: bool,
+    pub create_raw_processes: bool,
+}
+
+impl ElfProgramConfig {
+    /// Parse the given dictionary into an ElfProgramConfig, checking it against security policy as
+    /// needed.
+    ///
+    /// Checking against security policy is intentionally combined with parsing here, so that policy
+    /// enforcement is as close to the point of parsing as possible and can't be inadvertently skipped.
+    pub fn parse_and_check(
+        program: &fdata::Dictionary,
+        checker: &ScopedPolicyChecker,
+        url: &str,
+    ) -> Result<Self, ElfRunnerError> {
+        const STOP_EVENT_KEY: &str = "lifecycle.stop_event";
+        let notify_lifecycle_stop = match Self::find(program, STOP_EVENT_KEY) {
+            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
+                "notify" => Ok(true),
+                "ignore" => Ok(false),
+                _ => Err(()),
+            },
+            Some(_) => Err(()),
+            None => Ok(false),
+        }
+        .map_err(|_| ElfRunnerError::program_dictionary_error(STOP_EVENT_KEY, url))?;
+
+        const VMEX_KEY: &str = "job_policy_ambient_mark_vmo_exec";
+        let ambient_mark_vmo_exec = match Self::find(program, VMEX_KEY) {
+            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
+                "true" => Ok(true),
+                "false" => Ok(false),
+                _ => Err(()),
+            },
+            Some(_) => Err(()),
+            None => Ok(false),
+        }
+        .map_err(|_| ElfRunnerError::program_dictionary_error(VMEX_KEY, url))?;
+        if ambient_mark_vmo_exec {
+            checker.ambient_mark_vmo_exec_allowed()?;
+        }
+
+        const CRITICAL_KEY: &str = "main_process_critical";
+        let main_process_critical = match Self::find(program, CRITICAL_KEY) {
+            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
+                "true" => Ok(true),
+                "false" => Ok(false),
+                _ => Err(()),
+            },
+            Some(_) => Err(()),
+            None => Ok(false),
+        }
+        .map_err(|_| ElfRunnerError::program_dictionary_error(CRITICAL_KEY, url))?;
+        if main_process_critical {
+            checker.main_process_critical_allowed()?;
+        }
+
+        const CREATE_RAW_PROCESSES_KEY: &str = "job_policy_create_raw_processes";
+        let create_raw_processes = match Self::find(program, CREATE_RAW_PROCESSES_KEY) {
+            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
+                "true" => Ok(true),
+                "false" => Ok(false),
+                _ => Err(()),
+            },
+            Some(_) => Err(()),
+            None => Ok(false),
+        }
+        .map_err(|_| ElfRunnerError::program_dictionary_error(CREATE_RAW_PROCESSES_KEY, url))?;
+        if create_raw_processes {
+            checker.create_raw_processes_allowed()?;
+        }
+
+        Ok(ElfProgramConfig {
+            notify_lifecycle_stop,
+            ambient_mark_vmo_exec,
+            main_process_critical,
+            create_raw_processes,
+        })
+    }
+
+    fn find<'a>(dict: &'a fdata::Dictionary, key: &str) -> Option<&'a fdata::DictionaryValue> {
+        match &dict.entries {
+            Some(entries) => {
+                for entry in entries {
+                    if entry.key == key {
+                        return entry.value.as_ref().map(|val| &**val);
+                    }
+                }
+                None
+            }
+            _ => None,
+        }
+    }
+}
diff --git a/src/sys/component_manager/src/elf_runner/error.rs b/src/sys/component_manager/src/elf_runner/error.rs
new file mode 100644
index 0000000..69bfe6b
--- /dev/null
+++ b/src/sys/component_manager/src/elf_runner/error.rs
@@ -0,0 +1,129 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::model::{policy::PolicyError, runner::RunnerError},
+    anyhow::Error,
+    clonable_error::ClonableError,
+    fuchsia_zircon as zx,
+    log::error,
+    thiserror::Error,
+};
+
+/// Errors produced by `ElfRunner`.
+#[derive(Debug, Clone, Error)]
+pub enum ElfRunnerError {
+    #[error("failed to retrieve process koid for component with url \"{}\": {}", url, err)]
+    ComponentProcessIdError {
+        url: String,
+        #[source]
+        err: ClonableError,
+    },
+    #[error("failed to retrieve job koid for component with url \"{}\": {}", url, err)]
+    ComponentJobIdError {
+        url: String,
+        #[source]
+        err: ClonableError,
+    },
+    #[error("failed to set job policy for component with url \"{}\": {}", url, err)]
+    ComponentJobPolicyError {
+        url: String,
+        #[source]
+        err: ClonableError,
+    },
+    #[error("failed to create job for component with url \"{}\": {}", url, err)]
+    ComponentJobCreationError {
+        url: String,
+        #[source]
+        err: ClonableError,
+    },
+    #[error("failed to duplicate job for component with url \"{}\": {}", url, err)]
+    ComponentJobDuplicationError {
+        url: String,
+        #[source]
+        err: ClonableError,
+    },
+    #[error("failed to mark main process as critical for component with url \"{}\": {}", url, err)]
+    ComponentCriticalMarkingError {
+        url: String,
+        #[source]
+        err: ClonableError,
+    },
+    #[error("failed to add runtime/elf directory for component with url \"{}\"", url)]
+    ComponentElfDirectoryError { url: String },
+    #[error("program key \"{}\" invalid for component with url \"{}\"", key, url)]
+    ProgramDictionaryError { key: String, url: String },
+    #[error("{err}")]
+    SecurityPolicyError {
+        #[from]
+        err: PolicyError,
+    },
+    #[error("{err}")]
+    GenericRunnerError {
+        #[from]
+        err: RunnerError,
+    },
+    #[error("failed to duplicate UTC clock for component with url \"{}\": {}", url, status)]
+    DuplicateUtcClockError { url: String, status: zx::Status },
+}
+
+impl ElfRunnerError {
+    pub fn component_process_id_error(
+        url: impl Into<String>,
+        err: impl Into<Error>,
+    ) -> ElfRunnerError {
+        ElfRunnerError::ComponentProcessIdError { url: url.into(), err: err.into().into() }
+    }
+
+    pub fn component_job_id_error(url: impl Into<String>, err: impl Into<Error>) -> ElfRunnerError {
+        ElfRunnerError::ComponentJobIdError { url: url.into(), err: err.into().into() }
+    }
+
+    pub fn component_job_policy_error(
+        url: impl Into<String>,
+        err: impl Into<Error>,
+    ) -> ElfRunnerError {
+        ElfRunnerError::ComponentJobPolicyError { url: url.into(), err: err.into().into() }
+    }
+
+    pub fn component_job_creation_error(
+        url: impl Into<String>,
+        err: impl Into<Error>,
+    ) -> ElfRunnerError {
+        ElfRunnerError::ComponentJobCreationError { url: url.into(), err: err.into().into() }
+    }
+
+    pub fn component_job_duplication_error(
+        url: impl Into<String>,
+        err: impl Into<Error>,
+    ) -> ElfRunnerError {
+        ElfRunnerError::ComponentJobDuplicationError { url: url.into(), err: err.into().into() }
+    }
+
+    pub fn component_critical_marking_error(
+        url: impl Into<String>,
+        err: impl Into<Error>,
+    ) -> ElfRunnerError {
+        ElfRunnerError::ComponentCriticalMarkingError { url: url.into(), err: err.into().into() }
+    }
+
+    pub fn component_elf_directory_error(url: impl Into<String>) -> ElfRunnerError {
+        ElfRunnerError::ComponentElfDirectoryError { url: url.into() }
+    }
+
+    pub fn program_dictionary_error(
+        key: impl Into<String>,
+        url: impl Into<String>,
+    ) -> ElfRunnerError {
+        ElfRunnerError::ProgramDictionaryError { key: key.into(), url: url.into() }
+    }
+
+    pub fn as_zx_status(&self) -> zx::Status {
+        match self {
+            ElfRunnerError::GenericRunnerError { err } => err.as_zx_status(),
+            ElfRunnerError::SecurityPolicyError { .. } => zx::Status::ACCESS_DENIED,
+            _ => zx::Status::INTERNAL,
+        }
+    }
+}
diff --git a/src/sys/component_manager/src/elf_runner/launcher.rs b/src/sys/component_manager/src/elf_runner/launcher.rs
new file mode 100644
index 0000000..30b92c4
--- /dev/null
+++ b/src/sys/component_manager/src/elf_runner/launcher.rs
@@ -0,0 +1,52 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::{builtin::process_launcher::ProcessLauncher, config::RuntimeConfig},
+    anyhow::{Context as _, Error},
+    fidl_fuchsia_process as fproc, fuchsia_async as fasync,
+    fuchsia_component::client,
+    log::warn,
+};
+/// Connects to the appropriate `fuchsia.process.Launcher` service based on the options provided in
+/// `ProcessLauncherConnector::new`.
+///
+/// This exists so that callers can make a new connection to `fuchsia.process.Launcher` for each use
+/// because the service is stateful per connection, so it is not safe to share a connection between
+/// multiple asynchronous process launchers.
+///
+/// If `Arguments.use_builtin_process_launcher` is true, this will connect to the built-in
+/// `fuchsia.process.Launcher` service using the provided `ProcessLauncher`. Otherwise, this connects
+/// to the launcher service under /svc in component_manager's namespace.
+pub struct ProcessLauncherConnector {
+    use_builtin: bool,
+}
+
+impl ProcessLauncherConnector {
+    pub fn new(config: &RuntimeConfig) -> Self {
+        Self { use_builtin: config.use_builtin_process_launcher }
+    }
+
+    // This only returns an error when opening connection to an external
+    // Launcher service. If the built-in is used and fails to start it can only
+    // be discovered by getting an error attempting to use the launcher.
+    pub fn connect(&self) -> Result<fproc::LauncherProxy, Error> {
+        let proxy = if self.use_builtin {
+            let (proxy, stream) =
+                fidl::endpoints::create_proxy_and_stream::<fproc::LauncherMarker>()?;
+            fasync::Task::spawn(async move {
+                let result = ProcessLauncher::serve(stream).await;
+                if let Err(e) = result {
+                    warn!("ProcessLauncherConnector.connect failed: {}", e);
+                }
+            })
+            .detach();
+            proxy
+        } else {
+            client::connect_to_service::<fproc::LauncherMarker>()
+                .context("failed to connect to external launcher service")?
+        };
+        Ok(proxy)
+    }
+}
diff --git a/src/sys/component_manager/src/elf_runner/mod.rs b/src/sys/component_manager/src/elf_runner/mod.rs
index 20167ff..efe5d8f 100644
--- a/src/sys/component_manager/src/elf_runner/mod.rs
+++ b/src/sys/component_manager/src/elf_runner/mod.rs
@@ -2,54 +2,46 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+mod component;
+mod config;
+mod error;
+mod launcher;
+mod runtime_dir;
 mod stdout;
 
 use {
+    self::{
+        component::ElfComponent, config::ElfProgramConfig, error::ElfRunnerError,
+        launcher::ProcessLauncherConnector, runtime_dir::RuntimeDirBuilder,
+        stdout::bind_streams_to_syslog,
+    },
     crate::{
-        builtin::{process_launcher::ProcessLauncher, runner::BuiltinRunnerFactory},
+        builtin::runner::BuiltinRunnerFactory,
         config::RuntimeConfig,
         model::{
-            policy::{PolicyError, ScopedPolicyChecker},
+            policy::ScopedPolicyChecker,
             runner::{Runner, RunnerError},
         },
     },
-    anyhow::{format_err, Context as _, Error},
+    anyhow::{format_err, Context as _},
     async_trait::async_trait,
-    clonable_error::ClonableError,
-    fidl::endpoints::{Proxy, ServerEnd},
+    fidl::endpoints::ServerEnd,
     fidl_fuchsia_component as fcomp, fidl_fuchsia_component_runner as fcrunner,
-    fidl_fuchsia_data as fdata,
     fidl_fuchsia_diagnostics_types::{
         ComponentDiagnostics, ComponentTasks, Task as DiagnosticsTask,
     },
-    fidl_fuchsia_io::{DirectoryMarker, NodeMarker, OPEN_RIGHT_READABLE, OPEN_RIGHT_WRITABLE},
     fidl_fuchsia_process as fproc,
-    fidl_fuchsia_process_lifecycle::{LifecycleMarker, LifecycleProxy},
+    fidl_fuchsia_process_lifecycle::LifecycleMarker,
     fuchsia_async as fasync,
-    fuchsia_component::client,
     fuchsia_runtime::{duplicate_utc_clock_handle, job_default, HandleInfo, HandleType},
-    fuchsia_zircon::{
-        self as zx, AsHandleRef, Clock, HandleBased, Job, Process, ProcessInfo, Task,
-    },
-    futures::{
-        channel::oneshot,
-        future::{BoxFuture, FutureExt},
-    },
-    log::{error, warn},
-    runner::component::{ChannelEpitaph, Controllable},
+    fuchsia_zircon::{self as zx, AsHandleRef, Clock, HandleBased, Job, ProcessInfo},
+    futures::channel::oneshot,
+    log::warn,
+    runner::component::ChannelEpitaph,
     std::convert::TryFrom,
     std::{convert::TryInto, path::Path, sync::Arc},
-    thiserror::Error,
-    vfs::{
-        directory::entry::DirectoryEntry, directory::immutable::simple as pfs,
-        execution_scope::ExecutionScope, file::vmo::asynchronous::read_only_static,
-        path::Path as fvfsPath, tree_builder::TreeBuilder,
-    },
 };
 
-// Simple directory type which is used to implement `ComponentStartInfo.runtime_directory`.
-type RuntimeDirectory = Arc<pfs::Simple>;
-
 // Minimum timer slack amount and default mode. The amount should be large enough to allow for some
 // coalescing of timers, but small enough to ensure applications don't miss deadlines.
 //
@@ -57,200 +49,7 @@
 // timers in Scenic and other system services.
 const TIMER_SLACK_DURATION: zx::Duration = zx::Duration::from_micros(50);
 
-/// Errors produced by `ElfRunner`.
-#[derive(Debug, Clone, Error)]
-pub enum ElfRunnerError {
-    #[error("failed to retrieve process koid for component with url \"{}\": {}", url, err)]
-    ComponentProcessIdError {
-        url: String,
-        #[source]
-        err: ClonableError,
-    },
-    #[error("failed to retrieve job koid for component with url \"{}\": {}", url, err)]
-    ComponentJobIdError {
-        url: String,
-        #[source]
-        err: ClonableError,
-    },
-    #[error("failed to set job policy for component with url \"{}\": {}", url, err)]
-    ComponentJobPolicyError {
-        url: String,
-        #[source]
-        err: ClonableError,
-    },
-    #[error("failed to create job for component with url \"{}\": {}", url, err)]
-    ComponentJobCreationError {
-        url: String,
-        #[source]
-        err: ClonableError,
-    },
-    #[error("failed to duplicate job for component with url \"{}\": {}", url, err)]
-    ComponentJobDuplicationError {
-        url: String,
-        #[source]
-        err: ClonableError,
-    },
-    #[error("failed to mark main process as critical for component with url \"{}\": {}", url, err)]
-    ComponentCriticalMarkingError {
-        url: String,
-        #[source]
-        err: ClonableError,
-    },
-    #[error("failed to add runtime/elf directory for component with url \"{}\"", url)]
-    ComponentElfDirectoryError { url: String },
-    #[error("program key \"{}\" invalid for component with url \"{}\"", key, url)]
-    ProgramDictionaryError { key: String, url: String },
-    #[error("{err}")]
-    SecurityPolicyError {
-        #[from]
-        err: PolicyError,
-    },
-    #[error("{err}")]
-    GenericRunnerError {
-        #[from]
-        err: RunnerError,
-    },
-    #[error("failed to duplicate UTC clock for component with url \"{}\": {}", url, status)]
-    DuplicateUtcClockError { url: String, status: zx::Status },
-}
-
-impl ElfRunnerError {
-    pub fn component_process_id_error(
-        url: impl Into<String>,
-        err: impl Into<Error>,
-    ) -> ElfRunnerError {
-        ElfRunnerError::ComponentProcessIdError { url: url.into(), err: err.into().into() }
-    }
-
-    pub fn component_job_id_error(url: impl Into<String>, err: impl Into<Error>) -> ElfRunnerError {
-        ElfRunnerError::ComponentJobIdError { url: url.into(), err: err.into().into() }
-    }
-
-    pub fn component_job_policy_error(
-        url: impl Into<String>,
-        err: impl Into<Error>,
-    ) -> ElfRunnerError {
-        ElfRunnerError::ComponentJobPolicyError { url: url.into(), err: err.into().into() }
-    }
-
-    pub fn component_job_creation_error(
-        url: impl Into<String>,
-        err: impl Into<Error>,
-    ) -> ElfRunnerError {
-        ElfRunnerError::ComponentJobCreationError { url: url.into(), err: err.into().into() }
-    }
-
-    pub fn component_job_duplication_error(
-        url: impl Into<String>,
-        err: impl Into<Error>,
-    ) -> ElfRunnerError {
-        ElfRunnerError::ComponentJobDuplicationError { url: url.into(), err: err.into().into() }
-    }
-
-    pub fn component_critical_marking_error(
-        url: impl Into<String>,
-        err: impl Into<Error>,
-    ) -> ElfRunnerError {
-        ElfRunnerError::ComponentCriticalMarkingError { url: url.into(), err: err.into().into() }
-    }
-
-    pub fn component_elf_directory_error(url: impl Into<String>) -> ElfRunnerError {
-        ElfRunnerError::ComponentElfDirectoryError { url: url.into() }
-    }
-
-    pub fn program_dictionary_error(
-        key: impl Into<String>,
-        url: impl Into<String>,
-    ) -> ElfRunnerError {
-        ElfRunnerError::ProgramDictionaryError { key: key.into(), url: url.into() }
-    }
-
-    pub fn as_zx_status(&self) -> zx::Status {
-        match self {
-            ElfRunnerError::GenericRunnerError { err } => err.as_zx_status(),
-            ElfRunnerError::SecurityPolicyError { .. } => zx::Status::ACCESS_DENIED,
-            _ => zx::Status::INTERNAL,
-        }
-    }
-}
-
 // Builds and serves the runtime directory
-struct RuntimeDirBuilder {
-    args: Vec<String>,
-    job_id: Option<u64>,
-    process_id: Option<u64>,
-    server_end: ServerEnd<NodeMarker>,
-}
-
-impl RuntimeDirBuilder {
-    fn new(server_end: ServerEnd<DirectoryMarker>) -> Self {
-        // Transform the server end to speak Node protocol only
-        let server_end = ServerEnd::<NodeMarker>::new(server_end.into_channel());
-        Self { args: vec![], job_id: None, process_id: None, server_end }
-    }
-
-    fn args(mut self, args: Vec<String>) -> Self {
-        self.args = args;
-        self
-    }
-
-    fn job_id(mut self, job_id: u64) -> Self {
-        self.job_id = Some(job_id);
-        self
-    }
-
-    fn process_id(mut self, process_id: u64) -> Self {
-        self.process_id = Some(process_id);
-        self
-    }
-
-    fn serve(mut self) -> RuntimeDirectory {
-        // Create the runtime tree structure
-        //
-        // runtime
-        // |- args
-        // |  |- 0
-        // |  |- 1
-        // |  \- ...
-        // \- elf
-        //    |- job_id
-        //    \- process_id
-        let mut runtime_tree_builder = TreeBuilder::empty_dir();
-        let mut count: u32 = 0;
-        for arg in self.args.drain(..) {
-            runtime_tree_builder
-                .add_entry(["args", &count.to_string()], read_only_static(arg))
-                .expect("Failed to add arg to runtime directory");
-            count += 1;
-        }
-
-        if let Some(job_id) = self.job_id {
-            runtime_tree_builder
-                .add_entry(["elf", "job_id"], read_only_static(job_id.to_string()))
-                .expect("Failed to add job_id to runtime/elf directory");
-        }
-
-        if let Some(process_id) = self.process_id {
-            runtime_tree_builder
-                .add_entry(["elf", "process_id"], read_only_static(process_id.to_string()))
-                .expect("Failed to add process_id to runtime/elf directory");
-        }
-
-        let runtime_directory = runtime_tree_builder.build();
-
-        // Serve the runtime directory
-        runtime_directory.clone().open(
-            ExecutionScope::new(),
-            OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
-            0,
-            fvfsPath::empty(),
-            self.server_end,
-        );
-
-        runtime_directory
-    }
-}
-
 /// Runs components with ELF binaries.
 pub struct ElfRunner {
     launcher_connector: ProcessLauncherConnector,
@@ -310,7 +109,7 @@
         .map_err(|e| RunnerError::invalid_args(resolved_url.clone(), e))?;
 
         let (stdout_and_stderr_tasks, stdout_and_stderr_handles) =
-            stdout::bind_streams_to_syslog(&ns, stdout_sink, stderr_sink)
+            bind_streams_to_syslog(&ns, stdout_sink, stderr_sink)
                 .await
                 .map_err(|s| RunnerError::component_launch_error(resolved_url.clone(), s))?;
 
@@ -374,7 +173,7 @@
         }))
     }
 
-    async fn start_component(
+    pub async fn start_component(
         &self,
         start_info: fcrunner::ComponentStartInfo,
         checker: &ScopedPolicyChecker,
@@ -525,260 +324,6 @@
     }
 }
 
-/// Wraps ELF runner-specific keys in component's "program" dictionary.
-#[derive(Default)]
-struct ElfProgramConfig {
-    notify_lifecycle_stop: bool,
-    ambient_mark_vmo_exec: bool,
-    main_process_critical: bool,
-    create_raw_processes: bool,
-}
-
-impl ElfProgramConfig {
-    /// Parse the given dictionary into an ElfProgramConfig, checking it against security policy as
-    /// needed.
-    ///
-    /// Checking against security policy is intentionally combined with parsing here, so that policy
-    /// enforcement is as close to the point of parsing as possible and can't be inadvertently skipped.
-    pub fn parse_and_check(
-        program: &fdata::Dictionary,
-        checker: &ScopedPolicyChecker,
-        url: &str,
-    ) -> Result<Self, ElfRunnerError> {
-        const STOP_EVENT_KEY: &str = "lifecycle.stop_event";
-        let notify_lifecycle_stop = match Self::find(program, STOP_EVENT_KEY) {
-            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
-                "notify" => Ok(true),
-                "ignore" => Ok(false),
-                _ => Err(()),
-            },
-            Some(_) => Err(()),
-            None => Ok(false),
-        }
-        .map_err(|_| ElfRunnerError::program_dictionary_error(STOP_EVENT_KEY, url))?;
-
-        const VMEX_KEY: &str = "job_policy_ambient_mark_vmo_exec";
-        let ambient_mark_vmo_exec = match Self::find(program, VMEX_KEY) {
-            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
-                "true" => Ok(true),
-                "false" => Ok(false),
-                _ => Err(()),
-            },
-            Some(_) => Err(()),
-            None => Ok(false),
-        }
-        .map_err(|_| ElfRunnerError::program_dictionary_error(VMEX_KEY, url))?;
-        if ambient_mark_vmo_exec {
-            checker.ambient_mark_vmo_exec_allowed()?;
-        }
-
-        const CRITICAL_KEY: &str = "main_process_critical";
-        let main_process_critical = match Self::find(program, CRITICAL_KEY) {
-            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
-                "true" => Ok(true),
-                "false" => Ok(false),
-                _ => Err(()),
-            },
-            Some(_) => Err(()),
-            None => Ok(false),
-        }
-        .map_err(|_| ElfRunnerError::program_dictionary_error(CRITICAL_KEY, url))?;
-        if main_process_critical {
-            checker.main_process_critical_allowed()?;
-        }
-
-        const CREATE_RAW_PROCESSES_KEY: &str = "job_policy_create_raw_processes";
-        let create_raw_processes = match Self::find(program, CREATE_RAW_PROCESSES_KEY) {
-            Some(fdata::DictionaryValue::Str(str_val)) => match &str_val[..] {
-                "true" => Ok(true),
-                "false" => Ok(false),
-                _ => Err(()),
-            },
-            Some(_) => Err(()),
-            None => Ok(false),
-        }
-        .map_err(|_| ElfRunnerError::program_dictionary_error(CREATE_RAW_PROCESSES_KEY, url))?;
-        if create_raw_processes {
-            checker.create_raw_processes_allowed()?;
-        }
-
-        Ok(ElfProgramConfig {
-            notify_lifecycle_stop,
-            ambient_mark_vmo_exec,
-            main_process_critical,
-            create_raw_processes,
-        })
-    }
-
-    fn find<'a>(dict: &'a fdata::Dictionary, key: &str) -> Option<&'a fdata::DictionaryValue> {
-        match &dict.entries {
-            Some(entries) => {
-                for entry in entries {
-                    if entry.key == key {
-                        return entry.value.as_ref().map(|val| &**val);
-                    }
-                }
-                None
-            }
-            _ => None,
-        }
-    }
-}
-
-/// Structure representing a running elf component.
-struct ElfComponent {
-    /// Namespace directory for this component, kept just as a reference to
-    /// keep the namespace alive.
-    _runtime_dir: RuntimeDirectory,
-
-    /// Job in which the underlying process that represents the component is
-    /// running.
-    job: Arc<Job>,
-
-    /// Process made for the program binary defined for this component.
-    process: Option<Arc<Process>>,
-
-    /// Client end of the channel given to an ElfComponent which says it
-    /// implements the Lifecycle protocol. If the component does not implement
-    /// the protocol, this will be None.
-    lifecycle_channel: Option<LifecycleProxy>,
-
-    /// We need to remember if we marked the main process as critical, because if we're asked to
-    /// kill a component that has such a marking it'll bring down everything.
-    main_process_critical: bool,
-
-    /// Any tasks spawned to serve this component. For example, stdout and stderr
-    /// listeners are Task objects that live for the duration of the component's
-    /// lifetime.
-    _tasks: Vec<fasync::Task<()>>,
-
-    /// URL with which the component was launched.
-    component_url: String,
-}
-
-impl ElfComponent {
-    pub fn new(
-        _runtime_dir: RuntimeDirectory,
-        job: Job,
-        process: Process,
-        lifecycle_channel: Option<LifecycleProxy>,
-        main_process_critical: bool,
-        tasks: Vec<fasync::Task<()>>,
-        component_url: String,
-    ) -> Self {
-        Self {
-            _runtime_dir,
-            job: Arc::new(job),
-            process: Some(Arc::new(process)),
-            lifecycle_channel,
-            main_process_critical,
-            _tasks: tasks,
-            component_url,
-        }
-    }
-
-    /// Return a pointer to the Process, returns None if the component has no
-    /// Process.
-    pub fn copy_process(&self) -> Option<Arc<Process>> {
-        self.process.clone()
-    }
-
-    /// Return a handle to the Job containing the process for this component.
-    ///
-    /// The rights of the job will be set such that the resulting handle will be apppropriate to
-    /// use for diagnostics-only purposes. Right now that is ZX_RIGHTS_BASIC (which includes
-    /// INSPECT).
-    pub fn copy_job_for_diagnostics(&self) -> Result<Job, ElfRunnerError> {
-        self.job.as_handle_ref().duplicate(zx::Rights::BASIC).map(|h| Job::from(h)).map_err(
-            |status| {
-                ElfRunnerError::component_job_duplication_error(self.component_url.clone(), status)
-            },
-        )
-    }
-}
-
-#[async_trait]
-impl Controllable for ElfComponent {
-    async fn kill(mut self) {
-        if self.main_process_critical {
-            warn!("killing a component with 'main_process_critical', so this will also kill component_manager and all of its components");
-        }
-        let _ = self.job.kill().map_err(|e| error!("failed killing job during kill: {}", e));
-    }
-
-    fn stop<'a>(&mut self) -> BoxFuture<'a, ()> {
-        if let Some(lifecycle_chan) = self.lifecycle_channel.take() {
-            let _ = lifecycle_chan.stop();
-
-            let job = self.job.clone();
-
-            // If the component's main process is critical we must watch for
-            // the main process to exit, otherwise we could end up killing that
-            // process and therefore killing the root job.
-            if self.main_process_critical {
-                if self.process.is_none() {
-                    // This is a bit strange because there's no process, but there is a lifecycle
-                    // channel. Since there is no process it seems like killing it can't kill
-                    // component manager.
-                    warn!("killing job of component with 'main_process_critical' set because component has lifecycle channel, but no process main process.");
-                    let _ = self.job.kill().map_err(|e| {
-                        error!("failed killing job for component with no lifecycle channel: {}", e)
-                    });
-                    return async {}.boxed();
-                }
-                // Try to duplicate the Process handle so we can us it to wait for
-                // process termination
-                let proc_handle = self.process.take().unwrap();
-
-                async move {
-                    let _ = fasync::OnSignals::new(
-                        &proc_handle.as_handle_ref(),
-                        zx::Signals::PROCESS_TERMINATED,
-                    )
-                    .await
-                    .map_err(|e| {
-                        error!(
-                        "killing component's job after failure waiting on process exit, err: {}",
-                        e
-                    )
-                    });
-                    let _ = job.kill().map_err(|e| {
-                        error!("failed killing job in stop after lifecycle channel closed: {}", e)
-                    });
-                }
-                .boxed()
-            } else {
-                async move {
-                    let _ = lifecycle_chan.on_closed()
-                    .await
-                    .map_err(|e| {
-                        error!(
-                        "killing component's job after failure waiting on lifecycle channel, err: {}",
-                        e
-                        )
-                    });
-                    let _ = job.kill().map_err(|e| {
-                        error!("failed killing job in stop after lifecycle channel closed: {}", e)
-                    });
-                }
-                .boxed()
-            }
-        } else {
-            let _ = self.job.kill().map_err(|e| {
-                error!("failed killing job for component with no lifecycle channel: {}", e)
-            });
-            async {}.boxed()
-        }
-    }
-}
-
-impl Drop for ElfComponent {
-    fn drop(&mut self) {
-        // just in case we haven't killed the job already
-        let _ = self.job.kill().map_err(|e| error!("failed to kill job in drop: {}", e));
-    }
-}
-
 impl BuiltinRunnerFactory for ElfRunner {
     fn get_scoped_runner(self: Arc<Self>, checker: ScopedPolicyChecker) -> Arc<dyn Runner> {
         Arc::new(ScopedElfRunner { runner: self, checker })
@@ -920,74 +465,45 @@
     }
 }
 
-/// Connects to the appropriate `fuchsia.process.Launcher` service based on the options provided in
-/// `ProcessLauncherConnector::new`.
-///
-/// This exists so that callers can make a new connection to `fuchsia.process.Launcher` for each use
-/// because the service is stateful per connection, so it is not safe to share a connection between
-/// multiple asynchronous process launchers.
-///
-/// If `Arguments.use_builtin_process_launcher` is true, this will connect to the built-in
-/// `fuchsia.process.Launcher` service using the provided `ProcessLauncher`. Otherwise, this connects
-/// to the launcher service under /svc in component_manager's namespace.
-struct ProcessLauncherConnector {
-    use_builtin: bool,
-}
-
-impl ProcessLauncherConnector {
-    pub fn new(config: &RuntimeConfig) -> Self {
-        Self { use_builtin: config.use_builtin_process_launcher }
-    }
-
-    pub fn connect(&self) -> Result<fproc::LauncherProxy, Error> {
-        let proxy = if self.use_builtin {
-            let (proxy, stream) =
-                fidl::endpoints::create_proxy_and_stream::<fproc::LauncherMarker>()?;
-            fasync::Task::spawn(async move {
-                let result = ProcessLauncher::serve(stream).await;
-                if let Err(e) = result {
-                    warn!("ProcessLauncherConnector.connect failed: {}", e);
-                }
-            })
-            .detach();
-            proxy
-        } else {
-            client::connect_to_service::<fproc::LauncherMarker>()
-                .context("failed to connect to external launcher service")?
-        };
-        Ok(proxy)
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use {
         super::*,
+        crate::model::policy::ScopedPolicyChecker,
         crate::{
+            builtin::runner::BuiltinRunnerFactory,
             config::{JobPolicyAllowlists, RuntimeConfig, SecurityPolicy},
             model::testing::test_helpers::{create_fs_with_mock_logsink, MockServiceRequest},
         },
         anyhow::Error,
         fdio,
-        fidl::endpoints::{create_proxy, ClientEnd, Proxy},
+        fidl::endpoints::{create_proxy, ClientEnd, Proxy, ServerEnd},
         fidl_fuchsia_component as fcomp, fidl_fuchsia_component_runner as fcrunner,
         fidl_fuchsia_data as fdata,
-        fidl_fuchsia_io::DirectoryProxy,
+        fidl_fuchsia_diagnostics_types::{
+            ComponentDiagnostics, ComponentTasks, Task as DiagnosticsTask,
+        },
+        fidl_fuchsia_io::{DirectoryMarker, DirectoryProxy, OPEN_RIGHT_READABLE},
         fidl_fuchsia_logger::LogSinkRequest,
+        fidl_fuchsia_process_lifecycle::LifecycleMarker,
+        fidl_fuchsia_process_lifecycle::LifecycleProxy,
         fuchsia_async::{self as fasync, futures::join},
-        fuchsia_zircon as zx,
+        fuchsia_zircon::{self as zx, Task},
+        fuchsia_zircon::{AsHandleRef, Job, Process},
         futures::{prelude::*, StreamExt},
         io_util,
         matches::assert_matches,
         moniker::AbsoluteMoniker,
         runner::component::Controllable,
         scoped_task,
+        std::path::Path,
         std::{
             convert::TryFrom,
             ffi::CString,
             sync::{Arc, Mutex},
             task::Poll,
         },
+        vfs::tree_builder::TreeBuilder,
     };
 
     // Rust's test harness does not allow passing through arbitrary arguments, so to get coverage
diff --git a/src/sys/component_manager/src/elf_runner/runtime_dir.rs b/src/sys/component_manager/src/elf_runner/runtime_dir.rs
new file mode 100644
index 0000000..5a5617e
--- /dev/null
+++ b/src/sys/component_manager/src/elf_runner/runtime_dir.rs
@@ -0,0 +1,93 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    fidl::endpoints::ServerEnd,
+    fidl_fuchsia_io::{DirectoryMarker, NodeMarker, OPEN_RIGHT_READABLE, OPEN_RIGHT_WRITABLE},
+    std::sync::Arc,
+    vfs::{
+        directory::entry::DirectoryEntry, directory::immutable::simple as pfs,
+        execution_scope::ExecutionScope, file::vmo::asynchronous::read_only_static,
+        path::Path as fvfsPath, tree_builder::TreeBuilder,
+    },
+};
+
+// Simple directory type which is used to implement `ComponentStartInfo.runtime_directory`.
+pub type RuntimeDirectory = Arc<pfs::Simple>;
+
+pub struct RuntimeDirBuilder {
+    args: Vec<String>,
+    job_id: Option<u64>,
+    process_id: Option<u64>,
+    server_end: ServerEnd<NodeMarker>,
+}
+
+impl RuntimeDirBuilder {
+    pub fn new(server_end: ServerEnd<DirectoryMarker>) -> Self {
+        // Transform the server end to speak Node protocol only
+        let server_end = ServerEnd::<NodeMarker>::new(server_end.into_channel());
+        Self { args: vec![], job_id: None, process_id: None, server_end }
+    }
+
+    pub fn args(mut self, args: Vec<String>) -> Self {
+        self.args = args;
+        self
+    }
+
+    pub fn job_id(mut self, job_id: u64) -> Self {
+        self.job_id = Some(job_id);
+        self
+    }
+
+    pub fn process_id(mut self, process_id: u64) -> Self {
+        self.process_id = Some(process_id);
+        self
+    }
+
+    pub fn serve(mut self) -> RuntimeDirectory {
+        // Create the runtime tree structure
+        //
+        // runtime
+        // |- args
+        // |  |- 0
+        // |  |- 1
+        // |  \- ...
+        // \- elf
+        //    |- job_id
+        //    \- process_id
+        let mut runtime_tree_builder = TreeBuilder::empty_dir();
+        let mut count: u32 = 0;
+        for arg in self.args.drain(..) {
+            runtime_tree_builder
+                .add_entry(["args", &count.to_string()], read_only_static(arg))
+                .expect("Failed to add arg to runtime directory");
+            count += 1;
+        }
+
+        if let Some(job_id) = self.job_id {
+            runtime_tree_builder
+                .add_entry(["elf", "job_id"], read_only_static(job_id.to_string()))
+                .expect("Failed to add job_id to runtime/elf directory");
+        }
+
+        if let Some(process_id) = self.process_id {
+            runtime_tree_builder
+                .add_entry(["elf", "process_id"], read_only_static(process_id.to_string()))
+                .expect("Failed to add process_id to runtime/elf directory");
+        }
+
+        let runtime_directory = runtime_tree_builder.build();
+
+        // Serve the runtime directory
+        runtime_directory.clone().open(
+            ExecutionScope::new(),
+            OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
+            0,
+            fvfsPath::empty(),
+            self.server_end,
+        );
+
+        runtime_directory
+    }
+}