[chaotic-good] Add client transport error handling. (#34611)

This is a follow-up PR of #34191, which handles the error condition of
endpoints failed to write/read in chaotic-good client transport.

This PR needs to be merged after #34191.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1255395..edd4eb1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -957,6 +957,7 @@
     add_dependencies(buildtests_cxx client_ssl_test)
   endif()
   add_dependencies(buildtests_cxx client_streaming_test)
+  add_dependencies(buildtests_cxx client_transport_error_test)
   add_dependencies(buildtests_cxx client_transport_test)
   add_dependencies(buildtests_cxx cmdline_test)
   add_dependencies(buildtests_cxx codegen_test_full)
@@ -9408,6 +9409,49 @@
 endif()
 if(gRPC_BUILD_TESTS)
 
+add_executable(client_transport_error_test
+  ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
+  src/core/ext/transport/chaotic_good/client_transport.cc
+  src/core/ext/transport/chaotic_good/frame.cc
+  src/core/ext/transport/chaotic_good/frame_header.cc
+  src/core/lib/transport/promise_endpoint.cc
+  test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
+  test/core/transport/chaotic_good/client_transport_error_test.cc
+)
+target_compile_features(client_transport_error_test PUBLIC cxx_std_14)
+target_include_directories(client_transport_error_test
+  PRIVATE
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${CMAKE_CURRENT_SOURCE_DIR}/include
+    ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+    ${_gRPC_RE2_INCLUDE_DIR}
+    ${_gRPC_SSL_INCLUDE_DIR}
+    ${_gRPC_UPB_GENERATED_DIR}
+    ${_gRPC_UPB_GRPC_GENERATED_DIR}
+    ${_gRPC_UPB_INCLUDE_DIR}
+    ${_gRPC_XXHASH_INCLUDE_DIR}
+    ${_gRPC_ZLIB_INCLUDE_DIR}
+    third_party/googletest/googletest/include
+    third_party/googletest/googletest
+    third_party/googletest/googlemock/include
+    third_party/googletest/googlemock
+    ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(client_transport_error_test
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  gtest
+  ${_gRPC_PROTOBUF_LIBRARIES}
+  grpc_test_util
+)
+
+
+endif()
+if(gRPC_BUILD_TESTS)
+
 add_executable(client_transport_test
   ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
   ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index f01b732..70a8757 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -7093,6 +7093,36 @@
   - grpc_authorization_provider
   - grpc_unsecure
   - grpc_test_util
+- name: client_transport_error_test
+  gtest: true
+  build: test
+  language: c++
+  headers:
+  - src/core/ext/transport/chaotic_good/client_transport.h
+  - src/core/ext/transport/chaotic_good/frame.h
+  - src/core/ext/transport/chaotic_good/frame_header.h
+  - src/core/lib/promise/detail/join_state.h
+  - src/core/lib/promise/event_engine_wakeup_scheduler.h
+  - src/core/lib/promise/inter_activity_pipe.h
+  - src/core/lib/promise/join.h
+  - src/core/lib/promise/mpsc.h
+  - src/core/lib/promise/try_join.h
+  - src/core/lib/promise/wait_set.h
+  - src/core/lib/transport/promise_endpoint.h
+  - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
+  src:
+  - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
+  - src/core/ext/transport/chaotic_good/client_transport.cc
+  - src/core/ext/transport/chaotic_good/frame.cc
+  - src/core/ext/transport/chaotic_good/frame_header.cc
+  - src/core/lib/transport/promise_endpoint.cc
+  - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
+  - test/core/transport/chaotic_good/client_transport_error_test.cc
+  deps:
+  - gtest
+  - protobuf
+  - grpc_test_util
+  uses_polling: false
 - name: client_transport_test
   gtest: true
   build: test
diff --git a/src/core/BUILD b/src/core/BUILD
index 5bfd4ad..9018067 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -6227,6 +6227,7 @@
         "arena",
         "chaotic_good_frame",
         "chaotic_good_frame_header",
+        "context",
         "event_engine_wakeup_scheduler",
         "for_each",
         "grpc_promise_endpoint",
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc
index 7049226..686a024 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.cc
+++ b/src/core/ext/transport/chaotic_good/client_transport.cc
@@ -63,6 +63,7 @@
           ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
               "client_transport")),
       arena_(MakeScopedArena(1024, &memory_allocator_)),
+      context_(arena_.get()),
       event_engine_(event_engine) {
   auto write_loop = Loop([this] {
     return TrySeq(
@@ -111,11 +112,10 @@
   writer_ = MakeActivity(
       // Continuously write next outgoing frames to promise endpoints.
       std::move(write_loop), EventEngineWakeupScheduler(event_engine_),
-      [](absl::Status status) {
-        GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
-                   status.code() == absl::StatusCode::kInternal);
-        // TODO(ladynana): handle the promise endpoint write failures with
-        // outgoing_frames.close() once available.
+      [this](absl::Status status) {
+        if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
+          this->AbortWithError();
+        }
       },
       // Hold Arena in activity for GetContext<Arena> usage.
       arena_.get());
@@ -176,11 +176,10 @@
   reader_ = MakeActivity(
       // Continuously read next incoming frames from promise endpoints.
       std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
-      [](absl::Status status) {
-        GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
-                   status.code() == absl::StatusCode::kInternal);
-        // TODO(ladynana): handle the promise endpoint read failures with
-        // iterating stream_map_ and close all the pipes once available.
+      [this](absl::Status status) {
+        if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
+          this->AbortWithError();
+        }
       },
       // Hold Arena in activity for GetContext<Arena> usage.
       arena_.get());
diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h
index a2110f1..4972630 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.h
+++ b/src/core/ext/transport/chaotic_good/client_transport.h
@@ -34,7 +34,6 @@
 
 #include <grpc/event_engine/event_engine.h>
 #include <grpc/event_engine/memory_allocator.h>
-#include <grpc/support/log.h>
 
 #include "src/core/ext/transport/chaotic_good/frame.h"
 #include "src/core/ext/transport/chaotic_good/frame_header.h"
@@ -42,6 +41,7 @@
 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
 #include "src/core/lib/gprpp/sync.h"
 #include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/context.h"
 #include "src/core/lib/promise/for_each.h"
 #include "src/core/lib/promise/if.h"
 #include "src/core/lib/promise/inter_activity_pipe.h"
@@ -75,6 +75,19 @@
       reader_.reset();
     }
   }
+  void AbortWithError() {
+    // Mark transport as unavailable when the endpoint write/read failed.
+    // Close all the available pipes.
+    if (!outgoing_frames_.IsClosed()) {
+      outgoing_frames_.MarkClosed();
+    }
+    MutexLock lock(&mu_);
+    for (const auto& pair : stream_map_) {
+      if (!pair.second->IsClose()) {
+        pair.second->MarkClose();
+      }
+    }
+  }
   auto AddStream(CallArgs call_args) {
     // At this point, the connection is set up.
     // Start sending data frames.
@@ -119,8 +132,11 @@
                       outgoing_frames.Send(ClientFrame(std::move(frame))),
                       [](bool success) -> absl::Status {
                         if (!success) {
-                          return absl::InternalError(
-                              "Send frame to outgoing_frames failed.");
+                          // TODO(ladynana): propagate the actual error message
+                          // from EventEngine.
+                          return absl::UnavailableError(
+                              "Transport closed due to endpoint write/read "
+                              "failed.");
                         }
                         return absl::OkStatus();
                       });
@@ -137,38 +153,51 @@
                   // Save incomming frame results to call_args.
                   [server_initial_metadata, server_to_client_messages](
                       absl::optional<ServerFrame> server_frame) mutable {
-                    GPR_ASSERT(server_frame.has_value());
-                    auto frame = std::move(
-                        absl::get<ServerFragmentFrame>(*server_frame));
+                    bool transport_closed = false;
+                    ServerFragmentFrame frame;
+                    if (!server_frame.has_value()) {
+                      // Incoming server frame pipe is closed, which only
+                      // happens when transport is aborted.
+                      transport_closed = true;
+                    } else {
+                      frame = std::move(
+                          absl::get<ServerFragmentFrame>(*server_frame));
+                    };
                     bool has_headers = (frame.headers != nullptr);
                     bool has_message = (frame.message != nullptr);
                     bool has_trailers = (frame.trailers != nullptr);
                     return TrySeq(
-                        If(
-                            has_headers,
-                            [server_initial_metadata,
-                             headers = std::move(frame.headers)]() mutable {
-                              return server_initial_metadata->Push(
-                                  std::move(headers));
-                            },
-                            [] { return false; }),
-                        If(
-                            has_message,
-                            [server_to_client_messages,
-                             message = std::move(frame.message)]() mutable {
-                              return server_to_client_messages->Push(
-                                  std::move(message));
-                            },
-                            [] { return false; }),
-                        If(
-                            has_trailers,
-                            [trailers = std::move(frame.trailers)]() mutable
-                            -> LoopCtl<ServerMetadataHandle> {
-                              return std::move(trailers);
-                            },
-                            []() -> LoopCtl<ServerMetadataHandle> {
-                              return Continue();
-                            }));
+                        If((!transport_closed) && has_headers,
+                           [server_initial_metadata,
+                            headers = std::move(frame.headers)]() mutable {
+                             return server_initial_metadata->Push(
+                                 std::move(headers));
+                           },
+                           [] { return false; }),
+                        If((!transport_closed) && has_message,
+                           [server_to_client_messages,
+                            message = std::move(frame.message)]() mutable {
+                             return server_to_client_messages->Push(
+                                 std::move(message));
+                           },
+                           [] { return false; }),
+                        If((!transport_closed) && has_trailers,
+                           [trailers = std::move(frame.trailers)]() mutable
+                           -> LoopCtl<ServerMetadataHandle> {
+                             return std::move(trailers);
+                           },
+                           [transport_closed]()
+                               -> LoopCtl<ServerMetadataHandle> {
+                             if (transport_closed) {
+                               // TODO(ladynana): propagate the actual error
+                               // message from EventEngine.
+                               return ServerMetadataFromStatus(
+                                   absl::UnavailableError(
+                                       "Transport closed due to endpoint "
+                                       "write/read failed."));
+                             }
+                             return Continue();
+                           }));
                   });
             })),
         [](std::tuple<Empty, ServerMetadataHandle> ret) {
@@ -204,6 +233,7 @@
   std::shared_ptr<FrameHeader> frame_header_;
   MemoryAllocator memory_allocator_;
   ScopedArenaPtr arena_;
+  promise_detail::Context<Arena> context_;
   // Use to synchronize writer_ and reader_ activity with outside activities;
   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
 };
diff --git a/src/core/lib/promise/inter_activity_pipe.h b/src/core/lib/promise/inter_activity_pipe.h
index e90a611..a7594fb 100644
--- a/src/core/lib/promise/inter_activity_pipe.h
+++ b/src/core/lib/promise/inter_activity_pipe.h
@@ -83,6 +83,11 @@
       on_available.Wakeup();
     }
 
+    bool IsClosed() {
+      MutexLock lock(&mu_);
+      return closed_;
+    }
+
    private:
     Mutex mu_;
     std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
@@ -108,6 +113,12 @@
       if (center_ != nullptr) center_->MarkClosed();
     }
 
+    bool IsClose() { return center_->IsClosed(); }
+
+    void MarkClose() {
+      if (center_ != nullptr) center_->MarkClosed();
+    }
+
     auto Push(T value) {
       return [center = center_, value = std::move(value)]() mutable {
         return center->Push(value);
diff --git a/src/core/lib/promise/mpsc.h b/src/core/lib/promise/mpsc.h
index 8525739..c125442 100644
--- a/src/core/lib/promise/mpsc.h
+++ b/src/core/lib/promise/mpsc.h
@@ -107,6 +107,12 @@
     receiver_closed_ = true;
   }
 
+  // Return whether the receiver is closed.
+  bool IsClosed() {
+    MutexLock lock(&mu_);
+    return receiver_closed_;
+  }
+
  private:
   Mutex mu_;
   const size_t max_queued_;
@@ -164,6 +170,10 @@
   ~MpscReceiver() {
     if (center_ != nullptr) center_->ReceiverClosed();
   }
+  bool IsClosed() { return center_->IsClosed(); }
+  void MarkClosed() {
+    if (center_ != nullptr) center_->ReceiverClosed();
+  }
   MpscReceiver(const MpscReceiver&) = delete;
   MpscReceiver& operator=(const MpscReceiver&) = delete;
   // Only movable until it's first polled, and so we don't need to contend with
diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD
index 036c894..fc698ae 100644
--- a/test/core/transport/chaotic_good/BUILD
+++ b/test/core/transport/chaotic_good/BUILD
@@ -118,3 +118,41 @@
         "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
     ],
 )
+
+grpc_cc_test(
+    name = "client_transport_error_test",
+    srcs = ["client_transport_error_test.cc"],
+    external_deps = [
+        "absl/functional:any_invocable",
+        "absl/status",
+        "absl/status:statusor",
+        "absl/strings:str_format",
+        "absl/types:optional",
+        "gtest",
+    ],
+    language = "C++",
+    uses_event_engine = False,
+    uses_polling = False,
+    deps = [
+        "//:grpc",
+        "//:grpc_public_hdrs",
+        "//:iomgr_timer",
+        "//:ref_counted_ptr",
+        "//src/core:activity",
+        "//src/core:arena",
+        "//src/core:chaotic_good_client_transport",
+        "//src/core:event_engine_wakeup_scheduler",
+        "//src/core:grpc_promise_endpoint",
+        "//src/core:if",
+        "//src/core:join",
+        "//src/core:loop",
+        "//src/core:memory_quota",
+        "//src/core:pipe",
+        "//src/core:resource_quota",
+        "//src/core:seq",
+        "//src/core:slice",
+        "//src/core:slice_buffer",
+        "//test/core/event_engine/fuzzing_event_engine",
+        "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
+    ],
+)
diff --git a/test/core/transport/chaotic_good/client_transport_error_test.cc b/test/core/transport/chaotic_good/client_transport_error_test.cc
new file mode 100644
index 0000000..3b30c4c
--- /dev/null
+++ b/test/core/transport/chaotic_good/client_transport_error_test.cc
@@ -0,0 +1,441 @@
+// Copyright 2023 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "absl/status/status.h"
+
+#include "src/core/ext/transport/chaotic_good/client_transport.h"
+#include "src/core/lib/transport/promise_endpoint.h"
+#include "src/core/lib/transport/transport.h"
+
+// IWYU pragma: no_include <sys/socket.h>
+
+#include <stddef.h>
+
+#include <algorithm>  // IWYU pragma: keep
+#include <memory>
+#include <string>  // IWYU pragma: keep
+#include <tuple>
+#include <utility>
+#include <vector>  // IWYU pragma: keep
+
+#include "absl/functional/any_invocable.h"
+#include "absl/status/statusor.h"     // IWYU pragma: keep
+#include "absl/strings/str_format.h"  // IWYU pragma: keep
+#include "absl/types/optional.h"      // IWYU pragma: keep
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include <grpc/event_engine/event_engine.h>
+#include <grpc/event_engine/memory_allocator.h>
+#include <grpc/event_engine/slice.h>  // IWYU pragma: keep
+#include <grpc/event_engine/slice_buffer.h>
+#include <grpc/grpc.h>
+#include <grpc/status.h>  // IWYU pragma: keep
+
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/timer_manager.h"
+#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
+#include "src/core/lib/promise/if.h"
+#include "src/core/lib/promise/join.h"
+#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/pipe.h"
+#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/resource_quota/arena.h"
+#include "src/core/lib/resource_quota/memory_quota.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
+#include "src/core/lib/slice/slice_buffer.h"
+#include "src/core/lib/slice/slice_internal.h"      // IWYU pragma: keep
+#include "src/core/lib/transport/metadata_batch.h"  // IWYU pragma: keep
+#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
+#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
+
+using testing::MockFunction;
+using testing::Return;
+using testing::Sequence;
+using testing::StrictMock;
+using testing::WithArgs;
+
+namespace grpc_core {
+namespace chaotic_good {
+namespace testing {
+
+class MockEndpoint
+    : public grpc_event_engine::experimental::EventEngine::Endpoint {
+ public:
+  MOCK_METHOD(
+      bool, Read,
+      (absl::AnyInvocable<void(absl::Status)> on_read,
+       grpc_event_engine::experimental::SliceBuffer* buffer,
+       const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
+           args),
+      (override));
+
+  MOCK_METHOD(
+      bool, Write,
+      (absl::AnyInvocable<void(absl::Status)> on_writable,
+       grpc_event_engine::experimental::SliceBuffer* data,
+       const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
+           args),
+      (override));
+
+  MOCK_METHOD(
+      const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
+      GetPeerAddress, (), (const, override));
+  MOCK_METHOD(
+      const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
+      GetLocalAddress, (), (const, override));
+};
+
+class ClientTransportTest : public ::testing::Test {
+ public:
+  ClientTransportTest()
+      : control_endpoint_ptr_(new StrictMock<MockEndpoint>()),
+        data_endpoint_ptr_(new StrictMock<MockEndpoint>()),
+        memory_allocator_(
+            ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
+                "test")),
+        control_endpoint_(*control_endpoint_ptr_),
+        data_endpoint_(*data_endpoint_ptr_),
+        event_engine_(std::make_shared<
+                      grpc_event_engine::experimental::FuzzingEventEngine>(
+            []() {
+              grpc_timer_manager_set_threading(false);
+              grpc_event_engine::experimental::FuzzingEventEngine::Options
+                  options;
+              return options;
+            }(),
+            fuzzing_event_engine::Actions())),
+        arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
+        pipe_client_to_server_messages_(arena_.get()),
+        pipe_server_to_client_messages_(arena_.get()),
+        pipe_server_intial_metadata_(arena_.get()),
+        pipe_client_to_server_messages_second_(arena_.get()),
+        pipe_server_to_client_messages_second_(arena_.get()),
+        pipe_server_intial_metadata_second_(arena_.get()) {}
+  // Initial ClientTransport with read expecations
+  void InitialClientTransport() {
+    client_transport_ = std::make_unique<ClientTransport>(
+        std::make_unique<PromiseEndpoint>(
+            std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
+            SliceBuffer()),
+        std::make_unique<PromiseEndpoint>(
+            std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
+        event_engine_);
+  }
+  // Send messages from client to server.
+  auto SendClientToServerMessages(
+      Pipe<MessageHandle>& pipe_client_to_server_messages,
+      int num_of_messages) {
+    return Loop([&pipe_client_to_server_messages, num_of_messages,
+                 this]() mutable {
+      bool has_message = (num_of_messages > 0);
+      return If(
+          has_message,
+          Seq(pipe_client_to_server_messages.sender.Push(
+                  arena_->MakePooled<Message>()),
+              [&num_of_messages]() -> LoopCtl<absl::Status> {
+                num_of_messages--;
+                return Continue();
+              }),
+          [&pipe_client_to_server_messages]() mutable -> LoopCtl<absl::Status> {
+            pipe_client_to_server_messages.sender.Close();
+            return absl::OkStatus();
+          });
+    });
+  }
+  // Add stream into client transport, and expect return trailers of
+  // "grpc-status:code".
+  auto AddStream(CallArgs args) {
+    return client_transport_->AddStream(std::move(args));
+  }
+
+ private:
+  MockEndpoint* control_endpoint_ptr_;
+  MockEndpoint* data_endpoint_ptr_;
+  size_t initial_arena_size = 1024;
+  MemoryAllocator memory_allocator_;
+
+ protected:
+  MockEndpoint& control_endpoint_;
+  MockEndpoint& data_endpoint_;
+  std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
+      event_engine_;
+  std::unique_ptr<ClientTransport> client_transport_;
+  ScopedArenaPtr arena_;
+  Pipe<MessageHandle> pipe_client_to_server_messages_;
+  Pipe<MessageHandle> pipe_server_to_client_messages_;
+  Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
+  // Added for mutliple streams tests.
+  Pipe<MessageHandle> pipe_client_to_server_messages_second_;
+  Pipe<MessageHandle> pipe_server_to_client_messages_second_;
+  Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
+  absl::AnyInvocable<void(absl::Status)> read_callback_;
+  Sequence control_endpoint_sequence_;
+  Sequence data_endpoint_sequence_;
+  // Added to verify received message payload.
+  const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
+};
+
+TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
+  // Mock write failed and read is pending.
+  EXPECT_CALL(control_endpoint_, Write)
+      .WillOnce(
+          WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+            on_write(absl::InternalError("control endpoint write failed."));
+            return false;
+          }));
+  EXPECT_CALL(data_endpoint_, Write)
+      .WillOnce(
+          WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+            on_write(absl::InternalError("data endpoint write failed."));
+            return false;
+          }));
+  EXPECT_CALL(control_endpoint_, Read)
+      .InSequence(control_endpoint_sequence_)
+      .WillOnce(Return(false));
+  InitialClientTransport();
+  ClientMetadataHandle md;
+  auto args = CallArgs{std::move(md),
+                       ClientInitialMetadataOutstandingToken::Empty(),
+                       nullptr,
+                       &pipe_server_intial_metadata_.sender,
+                       &pipe_client_to_server_messages_.receiver,
+                       &pipe_server_to_client_messages_.sender};
+  StrictMock<MockFunction<void(absl::Status)>> on_done;
+  EXPECT_CALL(on_done, Call(absl::OkStatus()));
+  auto activity = MakeActivity(
+      Seq(
+          // Concurrently: write and read messages in client transport.
+          Join(
+              // Add first stream with call_args into client transport.
+              // Expect return trailers "grpc-status:unavailable".
+              AddStream(std::move(args)),
+              // Send messages to call_args.client_to_server_messages pipe,
+              // which will be eventually sent to control/data endpoints.
+              SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+          // Once complete, verify successful sending and the received value.
+          [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+            EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+                      GRPC_STATUS_UNAVAILABLE);
+            EXPECT_TRUE(std::get<1>(ret).ok());
+            return absl::OkStatus();
+          }),
+      EventEngineWakeupScheduler(event_engine_),
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+  // Wait until ClientTransport's internal activities to finish.
+  event_engine_->TickUntilIdle();
+  event_engine_->UnsetGlobalHooks();
+}
+
+TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
+  // Mock read failed.
+  EXPECT_CALL(control_endpoint_, Read)
+      .InSequence(control_endpoint_sequence_)
+      .WillOnce(WithArgs<0>(
+          [](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
+            on_read(absl::InternalError("control endpoint read failed."));
+            // Return false to mock EventEngine read not finish.
+            return false;
+          }));
+  InitialClientTransport();
+  ClientMetadataHandle md;
+  auto args = CallArgs{std::move(md),
+                       ClientInitialMetadataOutstandingToken::Empty(),
+                       nullptr,
+                       &pipe_server_intial_metadata_.sender,
+                       &pipe_client_to_server_messages_.receiver,
+                       &pipe_server_to_client_messages_.sender};
+  StrictMock<MockFunction<void(absl::Status)>> on_done;
+  EXPECT_CALL(on_done, Call(absl::OkStatus()));
+  auto activity = MakeActivity(
+      Seq(
+          // Concurrently: write and read messages in client transport.
+          Join(
+              // Add first stream with call_args into client transport.
+              // Expect return trailers "grpc-status:unavailable".
+              AddStream(std::move(args)),
+              // Send messages to call_args.client_to_server_messages pipe.
+              SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+          // Once complete, verify successful sending and the received value.
+          [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+            EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+                      GRPC_STATUS_UNAVAILABLE);
+            EXPECT_TRUE(std::get<1>(ret).ok());
+            return absl::OkStatus();
+          }),
+      EventEngineWakeupScheduler(event_engine_),
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+  // Wait until ClientTransport's internal activities to finish.
+  event_engine_->TickUntilIdle();
+  event_engine_->UnsetGlobalHooks();
+}
+
+TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
+  // Mock write failed at first stream and second stream's write will fail too.
+  EXPECT_CALL(control_endpoint_, Write)
+      .Times(1)
+      .WillRepeatedly(
+          WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+            on_write(absl::InternalError("control endpoint write failed."));
+            return false;
+          }));
+  EXPECT_CALL(data_endpoint_, Write)
+      .Times(1)
+      .WillRepeatedly(
+          WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+            on_write(absl::InternalError("data endpoint write failed."));
+            return false;
+          }));
+  EXPECT_CALL(control_endpoint_, Read)
+      .InSequence(control_endpoint_sequence_)
+      .WillOnce(Return(false));
+  InitialClientTransport();
+  ClientMetadataHandle first_stream_md;
+  ClientMetadataHandle second_stream_md;
+  auto first_stream_args =
+      CallArgs{std::move(first_stream_md),
+               ClientInitialMetadataOutstandingToken::Empty(),
+               nullptr,
+               &pipe_server_intial_metadata_.sender,
+               &pipe_client_to_server_messages_.receiver,
+               &pipe_server_to_client_messages_.sender};
+  auto second_stream_args =
+      CallArgs{std::move(second_stream_md),
+               ClientInitialMetadataOutstandingToken::Empty(),
+               nullptr,
+               &pipe_server_intial_metadata_second_.sender,
+               &pipe_client_to_server_messages_second_.receiver,
+               &pipe_server_to_client_messages_second_.sender};
+  StrictMock<MockFunction<void(absl::Status)>> on_done;
+  EXPECT_CALL(on_done, Call(absl::OkStatus()));
+  auto activity = MakeActivity(
+      Seq(
+          // Concurrently: write and read messages from client transport.
+          Join(
+              // Add first stream with call_args into client transport.
+              // Expect return trailers "grpc-status:unavailable".
+              AddStream(std::move(first_stream_args)),
+              // Send messages to first stream's
+              // call_args.client_to_server_messages pipe.
+              SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+          // Once complete, verify successful sending and the received value.
+          [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+            EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+                      GRPC_STATUS_UNAVAILABLE);
+            EXPECT_TRUE(std::get<1>(ret).ok());
+            return absl::OkStatus();
+          },
+          Join(
+              // Add second stream with call_args into client transport.
+              // Expect return trailers "grpc-status:unavailable".
+              AddStream(std::move(second_stream_args)),
+              // Send messages to second stream's
+              // call_args.client_to_server_messages pipe.
+              SendClientToServerMessages(pipe_client_to_server_messages_second_,
+                                         1)),
+          // Once complete, verify successful sending and the received value.
+          [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+            EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+                      GRPC_STATUS_UNAVAILABLE);
+            EXPECT_TRUE(std::get<1>(ret).ok());
+            return absl::OkStatus();
+          }),
+      EventEngineWakeupScheduler(event_engine_),
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+  // Wait until ClientTransport's internal activities to finish.
+  event_engine_->TickUntilIdle();
+  event_engine_->UnsetGlobalHooks();
+}
+
+TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
+  // Mock read failed at first stream, and second stream's write will fail too.
+  EXPECT_CALL(control_endpoint_, Read)
+      .InSequence(control_endpoint_sequence_)
+      .WillOnce(WithArgs<0>(
+          [](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
+            on_read(absl::InternalError("control endpoint read failed."));
+            // Return false to mock EventEngine read not finish.
+            return false;
+          }));
+  InitialClientTransport();
+  ClientMetadataHandle first_stream_md;
+  ClientMetadataHandle second_stream_md;
+  auto first_stream_args =
+      CallArgs{std::move(first_stream_md),
+               ClientInitialMetadataOutstandingToken::Empty(),
+               nullptr,
+               &pipe_server_intial_metadata_.sender,
+               &pipe_client_to_server_messages_.receiver,
+               &pipe_server_to_client_messages_.sender};
+  auto second_stream_args =
+      CallArgs{std::move(second_stream_md),
+               ClientInitialMetadataOutstandingToken::Empty(),
+               nullptr,
+               &pipe_server_intial_metadata_second_.sender,
+               &pipe_client_to_server_messages_second_.receiver,
+               &pipe_server_to_client_messages_second_.sender};
+  StrictMock<MockFunction<void(absl::Status)>> on_done;
+  EXPECT_CALL(on_done, Call(absl::OkStatus()));
+  auto activity = MakeActivity(
+      Seq(
+          // Concurrently: write and read messages from client transport.
+          Join(
+              // Add first stream with call_args into client transport.
+              AddStream(std::move(first_stream_args)),
+              // Send messages to first stream's
+              // call_args.client_to_server_messages pipe, which will be
+              // eventually sent to control/data endpoints.
+              SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+          // Once complete, verify successful sending and the received value.
+          [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+            EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+                      GRPC_STATUS_UNAVAILABLE);
+            EXPECT_TRUE(std::get<1>(ret).ok());
+            return absl::OkStatus();
+          },
+          Join(
+              // Add second stream with call_args into client transport.
+              AddStream(std::move(second_stream_args)),
+              // Send messages to second stream's
+              // call_args.client_to_server_messages pipe, which will be
+              // eventually sent to control/data endpoints.
+              SendClientToServerMessages(pipe_client_to_server_messages_second_,
+                                         1)),
+          // Once complete, verify successful sending and the received value.
+          [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+            EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+                      GRPC_STATUS_UNAVAILABLE);
+            EXPECT_TRUE(std::get<1>(ret).ok());
+            return absl::OkStatus();
+          }),
+      EventEngineWakeupScheduler(event_engine_),
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+  // Wait until ClientTransport's internal activities to finish.
+  event_engine_->TickUntilIdle();
+  event_engine_->UnsetGlobalHooks();
+}
+
+}  // namespace testing
+}  // namespace chaotic_good
+}  // namespace grpc_core
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  // Must call to create default EventEngine.
+  grpc_init();
+  int ret = RUN_ALL_TESTS();
+  grpc_shutdown();
+  return ret;
+}
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 0050efa..6b389d2 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -2220,6 +2220,30 @@
     "flaky": false,
     "gtest": true,
     "language": "c++",
+    "name": "client_transport_error_test",
+    "platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "uses_polling": false
+  },
+  {
+    "args": [],
+    "benchmark": false,
+    "ci_platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "cpu_cost": 1.0,
+    "exclude_configs": [],
+    "exclude_iomgrs": [],
+    "flaky": false,
+    "gtest": true,
+    "language": "c++",
     "name": "client_transport_test",
     "platforms": [
       "linux",