[pkg_resolver] Rewrite rules toggle amber sources

This reverts commit b3099baf2e433f3c1450a07ac232e27a97ab3ed0, which was
a revert of commit e78ed1e2062564d3cf29d63ee5edf07f8dc5ba97, which was
essentially this commit.

When pkg_resolver's rewrite rules change, it will now make a best effort
to exclusively enable all matching amber sources with an equivalent ID,
if there is a rewrite rule that rewrites all of fuchsia.com.

Additionally, amberctl now enables and disables amber sources by
managing rewrite rules. A side effect of this change is that it can no
longer reasonably support enabling multiple sources, so it now ignores
the "-x" flag if provided, always ensuring there is 0 or 1 enabled Amber
source.

Test: CQ
PKG-799 #done

Change-Id: I47a18647624101b9054c95c6f9bd9f36e0d4a278
diff --git a/garnet/bin/pkg_resolver/BUILD.gn b/garnet/bin/pkg_resolver/BUILD.gn
index 1a6cb1f..3fa327f 100644
--- a/garnet/bin/pkg_resolver/BUILD.gn
+++ b/garnet/bin/pkg_resolver/BUILD.gn
@@ -13,6 +13,7 @@
   edition = "2018"
 
   deps = [
+    "//garnet/lib/rust/fidl_fuchsia_amber_ext",
     "//garnet/lib/rust/fidl_fuchsia_pkg_ext",
     "//garnet/lib/rust/files_async",
     "//garnet/public/lib/fidl/rust/fidl",
diff --git a/garnet/bin/pkg_resolver/src/amber.rs b/garnet/bin/pkg_resolver/src/amber.rs
new file mode 100644
index 0000000..7edbeb4
--- /dev/null
+++ b/garnet/bin/pkg_resolver/src/amber.rs
@@ -0,0 +1,255 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::amber_connector::AmberConnect,
+    fidl_fuchsia_amber::{ControlProxy as AmberProxy, Status as AmberStatus},
+    fuchsia_syslog::{fx_log_err, fx_log_info},
+    futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered},
+};
+
+/// High level actions performed on amber by the package resolver to manipulate source configs.
+pub trait AmberSourceSelector: Sized + Clone + Send {
+    /// Disable all enabled sources.
+    fn disable_all_sources(&self) -> BoxFuture<'_, ()>;
+
+    /// Enable the given source id, disabling all other enabled sources. If no source with the
+    /// given `id` exists, disable all enabled sources.
+    fn enable_source_exclusive(&self, id: &str) -> BoxFuture<'_, ()>;
+}
+
+impl<T> AmberSourceSelector for T
+where
+    T: AmberConnect + Clone,
+{
+    fn disable_all_sources(&self) -> BoxFuture<'_, ()> {
+        let amber = match self.connect() {
+            Ok(amber) => amber,
+            Err(e) => {
+                fx_log_err!("while disabling all Amber sources: {:?}", e);
+                return future::ready(()).boxed();
+            }
+        };
+        async move {
+            let mut work = FuturesUnordered::new();
+            for (source_id, enabled) in await!(iter_sources(&amber)) {
+                if enabled {
+                    work.push(set_source_enabled(&amber, source_id, false));
+                }
+            }
+            await!(work.collect::<()>());
+        }
+            .boxed()
+    }
+    fn enable_source_exclusive(&self, id: &str) -> BoxFuture<'_, ()> {
+        let amber = match self.connect() {
+            Ok(amber) => amber,
+            Err(e) => {
+                fx_log_err!("while enabling exclusive Amber source: {:?}", e);
+                return future::ready(()).boxed();
+            }
+        };
+        let id = id.to_owned();
+
+        async move {
+            let mut work = FuturesUnordered::new();
+            // Rewrite rules don't require hostnames to be registered source or repository configs.
+            // If id doesn't reference a valid source config, try to enable it anyway for the
+            // syslog output. Note that this case would result in all sources being disabled.
+            work.push(set_source_enabled(&amber, id.clone(), true));
+            for (source_id, enabled) in await!(iter_sources(&amber)) {
+                if enabled && source_id != id {
+                    work.push(set_source_enabled(&amber, source_id, false));
+                }
+            }
+            await!(work.collect::<()>());
+        }
+            .boxed()
+    }
+}
+
+async fn iter_sources(amber: &AmberProxy) -> impl Iterator<Item = (String, bool)> {
+    await!(amber.list_srcs())
+        .unwrap_or_else(|e| {
+            fx_log_err!("while listing Amber sources: {:?}", e);
+            vec![]
+        })
+        .into_iter()
+        .map(|source| {
+            let enabled = source.status_config.map(|status| status.enabled).unwrap_or(true);
+            (source.id, enabled)
+        })
+}
+
+async fn set_source_enabled(amber: &AmberProxy, id: String, enabled: bool) {
+    let verb = if enabled { "enabl" } else { "disabl" };
+    fx_log_info!("{}ing source {}", verb, id.as_str());
+    match await!(amber.set_src_enabled(id.as_str(), enabled)) {
+        Ok(AmberStatus::Ok) => {}
+        Ok(status) => {
+            fx_log_err!("unable to {}e Amber source {:?}: {:?}", verb, id, status);
+        }
+        Err(err) => {
+            fx_log_err!("while {}ing Amber source {:?}: {:?}", verb, id, err);
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        super::*,
+        crate::test_util::{CloneAmberConnector, ClosedAmberConnector, FailAmberConnector},
+        failure::Error,
+        fidl_fuchsia_amber::{
+            ControlMarker as AmberMarker, ControlRequest as Request,
+            ControlRequestStream as RequestStream, SourceConfig, Status,
+        },
+        fidl_fuchsia_amber_ext::SourceConfigBuilder,
+        fuchsia_async as fasync,
+    };
+
+    const ROOT_KEY: &str = "00112233445566778899aabbccddeeffffeeddccbbaa99887766554433221100";
+
+    enum Enablement {
+        Enabled,
+        Disabled,
+    }
+    use Enablement::*;
+
+    fn make_source(id: &str, enabled: Enablement) -> SourceConfig {
+        SourceConfigBuilder::new(id)
+            .add_root_key(ROOT_KEY)
+            .enabled(match enabled {
+                Enabled => true,
+                Disabled => false,
+            })
+            .build()
+            .into()
+    }
+
+    #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
+    enum Record {
+        ListSrcs,
+        SetSrcEnabled { id: String, enabled: bool },
+    }
+
+    async fn serve_amber(stream: RequestStream, mut sources: Vec<SourceConfig>) -> Vec<Record> {
+        await!(stream
+            .map(|res| res.unwrap())
+            .map(|request| {
+                match request {
+                    Request::ListSrcs { responder } => {
+                        responder.send(&mut sources.iter_mut()).unwrap();
+                        Record::ListSrcs
+                    }
+                    Request::SetSrcEnabled { id, enabled, responder } => {
+                        responder.send(Status::Ok).unwrap();
+                        Record::SetSrcEnabled { id, enabled }
+                    }
+                    _ => panic!("unexpected request"),
+                }
+            })
+            .collect::<Vec<_>>())
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_disable_all_sources() -> Result<(), Error> {
+        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<AmberMarker>()?;
+        let proxy = CloneAmberConnector::new(proxy);
+        fasync::spawn(async move {
+            await!(proxy.disable_all_sources());
+        });
+
+        let records = await!(serve_amber(
+            stream,
+            vec![make_source("disabled", Disabled), make_source("enabled", Enabled)]
+        ));
+
+        assert_eq!(
+            records,
+            vec![
+                Record::ListSrcs,
+                Record::SetSrcEnabled { id: "enabled".to_owned(), enabled: false }
+            ]
+        );
+
+        Ok(())
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_enable_source_exclusive() -> Result<(), Error> {
+        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<AmberMarker>()?;
+        let proxy = CloneAmberConnector::new(proxy);
+        fasync::spawn(async move {
+            await!(proxy.enable_source_exclusive("to_enable"));
+        });
+
+        let sources = vec![
+            make_source("disabled", Disabled),
+            make_source("to_disable1", Enabled),
+            make_source("to_disable2", Enabled),
+            make_source("to_enable", Disabled),
+        ];
+        let mut records = await!(serve_amber(stream, sources));
+
+        // List must happen first.
+        assert_eq!(records.remove(0), Record::ListSrcs);
+
+        // The source management requests can be handled in any order.
+        records.sort_unstable();
+        assert_eq!(
+            records,
+            vec![
+                Record::SetSrcEnabled { id: "to_disable1".to_owned(), enabled: false },
+                Record::SetSrcEnabled { id: "to_disable2".to_owned(), enabled: false },
+                Record::SetSrcEnabled { id: "to_enable".to_owned(), enabled: true },
+            ]
+        );
+
+        Ok(())
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_attempts_to_enable_nonexistant_source() -> Result<(), Error> {
+        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<AmberMarker>()?;
+        let proxy = CloneAmberConnector::new(proxy);
+        fasync::spawn(async move {
+            await!(proxy.enable_source_exclusive("does_not_exist"));
+        });
+
+        let sources =
+            vec![make_source("to_disable", Enabled), make_source("already_disabled", Disabled)];
+        let mut records = await!(serve_amber(stream, sources));
+
+        // List must happen first.
+        assert_eq!(records.remove(0), Record::ListSrcs);
+
+        // The source management requests can be handled in any order.
+        records.sort_unstable();
+        assert_eq!(
+            records,
+            vec![
+                Record::SetSrcEnabled { id: "does_not_exist".to_owned(), enabled: true },
+                Record::SetSrcEnabled { id: "to_disable".to_owned(), enabled: false },
+            ]
+        );
+
+        Ok(())
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_doesnt_crash_if_amber_wont_connect() {
+        let proxy = FailAmberConnector;
+        await!(proxy.enable_source_exclusive("fake"));
+        await!(proxy.disable_all_sources());
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_doesnt_crash_if_amber_is_closed() {
+        let proxy = ClosedAmberConnector;
+        await!(proxy.enable_source_exclusive("fake"));
+        await!(proxy.disable_all_sources());
+    }
+}
diff --git a/garnet/bin/pkg_resolver/src/main.rs b/garnet/bin/pkg_resolver/src/main.rs
index 809dccb..4c80f03 100644
--- a/garnet/bin/pkg_resolver/src/main.rs
+++ b/garnet/bin/pkg_resolver/src/main.rs
@@ -19,6 +19,7 @@
     std::sync::Arc,
 };
 
+mod amber;
 mod amber_connector;
 mod repository_manager;
 mod repository_service;
@@ -55,7 +56,9 @@
     let inspector = fuchsia_inspect::Inspector::new();
     let rewrite_inspect_node = inspector.root().create_child("rewrite_manager");
 
-    let repo_manager = Arc::new(RwLock::new(load_repo_manager()));
+    let amber_connector = AmberConnector::new();
+
+    let repo_manager = Arc::new(RwLock::new(load_repo_manager(amber_connector.clone())));
     let rewrite_manager = Arc::new(RwLock::new(load_rewrite_manager(rewrite_inspect_node)));
 
     let resolver_cb = {
@@ -89,7 +92,8 @@
     };
 
     let rewrite_cb = move |stream| {
-        let mut rewrite_service = RewriteService::new(rewrite_manager.clone());
+        let mut rewrite_service =
+            RewriteService::new(rewrite_manager.clone(), amber_connector.clone());
 
         fasync::spawn(
             async move { await!(rewrite_service.handle_client(stream)) }
@@ -112,10 +116,10 @@
     Ok(())
 }
 
-fn load_repo_manager() -> RepositoryManager<AmberConnector> {
+fn load_repo_manager(amber_connector: AmberConnector) -> RepositoryManager<AmberConnector> {
     // report any errors we saw, but don't error out because otherwise we won't be able
     // to update the system.
-    RepositoryManagerBuilder::new(DYNAMIC_REPO_PATH, AmberConnector::new())
+    RepositoryManagerBuilder::new(DYNAMIC_REPO_PATH, amber_connector)
         .unwrap_or_else(|(builder, err)| {
             fx_log_err!("error loading dynamic repo config: {}", err);
             builder
diff --git a/garnet/bin/pkg_resolver/src/rewrite_manager.rs b/garnet/bin/pkg_resolver/src/rewrite_manager.rs
index e58449d..523ae37 100644
--- a/garnet/bin/pkg_resolver/src/rewrite_manager.rs
+++ b/garnet/bin/pkg_resolver/src/rewrite_manager.rs
@@ -114,6 +114,11 @@
         }
     }
 
+    /// Finds the first rule, if any, that remaps all of `fuchsia.com`.
+    pub fn amber_source_name(&self) -> Option<String> {
+        self.list().filter_map(|rule| rule.fuchsia_replacement()).next()
+    }
+
     /// Return an iterator through all rewrite rules in the order they should be applied to
     /// incoming `fuchsia-pkg://` URLs.
     pub fn list<'a>(&'a self) -> impl Iterator<Item = &'a Rule> {
diff --git a/garnet/bin/pkg_resolver/src/rewrite_service.rs b/garnet/bin/pkg_resolver/src/rewrite_service.rs
index 33dc98b..abe7b78 100644
--- a/garnet/bin/pkg_resolver/src/rewrite_service.rs
+++ b/garnet/bin/pkg_resolver/src/rewrite_service.rs
@@ -3,7 +3,10 @@
 // found in the LICENSE file.
 
 use {
-    crate::rewrite_manager::{CommitError, RewriteManager},
+    crate::{
+        amber::AmberSourceSelector,
+        rewrite_manager::{CommitError, RewriteManager},
+    },
     failure::Error,
     fidl_fuchsia_pkg_rewrite::{
         EditTransactionRequest, EditTransactionRequestStream, EngineRequest, EngineRequestStream,
@@ -20,13 +23,20 @@
 const LIST_CHUNK_SIZE: usize = 100;
 
 #[derive(Debug, Clone)]
-pub struct RewriteService {
+pub struct RewriteService<A>
+where
+    A: AmberSourceSelector,
+{
     state: Arc<RwLock<RewriteManager>>,
+    amber: A,
 }
 
-impl RewriteService {
-    pub fn new(state: Arc<RwLock<RewriteManager>>) -> Self {
-        RewriteService { state }
+impl<A> RewriteService<A>
+where
+    A: AmberSourceSelector + 'static,
+{
+    pub fn new(state: Arc<RwLock<RewriteManager>>, amber: A) -> Self {
+        RewriteService { state, amber }
     }
 
     pub async fn handle_client(&mut self, mut stream: EngineRequestStream) -> Result<(), Error> {
@@ -68,6 +78,7 @@
 
     pub(self) fn serve_edit_transaction(&mut self, mut stream: EditTransactionRequestStream) {
         let state = self.state.clone();
+        let amber = self.amber.clone();
         let mut transaction = state.read().transaction();
 
         fasync::spawn(
@@ -88,10 +99,38 @@
                             responder.send(status.into_raw())?;
                         }
                         EditTransactionRequest::Commit { responder } => {
-                            let status = match state.write().apply(transaction) {
-                                Ok(()) => Status::OK,
-                                Err(CommitError::TooLate) => Status::UNAVAILABLE,
+                            let (status, work) = {
+                                let mut state = state.write();
+                                let old_source_name = state.amber_source_name();
+                                match state.apply(transaction) {
+                                    Ok(()) => {
+                                        // Before reporting success, make a best effort to push the
+                                        // rewrite rule configs down to the Amber service.
+                                        let work =
+                                            match (old_source_name, state.amber_source_name()) {
+                                                (None, None) => None,
+                                                (Some(_), None) => {
+                                                    Some(amber.disable_all_sources())
+                                                }
+                                                (None, Some(after)) => {
+                                                    Some(amber.enable_source_exclusive(&after))
+                                                }
+                                                (Some(before), Some(after)) => {
+                                                    if before != after {
+                                                        Some(amber.enable_source_exclusive(&after))
+                                                    } else {
+                                                        None
+                                                    }
+                                                }
+                                            };
+                                        (Status::OK, work)
+                                    }
+                                    Err(CommitError::TooLate) => (Status::UNAVAILABLE, None),
+                                }
                             };
+                            if let Some(work) = work {
+                                await!(work);
+                            }
                             responder.send(status.into_raw())?;
                             return Ok(());
                         }
@@ -113,6 +152,8 @@
         crate::rewrite_manager::{tests::make_rule_config, RewriteManagerBuilder, Transaction},
         fidl::endpoints::create_proxy_and_stream,
         fidl_fuchsia_pkg_rewrite::{EditTransactionMarker, RuleIteratorMarker},
+        futures::future::BoxFuture,
+        parking_lot::Mutex,
     };
 
     macro_rules! rule {
@@ -128,13 +169,50 @@
         };
     }
 
+    /// Given the future of a FIDL API call, wait for it to complete, and assert that it
+    /// successfully returns the given [`fuchsia_zircon::Status`].
+    macro_rules! assert_yields_status {
+        ($expr:expr, $status:expr) => {
+            assert_eq!(Status::from_raw(await!($expr).unwrap()), $status);
+        };
+    }
+
+    #[derive(Debug, Clone, Default)]
+    struct FakeAmberSourceSelector {
+        events: Arc<Mutex<Vec<AmberSourceEvent>>>,
+    }
+
+    #[derive(Debug, Clone, PartialEq, Eq)]
+    enum AmberSourceEvent {
+        DisableAllSources,
+        EnableSource(String),
+    }
+
+    impl FakeAmberSourceSelector {
+        fn take_events(&mut self) -> Vec<AmberSourceEvent> {
+            std::mem::replace(&mut self.events.lock(), vec![])
+        }
+    }
+
+    impl AmberSourceSelector for FakeAmberSourceSelector {
+        fn disable_all_sources(&self) -> BoxFuture<'_, ()> {
+            self.events.lock().push(AmberSourceEvent::DisableAllSources);
+            future::ready(()).boxed()
+        }
+        fn enable_source_exclusive(&self, id: &str) -> BoxFuture<'_, ()> {
+            self.events.lock().push(AmberSourceEvent::EnableSource(id.to_owned()));
+            future::ready(()).boxed()
+        }
+    }
+
     async fn verify_list_call(
         state: Arc<RwLock<RewriteManager>>,
         expected: Vec<fidl_fuchsia_pkg_rewrite::Rule>,
     ) {
         let (client, request_stream) = create_proxy_and_stream::<RuleIteratorMarker>().unwrap();
+        let mut amber = FakeAmberSourceSelector::default();
 
-        RewriteService::new(state).serve_list(request_stream);
+        RewriteService::new(state, amber.clone()).serve_list(request_stream);
 
         let mut rules = Vec::new();
         loop {
@@ -148,6 +226,7 @@
         assert_eq!(rules, expected);
 
         assert!(await!(client.next()).unwrap().is_empty());
+        assert_eq!(amber.take_events(), vec![]);
     }
 
     #[fasync::run_until_stalled(test)]
@@ -182,14 +261,15 @@
         let state = Arc::new(RwLock::new(
             RewriteManagerBuilder::new(node, &dynamic_config).unwrap().build(),
         ));
-        let mut service = RewriteService::new(state.clone());
+        let mut amber = FakeAmberSourceSelector::default();
+        let mut service = RewriteService::new(state.clone(), amber.clone());
 
         let (client, request_stream) = create_proxy_and_stream::<EditTransactionMarker>().unwrap();
-
         service.serve_edit_transaction(request_stream);
 
         client.reset_all().unwrap();
-        assert_eq!(Status::from_raw(await!(client.commit()).unwrap()), Status::OK);
+        assert_yields_status!(client.commit(), Status::OK);
+        assert_eq!(amber.take_events(), vec![]);
 
         assert_eq!(state.read().transaction(), Transaction::new(vec![], 1));
     }
@@ -206,7 +286,8 @@
         let state = Arc::new(RwLock::new(
             RewriteManagerBuilder::new(node, &dynamic_config).unwrap().build(),
         ));
-        let mut service = RewriteService::new(state.clone());
+        let amber = FakeAmberSourceSelector::default();
+        let mut service = RewriteService::new(state.clone(), amber);
 
         let client1 = {
             let (client, request_stream) =
@@ -226,13 +307,10 @@
         client2.reset_all().unwrap();
 
         let rule = rule!("fuchsia.com" => "fuchsia.com", "/foo" => "/foo");
-        assert_eq!(
-            Status::from_raw(await!(client1.add(&mut rule.clone().into())).unwrap()),
-            Status::OK
-        );
+        assert_yields_status!(client1.add(&mut rule.clone().into()), Status::OK);
 
-        assert_eq!(Status::from_raw(await!(client1.commit()).unwrap()), Status::OK);
-        assert_eq!(Status::from_raw(await!(client2.commit()).unwrap()), Status::UNAVAILABLE);
+        assert_yields_status!(client1.commit(), Status::OK);
+        assert_yields_status!(client2.commit(), Status::UNAVAILABLE);
 
         assert_eq!(state.read().transaction(), Transaction::new(vec![rule], 1),);
 
@@ -243,7 +321,7 @@
             client
         };
         client2.reset_all().unwrap();
-        assert_eq!(Status::from_raw(await!(client2.commit()).unwrap()), Status::OK);
+        assert_yields_status!(client2.commit(), Status::OK);
         assert_eq!(state.read().transaction(), Transaction::new(vec![], 2));
     }
 
@@ -255,7 +333,8 @@
         let state = Arc::new(RwLock::new(
             RewriteManagerBuilder::new(node, &dynamic_config).unwrap().build(),
         ));
-        let mut service = RewriteService::new(state.clone());
+        let mut amber = FakeAmberSourceSelector::default();
+        let mut service = RewriteService::new(state.clone(), amber.clone());
 
         await!(verify_list_call(state.clone(), vec![]));
 
@@ -268,23 +347,102 @@
 
         let rule = rule!("fuchsia.com" => "devhost.fuchsia.com", "/" => "/");
 
-        assert_eq!(
-            Status::from_raw(await!(edit_client.add(&mut rule.clone().into())).unwrap()),
-            Status::OK
-        );
+        assert_yields_status!(edit_client.add(&mut rule.clone().into()), Status::OK);
 
         await!(verify_list_call(state.clone(), vec![]));
 
         let long_list_call = {
             let (client, request_stream) = create_proxy_and_stream::<RuleIteratorMarker>().unwrap();
-            RewriteService::new(state.clone()).serve_list(request_stream);
+            service.serve_list(request_stream);
             client
         };
 
-        assert_eq!(Status::from_raw(await!(edit_client.commit()).unwrap()), Status::OK);
+        assert_yields_status!(edit_client.commit(), Status::OK);
 
         assert_eq!(await!(long_list_call.next()).unwrap(), vec![]);
 
         await!(verify_list_call(state.clone(), vec![rule.into()]));
+        assert_eq!(
+            amber.take_events(),
+            vec![AmberSourceEvent::EnableSource("devhost.fuchsia.com".to_owned())]
+        );
+    }
+
+    #[fasync::run_until_stalled(test)]
+    async fn test_enables_amber_source() {
+        let inspector = fuchsia_inspect::Inspector::new();
+        let node = inspector.root().create_child("rewrite-manager");
+        let dynamic_config = make_rule_config(vec![]);
+        let state = Arc::new(RwLock::new(
+            RewriteManagerBuilder::new(node, &dynamic_config).unwrap().build(),
+        ));
+        let mut amber = FakeAmberSourceSelector::default();
+        let mut service = RewriteService::new(state.clone(), amber.clone());
+
+        let (client, request_stream) = create_proxy_and_stream::<EditTransactionMarker>().unwrap();
+        service.serve_edit_transaction(request_stream);
+
+        let rules = vec![
+            rule!("example.com" => "wrong_host.fuchsia.com", "/" => "/"),
+            rule!("fuchsia.com" => "correct.fuchsia.com", "/" => "/"),
+            rule!("fuchsia.com" => "wrong_priority.fuchsia.com", "/" => "/"),
+            rule!("fuchsia.com" => "wrong_match.fuchsia.com", "/foo/" => "/"),
+            rule!("fuchsia.com" => "wrong_replacement.fuchsia.com", "/" => "/bar/"),
+        ];
+        for rule in rules.into_iter().rev() {
+            assert_yields_status!(client.add(&mut rule.into()), Status::OK);
+        }
+        assert_yields_status!(client.commit(), Status::OK);
+
+        assert_eq!(
+            amber.take_events(),
+            vec![AmberSourceEvent::EnableSource("correct.fuchsia.com".to_owned())]
+        );
+
+        // Adding a duplicate of the currently enabled source is a no-op.
+        let (client, request_stream) = create_proxy_and_stream::<EditTransactionMarker>().unwrap();
+        service.serve_edit_transaction(request_stream);
+        let rule = rule!("fuchsia.com" => "correct.fuchsia.com", "/" => "/");
+        assert_yields_status!(client.add(&mut rule.into()), Status::OK);
+        assert_yields_status!(client.commit(), Status::OK);
+        assert_eq!(amber.take_events(), vec![]);
+
+        // Adding a different entry with higher priority enables the new source.
+        let (client, request_stream) = create_proxy_and_stream::<EditTransactionMarker>().unwrap();
+        service.serve_edit_transaction(request_stream);
+        let rule = rule!("fuchsia.com" => "correcter.fuchsia.com", "/" => "/");
+        assert_yields_status!(client.add(&mut rule.into()), Status::OK);
+        assert_yields_status!(client.commit(), Status::OK);
+        assert_eq!(
+            amber.take_events(),
+            vec![AmberSourceEvent::EnableSource("correcter.fuchsia.com".to_owned())]
+        );
+    }
+
+    #[fasync::run_until_stalled(test)]
+    async fn test_disables_amber_sources() {
+        let rules = vec![rule!("fuchsia.com" => "enabled.fuchsia.com", "/" => "/")];
+
+        let inspector = fuchsia_inspect::Inspector::new();
+        let node = inspector.root().create_child("rewrite-manager");
+        let dynamic_config = make_rule_config(rules);
+        let state = Arc::new(RwLock::new(
+            RewriteManagerBuilder::new(node, &dynamic_config).unwrap().build(),
+        ));
+        let mut amber = FakeAmberSourceSelector::default();
+        let mut service = RewriteService::new(state.clone(), amber.clone());
+
+        let (client, request_stream) = create_proxy_and_stream::<EditTransactionMarker>().unwrap();
+        service.serve_edit_transaction(request_stream);
+        client.reset_all().unwrap();
+        assert_yields_status!(client.commit(), Status::OK);
+        assert_eq!(amber.take_events(), vec![AmberSourceEvent::DisableAllSources]);
+
+        // Edits that don't change the enabled source are a no-op.
+        let (client, request_stream) = create_proxy_and_stream::<EditTransactionMarker>().unwrap();
+        service.serve_edit_transaction(request_stream);
+        client.reset_all().unwrap();
+        assert_yields_status!(client.commit(), Status::OK);
+        assert_eq!(amber.take_events(), vec![]);
     }
 }
diff --git a/garnet/bin/pkg_resolver/src/test_util.rs b/garnet/bin/pkg_resolver/src/test_util.rs
index 91ec087..bf86d37 100644
--- a/garnet/bin/pkg_resolver/src/test_util.rs
+++ b/garnet/bin/pkg_resolver/src/test_util.rs
@@ -295,7 +295,33 @@
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
+pub(crate) struct CloneAmberConnector {
+    amber: AmberProxy,
+}
+
+impl CloneAmberConnector {
+    pub(crate) fn new(amber: AmberProxy) -> Self {
+        Self { amber }
+    }
+}
+
+impl AmberConnect for CloneAmberConnector {
+    fn connect(&self) -> Result<AmberProxy, fuchsia_zircon::Status> {
+        Ok(self.amber.clone())
+    }
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct FailAmberConnector;
+
+impl AmberConnect for FailAmberConnector {
+    fn connect(&self) -> Result<AmberProxy, Status> {
+        Err(Status::INTERNAL)
+    }
+}
+
+#[derive(Debug, Clone)]
 pub(crate) struct ClosedAmberConnector;
 
 impl AmberConnect for ClosedAmberConnector {
diff --git a/garnet/go/src/amber/amberctl/amberctl.go b/garnet/go/src/amber/amberctl/amberctl.go
index 5271113..de9436a 100644
--- a/garnet/go/src/amber/amberctl/amberctl.go
+++ b/garnet/go/src/amber/amberctl/amberctl.go
@@ -49,24 +49,23 @@
         -n: name of the update source (optional, with URL)
         -f: file path or url to a source config file
         -h: SHA256 hash of source config file (optional, with URL)
-        -x: do not disable other active sources (if the provided source is enabled)
+        -x: [Obsolete] do not disable other active sources (if the provided source is enabled)
 
     add_repo_cfg  - add a repository config to the set of known repositories, using a source config
         -n: name of the update source (optional, with URL)
         -f: file path or url to a source config file
         -h: SHA256 hash of source config file (optional, with URL)
 
-    rm_src        - remove a source, if it exists
+    rm_src        - remove a source, if it exists, disabling all remaining sources
         -n: name of the update source
 
     list_srcs     - list the set of sources we can use
 
     enable_src
         -n: name of the update source
-        -x: do not disable other active sources
+        -x: [Obsolete] do not disable other active sources
 
-    disable_src
-        -n: name of the update source
+    disable_src   - disables all sources
 
     system_update - check for, download, and apply any available system update
 
@@ -83,7 +82,7 @@
 	version      = fs.String("v", "", "Version of a package")
 	blobID       = fs.String("i", "", "Content ID of the blob")
 	merkle       = fs.String("m", "", "Merkle root of the desired update.")
-	nonExclusive = fs.Bool("x", false, "When adding or enabling a source, do not disable other sources.")
+	nonExclusive = fs.Bool("x", false, "[Obsolete] When adding or enabling a source, do not disable other sources.")
 )
 
 type ErrGetFile string
@@ -208,14 +207,14 @@
 }
 
 func repoUrlForId(id string) string {
-	return fmt.Sprintf("fuchsia-pkg://%s", sanitizeId(id))
+	return fmt.Sprintf("fuchsia-pkg://%s", id)
 }
 
 func rewriteRuleForId(id string) rewrite.Rule {
 	var rule rewrite.Rule
 	rule.SetLiteral(rewrite.LiteralRule{
 		HostMatch:             "fuchsia.com",
-		HostReplacement:       sanitizeId(id),
+		HostReplacement:       id,
 		PathPrefixMatch:       "/",
 		PathPrefixReplacement: "/",
 	})
@@ -357,6 +356,8 @@
 
 	if *name != "" {
 		cfg.Id = *name
+	} else {
+		cfg.Id = sanitizeId(cfg.Id)
 	}
 
 	// Update the host segment of the URL with the original if it appears to have
@@ -387,11 +388,6 @@
 			return fmt.Errorf("request arguments properly formatted, but possibly otherwise invalid")
 		}
 
-		if isSourceConfigEnabled(&cfg) && !*nonExclusive {
-			if err := disableAllSources(services.amber, cfg.Id); err != nil {
-				return err
-			}
-		}
 	}
 
 	repoCfg := upgradeSourceConfig(cfg)
@@ -496,18 +492,6 @@
 	return nil
 }
 
-func setSourceEnablement(a *amber.ControlInterface, id string, enabled bool) error {
-	status, err := a.SetSrcEnabled(id, enabled)
-	if err != nil {
-		return fmt.Errorf("call failure attempting to change source status: %s", err)
-	}
-	if status != amber.StatusOk {
-		return fmt.Errorf("failure changing source status")
-	}
-
-	return nil
-}
-
 func isSourceConfigEnabled(cfg *amber.SourceConfig) bool {
 	if cfg.StatusConfig == nil {
 		return true
@@ -515,28 +499,6 @@
 	return cfg.StatusConfig.Enabled
 }
 
-func disableAllSources(a *amber.ControlInterface, except string) error {
-	errorIds := []string{}
-	cfgs, err := a.ListSrcs()
-	if err != nil {
-		return err
-	}
-	for _, cfg := range cfgs {
-		if cfg.Id != except && isSourceConfigEnabled(&cfg) {
-			if err := setSourceEnablement(a, cfg.Id, false); err != nil {
-				log.Printf("error disabling %q: %s", cfg.Id, err)
-				errorIds = append(errorIds, fmt.Sprintf("%q", cfg.Id))
-			} else {
-				fmt.Printf("Source %q disabled\n", cfg.Id)
-			}
-		}
-	}
-	if len(errorIds) > 0 {
-		return fmt.Errorf("could not disable %s", strings.Join(errorIds, ", "))
-	}
-	return nil
-}
-
 func do(services Services) int {
 	switch os.Args[1] {
 	case "get_up":
@@ -606,34 +568,18 @@
 			log.Printf("Error enabling source: no source id provided")
 			return 1
 		}
-		err := setSourceEnablement(services.amber, *name, true)
-		if err != nil {
-			log.Printf("Error enabling source: %s", err)
-			return 1
-		}
-		err = replaceDynamicRewriteRules(services.rewriteEngine, rewriteRuleForId(*name))
+		err := replaceDynamicRewriteRules(services.rewriteEngine, rewriteRuleForId(*name))
 		if err != nil {
 			log.Printf("Error configuring rewrite rules: %s", err)
 			return 1
 		}
 		fmt.Printf("Source %q enabled\n", *name)
-		if !*nonExclusive {
-			if err := disableAllSources(services.amber, *name); err != nil {
-				log.Printf("Error disabling sources: %s", err)
-				return 1
-			}
-		}
 	case "disable_src":
-		if *name == "" {
-			log.Printf("Error disabling source: no source id provided")
-			return 1
+		if *name != "" {
+			log.Printf("\"name\" parameter is now ignored: disabling all sources.\n"+
+				"To enable a specific source, use 'amberctl enable_src -n %q'", *name)
 		}
-		err := setSourceEnablement(services.amber, *name, false)
-		if err != nil {
-			log.Printf("Error disabling source: %s", err)
-			return 1
-		}
-		err = removeAllDynamicRewriteRules(services.rewriteEngine)
+		err := removeAllDynamicRewriteRules(services.rewriteEngine)
 		if err != nil {
 			log.Printf("Error configuring rewrite rules: %s", err)
 			return 1
@@ -694,6 +640,14 @@
 
 	fs.Parse(os.Args[2:])
 
+	if *name != "" {
+		*name = sanitizeId(*name)
+	}
+
+	if *nonExclusive {
+		fmt.Println(`Warning: -x is no longer supported.`)
+	}
+
 	ctx := context.CreateFromStartupInfo()
 
 	var services Services
diff --git a/garnet/lib/rust/fidl_fuchsia_amber_ext/src/types.rs b/garnet/lib/rust/fidl_fuchsia_amber_ext/src/types.rs
index 9f9768c..a076d89 100644
--- a/garnet/lib/rust/fidl_fuchsia_amber_ext/src/types.rs
+++ b/garnet/lib/rust/fidl_fuchsia_amber_ext/src/types.rs
@@ -35,6 +35,11 @@
         }
     }
 
+    pub fn id(mut self, id: impl Into<String>) -> Self {
+        self.config.id = id.into();
+        self
+    }
+
     pub fn repo_url(mut self, value: impl Into<String>) -> Self {
         self.config.repo_url = value.into();
         self.config.blob_repo_url = format!("{}/blobs", self.config.repo_url);
@@ -369,6 +374,7 @@
     prop_compose! {
         fn arb_source_builder()(
             id in "[[:alnum:]]+",
+            id2 in prop::option::of("[[:alnum:]]+"),
             repo_url in prop::option::of(arb_url()),
             rate_period in prop::option::of(any::<i32>()),
             auto in prop::option::of(any::<bool>()),
@@ -376,6 +382,9 @@
             enabled in prop::option::of(any::<bool>()),
         ) -> SourceConfigBuilder {
             let mut builder = SourceConfigBuilder::new(id);
+            if let Some(id2) = id2 {
+                builder = builder.id(id2);
+            }
             if let Some(repo_url) = repo_url {
                 builder = builder.repo_url(repo_url);
             }
diff --git a/garnet/tests/amberctl/src/lib.rs b/garnet/tests/amberctl/src/lib.rs
index 36595dd..5205611 100644
--- a/garnet/tests/amberctl/src/lib.rs
+++ b/garnet/tests/amberctl/src/lib.rs
@@ -157,8 +157,6 @@
                 .add_dir_to_namespace("/configs/test.json".to_string(), config_file)
                 .expect("static /configs to mount")
                 .arg("add_src")
-                // Run amberctl in non-exclusive mode so it doesn't disable existing source configs
-                .arg("-x")
                 .arg("-f=/configs/test.json")
         ));
     }
@@ -317,17 +315,16 @@
 
     let source = SourceConfigBuilder::new("http://10.0.0.1:8083")
         .repo_url("http://10.0.0.1:8083")
-        .add_root_key(ROOT_KEY_1)
-        .build();
+        .add_root_key(ROOT_KEY_1);
 
     let repo = RepositoryConfigBuilder::new("fuchsia-pkg://http___10_0_0_1_8083".parse().unwrap())
         .add_root_key(RepositoryKey::Ed25519(hex::decode(ROOT_KEY_1).unwrap()))
         .add_mirror(MirrorConfigBuilder::new("http://10.0.0.1:8083"))
         .build();
 
-    await!(env.run_amberctl_add_src(source.clone()));
+    await!(env.run_amberctl_add_src(source.clone().build()));
 
-    assert_eq!(await!(env.amber_list_sources()), vec![source]);
+    assert_eq!(await!(env.amber_list_sources()), vec![source.id("http___10_0_0_1_8083").build()]);
     assert_eq!(await!(env.resolver_list_repos()), vec![repo]);
     assert_eq!(
         await!(env.rewrite_engine_list_rules()),
@@ -341,8 +338,7 @@
 
     let source = SourceConfigBuilder::new("http://[fe80::1122:3344]:8083")
         .repo_url("http://[fe80::1122:3344]:8083")
-        .add_root_key(ROOT_KEY_1)
-        .build();
+        .add_root_key(ROOT_KEY_1);
 
     let repo = RepositoryConfigBuilder::new(
         "fuchsia-pkg://http____fe80__1122_3344__8083".parse().unwrap(),
@@ -351,9 +347,12 @@
     .add_mirror(MirrorConfigBuilder::new("http://[fe80::1122:3344]:8083"))
     .build();
 
-    await!(env.run_amberctl_add_src(source.clone()));
+    await!(env.run_amberctl_add_src(source.clone().build()));
 
-    assert_eq!(await!(env.amber_list_sources()), vec![source]);
+    assert_eq!(
+        await!(env.amber_list_sources()),
+        vec![source.id("http____fe80__1122_3344__8083").build()]
+    );
     assert_eq!(await!(env.resolver_list_repos()), vec![repo]);
     assert_eq!(
         await!(env.rewrite_engine_list_rules()),
@@ -429,14 +428,13 @@
     let cfg_b = SourceConfigBuilder::new("b")
         .repo_url("http://example.com/b")
         .rate_period(60)
-        .add_root_key(ROOT_KEY_2)
-        .build();
+        .add_root_key(ROOT_KEY_2);
 
     await!(env.run_amberctl_add_src(cfg_a.clone().into()));
-    await!(env.run_amberctl_add_src(cfg_b.clone().into()));
+    await!(env.run_amberctl_add_src(cfg_b.clone().build().into()));
 
     await!(env.run_amberctl(&["rm_src", "-n", "http://[fe80::1122:3344]:8083"]));
-    assert_eq!(await!(env.amber_list_sources()), vec![cfg_b]);
+    assert_eq!(await!(env.amber_list_sources()), vec![cfg_b.enabled(false).build()]);
     assert_eq!(
         await!(env.resolver_list_repos()),
         vec![RepositoryConfigBuilder::new("fuchsia-pkg://b".parse().unwrap())
@@ -524,7 +522,7 @@
 }
 
 #[fasync::run_singlethreaded(test)]
-async fn test_disable_src() {
+async fn test_disable_src_disables_all_sources() {
     let env = TestEnv::new();
 
     let cfg_a = SourceConfigBuilder::new("a")
@@ -540,11 +538,11 @@
     await!(env.run_amberctl_add_src(cfg_a.clone().build().into()));
     await!(env.run_amberctl_add_src(cfg_b.clone().build().into()));
 
-    await!(env.run_amberctl(&["disable_src", "-n", "a"]));
+    await!(env.run_amberctl(&["disable_src"]));
 
     assert_eq!(
         await!(env.amber_list_sources()),
-        vec![cfg_a.enabled(false).build(), cfg_b.enabled(true).build().into(),]
+        vec![cfg_a.enabled(false).build(), cfg_b.enabled(false).build().into(),]
     );
     assert_eq!(
         await!(env.resolver_list_repos()),
diff --git a/src/sys/lib/fuchsia_url_rewrite/src/rule.rs b/src/sys/lib/fuchsia_url_rewrite/src/rule.rs
index d38a974..7707a5d 100644
--- a/src/sys/lib/fuchsia_url_rewrite/src/rule.rs
+++ b/src/sys/lib/fuchsia_url_rewrite/src/rule.rs
@@ -134,6 +134,24 @@
             _node: node,
         }
     }
+
+    /// Determines the replacement source id, if this rule rewrites all of "fuchsia.com".
+    pub fn fuchsia_replacement(&self) -> Option<String> {
+        if self.host_match == "fuchsia.com" && self.path_prefix_match == "/" {
+            if let Some(n) = self.host_replacement.rfind(".fuchsia.com") {
+                let (host_replacement, _) = self.host_replacement.split_at(n);
+                host_replacement
+                    .split('.')
+                    .nth(1)
+                    .map(|s| s.to_owned())
+                    .or_else(|| Some(self.host_replacement.clone()))
+            } else {
+                Some(self.host_replacement.clone())
+            }
+        } else {
+            None
+        }
+    }
 }
 
 impl TryFrom<fidl::Rule> for Rule {
@@ -322,7 +340,6 @@
 
         assert_eq!(serde_json::to_value(expected).unwrap(), json);
     }
-
 }
 
 #[cfg(test)]
@@ -607,4 +624,56 @@
             );
         }
     }
+
+    #[test]
+    fn test_non_fuchsia_replacement_nontrivial_match_path() {
+        let rules = [
+            Rule::new("fuchsia.com", "fuchsia.com", "/foo", "/bar").unwrap(),
+            Rule::new("fuchsia.com", "fuchsia.com", "/foo/", "/").unwrap(),
+        ];
+
+        for rule in &rules {
+            assert_eq!(rule.fuchsia_replacement(), None);
+        }
+    }
+
+    #[test]
+    fn test_non_fuchsia_replacement_wrong_domain() {
+        let rules = [
+            Rule::new("subdomain.fuchsia.com", "fuchsia.com", "/", "/").unwrap(),
+            Rule::new("example.com", "fuchsia.com", "/", "/").unwrap(),
+        ];
+
+        for rule in &rules {
+            assert_eq!(rule.fuchsia_replacement(), None);
+        }
+    }
+
+    #[test]
+    fn test_fuchsia_replacement_accepts_any_replacements() {
+        let rules = [
+            Rule::new("fuchsia.com", "example.com", "/", "/").unwrap(),
+            Rule::new("fuchsia.com", "fuchsia.com", "/", "/bar/").unwrap(),
+        ];
+
+        for rule in &rules {
+            assert!(rule.fuchsia_replacement().is_some());
+        }
+    }
+
+    fn verify_fuchsia_replacement(
+        host_replacement: impl Into<String>,
+        source_id: impl Into<String>,
+    ) {
+        let rule = Rule::new("fuchsia.com", host_replacement, "/", "/").unwrap();
+        assert_eq!(rule.fuchsia_replacement(), Some(source_id.into()));
+    }
+
+    #[test]
+    fn test_fuchsia_replacement() {
+        verify_fuchsia_replacement("test.example.com", "test.example.com");
+        verify_fuchsia_replacement("test.fuchsia.com", "test.fuchsia.com");
+        verify_fuchsia_replacement("a.b-c.d.fuchsia.com", "b-c");
+        verify_fuchsia_replacement("a.b-c.d.example.com", "a.b-c.d.example.com");
+    }
 }
diff --git a/tools/devshell/add-update-source b/tools/devshell/add-update-source
index 623b17e..0c800fa 100755
--- a/tools/devshell/add-update-source
+++ b/tools/devshell/add-update-source
@@ -5,7 +5,7 @@
 
 ### register dev host as target's update source
 
-## usage: fx add-update-source [--type=TYPE] [--name=NAME] [--disable-source=NAME]
+## usage: fx add-update-source [--type=TYPE] [--name=NAME]
 ##
 ## Configure target device to use a new update source.
 ##
@@ -16,7 +16,6 @@
 ## update source.
 ##
 ## --name=NAME           Name the generated update source config NAME. Defaults to the config type.
-## --disable-source=NAME Disable the update source with NAME after adding the new update source.
 
 source "$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"/lib/vars.sh || exit $?
 fx-config-read
@@ -32,7 +31,6 @@
   config_type="devhost"
   port="8083"
   source_name=""
-  disable_source=""
   while [[ $# -ne 0 ]]; do
     case "$1" in
       --port)
@@ -47,10 +45,6 @@
         source_name="$2"
         shift
         ;;
-      --disable-source)
-        disable_source="$2"
-        shift
-        ;;
       *)
         echo >&2 "Unrecognized option: $1"
         usage
@@ -100,11 +94,6 @@
     fi
     return "$err"
   fi
-
-  if [[ -n "${disable_source}" ]]; then
-    # Best effort, don't show status or fail on error
-    fx-command-run shell amberctl disable_src -n "${disable_source}" >/dev/null 2>/dev/null || true
-  fi
 }
 
 main "$@"