[trace,trace_manager] Split Start,Stop into Initialize,Start,Stop,Terminate

Bug: 22973

Also, Bug: 35876 remove fidl exclude_checks

Change-Id: I6cfccad7dd0c4c36ca9a047347001d6bb9cad7bf
diff --git a/garnet/bin/trace/cmd_utils.cc b/garnet/bin/trace/cmd_utils.cc
index cd4a9e0..ab536fb 100644
--- a/garnet/bin/trace/cmd_utils.cc
+++ b/garnet/bin/trace/cmd_utils.cc
@@ -100,4 +100,19 @@
   return uniquified_specs;
 }
 
+const char* StartErrorCodeToString(controller::StartErrorCode code) {
+  switch (code) {
+    case controller::StartErrorCode::NOT_INITIALIZED:
+      return "not initialized";
+    case controller::StartErrorCode::ALREADY_STARTED:
+      return "already started";
+    case controller::StartErrorCode::STOPPING:
+      return "stopping";
+    case controller::StartErrorCode::TERMINATING:
+      return "terminating";
+    default:
+      return "<unknown>";
+  }
+}
+
 }  // namespace tracing
diff --git a/garnet/bin/trace/cmd_utils.h b/garnet/bin/trace/cmd_utils.h
index ed24c31..5f2b1fb 100644
--- a/garnet/bin/trace/cmd_utils.h
+++ b/garnet/bin/trace/cmd_utils.h
@@ -40,6 +40,8 @@
 std::vector<controller::ProviderSpec> TranslateProviderSpecs(
     const std::vector<ProviderSpec>& specs);
 
+const char* StartErrorCodeToString(controller::StartErrorCode code);
+
 }  // namespace tracing
 
 #endif  // GARNET_BIN_TRACE_CMD_UTILS_H_
diff --git a/garnet/bin/trace/command.cc b/garnet/bin/trace/command.cc
index 13d73e8..d012dac 100644
--- a/garnet/bin/trace/command.cc
+++ b/garnet/bin/trace/command.cc
@@ -44,7 +44,7 @@
 CommandWithController::CommandWithController(sys::ComponentContext* context)
     : Command(context), controller_(context->svc()->Connect<controller::Controller>()) {
   controller_.set_error_handler([this](zx_status_t status) {
-    FXL_LOG(ERROR) << "Trace controller(controller) disconnected unexpectedly";
+    FXL_LOG(ERROR) << "Trace controller disconnected unexpectedly";
     Done(EXIT_FAILURE);
   });
 }
diff --git a/garnet/bin/trace/commands/record.cc b/garnet/bin/trace/commands/record.cc
index ebf32d2..ed89008 100644
--- a/garnet/bin/trace/commands/record.cc
+++ b/garnet/bin/trace/commands/record.cc
@@ -10,23 +10,17 @@
 #include <lib/async/default.h>
 #include <lib/fdio/spawn.h>
 #include <lib/zx/time.h>
-#include <netdb.h>
 #include <string.h>
-#include <sys/socket.h>
 #include <zircon/status.h>
 
 #include <fstream>
 #include <string>
 #include <unordered_set>
 
-#include <src/lib/files/file.h>
-#include <src/lib/files/path.h>
-#include <src/lib/files/unique_fd.h>
-#include <third_party/zlib/contrib/iostream3/zfstream.h>
-
 #include "garnet/bin/trace/results_export.h"
 #include "garnet/bin/trace/results_output.h"
-#include "src/lib/fsl/types/type_converters.h"
+#include "src/lib/files/file.h"
+#include "src/lib/files/path.h"
 #include "src/lib/fxl/logging.h"
 #include "src/lib/fxl/strings/join_strings.h"
 #include "src/lib/fxl/strings/split_string.h"
@@ -415,31 +409,39 @@
 
   tracing_ = true;
 
-  TraceConfig trace_config;
+  controller::TraceConfig trace_config;
   trace_config.set_categories(options_.categories);
   trace_config.set_buffer_size_megabytes_hint(options_.buffer_size_megabytes);
   // TODO(dje): start_timeout_milliseconds
   trace_config.set_buffering_mode(options_.buffering_mode);
   trace_config.set_provider_specs(TranslateProviderSpecs(options_.provider_specs));
 
-  tracer_->Start(
+  tracer_->Initialize(
       std::move(trace_config), options_.binary, std::move(bytes_consumer),
       std::move(record_consumer), std::move(error_handler),
-      [this] {
-        if (!options_.app.empty())
-          options_.spawn ? LaunchSpawnedApp() : LaunchComponentApp();
-        StartTimer();
-      },
       [this] { DoneTrace(); },  // TODO(37435): For now preserve existing behaviour.
       [this] { DoneTrace(); });
+
+  tracer_->Start([this](controller::Controller_StartTracing_Result result) {
+    if (result.is_err()) {
+      FXL_LOG(ERROR) << "Unable to start trace: " << StartErrorCodeToString(result.err());
+      tracing_ = false;
+      Done(EXIT_FAILURE);
+      return;
+    }
+    if (!options_.app.empty()) {
+      options_.spawn ? LaunchSpawnedApp() : LaunchComponentApp();
+    }
+    StartTimer();
+  });
 }
 
-void RecordCommand::StopTrace(int32_t return_code) {
+void RecordCommand::TerminateTrace(int32_t return_code) {
   if (tracing_) {
-    out() << "Stopping trace..." << std::endl;
+    out() << "Terminating trace..." << std::endl;
     tracing_ = false;
     return_code_ = return_code;
-    tracer_->Stop();
+    tracer_->Terminate();
     if (spawned_app_ && !options_.detach) {
       KillSpawnedApp();
     }
@@ -558,7 +560,7 @@
 void RecordCommand::LaunchComponentApp() {
   fuchsia::sys::LaunchInfo launch_info;
   launch_info.url = options_.app;
-  launch_info.arguments = fidl::To<fidl::VectorPtr<std::string>>(options_.args);
+  launch_info.arguments = options_.args;
 
   // Include the arguments here for when invoked by traceutil: It's useful to
   // see how the passed command+args ended up after shell processing.
@@ -588,8 +590,8 @@
           << std::endl;
     if (!options_.decouple)
       // The trace might have been already stopped by the |Wait()| callback. In
-      // that case, |StopTrace| below does nothing.
-      StopTrace(EXIT_FAILURE);
+      // that case, |TerminateTrace| below does nothing.
+      TerminateTrace(EXIT_FAILURE);
   });
   component_controller_.events().OnTerminated =
       [this](int64_t return_code, fuchsia::sys::TerminationReason termination_reason) {
@@ -599,9 +601,9 @@
         component_controller_.set_error_handler([](zx_status_t error) {});
         if (!options_.decouple) {
           if (options_.return_child_result) {
-            StopTrace(return_code);
+            TerminateTrace(return_code);
           } else {
-            StopTrace(EXIT_SUCCESS);
+            TerminateTrace(EXIT_SUCCESS);
           }
         }
       };
@@ -621,7 +623,7 @@
   zx::process subprocess;
   zx_status_t status = Spawn(all_args, &subprocess);
   if (status != ZX_OK) {
-    StopTrace(EXIT_FAILURE);
+    TerminateTrace(EXIT_FAILURE);
     FXL_LOG(ERROR) << "Subprocess launch failed: \"" << status
                    << "\" Did you provide the full path to the tool?";
     return;
@@ -638,7 +640,7 @@
                                      zx_status_t status, const zx_packet_signal_t* signal) {
   if (status != ZX_OK) {
     FXL_LOG(ERROR) << "Failed to wait for spawned app: status=" << status;
-    StopTrace(EXIT_FAILURE);
+    TerminateTrace(EXIT_FAILURE);
     return;
   }
 
@@ -651,9 +653,9 @@
     out() << "Application exited with return code " << proc_info.return_code << std::endl;
     if (!options_.decouple) {
       if (options_.return_child_result) {
-        StopTrace(proc_info.return_code);
+        TerminateTrace(proc_info.return_code);
       } else {
-        StopTrace(EXIT_SUCCESS);
+        TerminateTrace(EXIT_SUCCESS);
       }
     }
   } else {
@@ -677,7 +679,7 @@
       dispatcher_,
       [weak = weak_ptr_factory_.GetWeakPtr()] {
         if (weak)
-          weak->StopTrace(EXIT_SUCCESS);
+          weak->TerminateTrace(EXIT_SUCCESS);
       },
       zx::nsec(options_.duration.ToNanoseconds()));
   out() << "Starting trace; will stop in " << options_.duration.ToSecondsF() << " seconds..."
diff --git a/garnet/bin/trace/commands/record.h b/garnet/bin/trace/commands/record.h
index 71f16a9..487935b 100644
--- a/garnet/bin/trace/commands/record.h
+++ b/garnet/bin/trace/commands/record.h
@@ -62,7 +62,7 @@
   void Start(const fxl::CommandLine& command_line) override;
 
  private:
-  void StopTrace(int32_t return_code);
+  void TerminateTrace(int32_t return_code);
   void ProcessMeasurements();
   void DoneTrace();
   void LaunchComponentApp();
@@ -77,10 +77,13 @@
   fuchsia::sys::EnvironmentControllerPtr environment_controller_;
   zx::process spawned_app_;
   async::WaitMethod<RecordCommand, &RecordCommand::OnSpawnedAppExit> wait_spawned_app_;
+
   std::unique_ptr<std::ostream> binary_out_;
   // TODO(PT-113): Remove |exporter_|.
   std::unique_ptr<ChromiumExporter> exporter_;
+
   std::unique_ptr<Tracer> tracer_;
+
   // Aggregate events if there are any measurements to be performed, so that we
   // can sort them by timestamp and process in order.
   bool aggregate_events_ = false;
@@ -91,6 +94,7 @@
   std::unique_ptr<measure::MeasureDuration> measure_duration_;
   std::unique_ptr<measure::MeasureTimeBetween> measure_time_between_;
   std::unique_ptr<measure::MeasureArgumentValue> measure_argument_value_;
+
   bool tracing_ = false;
   int32_t return_code_ = 0;
   Options options_;
diff --git a/garnet/bin/trace/tracer.cc b/garnet/bin/trace/tracer.cc
index 1c9efd8..6219225 100644
--- a/garnet/bin/trace/tracer.cc
+++ b/garnet/bin/trace/tracer.cc
@@ -2,6 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+// This file provides the FIDL interface for "trace record".
+
 #include "garnet/bin/trace/tracer.h"
 
 #include <lib/async/cpp/task.h>
@@ -24,16 +26,10 @@
 
 Tracer::~Tracer() { CloseSocket(); }
 
-void Tracer::Start(TraceConfig config, bool binary, BytesConsumer bytes_consumer,
-                   RecordConsumer record_consumer, ErrorHandler error_handler,
-                   StartCallback start_callback, FailCallback fail_callback,
-                   DoneCallback done_callback) {
-  FXL_DCHECK(state_ == State::kStopped);
-
-  state_ = State::kStarted;
-  start_callback_ = std::move(start_callback);
-  fail_callback_ = std::move(fail_callback);
-  done_callback_ = std::move(done_callback);
+void Tracer::Initialize(controller::TraceConfig config, bool binary, BytesConsumer bytes_consumer,
+                        RecordConsumer record_consumer, ErrorHandler error_handler,
+                        FailCallback fail_callback, DoneCallback done_callback) {
+  FXL_DCHECK(state_ == State::kReady);
 
   zx::socket outgoing_socket;
   zx_status_t status = zx::socket::create(0u, &socket_, &outgoing_socket);
@@ -43,30 +39,56 @@
     return;
   }
 
-  controller_->StartTracing(std::move(config), std::move(outgoing_socket),
-                            [this]() { start_callback_(); });
+  controller_->InitializeTracing(std::move(config), std::move(outgoing_socket));
 
   binary_ = binary;
   bytes_consumer_ = std::move(bytes_consumer);
   reader_.reset(new trace::TraceReader(std::move(record_consumer), std::move(error_handler)));
 
+  fail_callback_ = std::move(fail_callback);
+  done_callback_ = std::move(done_callback);
+
   dispatcher_ = async_get_default_dispatcher();
   wait_.set_object(socket_.get());
   status = wait_.Begin(dispatcher_);
   FXL_CHECK(status == ZX_OK) << "Failed to add handler: status=" << status;
+
+  state_ = State::kInitialized;
 }
 
-void Tracer::Stop() {
-  // Note: The controller will close the socket when finished.
-  if (state_ == State::kStarted) {
-    state_ = State::kStopping;
-    controller_->StopTracing();
-  }
+void Tracer::Start(StartCallback start_callback) {
+  FXL_DCHECK(state_ == State::kInitialized);
+
+  start_callback_ = std::move(start_callback);
+
+  // All our categories are passed when we initialize, and we're just
+  // starting tracing so the buffer is already empty, so there's nothing to
+  // pass for |StartOptions| here.
+  controller::StartOptions start_options{};
+
+  controller_->StartTracing(std::move(start_options),
+                            [this](controller::Controller_StartTracing_Result result) {
+                              start_callback_(std::move(result));
+                            });
+
+  state_ = State::kStarted;
+}
+
+void Tracer::Terminate() {
+  // Note: The controller will close the consumer socket when finished.
+  FXL_DCHECK(state_ != State::kReady);
+  state_ = State::kTerminating;
+  controller::TerminateOptions terminate_options{};
+  terminate_options.set_write_results(true);
+  controller_->TerminateTracing(std::move(terminate_options),
+                                [](controller::TerminateResult result) {
+                                  // TODO(dje): Print provider stats.
+                                });
 }
 
 void Tracer::OnHandleReady(async_dispatcher_t* dispatcher, async::WaitBase* wait,
                            zx_status_t status, const zx_packet_signal_t* signal) {
-  FXL_DCHECK(state_ == State::kStarted || state_ == State::kStopping);
+  FXL_DCHECK(state_ == State::kStarted || state_ == State::kTerminating);
 
   if (status != ZX_OK) {
     OnHandleError(status);
@@ -144,14 +166,12 @@
   }
 }
 
-void Tracer::Fail() {
-  fail_callback_();
-}
+void Tracer::Fail() { fail_callback_(); }
 
 void Tracer::Done() {
-  FXL_DCHECK(state_ == State::kStarted || state_ == State::kStopping);
+  FXL_DCHECK(state_ == State::kStarted || state_ == State::kTerminating);
 
-  state_ = State::kStopped;
+  state_ = State::kTerminated;
   reader_.reset();
 
   CloseSocket();
diff --git a/garnet/bin/trace/tracer.h b/garnet/bin/trace/tracer.h
index 38675d3..30cfc02 100644
--- a/garnet/bin/trace/tracer.h
+++ b/garnet/bin/trace/tracer.h
@@ -5,13 +5,14 @@
 #ifndef GARNET_BIN_TRACE_TRACER_H_
 #define GARNET_BIN_TRACE_TRACER_H_
 
-#include <array>
-#include <string>
-
 #include <fuchsia/tracing/controller/cpp/fidl.h>
 #include <lib/async/cpp/wait.h>
 #include <lib/fit/function.h>
 #include <lib/zx/socket.h>
+
+#include <array>
+#include <string>
+
 #include <trace-engine/fields.h>
 #include <trace-reader/reader.h>
 
@@ -19,9 +20,6 @@
 
 namespace controller = ::fuchsia::tracing::controller;
 
-// TODO(dje): Temporary alias to ease renaming of TraceOptions to TraceConfig.
-using TraceConfig = controller::TraceOptions;
-
 // Runs traces.
 class Tracer {
  public:
@@ -31,30 +29,38 @@
   using ErrorHandler = trace::TraceReader::ErrorHandler;
 
   // Called when tracing has completed starting.
-  using StartCallback = fit::closure;
+  using StartCallback = fit::callback<void(controller::Controller_StartTracing_Result result)>;
 
   // This is called when there's a failure and trace processing must stop.
-  using FailCallback = fit::closure;
+  using FailCallback = fit::callback<void()>;
 
   // This is called on successfully writing trace results.
+  // This is not a |fit::callback| as async::PostTask takes a |fit::closure|.
   using DoneCallback = fit::closure;
 
   explicit Tracer(controller::Controller* controller);
   ~Tracer();
 
-  // Starts tracing.
+  // Initialize tracing.
   // Streams records |record_consumer| and errors to |error_handler|.
-  // Invokes |done_callback| when tracing stops.
+  // Invokes |done_callback| when tracing stops and |fail_callback| upon failure.
   // TODO(PT-113): Remove |binary,record_consumer,error_handler|
-  void Start(TraceConfig config, bool binary, BytesConsumer bytes_consumer,
-             RecordConsumer record_consumer, ErrorHandler error_handler,
-             StartCallback start_callback, FailCallback fail_callback, DoneCallback done_callback);
+  void Initialize(controller::TraceConfig config, bool binary, BytesConsumer bytes_consumer,
+                  RecordConsumer record_consumer, ErrorHandler error_handler,
+                  FailCallback fail_callback, DoneCallback done_callback);
 
-  // Stops the trace.
-  // Does nothing if not started or if already stopping.
-  void Stop();
+  // Start tracing.
+  // Tracing must not already be started.
+  // |start_callback| is called when tracing has fully started.
+  void Start(StartCallback start_callback);
+
+  // Terminates the trace.
+  // Does nothing if not started or if already terminating.
+  void Terminate();
 
  private:
+  enum class State { kReady, kInitialized, kStarted, kTerminating, kTerminated };
+
   // Note: Buffer needs to be big enough to store records of maximum size.
   static constexpr size_t kReadBufferSize = trace::RecordFields::kMaxRecordSizeBytes * 4;
 
@@ -69,9 +75,8 @@
 
   controller::Controller* const controller_;
 
-  enum class State { kStopped, kStarted, kStopping };
+  State state_ = State::kReady;
 
-  State state_ = State::kStopped;
   StartCallback start_callback_;
   FailCallback fail_callback_;
   DoneCallback done_callback_;
diff --git a/garnet/bin/trace_manager/app.cc b/garnet/bin/trace_manager/app.cc
index 1d37efc..0796248 100644
--- a/garnet/bin/trace_manager/app.cc
+++ b/garnet/bin/trace_manager/app.cc
@@ -10,18 +10,17 @@
 
 namespace tracing {
 
-TraceManagerApp::TraceManagerApp(std::unique_ptr<sys::ComponentContext> context,
-                                 const Config& config)
-    : context_(std::move(context)), trace_manager_(context_.get(), config) {
+TraceManagerApp::TraceManagerApp(std::unique_ptr<sys::ComponentContext> context, Config config)
+    : context_(std::move(context)), trace_manager_(this, context_.get(), std::move(config)) {
   [[maybe_unused]] zx_status_t status;
 
-  status =
-      context_->outgoing()->AddPublicService(provider_registry_bindings_.GetHandler(&trace_manager_));
+  status = context_->outgoing()->AddPublicService(
+      provider_registry_bindings_.GetHandler(&trace_manager_));
   FXL_DCHECK(status == ZX_OK);
 
-  status =
-      context_->outgoing()->AddPublicService(controller_bindings_.GetHandler(&trace_manager_));
+  status = context_->outgoing()->AddPublicService(controller_bindings_.GetHandler(&trace_manager_));
   FXL_DCHECK(status == ZX_OK);
+  controller_bindings_.set_empty_set_handler([this]() { trace_manager_.OnEmptyControllerSet(); });
 
   FXL_VLOG(2) << "TraceManager services registered";
 }
diff --git a/garnet/bin/trace_manager/app.h b/garnet/bin/trace_manager/app.h
index 7ead15d..7460238 100644
--- a/garnet/bin/trace_manager/app.h
+++ b/garnet/bin/trace_manager/app.h
@@ -18,13 +18,17 @@
 
 class TraceManagerApp {
  public:
-  explicit TraceManagerApp(std::unique_ptr<sys::ComponentContext> context, const Config& config);
+  explicit TraceManagerApp(std::unique_ptr<sys::ComponentContext> context, Config config);
   ~TraceManagerApp();
 
   // For testing.
   sys::ComponentContext* context() const { return context_.get(); }
   const TraceManager* trace_manager() const { return &trace_manager_; }
 
+  const fidl::BindingSet<controller::Controller>& controller_bindings() const {
+    return controller_bindings_;
+  }
+
  private:
   std::unique_ptr<sys::ComponentContext> context_;
 
diff --git a/garnet/bin/trace_manager/main.cc b/garnet/bin/trace_manager/main.cc
index de74975..3fefb7b 100644
--- a/garnet/bin/trace_manager/main.cc
+++ b/garnet/bin/trace_manager/main.cc
@@ -2,10 +2,9 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include <stdlib.h>
-
 #include <lib/async-loop/cpp/loop.h>
 #include <lib/async-loop/default.h>
+#include <stdlib.h>
 
 #include "garnet/bin/trace_manager/app.h"
 #include "src/lib/fxl/command_line.h"
@@ -37,7 +36,7 @@
   FXL_LOG(INFO) << "Trace Manager starting with config: " << config_file;
 
   async::Loop loop(&kAsyncLoopConfigAttachToCurrentThread);
-  tracing::TraceManagerApp trace_manager_app(sys::ComponentContext::Create(), config);
+  tracing::TraceManagerApp trace_manager_app{sys::ComponentContext::Create(), std::move(config)};
   loop.Run();
   return EXIT_SUCCESS;
 }
diff --git a/garnet/bin/trace_manager/tests/BUILD.gn b/garnet/bin/trace_manager/tests/BUILD.gn
index 07c8760..45c6af0 100644
--- a/garnet/bin/trace_manager/tests/BUILD.gn
+++ b/garnet/bin/trace_manager/tests/BUILD.gn
@@ -19,7 +19,12 @@
   sources = [
     "fake_provider.cc",
     "fake_provider.h",
+    "init_to_fini_tests.cc",
+    "initialize_tests.cc",
     "provider_tests.cc",
+    "start_tests.cc",
+    "stop_tests.cc",
+    "terminate_tests.cc",
     "trace_manager_test.cc",
     "trace_manager_test.h",
   ]
diff --git a/garnet/bin/trace_manager/tests/init_to_fini_tests.cc b/garnet/bin/trace_manager/tests/init_to_fini_tests.cc
new file mode 100644
index 0000000..7573a30
--- /dev/null
+++ b/garnet/bin/trace_manager/tests/init_to_fini_tests.cc
@@ -0,0 +1,111 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <src/lib/fxl/logging.h>
+
+#include "garnet/bin/trace_manager/tests/trace_manager_test.h"
+
+namespace tracing {
+namespace test {
+
+TEST_F(TraceManagerTest, InitToFini) {
+  ConnectToControllerService();
+
+  FakeProvider* provider;
+  ASSERT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name, &provider));
+  EXPECT_EQ(fake_provider_bindings().size(), 1u);
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+  VerifyCounts(1, 0);
+
+  ASSERT_TRUE(StopSession());
+  VerifyCounts(1, 1);
+
+  ASSERT_TRUE(StartSession());
+  VerifyCounts(2, 1);
+
+  ASSERT_TRUE(StopSession());
+  VerifyCounts(2, 2);
+
+  ASSERT_TRUE(TerminateSession());
+  VerifyCounts(2, 2);
+}
+
+TEST_F(TraceManagerTest, InitToFiniWithNoProviders) {
+  ConnectToControllerService();
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+  VerifyCounts(1, 0);
+
+  ASSERT_TRUE(StopSession());
+  VerifyCounts(1, 1);
+
+  ASSERT_TRUE(StartSession());
+  VerifyCounts(2, 1);
+
+  ASSERT_TRUE(StopSession());
+  VerifyCounts(2, 2);
+
+  ASSERT_TRUE(TerminateSession());
+  VerifyCounts(2, 2);
+}
+
+TEST_F(TraceManagerTest, InitToFiniWithProviderAddedAfterSessionStarts) {
+  ConnectToControllerService();
+
+  FakeProvider* provider1;
+  ASSERT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name, &provider1));
+  EXPECT_EQ(fake_provider_bindings().size(), 1u);
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+  VerifyCounts(1, 0);
+
+  FakeProvider* provider2;
+  ASSERT_TRUE(AddFakeProvider(kProvider2Pid, kProvider2Name, &provider2));
+  EXPECT_EQ(fake_provider_bindings().size(), 2u);
+
+  // Given the session a chance to start the new provider before we stop it.
+  RunLoopUntilIdle();
+  ASSERT_EQ(provider2->state(), FakeProvider::State::kStarting);
+  provider2->MarkStarted();
+  // Give TraceSession a chance to process the STARTED fifo packet.
+  RunLoopUntilIdle();
+
+  ASSERT_TRUE(StopSession());
+  VerifyCounts(1, 1);
+
+  ASSERT_TRUE(StartSession());
+  VerifyCounts(2, 1);
+
+  ASSERT_TRUE(StopSession());
+  VerifyCounts(2, 2);
+
+  ASSERT_TRUE(TerminateSession());
+  VerifyCounts(2, 2);
+}
+
+TEST_F(TraceManagerTest, InitToFiniWithNoStop) {
+  ConnectToControllerService();
+
+  FakeProvider* provider;
+  ASSERT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name, &provider));
+  EXPECT_EQ(fake_provider_bindings().size(), 1u);
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+  VerifyCounts(1, 0);
+
+  ASSERT_TRUE(TerminateSession());
+  VerifyCounts(1, 0);
+}
+
+}  // namespace test
+}  // namespace tracing
diff --git a/garnet/bin/trace_manager/tests/initialize_tests.cc b/garnet/bin/trace_manager/tests/initialize_tests.cc
new file mode 100644
index 0000000..b5c9256
--- /dev/null
+++ b/garnet/bin/trace_manager/tests/initialize_tests.cc
@@ -0,0 +1,38 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <src/lib/fxl/logging.h>
+
+#include "garnet/bin/trace_manager/tests/trace_manager_test.h"
+
+namespace tracing {
+namespace test {
+
+TEST_F(TraceManagerTest, DuplicateInitialization) {
+  ConnectToControllerService();
+
+  ASSERT_TRUE(InitializeSession());
+
+  zx::socket our_socket, their_socket;
+  zx_status_t status = zx::socket::create(0u, &our_socket, &their_socket);
+  ASSERT_EQ(status, ZX_OK);
+
+  controller::TraceConfig config{GetDefaultTraceConfig()};
+  controller()->InitializeTracing(std::move(config), std::move(their_socket));
+  // There's no state transition here that would trigger a call to
+  // |LoopQuit()|. Nor is there a result.
+  // Mostly we just want to verify things don't hang.
+  RunLoopUntilIdle();
+}
+
+TEST_F(TraceManagerTest, InitializeWithoutProviders) {
+  ConnectToControllerService();
+
+  ASSERT_TRUE(InitializeSession());
+
+  EXPECT_EQ(GetSessionState(), SessionState::kInitialized);
+}
+
+}  // namespace test
+}  // namespace tracing
diff --git a/garnet/bin/trace_manager/tests/start_tests.cc b/garnet/bin/trace_manager/tests/start_tests.cc
new file mode 100644
index 0000000..4b92449
--- /dev/null
+++ b/garnet/bin/trace_manager/tests/start_tests.cc
@@ -0,0 +1,140 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <src/lib/fxl/logging.h>
+
+#include "garnet/bin/trace_manager/tests/trace_manager_test.h"
+
+namespace tracing {
+namespace test {
+
+using SessionState = TraceManagerTest::SessionState;
+
+// This only works when no other condition could cause the loop to exit.
+// E.g., This doesn't work if the state is kStopping or kTerminating as the
+// transition to kStopped,kTerminated will also cause the loop to exit.
+template <typename T>
+controller::Controller_StartTracing_Result TryStart(TraceManagerTest* fixture,
+                                                    const T& interface_ptr) {
+  controller::Controller_StartTracing_Result start_result;
+  controller::StartOptions start_options{fixture->GetDefaultStartOptions()};
+  bool start_completed = false;
+  interface_ptr->StartTracing(std::move(start_options),
+                              [fixture, &start_completed,
+                               &start_result](controller::Controller_StartTracing_Result result) {
+                                start_completed = true;
+                                start_result = std::move(result);
+                                fixture->QuitLoop();
+                              });
+  fixture->RunLoopUntilIdle();
+  FXL_VLOG(2) << "Start loop done";
+  EXPECT_TRUE(start_completed);
+  return start_result;
+}
+
+TEST_F(TraceManagerTest, StartUninitialized) {
+  ConnectToControllerService();
+
+  controller::Controller_StartTracing_Result start_result{TryStart(this, controller())};
+  EXPECT_TRUE(start_result.is_err());
+  EXPECT_EQ(start_result.err(), controller::StartErrorCode::NOT_INITIALIZED);
+}
+
+template <typename T>
+void TryExtraStart(TraceManagerTest* fixture, const T& interface_ptr) {
+  controller::Controller_StartTracing_Result start_result{TryStart(fixture, interface_ptr)};
+  EXPECT_EQ(fixture->GetSessionState(), SessionState::kStarted);
+  EXPECT_TRUE(start_result.is_err());
+  EXPECT_EQ(start_result.err(), controller::StartErrorCode::ALREADY_STARTED);
+}
+
+TEST_F(TraceManagerTest, ExtraStart) {
+  ConnectToControllerService();
+
+  EXPECT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name));
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+
+  // Now try starting again.
+  TryExtraStart(this, controller());
+}
+
+TEST_F(TraceManagerTest, StartWhileStopping) {
+  ConnectToControllerService();
+
+  EXPECT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name));
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+
+  controller::StopOptions stop_options{GetDefaultStopOptions()};
+  controller()->StopTracing(std::move(stop_options), []() {});
+  RunLoopUntilIdle();
+  // The loop will exit for the transition to kStopping.
+  FXL_VLOG(2) << "Loop done, expecting session stopping";
+  ASSERT_EQ(GetSessionState(), SessionState::kStopping);
+
+  // Now try a Start while we're still in |kStopping|.
+  // The provider doesn't advance state until we tell it to, so we should
+  // still remain in |kStopping|.
+  controller::Controller_StartTracing_Result result;
+  controller::StartOptions start_options{GetDefaultStartOptions()};
+  bool start_completed = false;
+  controller()->StartTracing(
+      std::move(start_options),
+      [this, &start_completed, &result](controller::Controller_StartTracing_Result in_result) {
+        start_completed = true;
+        result = std::move(in_result);
+        QuitLoop();
+      });
+  RunLoopUntilIdle();
+  FXL_VLOG(2) << "Start loop done";
+  EXPECT_TRUE(GetSessionState() == SessionState::kStopping);
+  ASSERT_TRUE(start_completed);
+  ASSERT_TRUE(result.is_err());
+  EXPECT_EQ(result.err(), controller::StartErrorCode::STOPPING);
+}
+
+TEST_F(TraceManagerTest, StartWhileTerminating) {
+  ConnectToControllerService();
+
+  EXPECT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name));
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+
+  ASSERT_TRUE(StopSession());
+
+  controller::TerminateOptions options{GetDefaultTerminateOptions()};
+  controller()->TerminateTracing(std::move(options), [](controller::TerminateResult result) {});
+  RunLoopUntilIdle();
+  ASSERT_EQ(GetSessionState(), SessionState::kTerminating);
+
+  // Now try a Start while we're still in |kTerminating|.
+  // The provider doesn't advance state until we tell it to, so we should
+  // still remain in |kTerminating|.
+  controller::Controller_StartTracing_Result result;
+  controller::StartOptions start_options{GetDefaultStartOptions()};
+  bool start_completed = false;
+  controller()->StartTracing(
+      std::move(start_options),
+      [this, &start_completed, &result](controller::Controller_StartTracing_Result in_result) {
+        start_completed = true;
+        result = std::move(in_result);
+        QuitLoop();
+      });
+  RunLoopUntilIdle();
+  FXL_VLOG(2) << "Start loop done";
+  EXPECT_TRUE(GetSessionState() == SessionState::kTerminating);
+  ASSERT_TRUE(start_completed);
+  ASSERT_TRUE(result.is_err());
+  EXPECT_EQ(result.err(), controller::StartErrorCode::TERMINATING);
+}
+
+}  // namespace test
+}  // namespace tracing
diff --git a/garnet/bin/trace_manager/tests/stop_tests.cc b/garnet/bin/trace_manager/tests/stop_tests.cc
new file mode 100644
index 0000000..46e5f11
--- /dev/null
+++ b/garnet/bin/trace_manager/tests/stop_tests.cc
@@ -0,0 +1,123 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <src/lib/fxl/logging.h>
+
+#include "garnet/bin/trace_manager/tests/trace_manager_test.h"
+
+namespace tracing {
+namespace test {
+
+using SessionState = TraceManagerTest::SessionState;
+
+TEST_F(TraceManagerTest, StopUninitialize) {
+  // There's no error result. Mostly we want to verify we don't crash/hang.
+  ConnectToControllerService();
+
+  controller::StopOptions stop_options{GetDefaultStopOptions()};
+  bool stop_completed = false;
+  controller()->StopTracing(std::move(stop_options), [this, &stop_completed]() {
+    stop_completed = true;
+    QuitLoop();
+  });
+  RunLoopUntilIdle();
+  FXL_VLOG(2) << "Loop done";
+  EXPECT_TRUE(stop_completed);
+}
+
+template <typename T>
+void TryExtraStop(TraceManagerTest* fixture, const T& interface_ptr) {
+  controller::StopOptions stop_options{fixture->GetDefaultStopOptions()};
+  bool stop_completed = false;
+  interface_ptr->StopTracing(std::move(stop_options), [fixture, &stop_completed]() {
+    stop_completed = true;
+    fixture->QuitLoop();
+  });
+  fixture->RunLoopUntilIdle();
+  FXL_VLOG(2) << "Loop done, expecting session still stopped";
+  EXPECT_TRUE(stop_completed);
+  EXPECT_EQ(fixture->GetSessionState(), SessionState::kStopped);
+}
+
+TEST_F(TraceManagerTest, ExtraStop) {
+  ConnectToControllerService();
+
+  EXPECT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name));
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+
+  ASSERT_TRUE(StopSession());
+
+  // Now try stopping again.
+  // There's no error result. Mostly we want to verify we don't crash/hang.
+  TryExtraStop(this, controller());
+}
+
+TEST_F(TraceManagerTest, StopWhileStopping) {
+  ConnectToControllerService();
+
+  FakeProvider* provider;
+  EXPECT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name, &provider));
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+
+  controller::StopOptions stop1_options{GetDefaultStopOptions()};
+  controller()->StopTracing(std::move(stop1_options), []() {});
+  RunLoopUntilIdle();
+  // The loop will exit for the transition to kStopping.
+  FXL_VLOG(2) << "Loop done, expecting session stopping";
+  EXPECT_EQ(GetSessionState(), SessionState::kStopping);
+
+  // Now try another Stop while we're still in |kStopping|.
+  // The provider doesn't advance state until we tell it to, so we should
+  // still remain in |kStopping|.
+  controller::StopOptions stop2_options{GetDefaultStopOptions()};
+  bool stop_completed = false;
+  controller()->StopTracing(std::move(stop2_options), [this, &stop_completed]() {
+    stop_completed = true;
+    QuitLoop();
+  });
+  RunLoopUntilIdle();
+  FXL_VLOG(2) << "Stop loop done";
+  EXPECT_TRUE(stop_completed);
+  EXPECT_TRUE(GetSessionState() == SessionState::kStopping);
+}
+
+TEST_F(TraceManagerTest, StopWhileTerminating) {
+  ConnectToControllerService();
+
+  EXPECT_TRUE(AddFakeProvider(kProvider1Pid, kProvider1Name));
+
+  ASSERT_TRUE(InitializeSession());
+
+  ASSERT_TRUE(StartSession());
+
+  ASSERT_TRUE(StopSession());
+
+  controller::TerminateOptions options{GetDefaultTerminateOptions()};
+  controller()->TerminateTracing(std::move(options), [](controller::TerminateResult result) {});
+  RunLoopUntilIdle();
+  ASSERT_EQ(GetSessionState(), SessionState::kTerminating);
+
+  // Now try a Stop while we're still in |kTerminating|.
+  // The provider doesn't advance state until we tell it to, so we should
+  // still remain in |kTerminating|.
+  controller::StopOptions stop_options{GetDefaultStopOptions()};
+  bool stop_completed = false;
+  controller()->StopTracing(std::move(stop_options), [this, &stop_completed]() {
+    stop_completed = true;
+    QuitLoop();
+  });
+  RunLoopUntilIdle();
+  FXL_VLOG(2) << "Stop loop done";
+  EXPECT_TRUE(stop_completed);
+  EXPECT_TRUE(GetSessionState() == SessionState::kTerminating);
+}
+
+}  // namespace test
+}  // namespace tracing
diff --git a/garnet/bin/trace_manager/tests/terminate_tests.cc b/garnet/bin/trace_manager/tests/terminate_tests.cc
new file mode 100644
index 0000000..349e1b7
--- /dev/null
+++ b/garnet/bin/trace_manager/tests/terminate_tests.cc
@@ -0,0 +1,42 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <src/lib/fxl/logging.h>
+
+#include "garnet/bin/trace_manager/tests/trace_manager_test.h"
+
+namespace tracing {
+namespace test {
+
+TEST_F(TraceManagerTest, TerminateOnClose) {
+  ConnectToControllerService();
+
+  ASSERT_TRUE(InitializeSession());
+
+  EXPECT_EQ(GetSessionState(), SessionState::kInitialized);
+
+  DisconnectFromControllerService();
+
+  RunLoopUntilIdle();
+  EXPECT_EQ(GetSessionState(), SessionState::kNonexistent);
+}
+
+TEST_F(TraceManagerTest, TerminateWhenNotInitialized) {
+  ConnectToControllerService();
+
+  controller::TerminateOptions options;
+  options.set_write_results(false);
+  bool terminated = false;
+  controller()->TerminateTracing(
+      std::move(options), [&terminated](controller::TerminateResult result) { terminated = true; });
+
+  RunLoopUntilIdle();
+
+  // There is no error result in this case.
+  // Mostly we just want to verify we don't crash/hang.
+  ASSERT_TRUE(terminated);
+}
+
+}  // namespace test
+}  // namespace tracing
diff --git a/garnet/bin/trace_manager/tests/trace_manager_test.cc b/garnet/bin/trace_manager/tests/trace_manager_test.cc
index 9e30f6d..dcc18b6 100644
--- a/garnet/bin/trace_manager/tests/trace_manager_test.cc
+++ b/garnet/bin/trace_manager/tests/trace_manager_test.cc
@@ -2,14 +2,17 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include <utility>
-
 #include "garnet/bin/trace_manager/tests/trace_manager_test.h"
 
+#include <utility>
+
 namespace tracing {
 namespace test {
 
 TraceManagerTest::TraceManagerTest() {
+  controller_.events().OnSessionStateChange = [this](controller::SessionState state) {
+    FidlOnSessionStateChange(state);
+  };
 }
 
 void TraceManagerTest::SetUp() {
@@ -29,15 +32,16 @@
 }
 
 void TraceManagerTest::ConnectToControllerService() {
+  FXL_VLOG(2) << "ConnectToControllerService";
   context_provider().ConnectToPublicService(controller_.NewRequest());
 }
 
 void TraceManagerTest::DisconnectFromControllerService() {
+  FXL_VLOG(2) << "DisconnectFromControllerService";
   controller_.Unbind();
 }
 
-bool TraceManagerTest::AddFakeProvider(zx_koid_t pid,
-                                       const std::string& name,
+bool TraceManagerTest::AddFakeProvider(zx_koid_t pid, const std::string& name,
                                        FakeProvider** out_provider) {
   provider::RegistryPtr registry;
   context_provider().ConnectToPublicService(registry.NewRequest());
@@ -45,14 +49,12 @@
   auto provider_impl = std::make_unique<FakeProvider>(pid, name);
   auto provider = std::make_unique<FakeProviderBinding>(std::move(provider_impl));
 
-  fidl::InterfaceHandle<provider::Provider> provider_client{
-      provider->NewBinding()};
+  fidl::InterfaceHandle<provider::Provider> provider_client{provider->NewBinding()};
   if (!provider_client.is_valid()) {
     return false;
   }
 
-  registry->RegisterProvider(std::move(provider_client),
-                             provider->impl()->pid(),
+  registry->RegisterProvider(std::move(provider_client), provider->impl()->pid(),
                              provider->impl()->name());
   if (out_provider) {
     *out_provider = provider->impl().get();
@@ -61,16 +63,24 @@
   return true;
 }
 
-TraceManagerTest::SessionState
-TraceManagerTest::GetSessionState() const {
+void TraceManagerTest::OnSessionStateChange() {
+  FXL_VLOG(2) << "Session state change, QuitLoop";
+  QuitLoop();
+}
+
+TraceManagerTest::SessionState TraceManagerTest::GetSessionState() const {
   if (trace_manager()->session()) {
     switch (trace_manager()->session()->state()) {
-#define TRANSLATE_STATE(state) \
-    case TraceSession::State::state: return SessionState::state;
-    TRANSLATE_STATE(kReady);
-    TRANSLATE_STATE(kStarted);
-    TRANSLATE_STATE(kStopping);
-    TRANSLATE_STATE(kStopped);
+#define TRANSLATE_STATE(state)     \
+  case TraceSession::State::state: \
+    return SessionState::state;
+      TRANSLATE_STATE(kReady);
+      TRANSLATE_STATE(kInitialized);
+      TRANSLATE_STATE(kStarting);
+      TRANSLATE_STATE(kStarted);
+      TRANSLATE_STATE(kStopping);
+      TRANSLATE_STATE(kStopped);
+      TRANSLATE_STATE(kTerminating);
 #undef TRANSLATE_STATE
     }
   }
@@ -78,9 +88,9 @@
 }
 
 // static
-TraceConfig TraceManagerTest::GetDefaultTraceConfig() {
+controller::TraceConfig TraceManagerTest::GetDefaultTraceConfig() {
   std::vector<std::string> categories{kTestCategory};
-  TraceConfig config;
+  controller::TraceConfig config;
   config.set_categories(std::move(categories));
   config.set_buffer_size_megabytes_hint(kDefaultBufferSizeMegabytes);
   config.set_start_timeout_milliseconds(kDefaultStartTimeoutMilliseconds);
@@ -88,7 +98,7 @@
   return config;
 }
 
-void TraceManagerTest::StartSessionWorker(TraceConfig config, bool* success) {
+void TraceManagerTest::InitializeSessionWorker(controller::TraceConfig config, bool* success) {
   // Require a mode to be set, no default here.
   FXL_CHECK(config.has_buffering_mode());
 
@@ -96,21 +106,73 @@
   zx_status_t status = zx::socket::create(0u, &our_socket, &their_socket);
   ASSERT_EQ(status, ZX_OK);
 
-  bool start_completed = false;
-  auto callback = [&start_completed]() {
-    start_completed = true;
-  };
-  controller_->StartTracing(std::move(config), std::move(their_socket), callback);
+  controller_->InitializeTracing(std::move(config), std::move(their_socket));
+  RunLoopUntilIdle();
+  FXL_VLOG(2) << "Loop done, expecting session initialized";
+  ASSERT_EQ(GetSessionState(), SessionState::kInitialized);
 
+  // Run one more time to finish up provider initialization. This happens
+  // after the session transitions to the initialized state, but after all
+  // providers have been told to initialize. Since everything is happening
+  // on one thread, we can assume that when the loop is idle all registered
+  // providers have initialized.
+  // This doesn't run forever as there's no session state change involved.
   RunLoopUntilIdle();
 
+  // The counts always have a fixed value here.
+  VerifyCounts(0, 0);
+
+  destination_ = std::move(our_socket);
+
+  *success = true;
+}
+
+bool TraceManagerTest::InitializeSession(controller::TraceConfig config) {
+  bool success{};
+  FXL_VLOG(2) << "Initializing session";
+  InitializeSessionWorker(std::move(config), &success);
+  if (success) {
+    FXL_VLOG(2) << "Session initialized";
+  }
+  return success;
+}
+
+// static
+controller::StartOptions TraceManagerTest::GetDefaultStartOptions() {
+  std::vector<std::string> additional_categories{};
+  controller::StartOptions options;
+  options.set_buffer_disposition(controller::BufferDisposition::RETAIN);
+  options.set_additional_categories(std::move(additional_categories));
+  return options;
+}
+
+void TraceManagerTest::BeginStartSession(controller::StartOptions options) {
+  FXL_VLOG(2) << "Starting session";
+
+  MarkBeginOperation();
+
+  start_state_.start_completed = false;
+  auto callback = [this](controller::Controller_StartTracing_Result result) {
+    start_state_.start_completed = true;
+    start_state_.start_result = std::move(result);
+  };
+  controller_->StartTracing(std::move(options), callback);
+
+  RunLoopUntilIdle();
   // The loop will exit for the transition to kStarting.
+}
+
+bool TraceManagerTest::FinishStartSession() {
   // If there are no tracees then it will also subsequently transition to
   // kStarted before the loop exits. If there are tracees then we need to
   // wait for them to start.
   if (fake_provider_bindings().size() > 0) {
     FXL_VLOG(2) << "Loop done, expecting session starting";
-    ASSERT_EQ(GetSessionState(), SessionState::kStarting);
+    SessionState state = GetSessionState();
+    EXPECT_EQ(state, SessionState::kStarting);
+    if (state != SessionState::kStarting) {
+      return false;
+    }
 
     // Make sure all providers are marked kStarting.
     // The loop exits when we transition to kStarting, but providers won't have
@@ -118,65 +180,169 @@
     RunLoopUntilIdle();
 
     MarkAllProvidersStarted();
-
     // Wait until all providers are started.
     RunLoopUntilIdle();
   }
 
   // The loop will exit for the transition to kStarted.
   FXL_VLOG(2) << "Loop done, expecting all providers started";
-  ASSERT_EQ(GetSessionState(), SessionState::kStarted);
+  SessionState state = GetSessionState();
+  EXPECT_EQ(state, SessionState::kStarted);
+  if (state != SessionState::kStarted) {
+    return false;
+  }
 
   // Run the loop one more time to ensure we pick up the result.
+  // Remember the loop prematurely exits on session state changes.
   RunLoopUntilIdle();
-  ASSERT_TRUE(start_completed);
-
-  destination_ = std::move(our_socket);
-
-  *success = true;
-}
-
-bool TraceManagerTest::StartSession(TraceConfig config) {
-  bool success{};
-  FXL_VLOG(2) << "Starting session";
-  StartSessionWorker(std::move(config), &success);
-  if (success) {
-    FXL_VLOG(2) << "Session started";
+  EXPECT_TRUE(start_state_.start_completed);
+  if (!start_state_.start_completed) {
+    return false;
   }
-  return success;
+  EXPECT_FALSE(start_state_.start_result.is_err());
+
+  FXL_VLOG(2) << "Session started";
+  return true;
 }
 
-void TraceManagerTest::StopSessionWorker(bool* success) {
-  controller_->StopTracing();
+bool TraceManagerTest::StartSession(controller::StartOptions options) {
+  BeginStartSession(std::move(options));
+  return FinishStartSession();
+}
+
+// static
+controller::StopOptions TraceManagerTest::GetDefaultStopOptions() {
+  controller::StopOptions options;
+  options.set_write_results(false);
+  return options;
+}
+
+void TraceManagerTest::BeginStopSession(controller::StopOptions options) {
+  FXL_VLOG(2) << "Stopping session";
+
+  MarkBeginOperation();
+
+  stop_state_.stop_completed = false;
+  auto callback = [this]() {
+    stop_state_.stop_completed = true;
+    // We need to run the loop one last time to pick up the result.
+    // Be sure to exit it now that we have the result.
+    QuitLoop();
+  };
+  controller_->StopTracing(std::move(options), callback);
 
   RunLoopUntilIdle();
+  // The loop will exit for the transition to kStopping.
+}
 
+bool TraceManagerTest::FinishStopSession() {
+  // If there are no tracees then it will also subsequently transition to
+  // kStopped before the loop exits. If there are tracees then we need to
+  // wait for them to stop.
   if (fake_provider_bindings().size() > 0) {
     FXL_VLOG(2) << "Loop done, expecting session stopping";
-    ASSERT_EQ(GetSessionState(), SessionState::kStopping);
+    SessionState state = GetSessionState();
+    EXPECT_EQ(state, SessionState::kStopping);
+    if (state != SessionState::kStopping) {
+      return false;
+    }
+
+    // Make sure all providers are marked kStopping.
+    // The loop exits when we transition to kStopping, but providers won't have
+    // processed their Stop requests yet.
+    RunLoopUntilIdle();
 
     MarkAllProvidersStopped();
-
     // Wait until all providers are stopped.
     RunLoopUntilIdle();
   }
 
+  // The loop will exit for the transition to kStopped.
   FXL_VLOG(2) << "Loop done, expecting session stopped";
-  ASSERT_EQ(GetSessionState(), SessionState::kStopped);
+  SessionState state = GetSessionState();
+  EXPECT_EQ(state, SessionState::kStopped);
+  if (state != SessionState::kStopped) {
+    return false;
+  }
 
-  destination_.reset();
+  // Run one more time to ensure we pick up the stop result.
+  RunLoopUntilIdle();
+  EXPECT_TRUE(stop_state_.stop_completed);
+  if (!stop_state_.stop_completed) {
+    return false;
+  }
 
-  *success = true;
+  FXL_VLOG(2) << "Session stopped";
+  return true;
 }
 
-bool TraceManagerTest::StopSession() {
-  bool success{};
-  FXL_VLOG(2) << "Stopping session";
-  StopSessionWorker(&success);
-  if (success) {
-    FXL_VLOG(2) << "Session stopped";
+bool TraceManagerTest::StopSession(controller::StopOptions options) {
+  BeginStopSession(std::move(options));
+  return FinishStopSession();
+}
+
+// static
+controller::TerminateOptions TraceManagerTest::GetDefaultTerminateOptions() {
+  controller::TerminateOptions options;
+  options.set_write_results(true);
+  return options;
+}
+
+void TraceManagerTest::BeginTerminateSession(controller::TerminateOptions options) {
+  FXL_VLOG(2) << "Terminating session";
+
+  MarkBeginOperation();
+
+  terminate_state_.terminate_completed = false;
+  controller()->TerminateTracing(std::move(options), [this](controller::TerminateResult result) {
+    terminate_state_.terminate_completed = true;
+    terminate_state_.terminate_result = std::move(result);
+  });
+  RunLoopUntilIdle();
+  // The loop will exit for the transition to kTerminating.
+  // Note: If there are no providers then the state will transition again
+  // to kNonexistent(== "terminated") before the loop exits.
+}
+
+bool TraceManagerTest::FinishTerminateSession(controller::TerminateResult* result) {
+  // If there are no tracees then it will also subsequently transition to
+  // kTerminated before the loop exits. If there are tracees then we need to
+  // wait for them to terminate.
+  if (fake_provider_bindings().size() > 0) {
+    FXL_VLOG(2) << "Loop done, expecting session terminating";
+    SessionState state = GetSessionState();
+    EXPECT_EQ(state, SessionState::kTerminating);
+    if (state != SessionState::kTerminating) {
+      return false;
+    }
+
+    // Make sure all providers are marked kTerminating.
+    RunLoopUntilIdle();
+
+    MarkAllProvidersTerminated();
+    RunLoopUntilIdle();
   }
-  return success;
+
+  FXL_VLOG(2) << "Loop done, expecting session terminated";
+  EXPECT_EQ(GetSessionState(), SessionState::kNonexistent);
+
+  // Run the loop one more time to ensure we pick up the result.
+  RunLoopUntilIdle();
+  EXPECT_TRUE(terminate_state_.terminate_completed);
+  if (!terminate_state_.terminate_completed) {
+    return false;
+  }
+
+  FXL_VLOG(2) << "Session terminated";
+  if (result) {
+    *result = std::move(terminate_state_.terminate_result);
+  }
+  return true;
+}
+
+bool TraceManagerTest::TerminateSession(controller::TerminateOptions options) {
+  BeginTerminateSession(std::move(options));
+  return FinishTerminateSession();
 }
 
 void TraceManagerTest::MarkAllProvidersStarted() {
@@ -193,5 +359,38 @@
   }
 }
 
+void TraceManagerTest::MarkAllProvidersTerminated() {
+  FXL_VLOG(2) << "Marking all providers terminated";
+  for (const auto& p : fake_provider_bindings()) {
+    p->impl()->MarkTerminated();
+  }
+}
+
+void TraceManagerTest::VerifyCounts(int expected_start_count, int expected_stop_count) {
+  SessionState state{GetSessionState()};
+  for (const auto& p : fake_provider_bindings()) {
+    const std::string& name = p->impl()->name();
+    if (state != SessionState::kReady) {
+      EXPECT_EQ(p->impl()->initialize_count(), 1) << name;
+    } else {
+      EXPECT_EQ(p->impl()->initialize_count(), 0) << name;
+    }
+    EXPECT_EQ(p->impl()->start_count(), expected_start_count) << name;
+    EXPECT_EQ(p->impl()->stop_count(), expected_stop_count) << name;
+    if (state != SessionState::kNonexistent) {
+      EXPECT_EQ(p->impl()->terminate_count(), 0) << name;
+    } else {
+      EXPECT_EQ(p->impl()->terminate_count(), 1) << name;
+    }
+  }
+}
+
+// fidl event
+void TraceManagerTest::FidlOnSessionStateChange(controller::SessionState state) {
+  FXL_VLOG(2) << "FidlOnSessionStateChange " << state;
+  ++on_session_state_change_event_count_;
+  last_session_state_event_ = state;
+}
+
 }  // namespace test
 }  // namespace tracing
diff --git a/garnet/bin/trace_manager/tests/trace_manager_test.h b/garnet/bin/trace_manager/tests/trace_manager_test.h
index 4738898..9c1eb9f 100644
--- a/garnet/bin/trace_manager/tests/trace_manager_test.h
+++ b/garnet/bin/trace_manager/tests/trace_manager_test.h
@@ -5,16 +5,17 @@
 #ifndef GARNET_BIN_TRACE_MANAGER_TESTS_TRACE_MANAGER_TEST_H_
 #define GARNET_BIN_TRACE_MANAGER_TESTS_TRACE_MANAGER_TEST_H_
 
+#include <lib/fidl/cpp/binding.h>
+#include <lib/gtest/test_loop_fixture.h>
+#include <lib/sys/cpp/testing/component_context_provider.h>
+#include <lib/zx/socket.h>
+
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
 
 #include <gtest/gtest.h>
-#include <lib/fidl/cpp/binding.h>
-#include <lib/gtest/test_loop_fixture.h>
-#include <lib/sys/cpp/testing/component_context_provider.h>
-#include <lib/zx/socket.h>
 
 #include "garnet/bin/trace_manager/app.h"
 #include "garnet/bin/trace_manager/tests/fake_provider.h"
@@ -24,21 +25,24 @@
 
 namespace controller = ::fuchsia::tracing::controller;
 
-// TODO(dje): Temporary renaming to ease transition to new name.
-using TraceConfig = controller::TraceOptions;
-
 class TraceManagerTest : public gtest::TestLoopFixture {
  public:
-  // |TraceSession| intentionally doesn't have something like |kNone| as that is
+  // |TraceSession| intentionally doesn't have |kTerminated| as that is
   // represented by the session being non-existent. However, it's helpful in
   // tests to have a value to represent this state so we have our own copy of
   // |TraceSession::State|. It is not named |State| to help avoid confusion.
   enum class SessionState {
     // These values are all copies of |TraceSession::State|.
-    kReady, kStarted, kStopping, kStopped,
-    // New value to represent state between kReady and kStarted.
+    kReady,
+    kInitialized,
     kStarting,
+    kStarted,
+    kStopping,
+    kStopped,
+    kTerminating,
     // This is the new value to represent |TraceManager::session_| == nullptr.
+    // This isn't called |kTerminated|, though that is what it usually means,
+    // because this is also the state before any session has been created.
     kNonexistent,
   };
 
@@ -57,7 +61,10 @@
   static constexpr zx_koid_t kProvider2Pid = 1235;
   static constexpr char kProvider2Name[] = "test-provider2";
 
-  static TraceConfig GetDefaultTraceConfig();
+  static controller::TraceConfig GetDefaultTraceConfig();
+  static controller::StartOptions GetDefaultStartOptions();
+  static controller::StopOptions GetDefaultStopOptions();
+  static controller::TerminateOptions GetDefaultTerminateOptions();
 
   TraceManagerTest();
 
@@ -67,6 +74,12 @@
 
   const controller::ControllerPtr& controller() const { return controller_; }
 
+  int on_session_state_change_event_count() const { return on_session_state_change_event_count_; }
+  int begin_session_state_change_event_count() const {
+    return begin_session_state_change_event_count_;
+  }
+  controller::SessionState last_session_state_event() const { return last_session_state_event_; }
+
   const std::vector<std::unique_ptr<FakeProviderBinding>>& fake_provider_bindings() const {
     return fake_provider_bindings_;
   }
@@ -80,23 +93,64 @@
   bool AddFakeProvider(zx_koid_t pid, const std::string& name,
                        FakeProvider** out_provider = nullptr);
 
+  // Called from within |TraceManager| on |TraceSession| state changes.
+  void OnSessionStateChange();
+
   // Fetch the session's state.
   SessionState GetSessionState() const;
 
+  // Mark the beginning of an operation.
+  void MarkBeginOperation() {
+    begin_session_state_change_event_count_ = on_session_state_change_event_count_;
+  }
+
   // Wrappers to simplify the standard operations.
   // These are only to be called at times when they're expected to succeed.
-  // E.g., Don't call |StopSession()| if session is already stopped,
+  // E.g., Don't call |TerminateSession()| if session is already terminated,
   // and so on. If you want to perform these operations outside of this
   // constraint, roll your own or use something else.
   // These assume the controller connection is already made.
-  bool StartSession(TraceConfig config = GetDefaultTraceConfig());
-  bool StopSession();
+  // The default session doesn't have a consumer.
+  bool InitializeSession(controller::TraceConfig config = GetDefaultTraceConfig());
+  bool TerminateSession(controller::TerminateOptions options = GetDefaultTerminateOptions());
+
+  // Terminating a session involves two steps: Initiating the termination
+  // and waiting for it to terminate. Call these when you want to access
+  // intermediate state.
+  void BeginTerminateSession(controller::TerminateOptions options = GetDefaultTerminateOptions());
+  // Returns true on success.
+  // If |result| is non-NULL the result is stored there.
+  bool FinishTerminateSession(controller::TerminateResult* result = nullptr);
+
+  // Wrappers to simplify |Start,Stop| operations.
+  // These are only to be called at times when they're expected to succeed.
+  // E.g., Don't call |StartSession*()| if session is already started,
+  // and so on. If you want to perform these operations outside of this
+  // constraint, roll your own or use something else.
+  // These assume the needed interface is already connected.
+  // These combine both the "begin" and "finish" operations.
+  bool StartSession(controller::StartOptions options = GetDefaultStartOptions());
+  bool StopSession(controller::StopOptions options = GetDefaultStopOptions());
+
+  // Starting and stopping a session involves two steps: initiating the request
+  // and waiting for it to complete. Call these when you want to access
+  // intermediate state.
+  void BeginStartSession(controller::StartOptions options = GetDefaultStartOptions());
+  bool FinishStartSession();
+  void BeginStopSession(controller::StopOptions options = GetDefaultStopOptions());
+  bool FinishStopSession();
 
   // Helpers to advance provider state.
   // These can only be called when all providers are in the immediately
-  // preceding state (e.g., kReady->kStarted).
+  // preceding state (e.g., kStarting->kStarted).
   void MarkAllProvidersStarted();
   void MarkAllProvidersStopped();
+  void MarkAllProvidersTerminated();
+
+  // Helper to verify reception and processing of initialize, start, stop,
+  // and terminate fidl requests.
+  // This is not intended to be called before |Initialize*|.
+  void VerifyCounts(int expected_start_count, int expected_stop_count);
 
   // Publically accessible copies of test fixture methods.
   void QuitLoop() { gtest::TestLoopFixture::QuitLoop(); }
@@ -106,12 +160,35 @@
   }
 
  private:
+  // For communication between |BeginStart(),FinishStart()|.
+  // This value is only valid between those calls.
+  struct StartState {
+    bool start_completed = false;
+    controller::Controller_StartTracing_Result start_result;
+  };
+
+  // For communication between |BeginStop(),FinishStop()|.
+  // This value is only valid between those calls.
+  struct StopState {
+    bool stop_completed = false;
+  };
+
+  // For communication between |BeginTerminate(),FinishTerminate()|.
+  // This value is only valid between those calls.
+  struct TerminateState {
+    bool terminate_completed = false;
+    controller::TerminateResult terminate_result;
+  };
+
   void SetUp() override;
   void TearDown() override;
 
-  // Helper functions to cope with functions calling |ASSERT_*|: they have to return void.
-  void StartSessionWorker(TraceConfig config, bool* success);
-  void StopSessionWorker(bool* success);
+  // Helper functions to cope with functions calling |ASSERT_*|: they have to
+  // return void.
+  void InitializeSessionWorker(controller::TraceConfig config, bool* out_success);
+
+  // Handler for OnSessionStateChange fidl event.
+  void FidlOnSessionStateChange(controller::SessionState state);
 
   sys::testing::ComponentContextProvider context_provider_;
   std::unique_ptr<TraceManagerApp> app_;
@@ -119,9 +196,26 @@
   // Interfaces to make service requests.
   controller::ControllerPtr controller_;
 
+  // Running count of session state changes.
+  int on_session_state_change_event_count_ = 0;
+  // The value of |on_session_state_change_event_count_| at the start of an
+  // operation (e.g., termination). This is used to very cases where multiple
+  // state changes happen during an operation
+  // (e.g., terminating -> terminated).
+  int begin_session_state_change_event_count_ = 0;
+  // The last recorded session state, for verification purposes.
+  controller::SessionState last_session_state_event_{};
+
   // Socket for communication with controller.
   zx::socket destination_;
 
+  // Most tests don't care about the intermediate state (the state after the
+  // |Begin*Session()| function. But some do. To avoid duplicating a bunch of
+  // code to handle both cases we manage the intermediate state here.
+  StartState start_state_;
+  StopState stop_state_;
+  TerminateState terminate_state_;
+
   // Containers for provider bindings so that they get cleaned up at the end of the test.
   std::vector<std::unique_ptr<FakeProviderBinding>> fake_provider_bindings_;
 };
diff --git a/garnet/bin/trace_manager/trace_manager.cc b/garnet/bin/trace_manager/trace_manager.cc
index d823506..5283731 100644
--- a/garnet/bin/trace_manager/trace_manager.cc
+++ b/garnet/bin/trace_manager/trace_manager.cc
@@ -11,11 +11,13 @@
 #include <algorithm>
 #include <iostream>
 
+#include "garnet/bin/trace_manager/app.h"
+
 namespace tracing {
 namespace {
 
 // For large traces or when verbosity is on it can take awhile to write out
-// all the records. E.g., ipm_provider can take 40 seconds with --verbose=2
+// all the records. E.g., cpuperf_provider can take 40 seconds with --verbose=2
 constexpr zx::duration kStopTimeout = zx::sec(60);
 constexpr uint32_t kMinBufferSizeMegabytes = 1;
 constexpr uint32_t kMaxBufferSizeMegabytes = 64;
@@ -32,9 +34,9 @@
 
 }  // namespace
 
-TraceManager::TraceManager(sys::ComponentContext* context, const Config& config)
-    : context_(context), config_(config) {
-  // TODO(jeffbrown): We should do this in StartTracing() and take care
+TraceManager::TraceManager(TraceManagerApp* app, sys::ComponentContext* context, Config config)
+    : app_(app), context_(context), config_(std::move(config)) {
+  // TODO(jeffbrown): We should do this in InitializeTracing() and take care
   // to restart any crashed providers.  We should also wait briefly to ensure
   // that these providers have registered themselves before replying that
   // tracing has started.
@@ -43,13 +45,29 @@
 
 TraceManager::~TraceManager() = default;
 
+void TraceManager::OnEmptyControllerSet() {
+  // While one controller could go away and another remain causing a trace
+  // to not be terminated, at least handle the common case.
+  FXL_VLOG(5) << "Controller is gone";
+  if (session_) {
+    // Check the state first because the log messages are useful, but not if
+    // tracing has ended.
+    if (session_->state() != TraceSession::State::kTerminating) {
+      FXL_LOG(INFO) << "Controller is gone, terminating trace";
+      session_->Terminate([this]() {
+        FXL_LOG(INFO) << "Trace terminated";
+        session_ = nullptr;
+      });
+    }
+  }
+}
+
 // fidl
-void TraceManager::StartTracing(TraceConfig config, zx::socket output,
-                                StartTracingCallback start_callback) {
-  FXL_VLOG(2) << "StartTracing";
+void TraceManager::InitializeTracing(controller::TraceConfig config, zx::socket output) {
+  FXL_VLOG(2) << "InitializeTracing";
 
   if (session_) {
-    FXL_LOG(ERROR) << "Trace already in progress";
+    FXL_LOG(ERROR) << "Ignoring initialize request, trace already initialized";
     return;
   }
 
@@ -90,7 +108,7 @@
       return;
   }
 
-  FXL_LOG(INFO) << "Starting trace with " << default_buffer_size_megabytes
+  FXL_LOG(INFO) << "Initializing trace with " << default_buffer_size_megabytes
                 << " MB buffers, buffering mode=" << mode_name;
   if (provider_specs.size() > 0) {
     FXL_LOG(INFO) << "Provider overrides:";
@@ -99,13 +117,20 @@
     }
   }
 
-  std::vector<::std::string> categories;
+  std::vector<std::string> categories;
   if (config.has_categories()) {
     categories = std::move(config.categories());
   }
+
+  uint64_t start_timeout_milliseconds = kDefaultStartTimeoutMilliseconds;
+  if (config.has_start_timeout_milliseconds()) {
+    start_timeout_milliseconds = config.start_timeout_milliseconds();
+  }
+
   session_ = fxl::MakeRefCounted<TraceSession>(
       std::move(output), std::move(categories), default_buffer_size_megabytes,
-      provider_buffering_mode, std::move(provider_specs), [this]() { session_ = nullptr; });
+      provider_buffering_mode, std::move(provider_specs), zx::msec(start_timeout_milliseconds),
+      kStopTimeout, [this]() { session_ = nullptr; });
 
   // The trace header is written now to ensure it appears first, and to avoid
   // timing issues if the trace is terminated early (and the session being
@@ -116,31 +141,127 @@
     session_->AddProvider(&bundle);
   }
 
-  trace_running_ = true;
-
-  uint64_t start_timeout_milliseconds = kDefaultStartTimeoutMilliseconds;
-  if (config.has_start_timeout_milliseconds()) {
-    start_timeout_milliseconds = config.start_timeout_milliseconds();
-  }
-  session_->WaitForProvidersToStart(std::move(start_callback),
-                                    zx::msec(start_timeout_milliseconds));
+  session_->MarkInitialized();
 }
 
 // fidl
-void TraceManager::StopTracing() {
-  FXL_VLOG(2) << "StopTracing";
-
-  if (!session_)
+void TraceManager::TerminateTracing(controller::TerminateOptions options,
+                                    TerminateTracingCallback terminate_callback) {
+  if (!session_) {
+    FXL_VLOG(1) << "Ignoring terminate request, tracing not initialized";
+    controller::TerminateResult result;
+    terminate_callback(std::move(result));
     return;
-  trace_running_ = false;
+  }
 
-  FXL_LOG(INFO) << "Stopping trace";
-  session_->Stop(
-      [this]() {
-        FXL_LOG(INFO) << "Stopped trace";
-        session_ = nullptr;
-      },
-      kStopTimeout);
+  if (options.has_write_results()) {
+    session_->set_write_results_on_terminate(options.write_results());
+  }
+
+  FXL_LOG(INFO) << "Terminating trace";
+  session_->Terminate([this, terminate_callback = std::move(terminate_callback)]() {
+    FXL_LOG(INFO) << "Terminated trace";
+    controller::TerminateResult result;
+    // TODO(dje): Report stats back to user.
+    terminate_callback(std::move(result));
+    session_ = nullptr;
+  });
+}
+
+// fidl
+void TraceManager::StartTracing(controller::StartOptions options,
+                                StartTracingCallback start_callback) {
+  FXL_VLOG(2) << "StartTracing";
+
+  controller::Controller_StartTracing_Result result;
+
+  if (!session_) {
+    FXL_LOG(ERROR) << "Ignoring start request, trace must be initialized first";
+    result.set_err(controller::StartErrorCode::NOT_INITIALIZED);
+    start_callback(std::move(result));
+    return;
+  }
+
+  switch (session_->state()) {
+    case TraceSession::State::kStarting:
+    case TraceSession::State::kStarted:
+      FXL_LOG(ERROR) << "Ignoring start request, trace already started";
+      result.set_err(controller::StartErrorCode::ALREADY_STARTED);
+      start_callback(std::move(result));
+      return;
+    case TraceSession::State::kStopping:
+      FXL_LOG(ERROR) << "Ignoring start request, trace stopping";
+      result.set_err(controller::StartErrorCode::STOPPING);
+      start_callback(std::move(result));
+      return;
+    case TraceSession::State::kTerminating:
+      FXL_LOG(ERROR) << "Ignoring start request, trace terminating";
+      result.set_err(controller::StartErrorCode::TERMINATING);
+      start_callback(std::move(result));
+      return;
+    case TraceSession::State::kInitialized:
+    case TraceSession::State::kStopped:
+      break;
+    default:
+      FXL_NOTREACHED();
+      return;
+  }
+
+  std::vector<std::string> additional_categories;
+  if (options.has_additional_categories()) {
+    additional_categories = std::move(options.additional_categories());
+  }
+
+  // This default matches trace's.
+  controller::BufferDisposition buffer_disposition = controller::BufferDisposition::RETAIN;
+  if (options.has_buffer_disposition()) {
+    buffer_disposition = options.buffer_disposition();
+    switch (buffer_disposition) {
+      case controller::BufferDisposition::CLEAR_ALL:
+      case controller::BufferDisposition::CLEAR_NONDURABLE:
+      case controller::BufferDisposition::RETAIN:
+        break;
+      default:
+        FXL_LOG(ERROR) << "Bad value for buffer disposition: " << buffer_disposition
+                       << ", dropping connection";
+        // TODO(dje): IWBN to drop the connection. How?
+        result.set_err(controller::StartErrorCode::TERMINATING);
+        start_callback(std::move(result));
+        return;
+    }
+  }
+
+  FXL_LOG(INFO) << "Starting trace, buffer disposition: " << buffer_disposition;
+
+  session_->Start(buffer_disposition, additional_categories, std::move(start_callback));
+}
+
+// fidl
+void TraceManager::StopTracing(controller::StopOptions options, StopTracingCallback stop_callback) {
+  if (!session_) {
+    FXL_VLOG(1) << "Ignoring stop request, tracing not started";
+    stop_callback();
+    return;
+  }
+
+  if (session_->state() != TraceSession::State::kInitialized &&
+      session_->state() != TraceSession::State::kStarting &&
+      session_->state() != TraceSession::State::kStarted) {
+    FXL_VLOG(1) << "Ignoring stop request, state != Initialized,Starting,Started";
+    stop_callback();
+    return;
+  }
+
+  bool write_results = false;
+  if (options.has_write_results()) {
+    write_results = options.write_results();
+  }
+
+  FXL_LOG(INFO) << "Stopping trace" << (write_results ? ", and writing results" : "");
+  session_->Stop(write_results, [stop_callback = std::move(stop_callback)]() {
+    FXL_LOG(INFO) << "Stopped trace";
+    stop_callback();
+  });
 }
 
 // fidl
@@ -170,8 +291,8 @@
 void TraceManager::RegisterProviderWorker(fidl::InterfaceHandle<provider::Provider> provider,
                                           uint64_t pid, fidl::StringPtr name) {
   FXL_VLOG(2) << "Registering provider {" << pid << ":" << name.value_or("") << "}";
-  auto it =
-      providers_.emplace(providers_.end(), provider.Bind(), next_provider_id_++, pid, name.value_or(""));
+  auto it = providers_.emplace(providers_.end(), provider.Bind(), next_provider_id_++, pid,
+                               name.value_or(""));
 
   it->provider.set_error_handler([this, it](zx_status_t status) {
     if (session_)
@@ -179,8 +300,9 @@
     providers_.erase(it);
   });
 
-  if (session_)
+  if (session_) {
     session_->AddProvider(&(*it));
+  }
 }
 
 // fidl
@@ -194,7 +316,34 @@
                                                  uint64_t pid, std::string name,
                                                  RegisterProviderSynchronouslyCallback callback) {
   RegisterProviderWorker(std::move(provider), pid, std::move(name));
-  callback(ZX_OK, trace_running_);
+  bool already_started = (session_ && (session_->state() == TraceSession::State::kStarting ||
+                                       session_->state() == TraceSession::State::kStarted));
+  callback(ZX_OK, already_started);
+}
+
+void TraceManager::SendSessionStateEvent(controller::SessionState state) {
+  for (const auto& binding : app_->controller_bindings().bindings()) {
+    binding->events().OnSessionStateChange(state);
+  }
+}
+
+controller::SessionState TraceManager::TranslateSessionState(TraceSession::State state) {
+  switch (state) {
+    case TraceSession::State::kReady:
+      return controller::SessionState::READY;
+    case TraceSession::State::kInitialized:
+      return controller::SessionState::INITIALIZED;
+    case TraceSession::State::kStarting:
+      return controller::SessionState::STARTING;
+    case TraceSession::State::kStarted:
+      return controller::SessionState::STARTED;
+    case TraceSession::State::kStopping:
+      return controller::SessionState::STOPPING;
+    case TraceSession::State::kStopped:
+      return controller::SessionState::STOPPED;
+    case TraceSession::State::kTerminating:
+      return controller::SessionState::TERMINATING;
+  }
 }
 
 void TraceManager::LaunchConfiguredProviders() {
diff --git a/garnet/bin/trace_manager/trace_manager.h b/garnet/bin/trace_manager/trace_manager.h
index 92d4138..93ff530 100644
--- a/garnet/bin/trace_manager/trace_manager.h
+++ b/garnet/bin/trace_manager/trace_manager.h
@@ -11,6 +11,7 @@
 #include <lib/fidl/cpp/interface_ptr_set.h>
 #include <lib/fidl/cpp/interface_request.h>
 #include <lib/sys/cpp/component_context.h>
+#include <lib/zx/socket.h>
 
 #include <list>
 
@@ -23,22 +24,25 @@
 namespace controller = ::fuchsia::tracing::controller;
 namespace provider = ::fuchsia::tracing::provider;
 
-// TODO(dje): Temporary alias to ease renaming of TraceOptions to TraceConfig.
-using TraceConfig = controller::TraceOptions;
+// forward decl, here to break mutual header dependency
+class TraceManagerApp;
 
 class TraceManager : public controller::Controller, public provider::Registry {
  public:
-  TraceManager(sys::ComponentContext* context, const Config& config);
+  TraceManager(TraceManagerApp* app, sys::ComponentContext* context, Config config);
   ~TraceManager() override;
 
   // For testing.
   const TraceSession* session() const { return session_.get(); }
 
+  void OnEmptyControllerSet();
+
  private:
   // |Controller| implementation.
-  void StartTracing(TraceConfig config, zx::socket output,
-                    StartTracingCallback cb) override;
-  void StopTracing() override;
+  void InitializeTracing(controller::TraceConfig config, zx::socket output) override;
+  void TerminateTracing(controller::TerminateOptions options, TerminateTracingCallback cb) override;
+  void StartTracing(controller::StartOptions options, StartTracingCallback cb) override;
+  void StopTracing(controller::StopOptions options, StopTracingCallback cb) override;
   void GetProviders(GetProvidersCallback cb) override;
   void GetKnownCategories(GetKnownCategoriesCallback callback) override;
 
@@ -51,17 +55,21 @@
                                      uint64_t pid, std::string name,
                                      RegisterProviderSynchronouslyCallback callback) override;
 
-  void FinalizeTracing();
+  void SendSessionStateEvent(controller::SessionState state);
+  controller::SessionState TranslateSessionState(TraceSession::State state);
   void LaunchConfiguredProviders();
 
+  TraceManagerApp* const app_;
+
+  // Non-owning copy of component context. |TraceManagerApp| has the owning
+  // copy, but we need it too. This works out ok as |TraceManagerApp| owns us.
   sys::ComponentContext* const context_;
-  const Config& config_;
+
+  const Config config_;
 
   uint32_t next_provider_id_ = 1u;
   fxl::RefPtr<TraceSession> session_;
   std::list<TraceProviderBundle> providers_;
-  // True if tracing has been started, and is not (yet) being stopped.
-  bool trace_running_ = false;
 
   TraceManager(const TraceManager&) = delete;
   TraceManager(TraceManager&&) = delete;
diff --git a/garnet/bin/trace_manager/trace_session.cc b/garnet/bin/trace_manager/trace_session.cc
index 2133b59..8cbb584 100644
--- a/garnet/bin/trace_manager/trace_session.cc
+++ b/garnet/bin/trace_manager/trace_session.cc
@@ -6,44 +6,44 @@
 
 #include <lib/async/default.h>
 #include <lib/fidl/cpp/clone.h>
+#include <lib/trace-engine/fields.h>
 
 #include <numeric>
 
-#include <trace-engine/fields.h>
-
+#include "garnet/bin/trace_manager/trace_manager.h"
 #include "garnet/bin/trace_manager/util.h"
 #include "src/lib/fxl/logging.h"
 
 namespace tracing {
 
 TraceSession::TraceSession(zx::socket destination, std::vector<std::string> categories,
-                           size_t trace_buffer_size_megabytes,
-                           provider::BufferingMode buffering_mode,
-                           TraceProviderSpecMap&& provider_specs, fit::closure abort_handler)
+                           size_t buffer_size_megabytes, provider::BufferingMode buffering_mode,
+                           TraceProviderSpecMap&& provider_specs, zx::duration start_timeout,
+                           zx::duration stop_timeout, fit::closure abort_handler)
     : destination_(std::move(destination)),
       categories_(std::move(categories)),
-      trace_buffer_size_megabytes_(trace_buffer_size_megabytes),
+      buffer_size_megabytes_(buffer_size_megabytes),
       buffering_mode_(buffering_mode),
       provider_specs_(std::move(provider_specs)),
+      start_timeout_(start_timeout),
+      stop_timeout_(stop_timeout),
       abort_handler_(std::move(abort_handler)),
       weak_ptr_factory_(this) {}
 
 TraceSession::~TraceSession() {
   session_start_timeout_.Cancel();
-  session_finalize_timeout_.Cancel();
+  session_stop_timeout_.Cancel();
+  session_terminate_timeout_.Cancel();
   destination_.reset();
 }
 
-void TraceSession::WaitForProvidersToStart(fit::closure callback, zx::duration timeout) {
-  start_callback_ = std::move(callback);
-  session_start_timeout_.PostDelayed(async_get_default_dispatcher(), timeout);
-}
-
 void TraceSession::AddProvider(TraceProviderBundle* bundle) {
-  if (!(state_ == State::kReady || state_ == State::kStarted))
+  if (state_ == State::kTerminating) {
+    FXL_VLOG(1) << "Ignoring new provider " << *bundle << ", terminating";
     return;
+  }
 
-  uint32_t buffer_size_megabytes = trace_buffer_size_megabytes_;
+  uint32_t buffer_size_megabytes = buffer_size_megabytes_;
   auto spec_iter = provider_specs_.find(bundle->name);
   if (spec_iter != provider_specs_.end()) {
     const TraceProviderSpec* spec = &spec_iter->second;
@@ -56,56 +56,145 @@
   tracees_.emplace_back(std::make_unique<Tracee>(this, bundle));
   fidl::VectorPtr<std::string> categories_clone;
   fidl::Clone(categories_, &categories_clone);
-  if (!tracees_.back()->Start(
+  if (!tracees_.back()->Initialize(
           std::move(categories_clone), buffer_size, buffering_mode_,
           [weak = weak_ptr_factory_.GetWeakPtr(), bundle]() {
             if (weak) {
               weak->OnProviderStarted(bundle);
             }
           },
+          [weak = weak_ptr_factory_.GetWeakPtr(), bundle](bool write_results) {
+            if (weak) {
+              weak->OnProviderStopped(bundle, write_results);
+            }
+          },
           [weak = weak_ptr_factory_.GetWeakPtr(), bundle]() {
             if (weak) {
-              weak->FinishProvider(bundle);
+              weak->OnProviderTerminated(bundle);
             }
           })) {
     tracees_.pop_back();
   } else {
-    // We haven't fully started at this point, we still have to wait for
-    // the provider to indicate it has started, but for our purposes we have
-    // started "enough".
-    TransitionToState(State::kStarted);
+    Tracee* tracee = tracees_.back().get();
+    switch (state_) {
+      case State::kReady:
+      case State::kInitialized:
+        // Nothing more to do.
+        break;
+      case State::kStarting:
+      case State::kStarted:
+        // This is a new provider, there is nothing in the buffer to retain.
+        tracee->Start(controller::BufferDisposition::CLEAR_ALL, additional_categories_);
+        break;
+      case State::kStopping:
+      case State::kStopped:
+        // Mark the tracee as stopped so we don't try to wait for it to do so.
+        // This is a new provider, there are no results to write.
+        tracee->Stop(/*write_results=*/false);
+        break;
+      default:
+        FXL_NOTREACHED();
+        break;
+    }
   }
 }
 
-void TraceSession::Stop(fit::closure done_callback, zx::duration timeout) {
-  if (!(state_ == State::kReady || state_ == State::kStarted))
-    return;
+void TraceSession::MarkInitialized() { TransitionToState(State::kInitialized); }
 
-  FXL_VLOG(1) << "Stopping trace";
+void TraceSession::Terminate(fit::closure callback) {
+  if (state_ == State::kTerminating) {
+    FXL_VLOG(1) << "Ignoring terminate request, already terminating";
+    return;
+  }
+
+  TransitionToState(State::kTerminating);
+  terminate_callback_ = std::move(callback);
+
+  for (const auto& tracee : tracees_) {
+    tracee->Terminate();
+  }
+
+  session_terminate_timeout_.PostDelayed(async_get_default_dispatcher(), stop_timeout_);
+  TerminateSessionIfEmpty();
+}
+
+void TraceSession::Start(controller::BufferDisposition buffer_disposition,
+                         const std::vector<std::string>& additional_categories,
+                         controller::Controller::StartTracingCallback callback) {
+  FXL_DCHECK(state_ == State::kInitialized || state_ == State::kStopped);
+
+  if (force_clear_buffer_contents_) {
+    // "force-clear" -> Clear the entire buffer because it was saved.
+    buffer_disposition = controller::BufferDisposition::CLEAR_ALL;
+  }
+  force_clear_buffer_contents_ = false;
+
+  for (const auto& tracee : tracees_) {
+    tracee->Start(buffer_disposition, additional_categories);
+  }
+
+  start_callback_ = std::move(callback);
+  session_start_timeout_.PostDelayed(async_get_default_dispatcher(), start_timeout_);
+
+  // We haven't fully started at this point, we still have to wait for each
+  // provider to indicate it they've started.
+  TransitionToState(State::kStarting);
+
+  // If there are no providers currently registered, then we are started.
+  CheckAllProvidersStarted();
+
+  // Save for tracees that come along later.
+  additional_categories_ = additional_categories;
+}
+
+void TraceSession::Stop(bool write_results, fit::closure callback) {
+  FXL_DCHECK(state_ == State::kInitialized || state_ == State::kStarting ||
+             state_ == State::kStarted);
 
   TransitionToState(State::kStopping);
-  done_callback_ = std::move(done_callback);
+  stop_callback_ = std::move(callback);
 
-  // Walk through all remaining tracees and send out their buffers.
-  for (const auto& tracee : tracees_)
-    tracee->Stop();
+  for (const auto& tracee : tracees_) {
+    tracee->Stop(write_results);
+  }
 
-  session_finalize_timeout_.PostDelayed(async_get_default_dispatcher(), timeout);
-  FinishSessionIfEmpty();
+  // If we're writing results then force-clear the buffer on the next Start.
+  if (write_results) {
+    force_clear_buffer_contents_ = true;
+  }
+
+  session_stop_timeout_.PostDelayed(async_get_default_dispatcher(), stop_timeout_);
+  CheckAllProvidersStopped();
+
+  // Clear out, must be respecified for each Start() request.
+  additional_categories_.clear();
 }
 
 // Called when a provider reports that it has started.
 
 void TraceSession::OnProviderStarted(TraceProviderBundle* bundle) {
-  if (state_ == State::kReady || state_ == State::kStarted) {
+  if (state_ == State::kStarting) {
     CheckAllProvidersStarted();
+  } else if (state_ == State::kStarted) {
+    // Nothing to do. One example of when this can happen is if we time out
+    // waiting for providers to start and then a provider reports starting
+    // afterwards.
   } else {
-    // Tracing stopped in the interim.
+    // Tracing likely stopped or terminated in the interim.
     auto it = std::find_if(tracees_.begin(), tracees_.end(),
                            [bundle](const auto& tracee) { return *tracee == bundle; });
 
     if (it != tracees_.end()) {
-      (*it)->Stop();
+      if (state_ == State::kReady || state_ == State::kInitialized) {
+        FXL_LOG(WARNING) << "Provider " << *bundle << " sent a \"started\""
+                         << " notification but tracing hasn't started";
+        // Misbehaving provider, but it may just be slow.
+        (*it)->Stop(/*write_results=*/false);
+      } else if (state_ == State::kStopping || state_ == State::kStopped) {
+        (*it)->Stop(/*write_results=*/false);
+      } else {
+        (*it)->Terminate();
+      }
     }
   }
 }
@@ -114,12 +203,14 @@
 // This includes "failed" as well as "started".
 
 void TraceSession::CheckAllProvidersStarted() {
+  FXL_DCHECK(state_ == State::kStarting);
+
   bool all_started =
       std::accumulate(tracees_.begin(), tracees_.end(), true, [](bool value, const auto& tracee) {
         bool ready = (tracee->state() == Tracee::State::kStarted ||
                       // If a provider fails to start continue tracing.
-                      // TODO(TO-530): We should still record what providers
-                      // failed to start.
+                      // TODO(PT-10): We should still record what providers failed to start
+                      // (but is that done in timeout handling?).
                       tracee->state() == Tracee::State::kStopped);
         FXL_VLOG(5) << "tracee " << *tracee->bundle() << (ready ? "" : " not") << " ready";
         return value && ready;
@@ -132,96 +223,203 @@
 }
 
 void TraceSession::NotifyStarted() {
+  TransitionToState(State::kStarted);
   if (start_callback_) {
     FXL_VLOG(1) << "Marking session as having started";
     session_start_timeout_.Cancel();
-    auto start_callback = std::move(start_callback_);
-    start_callback();
+    auto callback = std::move(start_callback_);
+    controller::Controller_StartTracing_Result result;
+    controller::Controller_StartTracing_Response response;
+    result.set_response(response);
+    callback(std::move(result));
   }
 }
 
-void TraceSession::FinishProvider(TraceProviderBundle* bundle) {
+void TraceSession::FinishStartingDueToTimeout() { NotifyStarted(); }
+
+void TraceSession::OnProviderStopped(TraceProviderBundle* bundle, bool write_results) {
   auto it = std::find_if(tracees_.begin(), tracees_.end(),
                          [bundle](const auto& tracee) { return *tracee == bundle; });
 
-  if (it != tracees_.end()) {
-    if (destination_) {
-      switch ((*it)->TransferRecords(destination_)) {
-        case TransferStatus::kComplete:
-          break;
-        case TransferStatus::kProviderError:
-          FXL_LOG(ERROR) << "Problem reading provider output, skipping";
-          break;
-        case TransferStatus::kWriteError:
-          FXL_LOG(ERROR) << "Encountered unrecoverable error writing socket, "
-                            "aborting trace";
+  if (write_results) {
+    if (it != tracees_.end()) {
+      Tracee* tracee = (*it).get();
+      if (!tracee->results_written()) {
+        if (!WriteProviderData(tracee)) {
           Abort();
           return;
-        case TransferStatus::kReceiverDead:
-          FXL_LOG(ERROR) << "Peer is closed, aborting trace";
-          Abort();
-          return;
-        default:
-          break;
+        }
       }
     }
-    tracees_.erase(it);
   }
 
-  if (state_ != State::kStopping) {
-    // A trace provider may have entered the finished state without having
-    // first successfully started. Check whether all remaining providers have
-    // now started.
-    CheckAllProvidersStarted();
+  if (state_ == State::kStopped) {
+    // Late stop notification, nothing more to do.
+  } else if (state_ == State::kStopping) {
+    CheckAllProvidersStopped();
+  } else {
+    // Tracing may have terminated in the interim.
+    if (it != tracees_.end()) {
+      if (state_ == State::kTerminating) {
+        (*it)->Terminate();
+      }
+    }
   }
-
-  FinishSessionIfEmpty();
 }
 
-void TraceSession::FinishSessionIfEmpty() {
-  if (state_ == State::kStopping && tracees_.empty()) {
-    FXL_VLOG(1) << "Marking session as stopped, no more tracees";
+void TraceSession::CheckAllProvidersStopped() {
+  FXL_DCHECK(state_ == State::kStopping);
+
+  bool all_stopped =
+      std::accumulate(tracees_.begin(), tracees_.end(), true, [](bool value, const auto& tracee) {
+        bool stopped = tracee->state() == Tracee::State::kStopped;
+        FXL_VLOG(5) << "tracee " << *tracee->bundle() << (stopped ? "" : " not") << " stopped";
+        return value && stopped;
+      });
+
+  if (all_stopped) {
+    FXL_VLOG(2) << "All providers reporting stopped";
     TransitionToState(State::kStopped);
-    session_finalize_timeout_.Cancel();
-    done_callback_();
+    NotifyStopped();
   }
 }
 
-void TraceSession::FinishSessionDueToTimeout() {
-  // We do not consider pending_start_tracees_ here as we only
-  // stop them as a best effort.
-  if (state_ == State::kStopping && !tracees_.empty()) {
+void TraceSession::NotifyStopped() {
+  if (stop_callback_) {
+    FXL_VLOG(1) << "Marking session as having stopped";
+    session_stop_timeout_.Cancel();
+    auto callback = std::move(stop_callback_);
+    FXL_DCHECK(callback);
+    callback();
+  }
+}
+
+void TraceSession::FinishStoppingDueToTimeout() {
+  if (state_ == State::kStopping) {
     FXL_VLOG(1) << "Marking session as stopped, timed out waiting for tracee(s)";
     TransitionToState(State::kStopped);
     for (auto& tracee : tracees_) {
       if (tracee->state() != Tracee::State::kStopped)
         FXL_LOG(WARNING) << "Timed out waiting for trace provider " << *tracee->bundle()
-                         << " to finish";
+                         << " to stop";
     }
-    done_callback_();
+    NotifyStopped();
+  }
+}
+
+void TraceSession::OnProviderTerminated(TraceProviderBundle* bundle) {
+  auto it = std::find_if(tracees_.begin(), tracees_.end(),
+                         [bundle](const auto& tracee) { return *tracee == bundle; });
+
+  if (it != tracees_.end()) {
+    if (write_results_on_terminate_) {
+      Tracee* tracee = (*it).get();
+      // If the last Stop request saved the results, don't save them again.
+      // But don't write results if the tracee was never started.
+      if (tracee->was_started() && !tracee->results_written()) {
+        if (!WriteProviderData(tracee)) {
+          Abort();
+          return;
+        }
+      }
+    }
+    tracees_.erase(it);
+  }
+
+  if (state_ == State::kStarting) {
+    // A trace provider may have disconnected without having first successfully
+    // started. Check whether all remaining providers have now started so that
+    // we can transition to |kStarted|.
+    CheckAllProvidersStarted();
+  } else if (state_ == State::kStopping) {
+    // A trace provider may have disconnected without having been marked as
+    // stopped. Check whether all remaining providers have now stopped.
+    CheckAllProvidersStopped();
+  }
+
+  TerminateSessionIfEmpty();
+}
+
+void TraceSession::TerminateSessionIfEmpty() {
+  if (state_ == State::kTerminating && tracees_.empty()) {
+    FXL_VLOG(1) << "Marking session as terminated, no more tracees";
+
+    session_terminate_timeout_.Cancel();
+    auto callback = std::move(terminate_callback_);
+    FXL_DCHECK(callback);
+    callback();
+  }
+}
+
+void TraceSession::FinishTerminatingDueToTimeout() {
+  // We do not consider pending_start_tracees_ here as we only
+  // terminate them as a best effort.
+  if (state_ == State::kTerminating && !tracees_.empty()) {
+    FXL_VLOG(1) << "Marking session as terminated, timed out waiting for tracee(s)";
+
+    for (auto& tracee : tracees_) {
+      if (tracee->state() != Tracee::State::kTerminated)
+        FXL_LOG(WARNING) << "Timed out waiting for trace provider " << *tracee->bundle()
+                         << " to terminate";
+    }
+    auto callback = std::move(terminate_callback_);
+    FXL_DCHECK(callback);
+    callback();
   }
 }
 
 void TraceSession::SessionStartTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
                                        zx_status_t status) {
   FXL_LOG(WARNING) << "Waiting for start timed out.";
-  NotifyStarted();
+  FinishStartingDueToTimeout();
 }
 
-void TraceSession::SessionFinalizeTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
-                                          zx_status_t status) {
-  FinishSessionDueToTimeout();
+void TraceSession::SessionStopTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
+                                      zx_status_t status) {
+  FXL_LOG(WARNING) << "Waiting for stop timed out.";
+  FinishStoppingDueToTimeout();
+}
+
+void TraceSession::SessionTerminateTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
+                                           zx_status_t status) {
+  FXL_LOG(WARNING) << "Waiting for termination timed out.";
+  FinishTerminatingDueToTimeout();
 }
 
 void TraceSession::RemoveDeadProvider(TraceProviderBundle* bundle) {
-  if (!(state_ == State::kStarted || state_ == State::kStopping))
+  if (state_ == State::kReady) {
+    // Session never got started. Nothing to do.
     return;
-  FinishProvider(bundle);
+  }
+  OnProviderTerminated(bundle);
+}
+
+bool TraceSession::WriteProviderData(Tracee* tracee) {
+  FXL_DCHECK(!tracee->results_written());
+
+  switch (tracee->TransferRecords(destination_)) {
+    case TransferStatus::kComplete:
+      break;
+    case TransferStatus::kProviderError:
+      FXL_LOG(ERROR) << "Problem reading provider socket output, skipping";
+      break;
+    case TransferStatus::kWriteError:
+      FXL_LOG(ERROR) << "Encountered unrecoverable error writing socket";
+      return false;
+    case TransferStatus::kReceiverDead:
+      FXL_LOG(ERROR) << "Consumer socket peer is closed";
+      return false;
+    default:
+      __UNREACHABLE;
+      break;
+  }
+
+  return true;
 }
 
 void TraceSession::Abort() {
-  FXL_VLOG(1) << "Marking session as having aborted";
-  TransitionToState(State::kStopped);
+  FXL_VLOG(1) << "Fatal error occurred, aborting session";
+
   tracees_.clear();
   abort_handler_();
 }
@@ -244,6 +442,8 @@
               trace::MagicNumberRecordFields::TraceInfoType::Make(
                   trace::ToUnderlyingType(trace::TraceInfoType::kMagicNumber)) |
               trace::MagicNumberRecordFields::Magic::Make(trace::kMagicValue);
+  // This won't block as we're only called after the consumer connects, and
+  // this is the first record written.
   return WriteBufferToSocket(destination_, reinterpret_cast<uint8_t*>(record.data()),
                              trace::WordsToBytes(num_words));
 }
@@ -258,6 +458,12 @@
     case TraceSession::State::kReady:
       out << "ready";
       break;
+    case TraceSession::State::kInitialized:
+      out << "initialized";
+      break;
+    case TraceSession::State::kStarting:
+      out << "starting";
+      break;
     case TraceSession::State::kStarted:
       out << "started";
       break;
@@ -267,6 +473,9 @@
     case TraceSession::State::kStopped:
       out << "stopped";
       break;
+    case TraceSession::State::kTerminating:
+      out << "terminating";
+      break;
   }
 
   return out;
diff --git a/garnet/bin/trace_manager/trace_session.h b/garnet/bin/trace_manager/trace_session.h
index afd9d15..260b4c4 100644
--- a/garnet/bin/trace_manager/trace_session.h
+++ b/garnet/bin/trace_manager/trace_session.h
@@ -5,6 +5,7 @@
 #ifndef GARNET_BIN_TRACE_MANAGER_TRACE_SESSION_H_
 #define GARNET_BIN_TRACE_MANAGER_TRACE_SESSION_H_
 
+#include <fuchsia/tracing/controller/cpp/fidl.h>
 #include <fuchsia/tracing/provider/cpp/fidl.h>
 #include <lib/async/cpp/task.h>
 #include <lib/fidl/cpp/string.h>
@@ -26,24 +27,46 @@
 
 namespace tracing {
 
+namespace controller = ::fuchsia::tracing::controller;
 namespace provider = ::fuchsia::tracing::provider;
 
 // TraceSession keeps track of all TraceProvider instances that
 // are active for a tracing session.
 class TraceSession : public fxl::RefCountedThreadSafe<TraceSession> {
  public:
-  enum class State { kReady, kStarted, kStopping, kStopped };
+  enum class State {
+    // The session is ready to be initialized.
+    kReady,
+    // The session has been initialized.
+    kInitialized,
+    // The session is starting.
+    kStarting,
+    // The session is started.
+    // We transition to this after all providers have reported started.
+    kStarted,
+    // The session is being stopped right now.
+    kStopping,
+    // The session is stopped.
+    // We transition to this after all providers have reported stopped.
+    kStopped,
+    // The session is terminating.
+    kTerminating,
+    // There is no |kTerminated| state. The session is deleted as part of
+    // transitioning to the "terminated" state, and thus is gone (meaning
+    // |TraceManager::session_| == nullptr).
+  };
 
-  // Initializes a new instances that streams results
-  // to |destination|. Every provider active in this
-  // session is handed |categories| and a vmo of size
-  // |trace_buffer_size_megabytes| when started.
+  // Initializes a new session that streams results to |destination|.
+  // Every provider active in this session is handed |categories| and a vmo of size
+  // |buffer_size_megabytes| when started.
   //
   // |abort_handler| is invoked whenever the session encounters
   // unrecoverable errors that render the session dead.
   explicit TraceSession(zx::socket destination, std::vector<std::string> categories,
-                        size_t trace_buffer_size_megabytes, provider::BufferingMode buffering_mode,
-                        TraceProviderSpecMap&& provider_specs, fit::closure abort_handler);
+                        size_t buffer_size_megabytes, provider::BufferingMode buffering_mode,
+                        TraceProviderSpecMap&& provider_specs, zx::duration start_timeout,
+                        zx::duration stop_timeout, fit::closure abort_handler);
+
   // Frees all allocated resources and closes the outgoing
   // connection.
   ~TraceSession();
@@ -53,9 +76,7 @@
   // For testing.
   State state() const { return state_; }
 
-  // Invokes |callback| when all providers in this session have acknowledged
-  // the start request, or after |timeout| has elapsed.
-  void WaitForProvidersToStart(fit::closure callback, zx::duration timeout);
+  void set_write_results_on_terminate(bool flag) { write_results_on_terminate_ = flag; }
 
   // Writes all applicable trace info records.
   // These records are like a pre-amble to the trace, in particular they
@@ -63,32 +84,73 @@
   // can be used to identify the file as a Fuchsia Trace File.
   void WriteTraceInfo();
 
-  // Starts |provider| and adds it to this session.
+  // Initializes |provider| and adds it to this session.
   void AddProvider(TraceProviderBundle* provider);
-  // Stops |provider|, streaming out all of its trace records.
-  void RemoveDeadProvider(TraceProviderBundle* provider);
+
+  // Called after all registered providers have been added.
+  void MarkInitialized();
+
+  // Terminates the trace.
+  // Stops tracing first if necessary (see |Stop()|).
+  // If terminating providers takes longer than |stop_timeout_|, we forcefully
+  // terminate tracing and invoke |callback|.
+  void Terminate(fit::closure callback);
+
+  // Starts the trace.
+  // Invokes |callback| when all providers in this session have
+  // acknowledged the start request, or after |start_timeout_| has elapsed.
+  void Start(controller::BufferDisposition buffer_disposition,
+             const std::vector<std::string>& additional_categories,
+             controller::Controller::StartTracingCallback callback);
+
   // Stops all providers that are part of this session, streams out
-  // all remaining trace records and finally invokes |done_callback|.
+  // all remaining trace records and finally invokes |callback|.
+  // If |write_results| is true then trace results are written after
+  // providers stop (and a flag is set to clear buffer contents if tracing
+  // starts again).
   //
-  // If stopping providers takes longer than |timeout|, we forcefully
-  // shutdown operations and invoke |done_callback|.
-  void Stop(fit::closure done_callback, zx::duration timeout);
+  // If stopping providers takes longer than |stop_timeout_|, we forcefully
+  // stop tracing and invoke |callback|.
+  void Stop(bool write_results, fit::closure callback);
+
+  // Remove |provider|, it's dead Jim.
+  void RemoveDeadProvider(TraceProviderBundle* provider);
 
  private:
   friend std::ostream& operator<<(std::ostream& out, TraceSession::State state);
 
+  // Provider starting processing.
   void OnProviderStarted(TraceProviderBundle* bundle);
   void CheckAllProvidersStarted();
   void NotifyStarted();
-  void FinishProvider(TraceProviderBundle* bundle);
-  void FinishSessionIfEmpty();
-  void FinishSessionDueToTimeout();
+  void FinishStartingDueToTimeout();
 
+  // Provider stopping processing.
+  void OnProviderStopped(TraceProviderBundle* bundle, bool write_results);
+  void CheckAllProvidersStopped();
+  void NotifyStopped();
+  void FinishStoppingDueToTimeout();
+
+  // Provider termination processing.
+  void OnProviderTerminated(TraceProviderBundle* bundle);
+  void TerminateSessionIfEmpty();
+  void FinishTerminatingDueToTimeout();
+
+  // Timeout handlers.
   void SessionStartTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
                            zx_status_t status);
-  void SessionFinalizeTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
-                              zx_status_t status);
+  void SessionStopTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
+                          zx_status_t status);
+  void SessionTerminateTimeout(async_dispatcher_t* dispatcher, async::TaskBase* task,
+                               zx_status_t status);
 
+  // Returns true on success or non fatal error.
+  // Returns false if a fatal error occurred, in which case the caller is expected to call
+  // |Abort()| and immediately return as |this| will be deleted.
+  bool WriteProviderData(Tracee* tracee);
+
+  // Abort's the trace session.
+  // N.B. Upon return |this| will have been deleted.
   void Abort();
 
   TransferStatus WriteMagicNumberRecord();
@@ -98,17 +160,40 @@
   State state_ = State::kReady;
   zx::socket destination_;
   fidl::VectorPtr<std::string> categories_;
-  size_t trace_buffer_size_megabytes_;
+  size_t buffer_size_megabytes_;
   provider::BufferingMode buffering_mode_;
   TraceProviderSpecMap provider_specs_;
+  zx::duration start_timeout_;
+  // The stop timeout is used for both stopping and terminating.
+  zx::duration stop_timeout_;
+
+  // List of all registered providers (or "tracees"). Note that providers
+  // may come and go while tracing is active.
   std::list<std::unique_ptr<Tracee>> tracees_;
+
+  // Saved copy of Start()'s |additional_categories| parameter for tracees that
+  // come along after tracing has started.
+  std::vector<std::string> additional_categories_;
+
   async::TaskMethod<TraceSession, &TraceSession::SessionStartTimeout> session_start_timeout_{this};
-  async::TaskMethod<TraceSession, &TraceSession::SessionFinalizeTimeout> session_finalize_timeout_{
-      this};
-  fit::closure start_callback_;
-  fit::closure done_callback_;
+  async::TaskMethod<TraceSession, &TraceSession::SessionStopTimeout> session_stop_timeout_{this};
+  async::TaskMethod<TraceSession, &TraceSession::SessionTerminateTimeout>
+      session_terminate_timeout_{this};
+
+  controller::Controller::StartTracingCallback start_callback_;
+  fit::closure stop_callback_;
+  fit::closure terminate_callback_;
+
   fit::closure abort_handler_;
 
+  // Force the clearing of provider trace buffers on the next start.
+  // This is done when a provider stops with |write_results| set in
+  // |StopOptions|.
+  bool force_clear_buffer_contents_ = false;
+
+  // If true then write results when the session terminates.
+  bool write_results_on_terminate_ = true;
+
   fxl::WeakPtrFactory<TraceSession> weak_ptr_factory_;
 
   TraceSession(const TraceSession&) = delete;
diff --git a/garnet/bin/trace_manager/tracee.cc b/garnet/bin/trace_manager/tracee.cc
index ba45bcf..6cf91e58 100644
--- a/garnet/bin/trace_manager/tracee.cc
+++ b/garnet/bin/trace_manager/tracee.cc
@@ -72,13 +72,14 @@
 
 bool Tracee::operator==(TraceProviderBundle* bundle) const { return bundle_ == bundle; }
 
-bool Tracee::Start(fidl::VectorPtr<std::string> categories, size_t buffer_size,
-                   provider::BufferingMode buffering_mode, fit::closure started_callback,
-                   fit::closure stopped_callback) {
+bool Tracee::Initialize(fidl::VectorPtr<std::string> categories, size_t buffer_size,
+                        provider::BufferingMode buffering_mode, StartCallback start_callback,
+                        StopCallback stop_callback, TerminateCallback terminate_callback) {
   FXL_DCHECK(state_ == State::kReady);
   FXL_DCHECK(!buffer_vmo_);
-  FXL_DCHECK(started_callback);
-  FXL_DCHECK(stopped_callback);
+  FXL_DCHECK(start_callback);
+  FXL_DCHECK(stop_callback);
+  FXL_DCHECK(terminate_callback);
 
   zx::vmo buffer_vmo;
   zx_status_t status = zx::vmo::create(buffer_size, 0u, &buffer_vmo);
@@ -104,7 +105,6 @@
     return false;
   }
 
-  // TODO(PT-112): Split out Initialize().
   provider::ProviderConfig provider_config;
   provider_config.buffering_mode = buffering_mode;
   provider_config.buffer = std::move(buffer_vmo_for_provider);
@@ -114,32 +114,76 @@
   }
   bundle_->provider->Initialize(std::move(provider_config));
 
-  provider::StartOptions start_options;
-  start_options.buffer_disposition = provider::BufferDisposition::CLEAR_ENTIRE;
-  bundle_->provider->Start(std::move(start_options));
-
   buffering_mode_ = buffering_mode;
   buffer_vmo_ = std::move(buffer_vmo);
   buffer_vmo_size_ = buffer_size;
   fifo_ = std::move(fifo);
-  started_callback_ = std::move(started_callback);
-  stopped_callback_ = std::move(stopped_callback);
+
+  start_callback_ = std::move(start_callback);
+  stop_callback_ = std::move(stop_callback);
+  terminate_callback_ = std::move(terminate_callback);
+
   wait_.set_object(fifo_.get());
   wait_.set_trigger(ZX_FIFO_READABLE | ZX_FIFO_PEER_CLOSED);
   dispatcher_ = async_get_default_dispatcher();
   status = wait_.Begin(dispatcher_);
   FXL_CHECK(status == ZX_OK) << "Failed to add handler: status=" << status;
-  TransitionToState(State::kStartPending);
+
+  TransitionToState(State::kInitialized);
   return true;
 }
 
-void Tracee::Stop() {
-  if (state_ != State::kStarted)
+void Tracee::Terminate() {
+  if (state_ == State::kTerminating || state_ == State::kTerminated) {
     return;
-  // TODO(PT-112): Just call terminate.
-  bundle_->provider->Stop();
+  }
   bundle_->provider->Terminate();
+  TransitionToState(State::kTerminating);
+}
+
+void Tracee::Start(controller::BufferDisposition buffer_disposition,
+                   const std::vector<std::string>& additional_categories) {
+  // TraceSession should not call us unless we're ready, either because this
+  // is the first time, or subsequent times after tracing has fully stopped
+  // from the preceding time.
+  FXL_DCHECK(state_ == State::kInitialized || state_ == State::kStopped);
+
+  provider::StartOptions start_options;
+  switch (buffer_disposition) {
+    case controller::BufferDisposition::CLEAR_ALL:
+      start_options.buffer_disposition = provider::BufferDisposition::CLEAR_ENTIRE;
+      break;
+    case controller::BufferDisposition::CLEAR_NONDURABLE:
+      start_options.buffer_disposition = provider::BufferDisposition::CLEAR_NONDURABLE;
+      break;
+    case controller::BufferDisposition::RETAIN:
+      start_options.buffer_disposition = provider::BufferDisposition::RETAIN;
+      break;
+    default:
+      FXL_NOTREACHED();
+      break;
+  }
+  start_options.additional_categories = additional_categories;
+  bundle_->provider->Start(std::move(start_options));
+
+  TransitionToState(State::kStarting);
+  was_started_ = true;
+  results_written_ = false;
+}
+
+void Tracee::Stop(bool write_results) {
+  if (state_ != State::kStarting && state_ != State::kStarted) {
+    if (state_ == State::kInitialized) {
+      // We must have gotten added after tracing started while tracing was
+      // being stopped. Mark us as stopped so TraceSession won't try to wait
+      // for us to do so.
+      TransitionToState(State::kStopped);
+    }
+    return;
+  }
+  bundle_->provider->Stop();
   TransitionToState(State::kStopping);
+  write_results_ = write_results;
 }
 
 void Tracee::TransitionToState(State new_state) {
@@ -157,8 +201,7 @@
   zx_signals_t pending = signal->observed;
   FXL_VLOG(2) << *bundle_ << ": pending=0x" << std::hex << pending;
   FXL_DCHECK(pending & (ZX_FIFO_READABLE | ZX_FIFO_PEER_CLOSED));
-  FXL_DCHECK(state_ == State::kStartPending || state_ == State::kStarted ||
-             state_ == State::kStopping);
+  FXL_DCHECK(state_ != State::kReady && state_ != State::kTerminated);
 
   if (pending & ZX_FIFO_READABLE) {
     OnFifoReadable(dispatcher, wait);
@@ -172,10 +215,10 @@
   FXL_DCHECK(pending & ZX_FIFO_PEER_CLOSED);
   wait_.set_object(ZX_HANDLE_INVALID);
   dispatcher_ = nullptr;
-  TransitionToState(State::kStopped);
-  fit::closure stopped_callback = std::move(stopped_callback_);
-  FXL_DCHECK(stopped_callback);
-  stopped_callback();
+  TransitionToState(State::kTerminated);
+  fit::closure terminate_callback = std::move(terminate_callback_);
+  FXL_DCHECK(terminate_callback);
+  terminate_callback();
 }
 
 void Tracee::OnFifoReadable(async_dispatcher_t* dispatcher, async::WaitBase* wait) {
@@ -184,7 +227,7 @@
   FXL_DCHECK(status2 == ZX_OK);
   if (packet.data16 != 0) {
     FXL_LOG(ERROR) << *bundle_ << ": Received bad packet, non-zero data16 field: " << packet.data16;
-    Stop();
+    Abort();
     return;
   }
 
@@ -195,21 +238,21 @@
       if (packet.data32 != TRACE_PROVIDER_FIFO_PROTOCOL_VERSION) {
         FXL_LOG(ERROR) << *bundle_
                        << ": Received bad packet, unexpected version: " << packet.data32;
-        Stop();
+        Abort();
         break;
       }
       if (packet.data64 != 0) {
         FXL_LOG(ERROR) << *bundle_
                        << ": Received bad packet, non-zero data64 field: " << packet.data64;
-        Stop();
+        Abort();
         break;
       }
-      if (state_ == State::kStartPending) {
+      if (state_ == State::kStarting) {
         TransitionToState(State::kStarted);
-        fit::closure started_callback = std::move(started_callback_);
-        FXL_DCHECK(started_callback);
-        started_callback();
+        start_callback_();
       } else {
+        // This could be a problem in the provider or it could just be slow.
+        // TODO(dje): Disconnect it and force it to reconnect?
         FXL_LOG(WARNING) << *bundle_ << ": Received TRACE_PROVIDER_STARTED in state " << state_;
       }
       break;
@@ -217,7 +260,8 @@
       if (buffering_mode_ != provider::BufferingMode::STREAMING) {
         FXL_LOG(WARNING) << *bundle_ << ": Received TRACE_PROVIDER_SAVE_BUFFER in mode "
                          << ModeName(buffering_mode_);
-      } else if (state_ == State::kStarted || state_ == State::kStopping) {
+      } else if (state_ == State::kStarted || state_ == State::kStopping ||
+                 state_ == State::kTerminating) {
         uint32_t wrapped_count = packet.data32;
         uint64_t durable_data_end = packet.data64;
         // Schedule the write with the main async loop.
@@ -237,12 +281,16 @@
     case TRACE_PROVIDER_STOPPED:
       if (packet.data16 != 0 || packet.data32 != 0 || packet.data64 != 0) {
         FXL_LOG(ERROR) << *bundle_ << ": Received bad packet, non-zero data fields";
-        Stop();
+        Abort();
         break;
       }
-      if (state_ == State::kStopping) {
-        // Nothing to do yet. Let closing of the FIFO indicate final stopping.
-        // This part is in transition to full independent start/stop support.
+      if (state_ == State::kStopping || state_ == State::kTerminating) {
+        // If we're terminating leave the transition to kTerminated to
+        // noticing the fifo peer closed.
+        if (state_ == State::kStopping) {
+          TransitionToState(State::kStopped);
+        }
+        stop_callback_(write_results_);
       } else {
         // This could be a problem in the provider or it could just be slow.
         // TODO(dje): Disconnect it and force it to reconnect?
@@ -251,7 +299,7 @@
       break;
     default:
       FXL_LOG(ERROR) << *bundle_ << ": Received bad packet, unknown request: " << packet.request;
-      Stop();
+      Abort();
       break;
   }
 }
@@ -259,11 +307,10 @@
 void Tracee::OnHandleError(zx_status_t status) {
   FXL_VLOG(2) << *bundle_ << ": error=" << status;
   FXL_DCHECK(status == ZX_ERR_CANCELED);
-  FXL_DCHECK(state_ == State::kStartPending || state_ == State::kStarted ||
-             state_ == State::kStopping);
+  FXL_DCHECK(state_ != State::kReady && state_ != State::kTerminated);
   wait_.set_object(ZX_HANDLE_INVALID);
   dispatcher_ = nullptr;
-  TransitionToState(State::kStopped);
+  TransitionToState(State::kTerminated);
 }
 
 bool Tracee::VerifyBufferHeader(const trace::internal::BufferHeaderReader* header) const {
@@ -285,7 +332,7 @@
 
   // TODO(dje): Loop on smaller buffer.
   // Better yet, be able to pass the entire vmo to the socket (still need to
-  // support multiple chunks: the writer will need vmo,offset,size parameters).
+  // support multiple chunks: the consumer will need vmo,offset,size parameters (fuchsia.mem)).
 
   uint64_t size_in_words = trace::BytesToWords(size);
   // For paranoia purposes verify size is a multiple of the word size so we
@@ -303,13 +350,14 @@
   if (!by_size) {
     uint64_t words_written = GetBufferWordsWritten(buffer.data(), size_in_words);
     bytes_written = trace::WordsToBytes(words_written);
+    FXL_VLOG(2) << "By-record -> " << bytes_written << " bytes";
   } else {
     bytes_written = size;
   }
 
   auto status = WriteBufferToSocket(socket, buffer.data(), bytes_written);
   if (status != TransferStatus::kComplete) {
-    FXL_LOG(ERROR) << *bundle_ << ": Failed to write " << name << " records";
+    FXL_VLOG(1) << *bundle_ << ": Failed to write " << name << " records";
   }
   return status;
 }
@@ -347,6 +395,9 @@
 
   auto transfer_status = TransferStatus::kComplete;
 
+  // Regardless of whether we succeed or fail, mark results as being written.
+  results_written_ = true;
+
   if ((transfer_status = WriteProviderIdRecord(socket)) != TransferStatus::kComplete) {
     FXL_LOG(ERROR) << *bundle_ << ": Failed to write provider info record to trace.";
     return transfer_status;
@@ -375,9 +426,8 @@
     // If we can't write the buffer overflow record, it's not the end of the
     // world.
     if (WriteProviderBufferOverflowEvent(socket) != TransferStatus::kComplete) {
-      FXL_LOG(ERROR) << *bundle_
-                     << ": Failed to write provider event (buffer overflow)"
-                        " record to trace.";
+      FXL_VLOG(1) << *bundle_
+                  << ": Failed to write provider event (buffer overflow) record to trace.";
     }
   }
 
@@ -430,6 +480,7 @@
 
   // Print some stats to assist things like buffer size calculations.
   // Don't print anything if nothing was written.
+  // TODO(dje): Revisit this once stats are fully reported back to the client.
   if ((header->buffering_mode() == TRACE_BUFFERING_MODE_ONESHOT &&
        header->rolling_data_end(0) > kInitRecordSizeBytes) ||
       ((header->buffering_mode() != TRACE_BUFFERING_MODE_ONESHOT) &&
@@ -454,9 +505,12 @@
   FXL_DCHECK(buffer_vmo_);
 
   if (!DoTransferBuffer(socket, wrapped_count, durable_data_end)) {
-    Stop();
+    Abort();
+    return;
   }
 
+  // If a consumer isn't connected we still want to mark the buffer as having
+  // been saved in order to keep the trace engine running.
   last_wrapped_count_ = wrapped_count;
   last_durable_data_end_ = durable_data_end;
   NotifyBufferSaved(wrapped_count, durable_data_end);
@@ -547,7 +601,7 @@
     // The FIFO should never fill. If it does then the provider is sending us
     // buffer full notifications but not reading our replies. Terminate the
     // connection.
-    Stop();
+    Abort();
   } else {
     FXL_DCHECK(status == ZX_OK || status == ZX_ERR_PEER_CLOSED);
   }
@@ -610,6 +664,11 @@
                              trace::WordsToBytes(num_words));
 }
 
+void Tracee::Abort() {
+  FXL_LOG(ERROR) << *bundle_ << ": Aborting connection";
+  Terminate();
+}
+
 const char* Tracee::ModeName(provider::BufferingMode mode) {
   switch (mode) {
     case provider::BufferingMode::ONESHOT:
@@ -626,8 +685,11 @@
     case Tracee::State::kReady:
       out << "ready";
       break;
-    case Tracee::State::kStartPending:
-      out << "start pending";
+    case Tracee::State::kInitialized:
+      out << "initialized";
+      break;
+    case Tracee::State::kStarting:
+      out << "starting";
       break;
     case Tracee::State::kStarted:
       out << "started";
@@ -638,6 +700,12 @@
     case Tracee::State::kStopped:
       out << "stopped";
       break;
+    case Tracee::State::kTerminating:
+      out << "terminating";
+      break;
+    case Tracee::State::kTerminated:
+      out << "terminated";
+      break;
   }
 
   return out;
diff --git a/garnet/bin/trace_manager/tracee.h b/garnet/bin/trace_manager/tracee.h
index 368d9ec..01e0d1d 100644
--- a/garnet/bin/trace_manager/tracee.h
+++ b/garnet/bin/trace_manager/tracee.h
@@ -30,18 +30,28 @@
 class Tracee {
  public:
   enum class State {
-    // All systems go, provider hasn't been started, yet.
+    // The provider is ready to be initialized.
     kReady,
+    // The provider has been initialized.
+    kInitialized,
     // The provider was asked to start.
-    kStartPending,
+    kStarting,
     // The provider is started and tracing.
     kStarted,
     // The provider is being stopped right now.
     kStopping,
     // The provider is stopped.
-    kStopped
+    kStopped,
+    // The provider is terminating.
+    kTerminating,
+    // The provider is terminated.
+    kTerminated,
   };
 
+  using StartCallback = fit::closure;
+  using StopCallback = fit::function<void(bool write_results)>;
+  using TerminateCallback = fit::closure;
+
   // The size of the initialization record.
   static constexpr size_t kInitRecordSizeBytes = 16;
 
@@ -49,13 +59,19 @@
   ~Tracee();
 
   bool operator==(TraceProviderBundle* bundle) const;
-  bool Start(fidl::VectorPtr<std::string> categories, size_t buffer_size,
-             provider::BufferingMode buffering_mode, fit::closure started_callback,
-             fit::closure stopped_callback);
-  void Stop();
 
-  // Called once at the end of the trace to transfer all collected records
-  // to |socket|.
+  bool Initialize(fidl::VectorPtr<std::string> categories, size_t buffer_size,
+                  provider::BufferingMode buffering_mode, StartCallback start_callback,
+                  StopCallback stop_callback, TerminateCallback terminate_callback);
+
+  void Terminate();
+
+  void Start(controller::BufferDisposition buffer_disposition,
+             const std::vector<std::string>& additional_categories);
+
+  void Stop(bool write_results);
+
+  // Transfer all collected records to |socket|.
   TransferStatus TransferRecords(const zx::socket& socket) const;
 
   // Save the buffer specified by |wrapped_count|.
@@ -69,6 +85,8 @@
 
   const TraceProviderBundle* bundle() const { return bundle_; }
   State state() const { return state_; }
+  bool was_started() const { return was_started_; }
+  bool results_written() const { return results_written_; }
 
  private:
   // The size of the fifo, in packets.
@@ -122,21 +140,41 @@
 
   void NotifyBufferSaved(uint32_t wrapped_count, uint64_t durable_data_end);
 
+  // Called when a problem is detected warranting shutting the connection down.
+  void Abort();
+
   const TraceSession* const session_;
   const TraceProviderBundle* const bundle_;
   State state_ = State::kReady;
+
   provider::BufferingMode buffering_mode_;
   zx::vmo buffer_vmo_;
   size_t buffer_vmo_size_ = 0u;
   zx::fifo fifo_;
-  fit::closure started_callback_;
-  fit::closure stopped_callback_;
+
+  StartCallback start_callback_;
+  StopCallback stop_callback_;
+  TerminateCallback terminate_callback_;
+
   async_dispatcher_t* dispatcher_ = nullptr;
   async::WaitMethod<Tracee, &Tracee::OnHandleReady> wait_;
+
   uint32_t last_wrapped_count_ = 0u;
   uint64_t last_durable_data_end_ = 0;
   mutable bool provider_info_record_written_ = false;
 
+  // Set to true when starting. This is used to not write any results,
+  // including provider info, if the tracee was never started.
+  bool was_started_ = false;
+
+  // The |write_results| flag passed to |Stop()|.
+  // We do nothing with this except to pass it back to |stop_callback_|.
+  bool write_results_ = false;
+
+  // Set to false when starting and true when results are written.
+  // This is used to not save the results twice when terminating.
+  mutable bool results_written_ = false;
+
   fxl::WeakPtrFactory<Tracee> weak_ptr_factory_;
 
   Tracee(const Tracee&) = delete;
diff --git a/garnet/bin/trace_manager/util.cc b/garnet/bin/trace_manager/util.cc
index 0a720fe..14f17ae 100644
--- a/garnet/bin/trace_manager/util.cc
+++ b/garnet/bin/trace_manager/util.cc
@@ -60,4 +60,48 @@
   return out;
 }
 
+std::ostream& operator<<(std::ostream& out, controller::BufferDisposition disposition) {
+  switch (disposition) {
+    case controller::BufferDisposition::CLEAR_ALL:
+      out << "clear-all";
+      break;
+    case controller::BufferDisposition::CLEAR_NONDURABLE:
+      out << "clear-nondurable";
+      break;
+    case controller::BufferDisposition::RETAIN:
+      out << "retain";
+      break;
+  }
+
+  return out;
+}
+
+std::ostream& operator<<(std::ostream& out, controller::SessionState state) {
+  switch (state) {
+    case controller::SessionState::READY:
+      out << "ready";
+      break;
+    case controller::SessionState::INITIALIZED:
+      out << "initialized";
+      break;
+    case controller::SessionState::STARTING:
+      out << "starting";
+      break;
+    case controller::SessionState::STARTED:
+      out << "started";
+      break;
+    case controller::SessionState::STOPPING:
+      out << "stopping";
+      break;
+    case controller::SessionState::STOPPED:
+      out << "stopped";
+      break;
+    case controller::SessionState::TERMINATING:
+      out << "terminaing";
+      break;
+  }
+
+  return out;
+}
+
 }  // namespace tracing
diff --git a/garnet/bin/trace_manager/util.h b/garnet/bin/trace_manager/util.h
index 930520c..d5cc044 100644
--- a/garnet/bin/trace_manager/util.h
+++ b/garnet/bin/trace_manager/util.h
@@ -5,12 +5,15 @@
 #ifndef GARNET_BIN_TRACE_MANAGER_UTIL_H_
 #define GARNET_BIN_TRACE_MANAGER_UTIL_H_
 
-#include <iosfwd>
-
+#include <fuchsia/tracing/controller/cpp/fidl.h>
 #include <lib/zx/socket.h>
 
+#include <iosfwd>
+
 namespace tracing {
 
+namespace controller = ::fuchsia::tracing::controller;
+
 enum class TransferStatus {
   // The transfer is complete.
   kComplete,
@@ -25,6 +28,10 @@
 
 std::ostream& operator<<(std::ostream& out, TransferStatus status);
 
+std::ostream& operator<<(std::ostream& out, controller::BufferDisposition disposition);
+
+std::ostream& operator<<(std::ostream& out, controller::SessionState state);
+
 // Writes |len| bytes from |buffer| to |socket|. Returns
 // TransferStatus::kComplete if the entire buffer has been
 // successfully transferred. A return value of
diff --git a/sdk/fidl/fuchsia.tracing.controller/BUILD.gn b/sdk/fidl/fuchsia.tracing.controller/BUILD.gn
index 58d4457..984d066 100644
--- a/sdk/fidl/fuchsia.tracing.controller/BUILD.gn
+++ b/sdk/fidl/fuchsia.tracing.controller/BUILD.gn
@@ -5,11 +5,5 @@
 import("//build/fidl/fidl.gni")
 
 fidl("fuchsia.tracing.controller") {
-  # TODO(fxb/35879): Remove lint exclusions by fixing known FIDL lint violations in this target
-  excluded_checks = [
-    "string-bounds-not-specified",
-    "vector-bounds-not-specified",
-  ]
-
   sources = [ "trace_controller.fidl" ]
 }
diff --git a/sdk/fidl/fuchsia.tracing.controller/trace_controller.fidl b/sdk/fidl/fuchsia.tracing.controller/trace_controller.fidl
index 377aadf..4a64c39 100644
--- a/sdk/fidl/fuchsia.tracing.controller/trace_controller.fidl
+++ b/sdk/fidl/fuchsia.tracing.controller/trace_controller.fidl
@@ -18,45 +18,89 @@
 /// The maximum length of a category name.
 const uint32 MAX_CATEGORY_NAME_LENGTH = 100;
 
-// The controller interface used by the trace tool to start/stop tracing.
+/// The maximum length of a category description.
+const uint32 MAX_CATEGORY_DESCRIPTION_LENGTH = 400;
+
+/// The state of the tracing session.
+/// A "session" is everything between |Initialize| and |Terminate|.
+enum SessionState {
+    /// The tracing system is ready for a new session.
+    /// There can be only one session at a time.
+    READY = 1;
+    /// A new tracing session has been initialized.
+    INITIALIZED = 2;
+    /// Tracing is in the midst of starting.
+    STARTING = 3;
+    /// Tracing has started.
+    STARTED = 4;
+    /// Tracing is in the midst of being stopped.
+    STOPPING = 5;
+    /// Tracing has fully stopped.
+    STOPPED = 6;
+    /// Tracing is in the midst of being terminated.
+    /// Once the system has completely terminated the session it goes back
+    /// to the READY state.
+    TERMINATING = 7;
+};
+
+/// The controller interface used by the trace tool to initialize/start/stop/terminate tracing.
+///
+/// The trace controller may lightly validate the structure of
+/// trace records as it copies them from trace buffers into the output.
+/// In particular, it may verify the size of each record header to ensure
+/// that the framing of trace records in the data stream is maintained.
+///
+/// The trace controller does not validate the contents of the trace records
+/// themselves.  For example, it does not try to check argument lengths in
+/// events.  This ensures that the trace format can be extended without needing
+/// to modify the trace controller.
 [Discoverable]
 protocol Controller {
-    // Requests to start tracing with the specified `options`.
-    //
-    // The trace controller acknowledges the request when all registered providers have been
-    // started or after `options.start_timeout_milliseconds` milliseconds.
-    //
-    // The trace controller emits trace data to `output` as a sequence of
-    // binary formatted trace records.  Traces obtained from different providers
-    // are delimited by metadata records within the stream.
-    //
-    // The trace controller is responsible for lightly validating the structure of
-    // trace records as it copies them from trace buffers into the output.
-    // In particular, it must verify the size of each record header to ensure
-    // that the framing of trace records in the data stream is maintained.
-    //
-    // The trace controller does not validate the contents of the trace records
-    // themselves.  For example, it should not try to check argument lengths in
-    // events.  This ensures that the trace format can be extended without needing
-    // to modify the trace controller.
-    StartTracing(TraceOptions options, handle<socket> output) -> ();
+    /// Requests to initialize tracing with the specified |config|.
+    ///
+    /// A bad request will terminate the connection.
+    ///
+    /// The trace controller emits trace data to `output` as a sequence of
+    /// binary formatted trace records.  Traces obtained from different providers
+    /// are delimited by metadata records within the stream.
+    InitializeTracing(TraceConfig config, handle<socket> output);
 
-    // Requests to stop tracing.
-    //
-    // The trace controller continues to transfer any remaining data to the
-    // output until finished, then closes the output.
-    StopTracing();
+    /// Requests to terminate tracing.
+    ///
+    /// If tracing has not yet stopped it is stopped first.
+    /// If |options.write_results| is true, then the trace controller
+    /// continues to transfer any remaining data to the output socket
+    /// until finished, then closes the output.
+    TerminateTracing(TerminateOptions options) -> (TerminateResult result);
+
+    /// Requests to start tracing with the specified |options|.
+    ///
+    /// If tracing has already started then the request is ignored,
+    /// except to send back an error code.
+    ///
+    /// The trace controller acknowledges the request when all
+    /// registered providers have been started or after
+    /// |TraceConfig.start_timeout_milliseconds| milliseconds.
+    /// One useful reason for the has-started acknowledgement is that the
+    /// trace program can start a program to trace knowing that all the
+    /// providers are started.
+    StartTracing(StartOptions options) -> () error StartErrorCode;
+
+    /// Requests to stop tracing.
+    ///
+    /// If tracing has already stopped then this does nothing.
+    /// Returning a result lets callers know when it's ok to, for example,
+    /// start tracing again.
+    StopTracing(StopOptions options) -> ();
 
     /// Return the set of registered providers.
     GetProviders() -> (vector<ProviderInfo>:MAX_NUM_PROVIDERS providers);
 
     // Gets the known categories.
-    GetKnownCategories() -> (vector<KnownCategory> categories);
-};
+    GetKnownCategories() -> (vector<KnownCategory>:MAX_NUM_CATEGORIES categories);
 
-struct KnownCategory {
-    string:MAX_CATEGORY_NAME_LENGTH name;
-    string description;
+    /// Event sent when the session state changes.
+    -> OnSessionStateChange(SessionState state);
 };
 
 // This is a copy of provider.fidl:BufferingMode.
@@ -73,7 +117,7 @@
 };
 
 /// Provides options for the trace.
-table TraceOptions {
+table TraceConfig {
     /// The trace categories to record, or an empty array for all.
     1: vector<string:MAX_CATEGORY_NAME_LENGTH>:MAX_NUM_CATEGORIES categories;
 
@@ -92,6 +136,99 @@
     5: vector<ProviderSpec>:MAX_NUM_PROVIDERS provider_specs;
 };
 
+/// Terminate options.
+table TerminateOptions {
+    /// If true then write trace results.
+    /// If false then discard trace results.
+    1: bool write_results;
+};
+
+/// Result of a terminate request.
+table TerminateResult {
+    // TODO(dje): Provider stats.
+};
+
+/// Choices for clearing/retaining trace buffer contents at Start.
+/// A brief summary of buffer contents:
+/// The trace buffer is divided into two main pieces: durable and
+/// non-durable.
+/// The durable portion contains things like the string and thread data for
+/// their respective references (trace_encoded_string_ref_t and
+/// trace_encoded_thread_ref_t). The non-durable portion contains the rest of
+/// the trace data like events); this is the portion that, for example, is
+/// discarded in circular buffering mode when the (non-durable) buffer fills.
+enum BufferDisposition : uint8 {
+    /// Clear the entire buffer, including durable buffer contents.
+    /// N.B. If this is done mid-session, then string and thread references
+    /// from prior to this point will become invalid - the underlying data
+    /// will be gone. To prevent this save buffer contents before clearing.
+    ///
+    /// This is typically used when buffer contents were saved after the
+    /// preceding Stop.
+    CLEAR_ALL = 1;
+
+    /// Clear the non-durable portion of the buffer, retaining the durable
+    /// portion.
+    ///
+    /// This is typically used when buffer contents were not saved after the
+    /// preceding Stop and the current contents are to be discarded.
+    CLEAR_NONDURABLE = 2;
+
+    /// Retain buffer contents. New trace data is added whether the previous
+    /// trace run left off.
+    ///
+    /// This is typically used when buffer contents were not saved after the
+    /// preceding Stop and the current contents are to be retained.
+    RETAIN = 3;
+};
+
+/// Error codes from Start operations.
+enum StartErrorCode {
+    /// Tracing hasn't been initialized, not ready to start.
+    NOT_INITIALIZED = 1;
+    /// Tracing has already been started.
+    ALREADY_STARTED = 2;
+    /// Tracing is currently being stopped.
+    STOPPING = 3;
+    /// Tracing is currently being terminated.
+    TERMINATING = 4;
+};
+
+/// Additional options to control trace data collection.
+table StartOptions {
+    /// Whether and how to clear the buffer when starting data collection.
+    /// This allows, for example, multiple Start/Stop trace runs to be
+    /// collected in the same buffer.
+    ///
+    /// If the preceding |Stop()| request had |save_after_stopped=true|
+    /// then this value is overridden to CLEAR_ENTIRE_BUFFER to avoid
+    /// duplicate data being saved.
+    1: BufferDisposition buffer_disposition;
+
+    /// The trace categories to add to the initial set provided in
+    /// |TraceConfig|. If the combined number of categories goes over the
+    /// limit then the extra categories past the limit are discarded.
+    2: vector<string:MAX_CATEGORY_NAME_LENGTH>:MAX_NUM_CATEGORIES
+           additional_categories;
+};
+
+/// Additional options to control stopping of a trace.
+table StopOptions {
+    /// If true then write all collected data after tracing has stopped.
+    /// This is useful in situations where one wants to clear the buffer
+    /// before starting the next trace, without having to first terminate the
+    /// trace and start a new one.
+    1: bool write_results;
+};
+
+/// Result of |GetKnownCategories|.
+struct KnownCategory {
+    // Category name.
+    string:MAX_CATEGORY_NAME_LENGTH name;
+    // Category description.
+    string:MAX_CATEGORY_DESCRIPTION_LENGTH description;
+};
+
 /// Result of |GetProviders|.
 table ProviderInfo {
     /// The provider's ID, assigned by trace-manager.