[bt][hci] Limit queue depth for BR/EDR dynamic channels

Drop least recently queued packets for each "low-priority" L2CAP
channel on ACL-U links in ACLDataChannel to maintain a maximum depth of
kMaxAclPacketsPerChannel per such channel. This prevents unbounded
growth of outbound data when audio streaming in conditions where
retransmissions prevent throughput at the same rate that A2DP source
sends data, which is a rare OOM hazard and causes noticeable audio
latency.

The threshold kMaxAclPacketsPerChannel = 32 works out to between
500–1000 ms of audio data in our current pipeline.

Bug: fxbug.dev/71061
Test: in bt-host-hci-tests, AclDataChannelTest.
SendingLowPriorityBrEdrPacketsWhenTooManyAreQueuedDropsLeastRecentlySentPduOnSameChannel
SendingLowPriorityPacketsThatDropDoNotAffectDataOnDifferentLink
SendingLowPriorityPacketsThatDropDoNotAffectDataOnSameLinkDifferentChannel

Change-Id: I37263cc23b3280ace4ea5c093d54b3d3173d2b7f
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/495439
Commit-Queue: Xo Wang <xow@google.com>
Reviewed-by: Ben Lawson <benlawson@google.com>
Reviewed-by: Marie Janssen 💖 <jamuraa@google.com>
diff --git a/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.cc b/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.cc
index 70980de..f91cf2e 100644
--- a/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.cc
+++ b/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.cc
@@ -5,10 +5,14 @@
 #include "acl_data_channel.h"
 
 #include <endian.h>
+#include <lib/async/cpp/time.h>
 #include <lib/async/default.h>
 #include <zircon/assert.h>
 #include <zircon/status.h>
 
+#include <iterator>
+#include <numeric>
+
 #include "lib/fit/function.h"
 #include "slab_allocators.h"
 #include "src/connectivity/bluetooth/core/bt-host/common/log.h"
@@ -112,6 +116,16 @@
   // packet-based data flow control.
   CommandChannel::EventCallbackResult NumberOfCompletedPacketsCallback(const EventPacket& event);
 
+  // Searches send queue for packets with the same link and channel as |archetypal_packet| and
+  // removes the least recently-inserted full PDU if necessary. Called before inserting
+  // |archetypal_packet| to ensure that there are only up to |kMaxAclPacketsPerChannel| PDUs queued
+  // for the given connection and handle.
+  void DropOverflowPacket(const QueuedDataPacket& archetypal_packet);
+
+  // Drains |dropped_packet_counts_| and logs any recorded drops. Should only be called by
+  // |log_dropped_overflow_task_|.
+  void LogDroppedOverflowPackets();
+
   // Tries to send the next batch of queued data packets if the controller has
   // any space available. All packets in higher priority queues will be sent before packets in lower
   // priority queues.
@@ -189,6 +203,13 @@
   size_t num_sent_packets_;
   size_t le_num_sent_packets_;
 
+  // Counts of automatically-discarded packets on each channel due to overflow. Cleared by
+  // LogDroppedOverflowPackets.
+  std::map<std::pair<hci::ConnectionHandle, UniqueChannelId>, int64_t> dropped_packet_counts_;
+
+  async::TaskClosureMethod<AclDataChannelImpl, &AclDataChannelImpl::LogDroppedOverflowPackets>
+      log_dropped_overflow_task_{this};
+
   // The ACL data packet queue contains the data packets that are waiting to be
   // sent to the controller.
   // TODO(armansito): Use priority_queue based on L2CAP channel priority.
@@ -302,6 +323,8 @@
 
   bt_log(INFO, "hci", "shutting down");
 
+  log_dropped_overflow_task_.Cancel();
+
   auto handler_cleanup_task = [this] {
     bt_log(DEBUG, "hci", "removing I/O handler");
     zx_status_t status = channel_wait_.Cancel();
@@ -378,10 +401,16 @@
   }
 
   auto insert_iter = SendQueueInsertLocationForPriority(priority);
-  while (!packets.is_empty()) {
+  for (int i = 0; !packets.is_empty(); i++) {
     auto packet = packets.pop_front();
     auto ll_type = registered_links_[packet->connection_handle()];
     auto queue_packet = QueuedDataPacket(ll_type, channel_id, priority, std::move(packet));
+    if (i == 0) {
+      if (queue_packet.priority == PacketPriority::kLow &&
+          ll_type == hci::Connection::LinkType::kACL) {
+        DropOverflowPacket(queue_packet);
+      }
+    }
     send_queue_.insert(insert_iter, std::move(queue_packet));
   }
 
@@ -585,7 +614,8 @@
 
     auto iter = pending_links_.find(le16toh(data->connection_handle));
     if (iter == pending_links_.end()) {
-      bt_log(WARN, "hci", "controller reported sent packets on unknown connection handle!");
+      bt_log(WARN, "hci", "controller reported sent packets on unknown connection handle %#.4x!",
+             data->connection_handle);
       continue;
     }
 
@@ -593,9 +623,7 @@
 
     ZX_DEBUG_ASSERT(iter->second.count);
     if (iter->second.count < comp_packets) {
-      bt_log(WARN, "hci",
-             "packet tx count mismatch! (handle: %#.4x, expected: %zu, "
-             "actual : %u)",
+      bt_log(WARN, "hci", "packet tx count mismatch! (handle: %#.4x, expected: %zu, actual : %u)",
              le16toh(data->connection_handle), iter->second.count, comp_packets);
 
       iter->second.count = 0u;
@@ -625,6 +653,63 @@
   return CommandChannel::EventCallbackResult::kContinue;
 }
 
+void AclDataChannelImpl::DropOverflowPacket(const QueuedDataPacket& archetypal_packet) {
+  TRACE_DURATION("bluetooth", "ACLDataChannel::DropOverflowPacket", "send_queue_.size",
+                 send_queue_.size());
+
+  // TODO(fxbug.dev/71061): Performance of these O(N) searches is not amazing, taking tens of µs
+  // on low-end ARM when kMaxAclPacketsPerChannel=64. The std::list nodes do seem to have decent
+  // cache locality because that time increases sublinearly up to at least 64, and overall
+  // performance is acceptable, with the above TRACE_DURATION taking <15% of the total
+  // l2cap::channel_send flow duration. Performance optimization should be a future goal of work
+  // that reorganizes channel and link data flow layouts.
+
+  // predicate that checks if a QDP is the head of a PDU (not a continuing fragment)
+  auto is_head = [](const QueuedDataPacket& p) {
+    return p.packet->packet_boundary_flag() == hci::ACLPacketBoundaryFlag::kFirstFlushable ||
+           p.packet->packet_boundary_flag() == hci::ACLPacketBoundaryFlag::kFirstNonFlushable;
+  };
+  // predicate that checks if a QDP is like |archetypal_packet| and is the head of a PDU
+  auto is_similar_and_head = [&a = archetypal_packet, is_head](const QueuedDataPacket& b) {
+    return a.packet->connection_handle() == b.packet->connection_handle() &&
+           a.channel_id == b.channel_id && is_head(b);
+  };
+  const size_t queued_similar_pdu_count =
+      std::count_if(send_queue_.begin(), send_queue_.end(), is_similar_and_head);
+  if (queued_similar_pdu_count < kMaxAclPacketsPerChannel) {
+    return;
+  }
+
+  const auto to_drop_iter =
+      std::find_if(send_queue_.begin(), send_queue_.end(), is_similar_and_head);
+  ZX_ASSERT(to_drop_iter != send_queue_.end());
+  const auto next_packet_iter = std::find_if(std::next(to_drop_iter), send_queue_.end(), is_head);
+
+  dropped_packet_counts_[{archetypal_packet.packet->connection_handle(),
+                          archetypal_packet.channel_id}] +=
+      std::distance(to_drop_iter, next_packet_iter);
+  send_queue_.erase(to_drop_iter, next_packet_iter);
+
+  // Schedule a deadline to log this drop and any other drops that occur until the logging drains
+  // the counters.
+  if (!log_dropped_overflow_task_.is_pending()) {
+    constexpr zx::duration kMinLogInterval = zx::sec(1);
+    log_dropped_overflow_task_.PostDelayed(io_dispatcher_, kMinLogInterval);
+  }
+}
+
+void AclDataChannelImpl::LogDroppedOverflowPackets() {
+  // This exchange clears the accumulated counts since the previous call (which should be at least
+  // kMinLogInterval ago) and gets the number of dropped packets for each channel (if any).
+  const auto dropped_packet_counts = std::exchange(dropped_packet_counts_, {});
+
+  // This logs at most once per channel, at a temporal period of kMinLogInterval, and only for the
+  // channels where overflow occurred.
+  for (auto& [ids, count] : dropped_packet_counts) {
+    bt_log(WARN, "hci", "%#.4x:%#.4x dropped %zu fragments(s)", ids.first, ids.second, count);
+  }
+}
+
 void AclDataChannelImpl::TrySendNextQueuedPackets() {
   if (!is_initialized_)
     return;
diff --git a/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.h b/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.h
index 3a6bd7b..8802d2c 100644
--- a/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.h
+++ b/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel.h
@@ -13,6 +13,7 @@
 #include <zircon/compiler.h>
 
 #include <list>
+#include <map>
 #include <queue>
 #include <unordered_map>
 
@@ -77,6 +78,8 @@
   using AclPacketPredicate =
       fit::function<bool(const ACLDataPacketPtr& packet, UniqueChannelId channel_id)>;
 
+  static constexpr size_t kMaxAclPacketsPerChannel = 32;
+
   static std::unique_ptr<AclDataChannel> Create(Transport* transport, zx::channel hci_acl_channel);
 
   virtual ~AclDataChannel() = default;
diff --git a/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel_unittest.cc b/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel_unittest.cc
index 0fcf2a3..e35ed84 100644
--- a/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel_unittest.cc
+++ b/src/connectivity/bluetooth/core/bt-host/hci/acl_data_channel_unittest.cc
@@ -1269,5 +1269,206 @@
   EXPECT_EQ(cb_status->error(), hci::StatusCode::kInvalidHCICommandParameters);
 }
 
+TEST_F(HCI_ACLDataChannelTest,
+       SendingLowPriorityBrEdrPacketsWhenTooManyAreQueuedDropsLeastRecentlySentPduOnSameChannel) {
+  constexpr size_t kMaxMtu = 4;
+  constexpr size_t kMaxNumPackets = 2;
+  constexpr ConnectionHandle kHandle0 = 0x0001;
+  constexpr ConnectionHandle kHandle1 = 0x0002;
+
+  InitializeACLDataChannel(DataBufferInfo(kMaxMtu, kMaxNumPackets),
+                           DataBufferInfo(kMaxMtu, kMaxNumPackets));
+
+  acl_data_channel()->RegisterLink(kHandle0, Connection::LinkType::kLE);
+  acl_data_channel()->RegisterLink(kHandle1, Connection::LinkType::kACL);
+
+  // Fill up both LE and BR/EDR controller buffers
+  for (ConnectionHandle handle : {kHandle0, kHandle1}) {
+    for (size_t i = 0; i < kMaxNumPackets; ++i) {
+      auto packet = ACLDataPacket::New(handle, ACLPacketBoundaryFlag::kFirstNonFlushable,
+                                       ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+      EXPECT_TRUE(acl_data_channel()->SendPacket(std::move(packet), l2cap::kFirstDynamicChannelId,
+                                                 AclDataChannel::PacketPriority::kLow));
+    }
+  }
+  RunLoopUntilIdle();
+
+  // Callback invoked by TestDevice when it receive a data packet from us.
+  size_t packet_count = 0;
+  auto data_callback = [&](const ByteBuffer& bytes) {
+    ASSERT_LE(sizeof(ACLDataHeader), bytes.size());
+    PacketView<hci::ACLDataHeader> packet(&bytes, bytes.size() - sizeof(ACLDataHeader));
+    ConnectionHandle connection_handle = le16toh(packet.header().handle_and_flags) & 0xFFF;
+
+    // LE is still waiting for controller credits
+    EXPECT_EQ(kHandle1, connection_handle);
+
+    if ((packet_count == 0) || (packet_count == 1)) {
+      // The first low-priority queued packet and its continuation packet were dropped so the first
+      // packets actually sent should be those for the second PDU
+      EXPECT_EQ(1u, packet.payload_data()[0]);
+    }
+
+    packet_count++;
+  };
+  test_device()->SetDataCallback(data_callback, dispatcher());
+
+  // Send enough data that the first PDU sent in this loop gets dropped
+  for (size_t i = 0; i < AclDataChannel::kMaxAclPacketsPerChannel + 1; ++i) {
+    // Send two fragments per PDU
+    LinkedList<ACLDataPacket> packets;
+    for (auto pbf :
+         {ACLPacketBoundaryFlag::kFirstNonFlushable, ACLPacketBoundaryFlag::kContinuingFragment}) {
+      auto packet = ACLDataPacket::New(kHandle1, pbf, ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+
+      // Write a sequence number into the payload, starting at 0
+      packet->mutable_view()->mutable_payload_data()[0] = static_cast<uint8_t>(i);
+      packets.push_back(std::move(packet));
+    }
+    EXPECT_TRUE(acl_data_channel()->SendPackets(std::move(packets), l2cap::kFirstDynamicChannelId,
+                                                AclDataChannel::PacketPriority::kLow));
+  }
+
+  test_device()->SendCommandChannelPacket(testing::NumberOfCompletedPacketsPacket(kHandle1, 2));
+  RunLoopUntilIdle();
+
+  EXPECT_EQ(2u, packet_count);
+}
+
+TEST_F(HCI_ACLDataChannelTest,
+       SendingLowPriorityPacketsThatDropDoNotAffectDataOnSameLinkDifferentChannel) {
+  constexpr size_t kMaxMtu = 4;
+  constexpr size_t kMaxNumPackets = 2;
+  constexpr ConnectionHandle kHandle0 = 0x0001;
+  constexpr UniqueChannelId kChannelId0 = l2cap::kFirstDynamicChannelId;
+  constexpr UniqueChannelId kChannelId1 = l2cap::kFirstDynamicChannelId + 1;
+
+  InitializeACLDataChannel(DataBufferInfo(kMaxMtu, kMaxNumPackets));
+
+  acl_data_channel()->RegisterLink(kHandle0, Connection::LinkType::kACL);
+
+  // Fill up controller buffers
+  for (size_t i = 0; i < kMaxNumPackets; ++i) {
+    auto packet = ACLDataPacket::New(kHandle0, ACLPacketBoundaryFlag::kFirstNonFlushable,
+                                     ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+    EXPECT_TRUE(acl_data_channel()->SendPacket(std::move(packet), l2cap::kFirstDynamicChannelId,
+                                               AclDataChannel::PacketPriority::kLow));
+  }
+  RunLoopUntilIdle();
+
+  // Callback invoked by TestDevice when it receive a data packet from us.
+  size_t packet_count = 0;
+  auto data_callback = [&](const ByteBuffer& bytes) {
+    ASSERT_LT(sizeof(ACLDataHeader), bytes.size());
+    PacketView<hci::ACLDataHeader> packet(&bytes, bytes.size() - sizeof(ACLDataHeader));
+
+    // The first packet should be left in the queue because it is for kChannelId0 but the second
+    // should be dropped as it was the least recently sent unsent packet for channel kChannelId1.
+    if (packet_count == 0) {
+      EXPECT_EQ(0u, packet.payload_data()[0]);
+    } else if (packet_count == 1) {
+      EXPECT_EQ(2u, packet.payload_data()[0]);
+    }
+
+    packet_count++;
+  };
+  test_device()->SetDataCallback(data_callback, dispatcher());
+
+  // Send one packet on kChannelId0
+  {
+    auto packet = ACLDataPacket::New(kHandle0, ACLPacketBoundaryFlag::kFirstNonFlushable,
+                                     ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+
+    // Write a sequence number into the payload, starting at 0
+    packet->mutable_view()->mutable_payload_data()[0] = static_cast<uint8_t>(0);
+    EXPECT_TRUE(acl_data_channel()->SendPacket(std::move(packet), kChannelId0,
+                                               AclDataChannel::PacketPriority::kLow));
+  }
+
+  // Send enough data on kChannel1 that the first PDU sent in this loop gets dropped
+  for (size_t i = 0; i < AclDataChannel::kMaxAclPacketsPerChannel + 1; i++) {
+    auto packet = ACLDataPacket::New(kHandle0, ACLPacketBoundaryFlag::kFirstNonFlushable,
+                                     ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+
+    // Write a sequence number into the payload, starting at 0
+    packet->mutable_view()->mutable_payload_data()[0] = static_cast<uint8_t>(i + 1);
+    EXPECT_TRUE(acl_data_channel()->SendPacket(std::move(packet), kChannelId1,
+                                               AclDataChannel::PacketPriority::kLow));
+  }
+
+  test_device()->SendCommandChannelPacket(testing::NumberOfCompletedPacketsPacket(kHandle0, 2));
+  RunLoopUntilIdle();
+
+  EXPECT_EQ(2u, packet_count);
+}
+
+TEST_F(HCI_ACLDataChannelTest, SendingLowPriorityPacketsThatDropDoNotAffectDataOnDifferentLink) {
+  constexpr size_t kMaxMtu = 4;
+  constexpr size_t kMaxNumPackets = 2;
+  constexpr ConnectionHandle kHandle0 = 0x0001;
+  constexpr ConnectionHandle kHandle1 = 0x0002;
+
+  InitializeACLDataChannel(DataBufferInfo(kMaxMtu, kMaxNumPackets));
+
+  acl_data_channel()->RegisterLink(kHandle0, Connection::LinkType::kACL);
+  acl_data_channel()->RegisterLink(kHandle1, Connection::LinkType::kACL);
+
+  // Fill up controller buffers
+  for (size_t i = 0; i < kMaxNumPackets; ++i) {
+    auto packet = ACLDataPacket::New(kHandle0, ACLPacketBoundaryFlag::kFirstNonFlushable,
+                                     ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+    EXPECT_TRUE(acl_data_channel()->SendPacket(std::move(packet), l2cap::kFirstDynamicChannelId,
+                                               AclDataChannel::PacketPriority::kLow));
+  }
+  RunLoopUntilIdle();
+
+  // Callback invoked by TestDevice when it receive a data packet from us.
+  size_t packet_count = 0;
+  auto data_callback = [&](const ByteBuffer& bytes) {
+    ASSERT_LT(sizeof(ACLDataHeader), bytes.size());
+    PacketView<hci::ACLDataHeader> packet(&bytes, bytes.size() - sizeof(ACLDataHeader));
+    ConnectionHandle connection_handle = le16toh(packet.header().handle_and_flags) & 0xFFF;
+
+    // First packet on kHandle0 doesn't get dropped, but first packet on kHandle1 does get dropped
+    // because there are too many queued for that channel on that link.
+    if (packet_count == 0) {
+      EXPECT_EQ(kHandle0, connection_handle);
+      EXPECT_EQ(0u, packet.payload_data()[0]);
+    } else if (packet_count == 1) {
+      EXPECT_EQ(kHandle1, connection_handle);
+      EXPECT_EQ(2u, packet.payload_data()[0]);
+    }
+    packet_count++;
+  };
+  test_device()->SetDataCallback(data_callback, dispatcher());
+
+  // Send one data packet on kHandle0
+  {
+    auto packet = ACLDataPacket::New(kHandle0, ACLPacketBoundaryFlag::kFirstNonFlushable,
+                                     ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+
+    // Write a sequence number into the payload, starting at 0
+    packet->mutable_view()->mutable_payload_data()[0] = static_cast<uint8_t>(0);
+    EXPECT_TRUE(acl_data_channel()->SendPacket(std::move(packet), l2cap::kFirstDynamicChannelId,
+                                               AclDataChannel::PacketPriority::kLow));
+  }
+
+  // Send enough data on kHandle1 that the first PDU sent in this loop gets dropped
+  for (size_t i = 0; i < AclDataChannel::kMaxAclPacketsPerChannel + 1; i++) {
+    auto packet = ACLDataPacket::New(kHandle1, ACLPacketBoundaryFlag::kFirstNonFlushable,
+                                     ACLBroadcastFlag::kPointToPoint, kMaxMtu);
+
+    // Write a sequence number into the payload, starting at 0
+    packet->mutable_view()->mutable_payload_data()[0] = static_cast<uint8_t>(i + 1);
+    EXPECT_TRUE(acl_data_channel()->SendPacket(std::move(packet), l2cap::kFirstDynamicChannelId,
+                                               AclDataChannel::PacketPriority::kLow));
+  }
+
+  test_device()->SendCommandChannelPacket(testing::NumberOfCompletedPacketsPacket(kHandle0, 2));
+  RunLoopUntilIdle();
+
+  EXPECT_EQ(2u, packet_count);
+}
+
 }  // namespace
 }  // namespace bt::hci