Reland "[modular][migration] Allow Agents to publish their services in their outgoing directory"

This reverts commit ea184881b807ce87e32e7835ff7e42e487d693d8.

Reason for revert: CL is fixed; tested in GI, and manually on qemu and astro

Original change's description:
> Revert "[modular][migration] Allow Agents to publish their services in their outgoing directory"
>
> This reverts commit 7e32acd28b0ffa3170a9e34e3c4e01181a54c05b.
>
> Reason for revert: Seeing new E2E failures with:
> ```[WARNING][2020-02-14 02:34:40.278372]: iquery --find /hub; exit code: 1
> [WARNING][2020-02-14 02:34:40.278538]:
> [WARNING][2020-02-14 02:34:40.278635]: Error: timeout while reading dir: c/sessionmgr.cmx/98444/out
>
> [WARNING][2020-02-14 02:34:40.278851]: iquery --find /hub stderr: Error: timeout while reading dir: c/sessionmgr.cmx/98444/out``` as the primary diff in the logs.
>
> Original change's description:
> > [modular][migration] Allow Agents to publish their services in their outgoing directory
> >
> > Fixed: 43008
> > Change-Id: Iade02326749b21ae97679f78042c168be885f4a4
>
> TBR=thatguy@google.com,lindkvist@google.com,richkadel@google.com,ypomortsev@google.com
>
> Change-Id: Ia849e7d0757a63617c9d1f93e4ac4c7e80668c6b
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true

TBR=thatguy@google.com,lindkvist@google.com,nmulcahey@google.com,richkadel@google.com,ypomortsev@google.com

Change-Id: I7aa0e77ce0d7b1fda5d4977490bca6afb1fe401d
diff --git a/src/modular/bin/sessionmgr/agent_runner/BUILD.gn b/src/modular/bin/sessionmgr/agent_runner/BUILD.gn
index 704ac60..75ec616 100644
--- a/src/modular/bin/sessionmgr/agent_runner/BUILD.gn
+++ b/src/modular/bin/sessionmgr/agent_runner/BUILD.gn
@@ -40,6 +40,8 @@
     "//src/modular/lib/fidl:json_xdr",
     "//src/modular/lib/ledger_client:page_client",
     "//src/modular/lib/ledger_client:types",
+    "//zircon/public/lib/fdio",
+    "//zircon/system/fidl/fuchsia-io",
   ]
 }
 
@@ -63,6 +65,7 @@
     "//src/modular/bin/sessionmgr/entity_provider_runner",
     "//src/modular/lib/fidl:array_to_string",
     "//src/modular/lib/ledger_client:page_client",
+    "//src/modular/lib/pseudo_dir",
     "//src/modular/lib/testing:mock_base",
     "//src/modular/lib/testing:test_with_ledger",
     "//src/modular/tests:fuchsia.testing.modular",
diff --git a/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.cc b/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.cc
index e815c5b..c89ff0e 100644
--- a/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.cc
+++ b/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.cc
@@ -5,10 +5,15 @@
 #include "src/modular/bin/sessionmgr/agent_runner/agent_context_impl.h"
 
 #include <fuchsia/intl/cpp/fidl.h>
+#include <fuchsia/io/cpp/fidl.h>
 #include <fuchsia/modular/cpp/fidl.h>
+#include <lib/fdio/directory.h>
+#include <lib/fdio/vfs.h>
+#include <zircon/status.h>
 
 #include <memory>
 
+#include "lib/fdio/directory.h"
 #include "src/modular/bin/sessionmgr/agent_runner/agent_runner.h"
 #include "src/modular/lib/common/teardown.h"
 
@@ -29,6 +34,37 @@
   return std::to_string(std::hash<std::string>{}(agent_url)) + last_part;
 }
 
+// Get a list of names of the entries in a directory.
+void GetFidlDirectoryEntries(fuchsia::io::Directory* dir,
+                             fit::function<void(std::vector<std::string>)> callback) {
+  constexpr uint64_t max_bytes = 4096;
+
+  dir->ReadDirents(
+      max_bytes, [callback = std::move(callback)](int32_t status, std::vector<uint8_t> dirents) {
+        std::vector<std::string> entry_names{};
+
+        if (status != ZX_OK) {
+          FXL_LOG(ERROR) << "GetFidlDirectoryEntries: could not read directory entries, error "
+                         << status << " (" << zx_status_get_string(status) << ")";
+          callback(std::move(entry_names));
+          return;
+        }
+
+        uint64_t offset = 0;
+        auto* data_ptr = dirents.data();
+        while (dirents.size() - offset >= sizeof(vdirent_t)) {
+          vdirent_t* de = reinterpret_cast<vdirent_t*>(data_ptr + offset);
+          auto name = std::string(de->name, de->size);
+          if (name.at(0) != '.') {
+            entry_names.push_back(name);
+          }
+          offset += sizeof(vdirent_t) + de->size;
+        }
+
+        callback(std::move(entry_names));
+      });
+}
+
 };  // namespace
 
 class AgentContextImpl::InitializeCall : public Operation<> {
@@ -79,6 +115,18 @@
     agent_context_impl_->app_client_->services().ConnectToService(
         agent_context_impl_->agent_.NewRequest());
 
+    // Enumerate the services that the agent has published in its outgoing directory.
+    auto agent_outgoing_dir_handle =
+        fdio_service_clone(agent_context_impl_->app_client_->services().directory().get());
+    FXL_CHECK(agent_outgoing_dir_handle != ZX_HANDLE_INVALID);
+    zx::channel agent_outgoing_dir_chan(agent_outgoing_dir_handle);
+    outgoing_dir_ptr_.Bind(std::move(agent_outgoing_dir_chan));
+
+    GetFidlDirectoryEntries(outgoing_dir_ptr_.get(), [this, flow](auto entries) {
+      agent_context_impl_->agent_outgoing_services_ = std::set<std::string>(
+          std::make_move_iterator(entries.begin()), std::make_move_iterator(entries.end()));
+    });
+
     // We only want to use fuchsia::modular::Lifecycle if it exists.
     agent_context_impl_->app_client_->primary_service().set_error_handler(
         [agent_context_impl = agent_context_impl_](zx_status_t status) {
@@ -102,6 +150,7 @@
   AgentContextImpl* const agent_context_impl_;
   fuchsia::sys::Launcher* const launcher_;
   fuchsia::modular::AppConfig agent_config_;
+  fuchsia::io::DirectoryPtr outgoing_dir_ptr_;
 };
 
 // If |terminating| is set to true, the agent will be torn down irrespective
@@ -160,8 +209,8 @@
                                    fuchsia::modular::AppConfig agent_config,
                                    inspect::Node agent_node)
     : url_(agent_config.url),
-      agent_runner_(info.component_context_info.agent_runner),
       component_context_impl_(info.component_context_info, kAgentComponentNamespace, url_, url_),
+      agent_runner_(info.component_context_info.agent_runner),
       entity_provider_runner_(info.component_context_info.entity_provider_runner),
       agent_services_factory_(info.agent_services_factory),
       agent_node_(std::move(agent_node)) {
@@ -187,6 +236,31 @@
 
 AgentContextImpl::~AgentContextImpl() = default;
 
+void AgentContextImpl::ConnectToService(
+    std::string requestor_url,
+    fidl::InterfaceRequest<fuchsia::modular::AgentController> agent_controller_request,
+    std::string service_name, ::zx::channel channel) {
+  // Run this task on the operation queue to ensure that all member variables are
+  // fully initialized before we query their state.
+  operation_queue_.Add(std::make_unique<SyncCall>(
+      [this, requestor_url, agent_controller_request = std::move(agent_controller_request),
+       service_name, channel = std::move(channel)]() mutable {
+        FXL_CHECK(state_ == State::RUNNING);
+
+        if (agent_outgoing_services_.count(service_name) > 0) {
+          app_client_->services().ConnectToService(std::move(channel), service_name);
+        } else {
+          fuchsia::sys::ServiceProviderPtr agent_services;
+          agent_->Connect(requestor_url, agent_services.NewRequest());
+          agent_services->ConnectToService(service_name, std::move(channel));
+        }
+
+        // Add a binding to the |controller|. When all the bindings go away,
+        // the agent will stop.
+        agent_controller_bindings_.AddBinding(this, std::move(agent_controller_request));
+      }));
+}
+
 void AgentContextImpl::NewAgentConnection(
     const std::string& requestor_url,
     fidl::InterfaceRequest<fuchsia::sys::ServiceProvider> incoming_services_request,
diff --git a/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.h b/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.h
index a9354d2..69e17e5 100644
--- a/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.h
+++ b/src/modular/bin/sessionmgr/agent_runner/agent_context_impl.h
@@ -12,6 +12,7 @@
 #include <lib/fidl/cpp/binding_set.h>
 #include <lib/sys/inspect/cpp/component.h>
 
+#include <set>
 #include <string>
 
 #include "src/lib/fxl/macros.h"
@@ -54,8 +55,19 @@
   // AgentControllers. Calls into |AgentRunner::RemoveAgent()| to remove itself.
   void StopForTeardown(fit::function<void()> callback);
 
+  // Attempts to connect |channel| to service |service_name| published by the agent.
+  // If possible, connects to a service published in the agent's outgoing directory
+  // and falls back to using fuchsia.modular.Agent/Connect().
+  //
+  // If |agent_controller_request| is non-null, tracks its lifecycle and ensures
+  // this agent does not stop until |agent_controller_request| has closed.
+  void ConnectToService(
+      std::string requestor_url,
+      fidl::InterfaceRequest<fuchsia::modular::AgentController> agent_controller_request,
+      std::string service_name, ::zx::channel channel);
+
   // Called by AgentRunner when a component wants to connect to this agent.
-  // Connections will pend until fuchsia::modular::Agent::Initialize() responds
+  // Connections will pend until Agent::Initialize() responds
   // back, at which point all connections will be forwarded to the agent.
   void NewAgentConnection(
       const std::string& requestor_url,
@@ -120,14 +132,15 @@
   fidl::BindingSet<fuchsia::modular::AgentController> agent_controller_bindings_;
   fidl::BindingSet<fuchsia::auth::TokenManager> token_manager_bindings_;
 
-  AgentRunner* const agent_runner_;
+  // The names of services published by the agent in its outgoing directory.
+  std::set<std::string> agent_outgoing_services_;
 
   ComponentContextImpl component_context_impl_;
 
-  // A service provider that represents the services to be added into an
-  // application's namespace.
+  // Services provided to the agent in its namespace.
   component::ServiceProviderImpl service_provider_impl_;
 
+  AgentRunner* const agent_runner_;                     // Not owned.
   EntityProviderRunner* const entity_provider_runner_;  // Not owned.
   AgentServicesFactory* const agent_services_factory_;  // Not owned.
 
diff --git a/src/modular/bin/sessionmgr/agent_runner/agent_runner.cc b/src/modular/bin/sessionmgr/agent_runner/agent_runner.cc
index 4df0864..f260fca 100644
--- a/src/modular/bin/sessionmgr/agent_runner/agent_runner.cc
+++ b/src/modular/bin/sessionmgr/agent_runner/agent_runner.cc
@@ -121,6 +121,11 @@
 }
 
 void AgentRunner::EnsureAgentIsRunning(const std::string& agent_url, fit::function<void()> done) {
+  // Drop all new requests if AgentRunner is terminating.
+  if (*terminating_) {
+    return;
+  }
+
   auto agent_it = running_agents_.find(agent_url);
   if (agent_it != running_agents_.end()) {
     if (agent_it->second->state() == AgentContextImpl::State::TERMINATING) {
@@ -164,10 +169,6 @@
     const std::string& requestor_url, const std::string& agent_url,
     fidl::InterfaceRequest<fuchsia::sys::ServiceProvider> incoming_services_request,
     fidl::InterfaceRequest<fuchsia::modular::AgentController> agent_controller_request) {
-  // Drop all new requests if AgentRunner is terminating.
-  if (*terminating_) {
-    return;
-  }
   EnsureAgentIsRunning(
       agent_url, [this, agent_url, requestor_url,
                   incoming_services_request = std::move(incoming_services_request),
@@ -191,10 +192,12 @@
     std::string requestor_url, std::string agent_url,
     fidl::InterfaceRequest<fuchsia::modular::AgentController> agent_controller_request,
     std::string service_name, ::zx::channel channel) {
-  fuchsia::sys::ServiceProviderPtr agent_services;
-  ConnectToAgent(requestor_url, agent_url, agent_services.NewRequest(),
-                 std::move(agent_controller_request));
-  agent_services->ConnectToService(service_name, std::move(channel));
+  EnsureAgentIsRunning(
+      agent_url, [this, agent_url, requestor_url, service_name, channel = std::move(channel),
+                  agent_controller_request = std::move(agent_controller_request)]() mutable {
+        running_agents_[agent_url]->ConnectToService(
+            requestor_url, std::move(agent_controller_request), service_name, std::move(channel));
+      });
 }
 
 void AgentRunner::ConnectToAgentService(const std::string& requestor_url,
@@ -240,11 +243,6 @@
     const std::string& agent_url,
     fidl::InterfaceRequest<fuchsia::modular::EntityProvider> entity_provider_request,
     fidl::InterfaceRequest<fuchsia::modular::AgentController> agent_controller_request) {
-  // Drop all new requests if AgentRunner is terminating.
-  if (*terminating_) {
-    return;
-  }
-
   EnsureAgentIsRunning(
       agent_url, [this, agent_url, entity_provider_request = std::move(entity_provider_request),
                   agent_controller_request = std::move(agent_controller_request)]() mutable {
diff --git a/src/modular/bin/sessionmgr/agent_runner/agent_runner_unittest.cc b/src/modular/bin/sessionmgr/agent_runner/agent_runner_unittest.cc
index fff9bf2..7a789cb 100644
--- a/src/modular/bin/sessionmgr/agent_runner/agent_runner_unittest.cc
+++ b/src/modular/bin/sessionmgr/agent_runner/agent_runner_unittest.cc
@@ -25,6 +25,7 @@
 #include "src/lib/fxl/macros.h"
 #include "src/modular/bin/sessionmgr/entity_provider_runner/entity_provider_runner.h"
 #include "src/modular/lib/fidl/array_to_string.h"
+#include "src/modular/lib/pseudo_dir/pseudo_dir_server.h"
 #include "src/modular/lib/testing/mock_base.h"
 
 namespace modular_testing {
@@ -86,17 +87,17 @@
   TestAgent(zx::channel directory_request,
             fidl::InterfaceRequest<fuchsia::sys::ComponentController> ctrl,
             std::unique_ptr<component::ServiceNamespace> services_ptr = nullptr)
-      : vfs_(async_get_default_dispatcher()),
-        outgoing_directory_(fbl::AdoptRef(new fs::PseudoDir())),
-        controller_(this, std::move(ctrl)),
+      : controller_(this, std::move(ctrl)),
         agent_binding_(this),
         services_ptr_(std::move(services_ptr)) {
-    outgoing_directory_->AddEntry(fuchsia::modular::Agent::Name_,
-                                  fbl::AdoptRef(new fs::Service([this](zx::channel channel) {
-                                    agent_binding_.Bind(std::move(channel));
-                                    return ZX_OK;
-                                  })));
-    vfs_.ServeDirectory(outgoing_directory_, std::move(directory_request));
+    auto outgoing_dir = std::make_unique<vfs::PseudoDir>();
+    outgoing_dir->AddEntry(
+        fuchsia::modular::Agent::Name_,
+        std::make_unique<vfs::Service>([this](zx::channel channel, async_dispatcher_t* /*unused*/) {
+          agent_binding_.Bind(std::move(channel));
+        }));
+    outgoing_dir_server_ = std::make_unique<modular::PseudoDirServer>(std::move(outgoing_dir));
+    outgoing_dir_server_->Serve(std::move(directory_request));
   }
 
   void KillApplication() { controller_.Unbind(); }
@@ -119,8 +120,7 @@
   }
 
  private:
-  fs::SynchronousVfs vfs_;
-  fbl::RefPtr<fs::PseudoDir> outgoing_directory_;
+  std::unique_ptr<modular::PseudoDirServer> outgoing_dir_server_;
   fidl::Binding<fuchsia::sys::ComponentController> controller_;
   fidl::Binding<fuchsia::modular::Agent> agent_binding_;
 
@@ -289,7 +289,6 @@
   fuchsia::sys::ServiceProviderPtr incoming_services2;
   agent_runner()->ConnectToAgent("requestor_url2", kTestAgentUrl, incoming_services2.NewRequest(),
                                  agent_controller2.NewRequest());
-
   RunLoopWithTimeoutOrUntil(
       [&test_agent] { return test_agent && test_agent->GetCallCount("Connect"); });
   EXPECT_EQ(1, agent_launch_count);
diff --git a/src/modular/bin/sessionmgr/entity_provider_runner/BUILD.gn b/src/modular/bin/sessionmgr/entity_provider_runner/BUILD.gn
index 0220c6d..2981df3 100644
--- a/src/modular/bin/sessionmgr/entity_provider_runner/BUILD.gn
+++ b/src/modular/bin/sessionmgr/entity_provider_runner/BUILD.gn
@@ -51,6 +51,7 @@
     "//src/modular/bin/sessionmgr/agent_runner",
     "//src/modular/lib/connect",
     "//src/modular/lib/fidl:array_to_string",
+    "//src/modular/lib/pseudo_dir",
     "//src/modular/lib/testing:mock_base",
     "//src/modular/lib/testing:test_with_ledger",
     "//zircon/public/lib/fs",
diff --git a/src/modular/bin/sessionmgr/entity_provider_runner/entity_provider_runner_unittest.cc b/src/modular/bin/sessionmgr/entity_provider_runner/entity_provider_runner_unittest.cc
index 8b48a9c..c9201a8 100644
--- a/src/modular/bin/sessionmgr/entity_provider_runner/entity_provider_runner_unittest.cc
+++ b/src/modular/bin/sessionmgr/entity_provider_runner/entity_provider_runner_unittest.cc
@@ -24,6 +24,7 @@
 #include "src/modular/bin/sessionmgr/entity_provider_runner/entity_provider_launcher.h"
 #include "src/modular/lib/connect/connect.h"
 #include "src/modular/lib/fidl/array_to_string.h"
+#include "src/modular/lib/pseudo_dir/pseudo_dir_server.h"
 #include "src/modular/lib/testing/mock_base.h"
 
 namespace modular_testing {
@@ -40,9 +41,9 @@
 
     entity_provider_runner_ = std::make_unique<modular::EntityProviderRunner>(
         static_cast<modular::EntityProviderLauncher*>(this));
-    agent_runner_ = std::make_unique<modular::AgentRunner>(
-        &launcher_, token_manager_.get(), /*agent_services_factory=*/nullptr,
-        entity_provider_runner_.get(), &node_);
+    agent_runner_ = std::make_unique<modular::AgentRunner>(&launcher_, token_manager_.get(),
+                                                           /*agent_services_factory=*/nullptr,
+                                                           entity_provider_runner_.get(), &node_);
   }
 
   void TearDown() override {
@@ -94,23 +95,23 @@
  public:
   MyEntityProvider(fuchsia::sys::LaunchInfo launch_info,
                    fidl::InterfaceRequest<fuchsia::sys::ComponentController> ctrl)
-      : vfs_(async_get_default_dispatcher()),
-        outgoing_directory_(fbl::AdoptRef(new fs::PseudoDir())),
-        controller_(this, std::move(ctrl)),
+      : controller_(this, std::move(ctrl)),
         agent_binding_(this),
         entity_provider_binding_(this),
         launch_info_(std::move(launch_info)) {
-    outgoing_directory_->AddEntry(fuchsia::modular::Agent::Name_,
-                                  fbl::AdoptRef(new fs::Service([this](zx::channel channel) {
-                                    agent_binding_.Bind(std::move(channel));
-                                    return ZX_OK;
-                                  })));
-    outgoing_directory_->AddEntry(fuchsia::modular::EntityProvider::Name_,
-                                  fbl::AdoptRef(new fs::Service([this](zx::channel channel) {
-                                    entity_provider_binding_.Bind(std::move(channel));
-                                    return ZX_OK;
-                                  })));
-    vfs_.ServeDirectory(outgoing_directory_, std::move(launch_info_.directory_request));
+    auto outgoing_dir = std::make_unique<vfs::PseudoDir>();
+    outgoing_dir->AddEntry(
+        fuchsia::modular::Agent::Name_,
+        std::make_unique<vfs::Service>([this](zx::channel channel, async_dispatcher_t* /*unused*/) {
+          agent_binding_.Bind(std::move(channel));
+        }));
+    outgoing_dir->AddEntry(
+        fuchsia::modular::EntityProvider::Name_,
+        std::make_unique<vfs::Service>([this](zx::channel channel, async_dispatcher_t* /*unused*/) {
+          entity_provider_binding_.Bind(std::move(channel));
+        }));
+    outgoing_dir_server_ = std::make_unique<modular::PseudoDirServer>(std::move(outgoing_dir));
+    outgoing_dir_server_->Serve(std::move(launch_info_.directory_request));
 
     // Get |agent_context_| and |entity_resolver_| from incoming namespace.
     FXL_CHECK(launch_info_.additional_services);
@@ -169,8 +170,7 @@
   }
 
  private:
-  fs::SynchronousVfs vfs_;
-  fbl::RefPtr<fs::PseudoDir> outgoing_directory_;
+  std::unique_ptr<modular::PseudoDirServer> outgoing_dir_server_;
   fuchsia::modular::AgentContextPtr agent_context_;
   fuchsia::modular::EntityResolverPtr entity_resolver_;
   fidl::Binding<fuchsia::sys::ComponentController> controller_;
diff --git a/src/modular/tests/agent_services_test.cc b/src/modular/tests/agent_services_test.cc
index 994a3bb..0341f10 100644
--- a/src/modular/tests/agent_services_test.cc
+++ b/src/modular/tests/agent_services_test.cc
@@ -344,21 +344,26 @@
 
   template <typename Interface>
   void AddAgentService(fidl::InterfaceRequestHandler<Interface> handler) {
-    buffered_add_agent_service_calls_.push_back([this, handler = std::move(handler)]() mutable {
-      service_name_to_handler_[Interface::Name_] = [handler = std::move(handler)](zx::channel req) {
-        handler(fidl::InterfaceRequest<Interface>(std::move(req)));
-      };
-    });
-
-    FlushAddAgentServiceIfRunning();
+    service_name_to_handler_[Interface::Name_] = [handler = std::move(handler)](zx::channel req) {
+      handler(fidl::InterfaceRequest<Interface>(std::move(req)));
+    };
   }
 
- private:
+  template <typename Interface>
+  void AddPublicService(fidl::InterfaceRequestHandler<Interface> handler) {
+    buffered_add_service_calls_.push_back([this, handler = std::move(handler)]() mutable {
+      component_context()->outgoing()->AddPublicService(std::move(handler));
+    });
+
+    FlushAddServiceCallsIfRunning();
+  }
+
+ protected:
   // |modular_testing::FakeComponent|
   void OnCreate(fuchsia::sys::StartupInfo startup_info) {
     component_context()->outgoing()->AddPublicService<fuchsia::modular::Agent>(
         agent_bindings_.GetHandler(this));
-    FlushAddAgentServiceIfRunning();
+    FlushAddServiceCallsIfRunning();
   }
 
   // |fuchsia::modular::Agent|
@@ -377,12 +382,12 @@
     }
   }
 
-  void FlushAddAgentServiceIfRunning() {
+  void FlushAddServiceCallsIfRunning() {
     if (is_running()) {
-      for (auto& call : buffered_add_agent_service_calls_) {
+      for (auto& call : buffered_add_service_calls_) {
         call();
       }
-      buffered_add_agent_service_calls_.clear();
+      buffered_add_service_calls_.clear();
     }
   }
 
@@ -391,11 +396,10 @@
   fidl::BindingSet<fuchsia::modular::Agent> agent_bindings_;
   fidl::BindingSet<fuchsia::sys::ServiceProvider> agent_service_provider_bindings_;
 
-  // A mapping of `service name -> service connection handle` which is inflated using
-  // AddService<>().
+  // A mapping of `service name -> service connection handle`.
   std::unordered_map<std::string, fit::function<void(zx::channel)>> service_name_to_handler_;
-  std::vector<fit::closure> buffered_add_agent_service_calls_;
-};
+  std::vector<fit::closure> buffered_add_service_calls_;
+};  // namespace
 
 // Test that an Agent service can be acquired from any of another Agent, a Module, Session or Story
 // Shells, including testing that calls to the Agent.Connect() method (implemented by the agent)
@@ -549,4 +553,168 @@
   EXPECT_EQ(status, ZX_ERR_PEER_CLOSED);
 }
 
+// Test that an agent can publish its services using its outgoing directory, and that clients
+// can connect to those services through either ComponentContext.ConnectToAgent*() or
+// sys.ComponentContext.srv().Connect().
+TEST_F(AgentServicesSFWCompatTest, PublishToOutgoingDirectory) {
+  auto serving_agent = RequestorIdCapturingAgent::CreateWithDefaultOptions();
+
+  // Intercept this agent and use it as a client to connect to `serving_agent`.
+  auto agent = modular_testing::FakeAgent::CreateWithDefaultOptions();
+
+  // Set up the test environment with TestProtocol being served by `serving_agent`.
+  auto spec = CreateSpecWithAgentServiceIndex(
+      {{fuchsia::testing::modular::TestProtocol::Name_, serving_agent->url()}});
+  spec.mutable_sessionmgr_config()->mutable_session_agents()->push_back(agent->url());
+
+  modular_testing::TestHarnessBuilder builder(std::move(spec));
+  builder.InterceptComponent(serving_agent->BuildInterceptOptions());
+  builder.InterceptComponent(AddSandboxServices({fuchsia::testing::modular::TestProtocol::Name_},
+                                                agent->BuildInterceptOptions()));
+
+  // Instruct `serving_agent` to serve the TestProtocol, tracking the number of times
+  // the service was successfully connected.
+  int num_connections = 0;
+  std::vector<zx::channel> protocol_requests;
+  // Note that TestProtocol is being served using a sys.OutgoingDirectory.
+  serving_agent->AddPublicService(
+      fit::function<void(fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol>)>(
+          [&](fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol> request) {
+            ++num_connections;
+            protocol_requests.push_back(request.TakeChannel());
+          }));
+  builder.BuildAndRun(test_harness());
+
+  RunLoopUntil([&] { return agent->is_running(); });
+  ASSERT_FALSE(serving_agent->is_running());
+
+  // Attempt to connect to the test service in all ways that are currently supported.
+  std::vector<fuchsia::testing::modular::TestProtocolPtr> protocol_ptrs;
+  std::vector<fuchsia::modular::AgentControllerPtr> agent_controllers;
+
+  // Method 1: Connect using `agent`'s incoming directory
+  protocol_ptrs.push_back(
+      agent->component_context()->svc()->Connect<fuchsia::testing::modular::TestProtocol>());
+
+  // Method 2: Connect using fuchsia.modular.ComponentContext/ConnectToAgentService().
+  fuchsia::modular::AgentServiceRequest agent_service_request;
+  protocol_ptrs.emplace_back();
+  agent_controllers.emplace_back();
+  agent_service_request.set_service_name(fuchsia::testing::modular::TestProtocol::Name_);
+  agent_service_request.set_channel(protocol_ptrs.back().NewRequest().TakeChannel());
+  agent_service_request.set_agent_controller(agent_controllers.back().NewRequest());
+  agent_service_request.set_handler(serving_agent->url());
+  agent->modular_component_context()->ConnectToAgentService(std::move(agent_service_request));
+
+  // Track the number of those connection attempts failed.
+  int num_errors = 0;
+  for (auto& ptr : protocol_ptrs) {
+    ptr.set_error_handler([&](zx_status_t) { ++num_errors; });
+  }
+
+  constexpr int kTotalRequests = 2;
+  RunLoopUntil([&] { return num_connections + num_errors == kTotalRequests; });
+  EXPECT_TRUE(serving_agent->is_running());
+  EXPECT_EQ(num_connections, kTotalRequests);
+  EXPECT_EQ(num_errors, 0);
+}
+
+// If an agent exposes a service via both its outgoing directory and through fuchsia.modular.Agent,
+// prefer the outgoing directory.
+TEST_F(AgentServicesSFWCompatTest, PublishToOugoingDirectoryPrioritizesOutoingDirectory) {
+  auto serving_agent = RequestorIdCapturingAgent::CreateWithDefaultOptions();
+
+  // Intercept this agent and use it as a client to connect to `serving_agent`.
+  auto agent = modular_testing::FakeAgent::CreateWithDefaultOptions();
+
+  // Set up the test environment with TestProtocol being served by `serving_agent`.
+  auto spec = CreateSpecWithAgentServiceIndex(
+      {{fuchsia::testing::modular::TestProtocol::Name_, serving_agent->url()}});
+  spec.mutable_sessionmgr_config()->mutable_session_agents()->push_back(agent->url());
+
+  modular_testing::TestHarnessBuilder builder(std::move(spec));
+  builder.InterceptComponent(serving_agent->BuildInterceptOptions());
+  builder.InterceptComponent(AddSandboxServices({fuchsia::testing::modular::TestProtocol::Name_},
+                                                agent->BuildInterceptOptions()));
+
+  // Publish the service as both an outgoing/public service and an agent service.
+  bool saw_agent_connection = false;
+  bool saw_outgoing_connection = false;
+  serving_agent->AddAgentService(
+      fit::function<void(fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol>)>(
+          [&](fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol> request) {
+            saw_agent_connection = true;
+          }));
+  serving_agent->AddPublicService(
+      fit::function<void(fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol>)>(
+          [&](fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol> request) {
+            saw_outgoing_connection = true;
+          }));
+  builder.BuildAndRun(test_harness());
+
+  RunLoopUntil([&] { return agent->is_running(); });
+  ASSERT_FALSE(serving_agent->is_running());
+
+  auto protocol_ptr =
+      agent->component_context()->svc()->Connect<fuchsia::testing::modular::TestProtocol>();
+
+  RunLoopUntil([&] { return saw_agent_connection || saw_outgoing_connection; });
+  EXPECT_TRUE(saw_outgoing_connection);
+  EXPECT_FALSE(saw_agent_connection);
+}
+
+class NoAgentProtocolAgent : public RequestorIdCapturingAgent {
+ public:
+  NoAgentProtocolAgent(modular_testing::FakeComponent::Args args)
+      : RequestorIdCapturingAgent(std::move(args)) {}
+
+  static std::unique_ptr<NoAgentProtocolAgent> CreateWithDefaultOptions() {
+    return std::make_unique<NoAgentProtocolAgent>(modular_testing::FakeComponent::Args{
+        .url = modular_testing::TestHarnessBuilder::GenerateFakeUrl()});
+  }
+
+ private:
+  // |modular_testing::FakeComponent|
+  void OnCreate(fuchsia::sys::StartupInfo startup_info) {
+    // Don't publish the fuchsia.modular.Agent service!
+    FlushAddServiceCallsIfRunning();
+  }
+};
+
+// Test that an agent can still serve through its outgoing directory even if it does *not* publish
+// the fuchsia.modular.Agent protocol at all.
+TEST_F(AgentServicesSFWCompatTest, PublishToOutgoingDirectoryStillWorksWithoutAgentProtocol) {
+  auto serving_agent = NoAgentProtocolAgent::CreateWithDefaultOptions();
+
+  // Intercept this agent and use it as a client to connect to `serving_agent`.
+  auto agent = modular_testing::FakeAgent::CreateWithDefaultOptions();
+
+  // Set up the test environment with TestProtocol being served by `serving_agent`.
+  auto spec = CreateSpecWithAgentServiceIndex(
+      {{fuchsia::testing::modular::TestProtocol::Name_, serving_agent->url()}});
+  spec.mutable_sessionmgr_config()->mutable_session_agents()->push_back(agent->url());
+
+  modular_testing::TestHarnessBuilder builder(std::move(spec));
+  builder.InterceptComponent(serving_agent->BuildInterceptOptions());
+  builder.InterceptComponent(AddSandboxServices({fuchsia::testing::modular::TestProtocol::Name_},
+                                                agent->BuildInterceptOptions()));
+
+  // Publish the service as an outgoing/public service.
+  bool saw_outgoing_connection = false;
+  serving_agent->AddPublicService(
+      fit::function<void(fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol>)>(
+          [&](fidl::InterfaceRequest<fuchsia::testing::modular::TestProtocol> request) {
+            saw_outgoing_connection = true;
+          }));
+  builder.BuildAndRun(test_harness());
+
+  RunLoopUntil([&] { return agent->is_running(); });
+  ASSERT_FALSE(serving_agent->is_running());
+
+  auto protocol_ptr =
+      agent->component_context()->svc()->Connect<fuchsia::testing::modular::TestProtocol>();
+
+  RunLoopUntil([&] { return saw_outgoing_connection; });
+}
+
 }  // namespace