[host][xdc-server] Set host stream connect status.

The xdc device side sends a ctrl msg when a device stream goes on /
offline.

When a client registers that stream id, or currently has registered it,
we will set them as connected / not connected.

Later on we can use this to know when to queue more data from a client
to the xdc device.

TEST= patch in local usb handler changes
out/build-zircon/tools/xdcserver
fx shell xdc-test -u -f file
out/build-zircon/tools/xdc-test-host -d -f file

Stream id 1 should be connected.

Change-Id: I815141641cf76dd04a905333385a3d0f3ee06a4d
diff --git a/system/host/xdc-server/xdc-server.cpp b/system/host/xdc-server/xdc-server.cpp
index 40c0619..5cb3382 100644
--- a/system/host/xdc-server/xdc-server.cpp
+++ b/system/host/xdc-server/xdc-server.cpp
@@ -31,6 +31,17 @@
     stream_id_ = stream_id;
 }
 
+void Client::SetConnected(bool connected) {
+    if (connected == connected_) {
+        fprintf(stderr, "tried to set client with stream id %u as %s again.\n",
+                stream_id(), connected ? "connected" : "disconnected");
+        return;
+    }
+    printf("client with stream id %u is now %s to the xdc device stream.\n",
+           stream_id(), connected ?  "connected" : "disconnected");
+    connected_ = connected;
+}
+
 // static
 std::unique_ptr<XdcServer> XdcServer::Create() {
     auto conn = std::make_unique<XdcServer>(ConstructorTag{});
@@ -232,6 +243,9 @@
     } else {
         client->SetStreamId(stream_id);
         printf("registered stream id %u\n", stream_id);
+        if (dev_stream_ids_.count(stream_id)) {
+            client->SetConnected(true);
+        }
         resp = true;
     }
 
@@ -281,10 +295,27 @@
     case XDC_NOTIFY_STREAM_STATE: {
         uint32_t stream_id = msg->notify_stream_state.stream_id;
         bool online = msg->notify_stream_state.online;
+
+        auto dev_stream = dev_stream_ids_.find(stream_id);
+        bool saved_online_state = dev_stream != dev_stream_ids_.end();
+        if (online == saved_online_state) {
+            fprintf(stderr, "tried to set stream %u to %s again\n",
+                    stream_id, online ? "online" : "offline");
+            return;
+        }
+        if (online) {
+            dev_stream_ids_.insert(stream_id);
+        } else {
+            dev_stream_ids_.erase(dev_stream);
+        }
         printf("xdc device stream id %u is now %s\n", stream_id, online ? "online" : "offline");
 
-        // TODO(jocelyndang): update any matching client as connected and ready to transfer.
-
+        // Update the host client's connected status.
+        auto client = GetClient(stream_id);
+        if (!client) {
+            break;
+        }
+        client->SetConnected(online);
         break;
     }
     default:
diff --git a/system/host/xdc-server/xdc-server.h b/system/host/xdc-server/xdc-server.h
index 55cbfce..025aef6 100644
--- a/system/host/xdc-server/xdc-server.h
+++ b/system/host/xdc-server/xdc-server.h
@@ -7,6 +7,7 @@
 #include <fbl/unique_fd.h>
 #include <map>
 #include <poll.h>
+#include <set>
 #include <vector>
 
 #include "usb-handler.h"
@@ -18,16 +19,22 @@
     explicit Client(int fd) : fd_(fd) {}
 
     void SetStreamId(uint32_t stream_id);
+    void SetConnected(bool connected);
 
     int fd()             const { return fd_.get(); }
     bool registered()    const { return registered_; }
     uint32_t stream_id() const { return stream_id_; }
+    bool connected()     const { return connected_; }
 
 private:
     fbl::unique_fd fd_;
 
+    // Whether the client has registered a stream id.
     bool     registered_ = false;
     uint32_t stream_id_  = 0;
+    // True if the client has registered a stream id,
+    // and that stream id is also registered on the xdc device side.
+    bool     connected_  = false;
 };
 
 class XdcServer {
@@ -57,6 +64,7 @@
     std::shared_ptr<Client> GetClient(uint32_t stream_id);
 
     void UsbReadComplete(std::unique_ptr<UsbHandler::Transfer> transfer);
+    // Parses the control message from the given transfer buffer.
     void HandleCtrlMsg(unsigned char* transfer_buf, int transfer_len);
 
     std::unique_ptr<UsbHandler> usb_handler_;
@@ -72,6 +80,9 @@
     // File descriptors we are currently polling on.
     std::vector<pollfd> poll_fds_;
 
+    // Stream ids registered on the xdc device side.
+    std::set<uint32_t> dev_stream_ids_;
+
     xdc_packet_state_t read_packet_state_;
 };