[fdomain] Rename .client() -> .domain() and make infallible

There were some concerns about how overloaded `client` can be in
contexts where we needed to get the client for a handle or FIDL proxy.

In addition, this is no longer fallible. We now return a placeholder
client that has a permanently-failed transport if the client has gone
missing. This should result in `ClientLost` errors being deferred to
handle use time as we have tended to prefer.

Fixed: b/388594413
Change-Id: I3dc4cd24d67a2b5e493fa99d43dca37c9f5aa17b
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/1188754
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Reviewed-by: Ian McKellar <ianloic@google.com>
Fuchsia-Auto-Submit: Casey Dahlin <sadmac@google.com>
diff --git a/src/developer/ffx/lib/rcs/src/lib.rs b/src/developer/ffx/lib/rcs/src/lib.rs
index 85317cb..60a6476 100644
--- a/src/developer/ffx/lib/rcs/src/lib.rs
+++ b/src/developer/ffx/lib/rcs/src/lib.rs
@@ -25,7 +25,7 @@
 
 #[cfg(not(feature = "fdomain"))]
 use {
-    fidl::endpoints::{DiscoverableProtocolMarker, ProxyHasClient},
+    fidl::endpoints::{DiscoverableProtocolMarker, ProxyHasDomain},
     fidl_fuchsia_developer_remotecontrol::{
         ConnectCapabilityError, IdentifyHostError, IdentifyHostResponse, RemoteControlMarker,
         RemoteControlProxy,
@@ -245,7 +245,7 @@
     capability_set: OpenDirType,
     capability_name: &str,
 ) -> Result<KnockClientType, KnockRcsError> {
-    let rcs_client = rcs_proxy.client()?;
+    let rcs_client = rcs_proxy.domain();
     // Try to connect via fuchsia.developer.remotecontrol/RemoteControl.ConnectCapability.
     let (client, server) = rcs_client.create_channel();
     #[cfg(not(feature = "fdomain"))]
@@ -258,7 +258,7 @@
     }
     // Fallback to fuchsia.developer.remotecontrol/RemoteControl.DeprecatedOpenCapability.
     // This can be removed once we drop support for API level 27.
-    let (client, server) = rcs_proxy.client()?.create_channel();
+    let (client, server) = rcs_proxy.domain().create_channel();
     #[cfg(not(feature = "fdomain"))]
     let client = fuchsia_async::Channel::from_channel(client);
     rcs_proxy
@@ -407,7 +407,7 @@
     moniker: &str,
     rcs_proxy: &RemoteControlProxy,
 ) -> Result<P::Proxy> {
-    let (proxy, server_end) = rcs_proxy.client()?.create_proxy::<P>();
+    let (proxy, server_end) = rcs_proxy.domain().create_proxy::<P>();
     connect_with_timeout::<P>(dur, moniker, rcs_proxy, server_end.into_channel()).await?;
     Ok(proxy)
 }
@@ -428,7 +428,7 @@
     rcs_proxy: &RemoteControlProxy,
     timeout: Duration,
 ) -> Result<M::Proxy> {
-    let (proxy, server_end) = rcs_proxy.client()?.create_proxy::<M>();
+    let (proxy, server_end) = rcs_proxy.domain().create_proxy::<M>();
     let start_time = Instant::now();
     let res = open_with_timeout_at(
         timeout,
@@ -444,7 +444,7 @@
         proxy
     } else {
         // Fallback to the legacy remote control moniker if toolbox doesn't contain the capability.
-        let (proxy, server_end) = rcs_proxy.client()?.create_proxy::<M>();
+        let (proxy, server_end) = rcs_proxy.domain().create_proxy::<M>();
         let timeout = timeout.saturating_sub(Instant::now() - start_time);
         open_with_timeout_at(
             timeout,
@@ -465,7 +465,7 @@
     timeout: Duration,
 ) -> Result<proto_fuchsia_kernel::StatsProxy> {
     let (proxy, server_end) =
-        rcs_proxy.client()?.create_proxy::<proto_fuchsia_kernel::StatsMarker>();
+        rcs_proxy.domain().create_proxy::<proto_fuchsia_kernel::StatsMarker>();
     let start_time = Instant::now();
     let res = open_with_timeout_at(
         timeout,
@@ -482,7 +482,7 @@
     } else {
         // Fallback to the legacy remote control moniker if toolbox doesn't contain the capability.
         let (proxy, server_end) =
-            rcs_proxy.client()?.create_proxy::<proto_fuchsia_kernel::StatsMarker>();
+            rcs_proxy.domain().create_proxy::<proto_fuchsia_kernel::StatsMarker>();
         let timeout = timeout.saturating_sub(Instant::now() - start_time);
         open_with_timeout_at(
             timeout,
diff --git a/src/developer/ffx/lib/rcs/src/toolbox.rs b/src/developer/ffx/lib/rcs/src/toolbox.rs
index 15a6ffe..c48d6fe 100644
--- a/src/developer/ffx/lib/rcs/src/toolbox.rs
+++ b/src/developer/ffx/lib/rcs/src/toolbox.rs
@@ -15,7 +15,7 @@
 
 #[cfg(not(feature = "fdomain"))]
 use {
-    fidl::endpoints::{DiscoverableProtocolMarker, ProxyHasClient},
+    fidl::endpoints::{DiscoverableProtocolMarker, ProxyHasDomain},
     fidl_fuchsia_developer_remotecontrol::RemoteControlProxy,
     fidl_fuchsia_io as fio, fidl_fuchsia_sys2 as sys2,
     fidl_fuchsia_sys2::OpenDirType,
@@ -79,7 +79,7 @@
 /// Open the service directory of the toolbox.
 #[cfg(feature = "fdomain")]
 pub async fn open_toolbox(rcs: &RemoteControlProxy) -> Result<fio::DirectoryProxy> {
-    rcs.client()?.namespace().await.map_err(Into::into).map(fio::DirectoryProxy::from_channel)
+    rcs.domain().namespace().await.map_err(Into::into).map(fio::DirectoryProxy::from_channel)
 }
 
 /// Connects to a protocol available in the namespace of the `toolbox` component.
@@ -95,7 +95,7 @@
     P: DiscoverableProtocolMarker,
 {
     let protocol_name = P::PROTOCOL_NAME;
-    let (proxy, server_end) = rcs_proxy.client()?.create_proxy::<P>();
+    let (proxy, server_end) = rcs_proxy.domain().create_proxy::<P>();
     // time this so that we can use an appropriately shorter timeout for the attempt
     // to connect by the backup (if there is one)
     let start_time = Instant::now();
@@ -113,7 +113,7 @@
     let (toolbox_res, proxy) = if toolbox_res.is_ok() {
         (toolbox_res, proxy)
     } else {
-        let (proxy, server_end) = rcs_proxy.client()?.create_proxy::<P>();
+        let (proxy, server_end) = rcs_proxy.domain().create_proxy::<P>();
         let toolbox_took = Instant::now() - start_time;
         let timeout = dur.saturating_sub(toolbox_took);
         (
@@ -147,7 +147,7 @@
     // try to connect to the moniker given instead, but don't double
     // up the timeout.
     let timeout = dur.saturating_sub(toolbox_took);
-    let (proxy, server_end) = rcs_proxy.client()?.create_proxy::<P>();
+    let (proxy, server_end) = rcs_proxy.domain().create_proxy::<P>();
     let moniker_res = crate::open_with_timeout::<P>(
         timeout,
         &backup,
diff --git a/src/developer/ffx/lib/target/holders/src/fdomain/remote_control_proxy.rs b/src/developer/ffx/lib/target/holders/src/fdomain/remote_control_proxy.rs
index b73061c..1337c1e 100644
--- a/src/developer/ffx/lib/target/holders/src/fdomain/remote_control_proxy.rs
+++ b/src/developer/ffx/lib/target/holders/src/fdomain/remote_control_proxy.rs
@@ -9,7 +9,7 @@
     DiscoverableProtocolMarker as FDiscoverableProtocolMarker, Proxy as FProxy,
 };
 use fdomain_fuchsia_developer_remotecontrol::RemoteControlProxy;
-use ffx_command_error::{Error, FfxContext as _, Result};
+use ffx_command_error::{FfxContext as _, Result};
 use fho::{bug, FhoConnectionBehavior, FhoEnvironment, TryFromEnv};
 use std::ops::Deref;
 use std::sync::Arc;
@@ -81,8 +81,7 @@
     P: FProxy + 'static,
     P::Protocol: FDiscoverableProtocolMarker,
 {
-    let (proxy, server_end) =
-        rcs.client().map_err(|e| Error::Unexpected(e.into()))?.create_proxy::<P::Protocol>();
+    let (proxy, server_end) = rcs.domain().create_proxy::<P::Protocol>();
     rcs_fdomain::open_with_timeout::<P::Protocol>(
         timeout,
         moniker,
diff --git a/src/lib/fdomain/client/src/channel.rs b/src/lib/fdomain/client/src/channel.rs
index fd640e7..38965b4 100644
--- a/src/lib/fdomain/client/src/channel.rs
+++ b/src/lib/fdomain/client/src/channel.rs
@@ -4,7 +4,7 @@
 
 use crate::handle::handle_type;
 use crate::responder::Responder;
-use crate::{ordinals, AsHandleRef, Error, Event, EventPair, Handle, OnFDomainSignals, Socket};
+use crate::{ordinals, Error, Event, EventPair, Handle, OnFDomainSignals, Socket};
 use fidl_fuchsia_fdomain as proto;
 use futures::future::Either;
 use futures::stream::Stream;
@@ -135,11 +135,10 @@
 impl Channel {
     /// Reads a message from the channel.
     pub fn recv_msg(&self) -> impl Future<Output = Result<MessageBuf, Error>> {
-        let client = self.0.client.clone();
+        let client = self.0.client();
         let handle = self.0.proto();
 
         futures::future::poll_fn(move |ctx| {
-            let client = client.upgrade().ok_or(Error::ClientLost)?;
             client.poll_channel(handle, ctx, false).map(|x| {
                 x.expect("Got stream termination indication from non-streaming read!")
                     .map(|x| MessageBuf::from_proto(&client, x))
@@ -149,7 +148,7 @@
 
     /// Poll a channel for a message to read.
     pub fn recv_from(&self, cx: &mut Context<'_>, buf: &mut MessageBuf) -> Poll<Result<(), Error>> {
-        let client = self.0.client()?;
+        let client = self.0.client();
         match ready!(client.poll_channel(self.0.proto(), cx, false))
             .expect("Got stream termination indication from non-streaming read!")
         {
@@ -233,16 +232,12 @@
         let client = self.0.client();
         let handle = self.0.proto();
 
-        let result = client.map(move |client| {
-            client.clear_handles_for_transfer(&handles);
-            client.transaction(
-                ordinals::WRITE_CHANNEL,
-                proto::ChannelWriteChannelRequest { handle, data, handles },
-                move |x| Responder::WriteChannel(x, handle),
-            )
-        });
-
-        async move { result?.await }
+        client.clear_handles_for_transfer(&handles);
+        client.transaction(
+            ordinals::WRITE_CHANNEL,
+            proto::ChannelWriteChannelRequest { handle, data, handles },
+            move |x| Responder::WriteChannel(x, handle),
+        )
     }
 
     /// Split this channel into a streaming reader and a writer. This is more
@@ -252,8 +247,7 @@
     /// buffer, so it may lead to memory issues if you don't intend to use the
     /// messages from the channel as fast as they come.
     pub fn stream(self) -> Result<(ChannelMessageStream, ChannelWriter), Error> {
-        let client = self.client()?;
-        client.start_channel_streaming(self.0.proto())?;
+        self.0.client().start_channel_streaming(self.0.proto())?;
 
         let a = Arc::new(self);
         let b = Arc::clone(&a);
@@ -304,7 +298,7 @@
     /// `Channel::stream`.
     pub fn rejoin(mut self, writer: ChannelWriter) -> Channel {
         assert!(Arc::ptr_eq(&self.0, &writer.0), "Tried to join stream with wrong writer!");
-        if let Ok(client) = self.0 .0.client() {
+        if let Some(client) = self.0 .0.client.upgrade() {
             client.stop_channel_streaming(self.0 .0.proto())
         }
         std::mem::drop(writer);
@@ -314,9 +308,7 @@
 
     /// Whether this stream is closed.
     pub fn is_closed(&self) -> bool {
-        let Ok(client) = self.0.client() else {
-            return true;
-        };
+        let client = self.0 .0.client();
 
         !client.channel_is_streaming(self.0 .0.proto())
     }
@@ -330,7 +322,7 @@
 impl Stream for ChannelMessageStream {
     type Item = Result<MessageBuf, Error>;
     fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        let Ok(client) = self.0.client() else { return Poll::Ready(Some(Err(Error::ClientLost))) };
+        let client = self.0 .0.client();
         client
             .poll_channel(self.0 .0.proto(), ctx, true)
             .map(|x| x.map(|x| x.map(|x| MessageBuf::from_proto(&client, x))))
@@ -339,7 +331,7 @@
 
 impl Drop for ChannelMessageStream {
     fn drop(&mut self) {
-        if let Ok(client) = self.0.client() {
+        if let Some(client) = self.0 .0.client.upgrade() {
             client.stop_channel_streaming(self.0 .0.proto());
         }
     }
diff --git a/src/lib/fdomain/client/src/fidl.rs b/src/lib/fdomain/client/src/fidl.rs
index a89897c..6d47173 100644
--- a/src/lib/fdomain/client/src/fidl.rs
+++ b/src/lib/fdomain/client/src/fidl.rs
@@ -374,9 +374,12 @@
     /// exclusive control over these operations.
     fn as_channel(&self) -> &Channel;
 
-    /// Get the client supporting this proxy.
-    fn client(&self) -> Result<Arc<crate::Client>, Error> {
-        self.as_channel().client()
+    /// Get the client supporting this proxy. We call this a "domain" here because:
+    /// * Client is especially overloaded in contexts where this is useful.
+    /// * We simulate this call for target-side FIDL proxies, so it isn't always
+    ///   really a client.
+    fn domain(&self) -> Arc<crate::Client> {
+        self.as_channel().domain()
     }
 }
 
diff --git a/src/lib/fdomain/client/src/handle.rs b/src/lib/fdomain/client/src/handle.rs
index 861ea89..e956b86 100644
--- a/src/lib/fdomain/client/src/handle.rs
+++ b/src/lib/fdomain/client/src/handle.rs
@@ -20,8 +20,8 @@
 
 impl Handle {
     /// Get the FDomain client this handle belongs to.
-    pub(crate) fn client(&self) -> Result<Arc<Client>, Error> {
-        self.client.upgrade().ok_or(Error::ClientLost)
+    pub(crate) fn client(&self) -> Arc<Client> {
+        self.client.upgrade().unwrap_or_else(|| Arc::clone(&*crate::DEAD_CLIENT))
     }
 
     /// Get an invalid handle.
@@ -78,19 +78,15 @@
     ) -> impl Future<Output = Result<Handle, Error>> + 'static {
         let client = self.0.client();
         let handle = self.0.proto();
-        let result = client.map(|client| {
-            let new_handle = client.new_hid();
-            let id = new_handle.id;
-            client
-                .transaction(
-                    ordinals::DUPLICATE,
-                    proto::FDomainDuplicateRequest { handle, new_handle, rights },
-                    Responder::Duplicate,
-                )
-                .map(move |res| res.map(|_| Handle { id, client: Arc::downgrade(&client) }))
-        });
-
-        async move { result?.await }
+        let new_handle = client.new_hid();
+        let id = new_handle.id;
+        client
+            .transaction(
+                ordinals::DUPLICATE,
+                proto::FDomainDuplicateRequest { handle, new_handle, rights },
+                Responder::Duplicate,
+            )
+            .map(move |res| res.map(|_| Handle { id, client: Arc::downgrade(&client) }))
     }
 
     /// Assert and deassert signals on this handle.
@@ -102,15 +98,11 @@
         let handle = self.proto();
         let client = self.client();
 
-        let result: Result<_, Error> = client.map(|client| {
-            client.transaction(
-                ordinals::SIGNAL,
-                proto::FDomainSignalRequest { handle, set: set.bits(), clear: clear.bits() },
-                Responder::Signal,
-            )
-        });
-
-        async move { result?.await }
+        client.transaction(
+            ordinals::SIGNAL,
+            proto::FDomainSignalRequest { handle, set: set.bits(), clear: clear.bits() },
+            Responder::Signal,
+        )
     }
 }
 
@@ -128,9 +120,9 @@
         self.as_handle_ref().signal(set, clear)
     }
 
-    /// Get the client supporting this handle.
-    fn client(&self) -> Result<Arc<Client>, Error> {
-        self.as_handle_ref().0.client.upgrade().ok_or(Error::ClientLost)
+    /// Get the client supporting this handle. See `fidl::Proxy::domain`.
+    fn domain(&self) -> Arc<Client> {
+        self.as_handle_ref().0.client()
     }
 }
 
@@ -157,15 +149,11 @@
         let handle = self.as_handle_ref().proto();
         let client = self.as_handle_ref().client();
 
-        let result: Result<_, Error> = client.map(|client| {
-            client.transaction(
-                ordinals::SIGNAL_PEER,
-                proto::FDomainSignalPeerRequest { handle, set: set.bits(), clear: clear.bits() },
-                Responder::SignalPeer,
-            )
-        });
-
-        async move { result?.await }
+        client.transaction(
+            ordinals::SIGNAL_PEER,
+            proto::FDomainSignalPeerRequest { handle, set: set.bits(), clear: clear.bits() },
+            Responder::SignalPeer,
+        )
     }
 }
 
@@ -224,18 +212,14 @@
     pub fn new(handle: &Handle, signals: fidl::Signals) -> Self {
         let client = handle.client();
         let handle = handle.proto();
-        let result = client.map(|client| {
-            client
-                .transaction(
-                    ordinals::WAIT_FOR_SIGNALS,
-                    proto::FDomainWaitForSignalsRequest { handle, signals: signals.bits() },
-                    Responder::WaitForSignals,
-                )
-                .map(|f| f.map(|x| x.signals))
-        });
-        OnFDomainSignals {
-            fut: async move { result?.await.map(fidl::Signals::from_bits_retain) }.boxed(),
-        }
+        let fut = client
+            .transaction(
+                ordinals::WAIT_FOR_SIGNALS,
+                proto::FDomainWaitForSignalsRequest { handle, signals: signals.bits() },
+                Responder::WaitForSignals,
+            )
+            .map(|f| f.map(|x| fidl::Signals::from_bits_retain(x.signals)));
+        OnFDomainSignals { fut: fut.boxed() }
     }
 }
 
@@ -265,46 +249,34 @@
     /// Close this handle. Surfaces errors that dropping the handle will not.
     pub fn close(self) -> impl Future<Output = Result<(), Error>> {
         let client = self.client();
-        let result = client.map(|client| {
-            client.transaction(
-                ordinals::CLOSE,
-                proto::FDomainCloseRequest { handles: vec![self.take_proto()] },
-                Responder::Close,
-            )
-        });
-        async move { result?.await }
+        client.transaction(
+            ordinals::CLOSE,
+            proto::FDomainCloseRequest { handles: vec![self.take_proto()] },
+            Responder::Close,
+        )
     }
 
     /// Replace this handle with a new handle to the same object, with different
     /// rights.
     pub fn replace(self, rights: fidl::Rights) -> impl Future<Output = Result<Handle, Error>> {
-        let params = (move || {
-            let client = self.client()?;
-            let handle = self.take_proto();
-            {
-                let mut client = client.0.lock().unwrap();
-                let _ = client.channel_read_states.remove(&handle);
-                let _ = client.socket_read_states.remove(&handle);
-            }
-            let new_handle = client.new_hid();
-            Ok((new_handle, handle, client))
-        })();
+        let client = self.client();
+        let handle = self.take_proto();
+        {
+            let mut client = client.0.lock().unwrap();
+            let _ = client.channel_read_states.remove(&handle);
+            let _ = client.socket_read_states.remove(&handle);
+        }
+        let new_handle = client.new_hid();
 
-        let result: Result<_, Error> = params.map(|(new_handle, handle, client)| {
-            let id = new_handle.id;
-            let ret = Handle { id, client: Arc::downgrade(&client) };
-            (
-                client.transaction(
-                    ordinals::REPLACE,
-                    proto::FDomainReplaceRequest { handle, new_handle, rights },
-                    Responder::Replace,
-                ),
-                ret,
-            )
-        });
+        let id = new_handle.id;
+        let ret = Handle { id, client: Arc::downgrade(&client) };
+        let fut = client.transaction(
+            ordinals::REPLACE,
+            proto::FDomainReplaceRequest { handle, new_handle, rights },
+            Responder::Replace,
+        );
 
         async move {
-            let (fut, ret) = result?;
             fut.await?;
             Ok(ret)
         }
@@ -313,7 +285,7 @@
 
 impl Drop for Handle {
     fn drop(&mut self) {
-        if let Ok(client) = self.client() {
+        if let Some(client) = self.client.upgrade() {
             let mut client = client.0.lock().unwrap();
             if client.waiting_to_close.is_empty() {
                 client.waiting_to_close_waker.wake_by_ref();
diff --git a/src/lib/fdomain/client/src/lib.rs b/src/lib/fdomain/client/src/lib.rs
index b5ff113..e88b1a0 100644
--- a/src/lib/fdomain/client/src/lib.rs
+++ b/src/lib/fdomain/client/src/lib.rs
@@ -11,7 +11,7 @@
 use std::future::Future;
 use std::num::NonZeroU32;
 use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, LazyLock, Mutex};
 use std::task::{ready, Context, Poll, Waker};
 use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
 
@@ -117,7 +117,6 @@
     Transport(Arc<std::io::Error>),
     ConnectionMismatch,
     StreamingAborted,
-    ClientLost,
 }
 
 impl std::fmt::Display for Error {
@@ -160,7 +159,6 @@
                 write!(f, "Tried to use an FDomain handle from a different connection")
             }
             Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
-            Self::ClientLost => write!(f, "The client associated with this handle was destroyed"),
         }
     }
 }
@@ -179,7 +177,6 @@
             Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
             Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
             Self::StreamingAborted => write!(f, "StreamingAborted"),
-            Self::ClientLost => write!(f, "ClientLost"),
         }
     }
 }
@@ -592,6 +589,23 @@
     }
 }
 
+/// A client which is always disconnected. Handles that lose their clients
+/// connect to this client instead, which always returns a "Client Lost"
+/// transport failure.
+pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
+    Arc::new(Client(Mutex::new(ClientInner {
+        transport: Transport::Error(InnerError::Transport(Arc::new(std::io::Error::other(
+            "Client Lost",
+        )))),
+        transactions: HashMap::new(),
+        channel_read_states: HashMap::new(),
+        socket_read_states: HashMap::new(),
+        next_tx_id: 1,
+        waiting_to_close: Vec::new(),
+        waiting_to_close_waker: futures::task::noop_waker(),
+    })))
+});
+
 impl Client {
     /// Create a new FDomain client. The `transport` argument should contain the
     /// established connection to the target, ready to communicate the FDomain
@@ -791,6 +805,9 @@
     /// Start getting streaming events for socket reads.
     pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
         let mut inner = self.0.lock().unwrap();
+        if let Some(e) = inner.transport.error() {
+            return Err(e.into());
+        }
 
         let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
             wakers: Vec::new(),
@@ -831,6 +848,9 @@
     /// Start getting streaming events for socket reads.
     pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
         let mut inner = self.0.lock().unwrap();
+        if let Some(e) = inner.transport.error() {
+            return Err(e.into());
+        }
         let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
             wakers: Vec::new(),
             queued: VecDeque::new(),
diff --git a/src/lib/fdomain/client/src/socket.rs b/src/lib/fdomain/client/src/socket.rs
index e1ecd06..afde8fd1 100644
--- a/src/lib/fdomain/client/src/socket.rs
+++ b/src/lib/fdomain/client/src/socket.rs
@@ -36,13 +36,10 @@
 impl Socket {
     /// Read up to the given buffer's length from the socket.
     pub fn read<'a>(&self, buf: &'a mut [u8]) -> impl Future<Output = Result<usize, Error>> + 'a {
-        let client = self.0.client.clone();
+        let client = self.0.client();
         let handle = self.0.proto();
 
-        futures::future::poll_fn(move |ctx| {
-            let client = client.upgrade().ok_or(Error::ClientLost)?;
-            client.poll_socket(handle, ctx, buf)
-        })
+        futures::future::poll_fn(move |ctx| client.poll_socket(handle, ctx, buf))
     }
 
     /// Write all of the given data to the socket.
@@ -52,16 +49,13 @@
         let hid = self.0.proto();
 
         let client = self.0.client();
-        let result = client.map(|client| {
-            client
-                .transaction(
-                    ordinals::WRITE_SOCKET,
-                    proto::SocketWriteSocketRequest { handle: hid, data },
-                    move |x| Responder::WriteSocket(x, hid),
-                )
-                .map(move |x| x.map(|y| assert!(y.wrote as usize == len)))
-        });
-        async move { result?.await }
+        client
+            .transaction(
+                ordinals::WRITE_SOCKET,
+                proto::SocketWriteSocketRequest { handle: hid, data },
+                move |x| Responder::WriteSocket(x, hid),
+            )
+            .map(move |x| x.map(|y| assert!(y.wrote as usize == len)))
     }
 
     /// Set the disposition of this socket and/or its peer.
@@ -77,14 +71,11 @@
             .unwrap_or(proto::SocketDisposition::NoChange);
         let client = self.0.client();
         let handle = self.0.proto();
-        let result = client.map(|client| {
-            client.transaction(
-                ordinals::SET_SOCKET_DISPOSITION,
-                proto::SocketSetSocketDispositionRequest { handle, disposition, disposition_peer },
-                Responder::SetSocketDisposition,
-            )
-        });
-        async move { result?.await }
+        client.transaction(
+            ordinals::SET_SOCKET_DISPOSITION,
+            proto::SocketSetSocketDispositionRequest { handle, disposition, disposition_peer },
+            Responder::SetSocketDisposition,
+        )
     }
 
     /// Split this socket into a streaming reader and a writer. This is more
@@ -94,7 +85,7 @@
     /// lead to memory issues if you don't intend to use the data from the
     /// socket as fast as it comes.
     pub fn stream(self) -> Result<(SocketReadStream, SocketWriter), Error> {
-        self.0.client()?.start_socket_streaming(self.0.proto())?;
+        self.0.client().start_socket_streaming(self.0.proto())?;
 
         let a = Arc::new(self);
         let b = Arc::clone(&a);
@@ -125,7 +116,7 @@
 
 impl Drop for SocketReadStream {
     fn drop(&mut self) {
-        if let Ok(client) = self.0 .0.client() {
+        if let Some(client) = self.0 .0.client.upgrade() {
             client.stop_socket_streaming(self.0 .0.proto());
         }
     }
diff --git a/src/lib/fidl/rust/fidl/src/endpoints.rs b/src/lib/fidl/rust/fidl/src/endpoints.rs
index 1f86ea9..049eac9 100644
--- a/src/lib/fidl/rust/fidl/src/endpoints.rs
+++ b/src/lib/fidl/rust/fidl/src/endpoints.rs
@@ -11,7 +11,6 @@
 };
 use futures::{Future, FutureExt, Stream, TryFutureExt, TryStream, TryStreamExt};
 use log::error;
-use std::convert::Infallible;
 use std::marker::PhantomData;
 use std::sync::Arc;
 use {fuchsia_async as fasync, zx_status};
@@ -103,20 +102,20 @@
     }
 }
 
-/// This gives native Zircon proxies a client method like FDomain proxies have.
+/// This gives native Zircon proxies a domain method like FDomain proxies have.
 /// This makes it easier in some cases to build the same code for both FDomain
 /// and regular FIDL.
-pub trait ProxyHasClient {
+pub trait ProxyHasDomain {
     /// Get a "client" for this proxy. This is just an object which has methods
     /// for a few common handle creation operations.
-    fn client(&self) -> Result<ZirconClient, Infallible> {
-        Ok(ZirconClient)
+    fn domain(&self) -> ZirconClient {
+        ZirconClient
     }
 }
 
-impl<T: Proxy> ProxyHasClient for T {}
+impl<T: Proxy> ProxyHasDomain for T {}
 
-/// The fake "client" produced by `ProxyHasClient`. Analogous to an FDomain client.
+/// The fake "client" produced by `ProxyHasDomain`. Analogous to an FDomain client.
 pub struct ZirconClient;
 
 impl ZirconClient {