[component_manager] component_manager uses instance-id based storage dirs

* open_isolated_storage and deleted_isolated_storage take instance IDs
  into account.
* instance-id directories are placed under the root of the storage
  capability's backing directory
  -  This means the story capability is responsible for carving out
     a 'persistent/' subdir for persistent storage.

Change-Id: Iab443eb0c084196f9f065e82d20e4d0225278423
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/472978
Reviewed-by: Gary Bressler <geb@google.com>
Reviewed-by: Adam Lesinski <adamlesinski@google.com>
Commit-Queue: Vardhan Mudunuru <vardhan@google.com>
diff --git a/src/sys/component_manager/src/model/component.rs b/src/sys/component_manager/src/model/component.rs
index 9b23471..02fd63e 100644
--- a/src/sys/component_manager/src/model/component.rs
+++ b/src/sys/component_manager/src/model/component.rs
@@ -12,6 +12,7 @@
                 StopAction,
             },
             binding,
+            component_id_index::ComponentInstanceId,
             context::{ModelContext, WeakModelContext},
             environment::Environment,
             error::ModelError,
@@ -662,6 +663,12 @@
         }
         Ok(Arc::clone(self))
     }
+
+    pub fn instance_id(self: &Arc<Self>) -> Option<ComponentInstanceId> {
+        self.try_get_context()
+            .map(|ctx| ctx.component_id_index().look_up_moniker(&self.abs_moniker).cloned())
+            .unwrap_or(None)
+    }
 }
 
 impl std::fmt::Debug for ComponentInstance {
@@ -1240,13 +1247,14 @@
             rights,
             testing::{
                 mocks::{ControlMessage, ControllerActionResponse, MockController},
-                routing_test_helpers::RoutingTest,
+                routing_test_helpers::{RoutingTest, RoutingTestBuilder},
                 test_helpers::{
-                    self, component_decl_with_test_runner, ActionsTest, ComponentDeclBuilder,
-                    ComponentInfo,
+                    self, component_decl_with_test_runner, make_index_file, ActionsTest,
+                    ComponentDeclBuilder, ComponentInfo,
                 },
             },
         },
+        component_id_index::gen_instance_id,
         fidl::endpoints,
         fuchsia_async as fasync,
         fuchsia_zircon::{self as zx, AsHandleRef, Koid},
@@ -1917,6 +1925,46 @@
         assert_eq!(parent_stop.target_moniker, a_moniker.clone());
     }
 
+    #[fasync::run_singlethreaded(test)]
+    async fn realm_instance_id() {
+        let components = vec![
+            ("root", ComponentDeclBuilder::new().add_eager_child("a").build()),
+            ("a", ComponentDeclBuilder::new().add_eager_child("b").build()),
+            ("b", component_decl_with_test_runner()),
+        ];
+
+        let instance_id = Some(gen_instance_id(&mut rand::thread_rng()));
+        let component_id_index_path = make_index_file(component_id_index::Index {
+            instances: vec![component_id_index::InstanceIdEntry {
+                instance_id: instance_id.clone(),
+                appmgr_moniker: None,
+                moniker: Some(AbsoluteMoniker::root()),
+            }],
+            ..component_id_index::Index::default()
+        })
+        .unwrap();
+        let test = RoutingTestBuilder::new("root", components)
+            .set_component_id_index_path(
+                component_id_index_path.path().to_str().unwrap().to_string(),
+            )
+            .build()
+            .await;
+
+        let root_realm =
+            test.model.bind(&AbsoluteMoniker::root(), &BindReason::Root).await.unwrap();
+        assert_eq!(instance_id, root_realm.instance_id());
+
+        let a_realm = test
+            .model
+            .bind(
+                &AbsoluteMoniker::parse_string_without_instances("/a").unwrap(),
+                &BindReason::Root,
+            )
+            .await
+            .unwrap();
+        assert_eq!(None, a_realm.instance_id());
+    }
+
     async fn wait_until_event_get_timestamp(
         event_stream: &mut EventStream,
         event_type: EventType,
diff --git a/src/sys/component_manager/src/model/component_id_index.rs b/src/sys/component_manager/src/model/component_id_index.rs
index 7b52901..42697e6 100644
--- a/src/sys/component_manager/src/model/component_id_index.rs
+++ b/src/sys/component_manager/src/model/component_id_index.rs
@@ -79,21 +79,8 @@
 #[cfg(test)]
 pub mod tests {
     use super::*;
-    use anyhow::Result;
-    use fidl::encoding::encode_persistent;
+    use crate::model::testing::test_helpers::make_index_file;
     use fuchsia_async as fasync;
-    use std::convert::TryFrom;
-    use std::io::Write;
-    use tempfile::NamedTempFile;
-
-    fn make_index_file(index: component_id_index::Index) -> Result<NamedTempFile> {
-        let mut tmp_file = NamedTempFile::new()?;
-        tmp_file.write_all(
-            encode_persistent(&mut fcomponent_internal::ComponentIdIndex::try_from(index)?)?
-                .as_ref(),
-        )?;
-        Ok(tmp_file)
-    }
 
     #[fasync::run_singlethreaded(test)]
     async fn look_up_moniker_no_exists() {
diff --git a/src/sys/component_manager/src/model/routing/mod.rs b/src/sys/component_manager/src/model/routing/mod.rs
index dff75b9..505a3c1 100644
--- a/src/sys/component_manager/src/model/routing/mod.rs
+++ b/src/sys/component_manager/src/model/routing/mod.rs
@@ -368,6 +368,7 @@
     let storage_dir_proxy = storage::open_isolated_storage(
         storage_source_info,
         relative_moniker,
+        target.instance_id().as_ref(),
         open_mode,
         bind_reason,
     )
@@ -401,9 +402,13 @@
 ) -> Result<(), ModelError> {
     let (storage_source_info, relative_moniker) =
         route_storage_capability(use_decl, target).await?;
-    storage::delete_isolated_storage(storage_source_info, relative_moniker)
-        .await
-        .map_err(|e| ModelError::from(e))?;
+    storage::delete_isolated_storage(
+        storage_source_info,
+        relative_moniker,
+        target.instance_id().as_ref(),
+    )
+    .await
+    .map_err(|e| ModelError::from(e))?;
     Ok(())
 }
 
diff --git a/src/sys/component_manager/src/model/storage/admin_protocol.rs b/src/sys/component_manager/src/model/storage/admin_protocol.rs
index d19529d..35a67e6 100644
--- a/src/sys/component_manager/src/model/storage/admin_protocol.rs
+++ b/src/sys/component_manager/src/model/storage/admin_protocol.rs
@@ -32,6 +32,7 @@
     futures::TryStreamExt,
     lazy_static::lazy_static,
     log::*,
+    moniker::AbsoluteMoniker,
     moniker::ExtendedMoniker,
     std::{
         convert::TryInto,
@@ -179,7 +180,8 @@
         let capability = ComponentCapability::Storage(storage_decl.clone());
         let cap_state = CapabilityState::new(&capability);
         let storage_capability_source_info =
-            routing::route_storage_backing_directory(storage_decl, component, cap_state).await?;
+            routing::route_storage_backing_directory(storage_decl, component.clone(), cap_state)
+                .await?;
 
         let mut stream = ServerEnd::<fsys::StorageAdminMarker>::new(server_end)
             .into_stream()
@@ -194,9 +196,19 @@
                     object,
                     control_handle: _,
                 } => {
+                    let relative_moniker = relative_moniker.as_str().try_into()?;
+                    let abs_moniker =
+                        AbsoluteMoniker::from_relative(&component.abs_moniker, &relative_moniker)?;
+                    let instance_id = component
+                        .try_get_context()?
+                        .component_id_index()
+                        .look_up_moniker(&abs_moniker)
+                        .cloned();
+
                     let dir_proxy = storage::open_isolated_storage(
                         storage_capability_source_info.clone(),
-                        relative_moniker.as_str().try_into()?,
+                        relative_moniker,
+                        instance_id.as_ref(),
                         mode,
                         &BindReason::AccessCapability {
                             target: ExtendedMoniker::ComponentInstance(storage_moniker.clone()),
@@ -216,9 +228,19 @@
                             Err(fcomponent::Error::InvalidArguments)
                         }
                         Ok(relative_moniker) => {
+                            let abs_moniker = AbsoluteMoniker::from_relative(
+                                &component.abs_moniker,
+                                &relative_moniker,
+                            )?;
+                            let instance_id = component
+                                .try_get_context()?
+                                .component_id_index()
+                                .look_up_moniker(&abs_moniker)
+                                .cloned();
                             let res = storage::delete_isolated_storage(
                                 storage_capability_source_info.clone(),
                                 relative_moniker,
+                                instance_id.as_ref(),
                             )
                             .await;
                             match res {
diff --git a/src/sys/component_manager/src/model/storage/mod.rs b/src/sys/component_manager/src/model/storage/mod.rs
index cdefa61..9339209 100644
--- a/src/sys/component_manager/src/model/storage/mod.rs
+++ b/src/sys/component_manager/src/model/storage/mod.rs
@@ -8,6 +8,7 @@
         channel,
         model::{
             component::{BindReason, ComponentInstance},
+            component_id_index::ComponentInstanceId,
             error::ModelError,
         },
     },
@@ -30,35 +31,46 @@
 #[derive(Debug, Error, Clone)]
 pub enum StorageError {
     #[error(
-        "failed to open isolated storage from {:?}'s directory {} for {}: {} ",
+        "failed to open isolated storage from {:?}'s directory {} for {} (instance_id={:?}): {} ",
         dir_source_moniker,
         dir_source_path,
         relative_moniker,
+        instance_id,
         err
     )]
     Open {
         dir_source_moniker: Option<AbsoluteMoniker>,
         dir_source_path: CapabilityPath,
         relative_moniker: RelativeMoniker,
+        instance_id: Option<ComponentInstanceId>,
         #[source]
         err: ClonableError,
     },
     #[error(
-        "failed to remove isolated storage from {:?}'s directory {} for {}: {} ",
+        "failed to remove isolated storage from {:?}'s directory {} for {} (instance_id={:?}): {} ",
         dir_source_moniker,
         dir_source_path,
         relative_moniker,
+        instance_id,
         err
     )]
     Remove {
         dir_source_moniker: Option<AbsoluteMoniker>,
         dir_source_path: CapabilityPath,
         relative_moniker: RelativeMoniker,
+        instance_id: Option<ComponentInstanceId>,
         #[source]
         err: ClonableError,
     },
-    #[error("storage path for {} was invalid", relative_moniker)]
-    InvalidStoragePath { relative_moniker: RelativeMoniker },
+    #[error(
+        "storage path for relative_moniker={}, instance_id={:?} is invalid",
+        relative_moniker,
+        instance_id
+    )]
+    InvalidStoragePath {
+        relative_moniker: RelativeMoniker,
+        instance_id: Option<ComponentInstanceId>,
+    },
 }
 
 impl StorageError {
@@ -66,27 +78,39 @@
         dir_source_moniker: Option<AbsoluteMoniker>,
         dir_source_path: CapabilityPath,
         relative_moniker: RelativeMoniker,
+        instance_id: Option<ComponentInstanceId>,
         err: impl Into<Error>,
     ) -> Self {
-        Self::Open { dir_source_moniker, dir_source_path, relative_moniker, err: err.into().into() }
+        Self::Open {
+            dir_source_moniker,
+            dir_source_path,
+            relative_moniker,
+            instance_id,
+            err: err.into().into(),
+        }
     }
 
     pub fn remove(
         dir_source_moniker: Option<AbsoluteMoniker>,
         dir_source_path: CapabilityPath,
         relative_moniker: RelativeMoniker,
+        instance_id: Option<ComponentInstanceId>,
         err: impl Into<Error>,
     ) -> Self {
         Self::Remove {
             dir_source_moniker,
             dir_source_path,
             relative_moniker,
+            instance_id,
             err: err.into().into(),
         }
     }
 
-    pub fn invalid_storage_path(relative_moniker: RelativeMoniker) -> Self {
-        Self::InvalidStoragePath { relative_moniker }
+    pub fn invalid_storage_path(
+        relative_moniker: RelativeMoniker,
+        instance_id: Option<ComponentInstanceId>,
+    ) -> Self {
+        Self::InvalidStoragePath { relative_moniker, instance_id }
     }
 }
 
@@ -115,9 +139,13 @@
 /// Open the isolated storage sub-directory for the given component, creating it if necessary.
 /// `dir_source_component` and `dir_source_path` are the component hosting the directory and its
 /// capability path.
+///
+/// The storage sub-directory is based on provided instance ID if present, otherwise it is based on
+/// the provided relative moniker.
 pub async fn open_isolated_storage(
     storage_source_info: StorageCapabilitySource,
     relative_moniker: RelativeMoniker,
+    instance_id: Option<&ComponentInstanceId>,
     open_mode: u32,
     bind_reason: &BindReason,
 ) -> Result<DirectoryProxy, ModelError> {
@@ -136,7 +164,12 @@
         dir_source_component
             .bind(bind_reason)
             .await?
-            .open_outgoing(FLAGS, open_mode, full_backing_directory_path, &mut local_server_end)
+            .open_outgoing(
+                FLAGS,
+                open_mode,
+                full_backing_directory_path.clone(),
+                &mut local_server_end,
+            )
             .await?;
     } else {
         // If storage_source_info.storage_provider is None, the directory comes from component_manager's namespace
@@ -149,6 +182,7 @@
                 None,
                 storage_source_info.backing_directory_path.clone(),
                 relative_moniker.clone(),
+                instance_id.cloned(),
                 e,
             ))
         })?;
@@ -159,21 +193,24 @@
                 storage_source_info.storage_provider.as_ref().map(|r| r.abs_moniker.clone()),
                 storage_source_info.backing_directory_path.clone(),
                 relative_moniker.clone(),
+                instance_id.cloned(),
                 e,
             ))
         })?;
     }
-    let storage_proxy =
-        io_util::create_sub_directories(&dir_proxy, &generate_storage_path(&relative_moniker))
-            .map_err(|e| {
-                ModelError::from(StorageError::open(
-                    storage_source_info.storage_provider.as_ref().map(|r| r.abs_moniker.clone()),
-                    storage_source_info.backing_directory_path.clone(),
-                    relative_moniker.clone(),
-                    e,
-                ))
-            })?;
-    Ok(storage_proxy)
+    let storage_path = instance_id
+        .map(|id| generate_instance_id_based_storage_path(id))
+        .unwrap_or_else(|| generate_moniker_based_storage_path(&relative_moniker));
+
+    io_util::create_sub_directories(&dir_proxy, &storage_path).map_err(|e| {
+        ModelError::from(StorageError::open(
+            storage_source_info.storage_provider.as_ref().map(|r| r.abs_moniker.clone()),
+            storage_source_info.backing_directory_path.clone(),
+            relative_moniker.clone(),
+            instance_id.cloned(),
+            e,
+        ))
+    })
 }
 
 /// Delete the isolated storage sub-directory for the given component.  `dir_source_component` and
@@ -181,6 +218,7 @@
 pub async fn delete_isolated_storage(
     storage_source_info: StorageCapabilitySource,
     relative_moniker: RelativeMoniker,
+    instance_id: Option<&ComponentInstanceId>,
 ) -> Result<(), ModelError> {
     let (root_dir, local_server_end) =
         endpoints::create_proxy::<DirectoryMarker>().expect("failed to create proxy");
@@ -212,35 +250,78 @@
                 None,
                 storage_source_info.backing_directory_path.clone(),
                 relative_moniker.clone(),
+                None,
                 e,
             )
         })?;
     }
-    let storage_path = generate_storage_path(&relative_moniker);
-    // We want to strip off the "data" portion of the path, and then one more level to get to the
-    // directory holding the target component's storage.
-    if storage_path.parent().and_then(|p| p.parent()).is_none() {
-        return Err(StorageError::invalid_storage_path(relative_moniker.clone()).into());
-    }
-    let mut dir_path = storage_path.parent().unwrap().parent().unwrap().to_path_buf();
-    let name = storage_path.parent().unwrap().file_name().ok_or_else(|| {
-        ModelError::from(StorageError::invalid_storage_path(relative_moniker.clone()))
-    })?;
-    let name = name.to_str().ok_or_else(|| ModelError::name_is_not_utf8(name.to_os_string()))?;
-    if let Some(subdir) = storage_source_info.storage_subdir.as_ref() {
-        dir_path = subdir.join(dir_path);
-    }
-    let dir = if dir_path.parent().is_none() {
-        root_dir
+
+    let (dir, name) = if let Some(instance_id) = instance_id {
+        let storage_path = generate_instance_id_based_storage_path(instance_id);
+        let file_name = storage_path
+            .file_name()
+            .ok_or_else(|| {
+                StorageError::invalid_storage_path(
+                    relative_moniker.clone(),
+                    Some(instance_id.clone()),
+                )
+            })?
+            .to_str()
+            .ok_or_else(|| ModelError::name_is_not_utf8(storage_path.clone().into_os_string()))?
+            .to_string();
+
+        let parent_path = storage_path.parent().ok_or_else(|| {
+            StorageError::invalid_storage_path(relative_moniker.clone(), Some(instance_id.clone()))
+        })?;
+        let dir = if parent_path
+            .to_str()
+            .ok_or_else(|| ModelError::name_is_not_utf8(storage_path.clone().into_os_string()))?
+            .is_empty()
+        {
+            root_dir
+        } else {
+            io_util::open_directory(&root_dir, parent_path, FLAGS).map_err(|e| {
+                StorageError::open(
+                    storage_source_info.storage_provider.as_ref().map(|r| r.abs_moniker.clone()),
+                    storage_source_info.backing_directory_path.clone(),
+                    relative_moniker.clone(),
+                    None,
+                    e,
+                )
+            })?
+        };
+        (dir, file_name)
     } else {
-        io_util::open_directory(&root_dir, &dir_path, FLAGS).map_err(|e| {
-            StorageError::open(
-                storage_source_info.storage_provider.as_ref().map(|r| r.abs_moniker.clone()),
-                storage_source_info.backing_directory_path.clone(),
-                relative_moniker.clone(),
-                e,
-            )
-        })?
+        let storage_path = generate_moniker_based_storage_path(&relative_moniker);
+        // We want to strip off the "data" portion of the path, and then one more level to get to the
+        // directory holding the target component's storage.
+        if storage_path.parent().and_then(|p| p.parent()).is_none() {
+            return Err(StorageError::invalid_storage_path(relative_moniker.clone(), None).into());
+        }
+        let mut dir_path = storage_path.parent().unwrap().parent().unwrap().to_path_buf();
+        let name = storage_path.parent().unwrap().file_name().ok_or_else(|| {
+            ModelError::from(StorageError::invalid_storage_path(relative_moniker.clone(), None))
+        })?;
+        let name =
+            name.to_str().ok_or_else(|| ModelError::name_is_not_utf8(name.to_os_string()))?;
+        if let Some(subdir) = storage_source_info.storage_subdir.as_ref() {
+            dir_path = subdir.join(dir_path);
+        }
+
+        let dir = if dir_path.parent().is_none() {
+            root_dir
+        } else {
+            io_util::open_directory(&root_dir, &dir_path, FLAGS).map_err(|e| {
+                StorageError::open(
+                    storage_source_info.storage_provider.as_ref().map(|r| r.abs_moniker.clone()),
+                    storage_source_info.backing_directory_path.clone(),
+                    relative_moniker.clone(),
+                    None,
+                    e,
+                )
+            })?
+        };
+        (dir, name.to_string())
     };
 
     // TODO(fxbug.dev/36377): This function is subject to races. If another process has a handle to the
@@ -248,11 +329,12 @@
     // cause it to spin or fail because a subdir was not empty after it removed all the contents.
     // It's also possible that the directory was already deleted by the backing component or a
     // prior run.
-    files_async::remove_dir_recursive(&dir, name).await.map_err(|e| {
+    files_async::remove_dir_recursive(&dir, &name).await.map_err(|e| {
         StorageError::remove(
             storage_source_info.storage_provider.as_ref().map(|r| r.abs_moniker.clone()),
             storage_source_info.backing_directory_path.clone(),
             relative_moniker.clone(),
+            instance_id.cloned(),
             e,
         )
     })?;
@@ -285,7 +367,7 @@
 /// When `d` attempts to access `/my_cache` the framework creates the sub-directory
 /// `b:0/children/c:0/children/d:0/data` in the directory used by `a` to declare storage
 /// capabilities.  Then, the framework gives 'd' access to this new directory.
-fn generate_storage_path(relative_moniker: &RelativeMoniker) -> PathBuf {
+fn generate_moniker_based_storage_path(relative_moniker: &RelativeMoniker) -> PathBuf {
     assert!(
         !relative_moniker.down_path().is_empty(),
         "storage capability appears to have been exposed or used by its source"
@@ -308,6 +390,13 @@
     dir_path.into_iter().collect()
 }
 
+/// Generates the component storage directory path for the provided component instance.
+///
+/// Components which do not have an instance ID use a generate moniker-based storage path instead.
+fn generate_instance_id_based_storage_path(instance_id: &ComponentInstanceId) -> PathBuf {
+    [instance_id].iter().collect()
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -320,8 +409,9 @@
             testing::test_helpers::{self, component_decl_with_test_runner, ComponentDeclBuilder},
         },
         cm_rust::*,
-        fidl_fuchsia_io2 as fio2,
+        component_id_index, fidl_fuchsia_io2 as fio2,
         matches::assert_matches,
+        rand::{self, distributions::Alphanumeric, Rng},
         std::convert::{TryFrom, TryInto},
     };
 
@@ -366,6 +456,7 @@
                 storage_subdir: None,
             },
             relative_moniker.clone(),
+            None,
             OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
             &BindReason::Eager,
         )
@@ -384,6 +475,7 @@
                 storage_subdir: None,
             },
             relative_moniker.clone(),
+            None,
             OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
             &BindReason::Eager,
         )
@@ -402,6 +494,7 @@
                 storage_subdir: None,
             },
             relative_moniker.clone(),
+            None,
             OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
             &BindReason::Eager,
         )
@@ -412,6 +505,91 @@
         assert_eq!(test_helpers::list_directory(&dir).await, vec!["file".to_string()]);
     }
 
+    #[fuchsia_async::run_singlethreaded(test)]
+    async fn open_isolated_storage_instance_id() {
+        let components = vec![
+            ("a", ComponentDeclBuilder::new().add_lazy_child("b").add_lazy_child("c").build()),
+            (
+                "b",
+                ComponentDeclBuilder::new()
+                    .directory(DirectoryDecl {
+                        name: "data".into(),
+                        source_path: "/data".try_into().unwrap(),
+                        rights: *rights::READ_RIGHTS | *rights::WRITE_RIGHTS,
+                    })
+                    .expose(ExposeDecl::Directory(ExposeDirectoryDecl {
+                        source_name: "data".try_into().unwrap(),
+                        source: ExposeSource::Self_,
+                        target_name: "data".try_into().unwrap(),
+                        target: ExposeTarget::Parent,
+                        rights: Some(fio2::Operations::Connect),
+                        subdir: None,
+                    }))
+                    .build(),
+            ),
+        ];
+        let test = RoutingTest::new("a", components).await;
+        let b_component = test
+            .model
+            .look_up(&vec!["b:0"].into())
+            .await
+            .expect("failed to find component for b:0");
+        let dir_source_path = CapabilityPath::try_from("/data").unwrap();
+        let relative_moniker = RelativeMoniker::new(vec![], vec!["c:0".into(), "coll:d:1".into()]);
+
+        // open the storage directory using instance ID.
+        let instance_id = Some(component_id_index::gen_instance_id(&mut rand::thread_rng()));
+        let mut dir = open_isolated_storage(
+            StorageCapabilitySource {
+                storage_provider: Some(Arc::clone(&b_component)),
+                backing_directory_path: dir_source_path.clone(),
+                backing_directory_subdir: None,
+                storage_subdir: None,
+            },
+            relative_moniker.clone(),
+            instance_id.as_ref(),
+            OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
+            &BindReason::Eager,
+        )
+        .await
+        .expect("failed to open isolated storage");
+
+        // check that an instance-ID based directory was created:
+        assert!(test_helpers::list_directory(&test.test_dir_proxy)
+            .await
+            .contains(&instance_id.clone().unwrap()));
+
+        // check that a moniker-based directory was NOT created:
+        assert!(!test_helpers::list_directory_recursive(&test.test_dir_proxy).await.contains(
+            &generate_moniker_based_storage_path(&relative_moniker).to_str().unwrap().to_string()
+        ));
+
+        // check that the directory is writable by writing a marker file in it.
+        let marker_file_name: String =
+            rand::thread_rng().sample_iter(&Alphanumeric).take(7).map(char::from).collect();
+        assert_eq!(test_helpers::list_directory(&dir).await, Vec::<String>::new());
+        test_helpers::write_file(&dir, &marker_file_name, "contents").await;
+        assert_eq!(test_helpers::list_directory(&dir).await, vec![marker_file_name.clone()]);
+
+        // check that re-opening the directory gives us the same marker file.
+        dir = open_isolated_storage(
+            StorageCapabilitySource {
+                storage_provider: Some(Arc::clone(&b_component)),
+                backing_directory_path: dir_source_path.clone(),
+                backing_directory_subdir: None,
+                storage_subdir: None,
+            },
+            relative_moniker.clone(),
+            instance_id.as_ref(),
+            OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
+            &BindReason::Eager,
+        )
+        .await
+        .expect("failed to open isolated storage");
+
+        assert_eq!(test_helpers::list_directory(&dir).await, vec![marker_file_name.clone()]);
+    }
+
     // TODO: test with different subdirs
 
     #[fuchsia_async::run_singlethreaded(test)]
@@ -436,6 +614,7 @@
                 storage_subdir: None,
             },
             relative_moniker.clone(),
+            None,
             OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
             &BindReason::Eager,
         )
@@ -488,6 +667,7 @@
                 storage_subdir: None,
             },
             child_moniker.clone(),
+            None,
             OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
             &BindReason::Eager,
         )
@@ -506,6 +686,7 @@
                 storage_subdir: None,
             },
             parent_moniker.clone(),
+            None,
             OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
             &BindReason::Eager,
         )
@@ -524,6 +705,7 @@
                 storage_subdir: None,
             },
             child_moniker.clone(),
+            None,
         )
         .await
         .expect("failed to delete child's isolated storage");
@@ -537,6 +719,7 @@
                 storage_subdir: None,
             },
             parent_moniker.clone(),
+            None,
             OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
             &BindReason::Eager,
         )
@@ -546,7 +729,7 @@
 
         // Open list of children from parent. Should not contain child directory.
         assert_eq!(
-            test.list_directory_in_storage(None, parent_moniker.clone(), "children").await,
+            test.list_directory_in_storage(None, parent_moniker.clone(), None, "children").await,
             Vec::<String>::new(),
         );
 
@@ -559,6 +742,104 @@
                 storage_subdir: None,
             },
             child_moniker,
+            None,
+        )
+        .await
+        .expect_err("delete isolated storage not meant to succeed");
+        match err {
+            ModelError::StorageError { err: StorageError::Remove { .. } } => {}
+            _ => {
+                panic!("unexpected error: {:?}", err);
+            }
+        }
+    }
+
+    #[fuchsia_async::run_singlethreaded(test)]
+    async fn delete_isolated_storage_instance_id_test() {
+        let components = vec![
+            ("a", ComponentDeclBuilder::new().add_lazy_child("b").add_lazy_child("c").build()),
+            (
+                "b",
+                ComponentDeclBuilder::new()
+                    .directory(DirectoryDecl {
+                        name: "data".into(),
+                        source_path: "/data".try_into().unwrap(),
+                        rights: *rights::READ_RIGHTS | *rights::WRITE_RIGHTS,
+                    })
+                    .expose(ExposeDecl::Directory(ExposeDirectoryDecl {
+                        source_name: "data".try_into().unwrap(),
+                        source: ExposeSource::Self_,
+                        target_name: "data".try_into().unwrap(),
+                        target: ExposeTarget::Parent,
+                        rights: Some(fio2::Operations::Connect),
+                        subdir: None,
+                    }))
+                    .build(),
+            ),
+        ];
+        let test = RoutingTest::new("a", components).await;
+        let b_component = test
+            .model
+            .look_up(&vec!["b:0"].into())
+            .await
+            .expect("failed to find component for b:0");
+        let dir_source_path = CapabilityPath::try_from("/data").unwrap();
+        let child_moniker = RelativeMoniker::new(vec![], vec!["c:0".into(), "coll:d:1".into()]);
+        let instance_id = Some(component_id_index::gen_instance_id(&mut rand::thread_rng()));
+        // Open and write to the storage for child.
+        let dir = open_isolated_storage(
+            StorageCapabilitySource {
+                storage_provider: Some(Arc::clone(&b_component)),
+                backing_directory_path: dir_source_path.clone(),
+                backing_directory_subdir: None,
+                storage_subdir: None,
+            },
+            child_moniker.clone(),
+            instance_id.as_ref(),
+            OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE,
+            &BindReason::Eager,
+        )
+        .await
+        .expect("failed to open isolated storage");
+
+        // check that an instance-ID based directory was created:
+        assert!(test_helpers::list_directory(&test.test_dir_proxy)
+            .await
+            .contains(&instance_id.clone().unwrap()));
+
+        assert_eq!(test_helpers::list_directory(&dir).await, Vec::<String>::new());
+        test_helpers::write_file(&dir, "file", "hippos").await;
+        assert_eq!(test_helpers::list_directory(&dir).await, vec!["file".to_string()]);
+
+        // Delete the child's storage.
+        delete_isolated_storage(
+            StorageCapabilitySource {
+                storage_provider: Some(Arc::clone(&b_component)),
+                backing_directory_path: dir_source_path.clone(),
+                backing_directory_subdir: None,
+                storage_subdir: None,
+            },
+            child_moniker.clone(),
+            instance_id.as_ref(),
+        )
+        .await
+        .expect("failed to delete child's isolated storage");
+
+        // check that an instance-ID based directory was deleted:
+        assert!(!test_helpers::list_directory(&test.test_dir_proxy)
+            .await
+            .contains(&instance_id.clone().unwrap()));
+
+        // Error -- tried to delete nonexistent storage.
+        let err = delete_isolated_storage(
+            StorageCapabilitySource {
+                storage_provider: Some(Arc::clone(&b_component)),
+                backing_directory_path: dir_source_path,
+                backing_directory_subdir: None,
+                storage_subdir: None,
+            },
+            child_moniker,
+            instance_id.as_ref(),
         )
         .await
         .expect_err("delete isolated storage not meant to succeed");
@@ -571,7 +852,7 @@
     }
 
     #[test]
-    fn generate_storage_path_test() {
+    fn generate_moniker_based_storage_path_test() {
         for (relative_moniker, expected_output) in vec![
             (
                 RelativeMoniker::from_absolute(
@@ -595,7 +876,10 @@
                 "a:1/children/b:2/children/c:3/data",
             ),
         ] {
-            assert_eq!(generate_storage_path(&relative_moniker), PathBuf::from(expected_output))
+            assert_eq!(
+                generate_moniker_based_storage_path(&relative_moniker),
+                PathBuf::from(expected_output)
+            )
         }
     }
 }
diff --git a/src/sys/component_manager/src/model/testing/routing_test_helpers.rs b/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
index dbcd18b..4fd369e 100644
--- a/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
+++ b/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
@@ -11,6 +11,7 @@
         model::{
             binding::Binder,
             component::BindReason,
+            component_id_index::ComponentInstanceId,
             error::ModelError,
             events::{event::EventMode, registry::EventSubscription},
             hooks::HooksRegistration,
@@ -131,7 +132,7 @@
     outgoing_paths: HashMap<String, HashMap<CapabilityPath, Arc<dyn DirectoryEntry>>>,
     builtin_runners: HashMap<CapabilityName, Arc<dyn BuiltinRunnerFactory>>,
     namespace_capabilities: Vec<CapabilityDecl>,
-
+    component_id_index_path: Option<String>,
     // Map of components that have a custom function serving the "outgoing" directory.
     // Other functions will receive a stock directory generated by `RoutingTest`.
     custom_outgoing_host_fns: HashMap<String, HostFn>,
@@ -185,6 +186,11 @@
         self
     }
 
+    pub fn set_component_id_index_path(mut self, index_path: String) -> Self {
+        self.component_id_index_path = Some(index_path);
+        self
+    }
+
     /// Add a custom capability security policy to restrict routing of certain caps.
     pub fn add_capability_policy(
         mut self,
@@ -275,6 +281,7 @@
                 capability_policy: builder.capability_policy,
                 ..Default::default()
             },
+            component_id_index_path: builder.component_id_index_path,
             ..Default::default()
         };
         let mut env_builder = BuiltinEnvironmentBuilder::new()
@@ -304,6 +311,10 @@
         }
     }
 
+    pub fn test_dir_path(&self) -> &Path {
+        self.test_dir.path()
+    }
+
     /// Creates a static file at the given path in the temp directory.
     pub async fn create_static_file(
         &self,
@@ -402,6 +413,15 @@
                 capability_util::write_file_to_storage(&namespace, path, expected_res.clone())
                     .await;
 
+                let instance_id = self
+                    .model
+                    .root
+                    .try_get_context()
+                    .unwrap()
+                    .component_id_index()
+                    .look_up_moniker(&moniker)
+                    .cloned();
+
                 if let Some(relative_moniker) = storage_relation {
                     if from_cm_namespace {
                         // Check for the file in the /tmp in the test's namespace
@@ -413,6 +433,7 @@
                         let res = capability_util::check_file_in_storage(
                             storage_subdir,
                             relative_moniker,
+                            instance_id.as_ref(),
                             &tmp_proxy,
                         )
                         .await;
@@ -424,6 +445,7 @@
                         let res = capability_util::check_file_in_storage(
                             storage_subdir,
                             relative_moniker,
+                            instance_id.as_ref(),
                             &self.test_dir_proxy,
                         )
                         .await;
@@ -448,6 +470,16 @@
                 let (storage_proxy, server_end) = create_proxy().unwrap();
                 let flags = OPEN_RIGHT_WRITABLE | OPEN_FLAG_CREATE;
                 let relative_moniker_string = format!("{}", storage_relation);
+                let component_abs_moniker =
+                    AbsoluteMoniker::from_relative(&moniker, &storage_relation).unwrap();
+                let component_instance_id = self
+                    .model
+                    .root
+                    .try_get_context()
+                    .unwrap()
+                    .component_id_index()
+                    .look_up_moniker(&component_abs_moniker)
+                    .cloned();
                 storage_admin_proxy
                     .open_component_storage(
                         relative_moniker_string.as_str(),
@@ -476,6 +508,7 @@
                     capability_util::check_file_in_storage(
                         storage_subdir.clone(),
                         storage_relation.clone(),
+                        component_instance_id.as_ref(),
                         &storage_dir,
                     )
                     .await
@@ -488,6 +521,7 @@
                     capability_util::confirm_storage_is_deleted_for_component(
                         storage_subdir,
                         storage_relation,
+                        component_instance_id.as_ref(),
                         &storage_dir,
                     )
                     .await;
@@ -552,10 +586,14 @@
         &self,
         subdir: Option<&str>,
         relation: RelativeMoniker,
+        instance_id: Option<&ComponentInstanceId>,
         relative_path: &str,
     ) -> Vec<String> {
-        let dir_path =
-            capability_util::generate_storage_path(subdir.map(|s| s.to_string()), &relation);
+        let dir_path = capability_util::generate_storage_path(
+            subdir.map(|s| s.to_string()),
+            &relation,
+            instance_id,
+        );
         let mut dir_path = dir_path.parent().unwrap().to_path_buf();
         if !relative_path.is_empty() {
             dir_path.push(relative_path);
@@ -804,7 +842,8 @@
 /// Contains functions to use capabilities in routing tests.
 pub mod capability_util {
     use {
-        super::*, anyhow::format_err, cm_rust::NativeIntoFidl, fidl::endpoints::ServiceMarker,
+        super::*, crate::model::component_id_index::ComponentInstanceId, anyhow::format_err,
+        cm_rust::NativeIntoFidl, fidl::endpoints::ServiceMarker,
         fidl_fuchsia_sys2::BlockingEventSourceMarker, std::path::PathBuf,
     };
 
@@ -921,9 +960,10 @@
     pub async fn check_file_in_storage(
         storage_subdir: Option<String>,
         relation: RelativeMoniker,
+        instance_id: Option<&ComponentInstanceId>,
         test_dir_proxy: &DirectoryProxy,
     ) -> Result<(), anyhow::Error> {
-        let mut dir_path = generate_storage_path(storage_subdir, &relation);
+        let mut dir_path = generate_storage_path(storage_subdir, &relation, instance_id);
         dir_path.push("hippos");
         let file_proxy =
             io_util::open_file(&test_dir_proxy, &dir_path, io_util::OPEN_RIGHT_READABLE)?;
@@ -940,9 +980,10 @@
     pub async fn confirm_storage_is_deleted_for_component(
         storage_subdir: Option<String>,
         relation: RelativeMoniker,
+        instance_id: Option<&ComponentInstanceId>,
         test_dir_proxy: &DirectoryProxy,
     ) {
-        let dir_path = generate_storage_path(storage_subdir, &relation);
+        let dir_path = generate_storage_path(storage_subdir, &relation, instance_id);
         let res = io_util::directory::open_directory(
             &test_dir_proxy,
             dir_path.to_str().unwrap(),
@@ -1289,7 +1330,11 @@
     pub fn generate_storage_path(
         subdir: Option<String>,
         relative_moniker: &RelativeMoniker,
+        instance_id: Option<&ComponentInstanceId>,
     ) -> PathBuf {
+        if let Some(id) = instance_id {
+            return [id].iter().collect();
+        }
         assert!(relative_moniker.up_path().is_empty());
         let mut down_path = relative_moniker.down_path().iter();
         let mut dir_path = vec![];
diff --git a/src/sys/component_manager/src/model/testing/test_helpers.rs b/src/sys/component_manager/src/model/testing/test_helpers.rs
index a10479d..fbcb5dc 100644
--- a/src/sys/component_manager/src/model/testing/test_helpers.rs
+++ b/src/sys/component_manager/src/model/testing/test_helpers.rs
@@ -21,9 +21,10 @@
     },
     cm_rust::{ChildDecl, ComponentDecl, NativeIntoFidl},
     cm_types::Url,
+    fidl::encoding::encode_persistent,
     fidl::endpoints::{self, Proxy, ServerEnd},
-    fidl_fidl_examples_echo as echo, fidl_fuchsia_component_runner as fcrunner,
-    fidl_fuchsia_data as fdata,
+    fidl_fidl_examples_echo as echo, fidl_fuchsia_component_internal as fcomponent_internal,
+    fidl_fuchsia_component_runner as fcrunner, fidl_fuchsia_data as fdata,
     fidl_fuchsia_io::{
         DirectoryProxy, CLONE_FLAG_SAME_RIGHTS, MODE_TYPE_SERVICE, OPEN_FLAG_CREATE,
         OPEN_RIGHT_READABLE, OPEN_RIGHT_WRITABLE,
@@ -34,9 +35,12 @@
     futures::{channel::mpsc::Receiver, StreamExt, TryStreamExt},
     moniker::{AbsoluteMoniker, PartialMoniker},
     std::collections::HashSet,
+    std::convert::TryFrom,
     std::default::Default,
+    std::io::Write,
     std::path::Path,
     std::sync::Arc,
+    tempfile::NamedTempFile,
     vfs::directory::entry::DirectoryEntry,
 };
 
@@ -658,6 +662,15 @@
     start_info
 }
 
+/// Makes a temporary component ID index file with contents from the given `index`.
+pub fn make_index_file(index: component_id_index::Index) -> anyhow::Result<NamedTempFile> {
+    let mut tmp_file = NamedTempFile::new()?;
+    tmp_file.write_all(
+        encode_persistent(&mut fcomponent_internal::ComponentIdIndex::try_from(index)?)?.as_ref(),
+    )?;
+    Ok(tmp_file)
+}
+
 /// Contains test model and ancillary objects.
 pub struct TestModelResult {
     pub model: Arc<Model>,
diff --git a/src/sys/component_manager/src/model/tests/storage.rs b/src/sys/component_manager/src/model/tests/storage.rs
index 2852931..bf74b9d 100644
--- a/src/sys/component_manager/src/model/tests/storage.rs
+++ b/src/sys/component_manager/src/model/tests/storage.rs
@@ -11,6 +11,7 @@
         },
     },
     cm_rust::*,
+    component_id_index::gen_instance_id,
     fidl_fuchsia_sys2 as fsys, fuchsia_zircon as zx,
     moniker::{AbsoluteMoniker, ExtendedMoniker, RelativeMoniker},
     std::{collections::HashSet, convert::TryInto, fs, path::PathBuf},
@@ -840,26 +841,46 @@
     .await;
     // Confirm storage directory exists for component in collection
     assert_eq!(
-        test.list_directory_in_storage(Some("data"), RelativeMoniker::new(vec![], vec![]), "")
-            .await,
+        test.list_directory_in_storage(
+            Some("data"),
+            RelativeMoniker::new(vec![], vec![]),
+            None,
+            ""
+        )
+        .await,
         vec!["coll:c:1".to_string()],
     );
     assert_eq!(
-        test.list_directory_in_storage(Some("cache"), RelativeMoniker::new(vec![], vec![]), "")
-            .await,
+        test.list_directory_in_storage(
+            Some("cache"),
+            RelativeMoniker::new(vec![], vec![]),
+            None,
+            ""
+        )
+        .await,
         vec!["coll:c:1".to_string()],
     );
     test.destroy_dynamic_child(vec!["b:0"].into(), "coll", "c").await;
 
     // Confirm storage no longer exists.
     assert_eq!(
-        test.list_directory_in_storage(Some("data"), RelativeMoniker::new(vec![], vec![]), "")
-            .await,
+        test.list_directory_in_storage(
+            Some("data"),
+            RelativeMoniker::new(vec![], vec![]),
+            None,
+            ""
+        )
+        .await,
         Vec::<String>::new(),
     );
     assert_eq!(
-        test.list_directory_in_storage(Some("cache"), RelativeMoniker::new(vec![], vec![]), "")
-            .await,
+        test.list_directory_in_storage(
+            Some("cache"),
+            RelativeMoniker::new(vec![], vec![]),
+            None,
+            ""
+        )
+        .await,
         Vec::<String>::new(),
     );
 }
@@ -997,6 +1018,7 @@
         test.list_directory_in_storage(
             Some("data"),
             RelativeMoniker::new(vec![], vec!["b:0".into()]),
+            None,
             "children",
         )
         .await,
@@ -1006,6 +1028,7 @@
         test.list_directory_in_storage(
             Some("cache"),
             RelativeMoniker::new(vec![], vec!["b:0".into()]),
+            None,
             "children",
         )
         .await,
@@ -1018,6 +1041,7 @@
         test.list_directory_in_storage(
             Some("data"),
             RelativeMoniker::new(vec![], vec!["b:0".into()]),
+            None,
             "children"
         )
         .await,
@@ -1027,6 +1051,7 @@
         test.list_directory_in_storage(
             Some("cache"),
             RelativeMoniker::new(vec![], vec!["b:0".into()]),
+            None,
             "children"
         )
         .await,
@@ -1500,3 +1525,121 @@
     )
     .await;
 }
+
+///   component manager's namespace
+///    |
+///    a
+///    |
+///    b
+///    |
+///    c
+///
+/// Instance IDs defined only for `b` in the component ID index.
+/// Check that the correct storge layout is used when a component has an instance ID.
+#[fuchsia_async::run_singlethreaded(test)]
+async fn instance_id_from_index() {
+    let b_instance_id = Some(gen_instance_id(&mut rand::thread_rng()));
+    let component_id_index_path = make_index_file(component_id_index::Index {
+        instances: vec![component_id_index::InstanceIdEntry {
+            instance_id: b_instance_id.clone(),
+            appmgr_moniker: None,
+            moniker: Some(AbsoluteMoniker::parse_string_without_instances("/b").unwrap()),
+        }],
+        ..component_id_index::Index::default()
+    })
+    .unwrap();
+    let components = vec![
+        (
+            "a",
+            ComponentDeclBuilder::new()
+                .directory(
+                    DirectoryDeclBuilder::new("data")
+                        .path("/data")
+                        .rights(*rights::READ_RIGHTS | *rights::WRITE_RIGHTS)
+                        .build(),
+                )
+                .offer(OfferDecl::Storage(OfferStorageDecl {
+                    source: OfferStorageSource::Self_,
+                    target: OfferTarget::Child("b".to_string()),
+                    source_name: "cache".into(),
+                    target_name: "cache".into(),
+                }))
+                .add_lazy_child("b")
+                .storage(StorageDecl {
+                    name: "cache".into(),
+                    backing_dir: "data".try_into().unwrap(),
+                    source: StorageDirectorySource::Self_,
+                    subdir: None,
+                })
+                .build(),
+        ),
+        (
+            "b",
+            ComponentDeclBuilder::new()
+                .use_(UseDecl::Storage(UseStorageDecl {
+                    source_name: "cache".into(),
+                    target_path: "/storage".try_into().unwrap(),
+                }))
+                .offer(OfferDecl::Storage(OfferStorageDecl {
+                    source: OfferStorageSource::Parent,
+                    target: OfferTarget::Child("c".to_string()),
+                    source_name: "cache".into(),
+                    target_name: "cache".into(),
+                }))
+                .add_lazy_child("c")
+                .build(),
+        ),
+        (
+            "c",
+            ComponentDeclBuilder::new()
+                .use_(UseDecl::Storage(UseStorageDecl {
+                    source_name: "cache".into(),
+                    target_path: "/storage".try_into().unwrap(),
+                }))
+                .build(),
+        ),
+    ];
+    let test = RoutingTestBuilder::new("a", components)
+        .set_component_id_index_path(component_id_index_path.path().to_str().unwrap().to_string())
+        .build()
+        .await;
+
+    // instance `b` uses instance-id based paths.
+    test.check_use(
+        AbsoluteMoniker::parse_string_without_instances("/b").unwrap(),
+        CheckUse::Storage {
+            path: "/storage".try_into().unwrap(),
+            storage_relation: Some(RelativeMoniker::new(vec![], vec!["b:0".into()])),
+            from_cm_namespace: false,
+            storage_subdir: None,
+            expected_res: ExpectedResult::Ok,
+        },
+    )
+    .await;
+    assert!(test.list_directory(".").await.contains(b_instance_id.as_ref().unwrap()));
+
+    // instance `c` uses moniker-based paths.
+    let storage_relation = RelativeMoniker::new(vec![], vec!["b:0".into(), "c:0".into()]);
+    test.check_use(
+        AbsoluteMoniker::parse_string_without_instances("/b/c").unwrap(),
+        CheckUse::Storage {
+            path: "/storage".try_into().unwrap(),
+            storage_relation: Some(storage_relation.clone()),
+            from_cm_namespace: false,
+            storage_subdir: None,
+            expected_res: ExpectedResult::Ok,
+        },
+    )
+    .await;
+
+    let expected_storage_path =
+        capability_util::generate_storage_path(None, &storage_relation, None)
+            .to_str()
+            .unwrap()
+            .to_string();
+    assert!(list_directory_recursive(&test.test_dir_proxy)
+        .await
+        .iter()
+        .find(|&name| name.starts_with(&expected_storage_path))
+        .is_some());
+}