pw_grpc: Add send error callback to SendQueue

Users should have some way of reacting to write failures on the socket.
Because the Read on the socket is blocking and not interruptible, we
can't use our existing callbacks to close the connection.

Bug: 404684450
Change-Id: I9eed898e6055e0f13ad3c673a184270e79d44565
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/336159
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Docs-Not-Needed: Austin Foxley <afoxley@google.com>
Commit-Queue: Austin Foxley <afoxley@google.com>
Tests-Not-Needed: Austin Foxley <afoxley@google.com>
Reviewed-by: Eli Lipsitz <elipsitz@google.com>
Lint: Lint 🤖 <android-build-ayeaye@system.gserviceaccount.com>
Pigweed-Auto-Submit: Austin Foxley <afoxley@google.com>
Reviewed-by: Zixuan Qu <zixuanqu@google.com>
diff --git a/pw_grpc/public/pw_grpc/send_queue.h b/pw_grpc/public/pw_grpc/send_queue.h
index 5521924..c9c7faa 100644
--- a/pw_grpc/public/pw_grpc/send_queue.h
+++ b/pw_grpc/public/pw_grpc/send_queue.h
@@ -17,6 +17,7 @@
 #include "pw_async/dispatcher.h"
 #include "pw_async_basic/dispatcher.h"
 #include "pw_containers/dynamic_deque.h"
+#include "pw_function/function.h"
 #include "pw_status/status.h"
 #include "pw_stream/stream.h"
 #include "pw_sync/lock_annotations.h"
@@ -30,6 +31,8 @@
 // that is creating the buffers to send.
 class SendQueue : public thread::ThreadCore {
  public:
+  using ErrorHandler = pw::Function<void(pw::Status)>;
+
   SendQueue(stream::ReaderWriter& socket, Allocator& allocator)
       : socket_(socket),
         send_task_(pw::bind_member<&SendQueue::ProcessSendQueue>(this)),
@@ -44,15 +47,22 @@
   // Call before attempting to join thread.
   void RequestStop() { send_dispatcher_.RequestStop(); }
 
+  // Set callback to be called when write to socket fails. QueueSend should not
+  // be called from this callback.
+  void set_on_error(ErrorHandler&& error_handler)
+      PW_LOCKS_EXCLUDED(send_mutex_);
+
  private:
   void ProcessSendQueue(async::Context& context, Status status)
       PW_LOCKS_EXCLUDED(send_mutex_);
 
-  UniquePtr<std::byte[]> PopNext();
+  UniquePtr<std::byte[]> PopNext() PW_LOCKS_EXCLUDED(send_mutex_);
+  void NotifyOnError(Status status) PW_LOCKS_EXCLUDED(send_mutex_);
 
   stream::ReaderWriter& socket_;
   async::BasicDispatcher send_dispatcher_;
   async::Task send_task_;
+  ErrorHandler on_error_;
   sync::Mutex send_mutex_;
   DynamicDeque<UniquePtr<std::byte[]>> queue_ PW_GUARDED_BY(send_mutex_);
 };
diff --git a/pw_grpc/send_queue.cc b/pw_grpc/send_queue.cc
index ae73ee3..9ae1330 100644
--- a/pw_grpc/send_queue.cc
+++ b/pw_grpc/send_queue.cc
@@ -28,6 +28,18 @@
   return buffer;
 }
 
+void SendQueue::NotifyOnError(Status status) {
+  std::lock_guard lock(send_mutex_);
+  if (on_error_) {
+    on_error_(status);
+  }
+}
+
+void SendQueue::set_on_error(ErrorHandler&& error_handler) {
+  std::lock_guard lock(send_mutex_);
+  on_error_ = std::move(error_handler);
+}
+
 void SendQueue::ProcessSendQueue(async::Context&, Status task_status) {
   if (!task_status.ok()) {
     return;
@@ -38,6 +50,7 @@
     if (Status status = socket_.Write(pw::span(buffer.get(), buffer.size()));
         !status.ok()) {
       PW_LOG_ERROR("Failed to write to socket in SendQueue: %s", status.str());
+      NotifyOnError(status);
       return;
     }
     buffer = PopNext();