| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/connectivity/bluetooth/core/bt-host/public/pw_bluetooth_sapphire/internal/host/common/pipeline_monitor.h" |
| |
| #include <memory> |
| |
| #include <gmock/gmock.h> |
| #include <pw_async/fake_dispatcher_fixture.h> |
| |
| #include "src/connectivity/bluetooth/core/bt-host/public/pw_bluetooth_sapphire/internal/host/common/retire_log.h" |
| |
| namespace bt { |
| namespace { |
| |
| using Token = PipelineMonitor::Token; |
| |
| using PipelineMonitorTest = pw::async::test::FakeDispatcherFixture; |
| |
| const internal::RetireLog kRetireLogDefaultParams(/*min_depth=*/1, |
| /*max_depth=*/100); |
| |
| TEST_F(PipelineMonitorTest, TokensCanOutliveMonitor) { |
| auto monitor = |
| std::make_unique<PipelineMonitor>(dispatcher(), kRetireLogDefaultParams); |
| auto token = monitor->Issue(0); |
| monitor.reset(); |
| } |
| |
| TEST_F(PipelineMonitorTest, SequentialTokensModifyCounts) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| EXPECT_EQ(0U, monitor.bytes_issued()); |
| EXPECT_EQ(0, monitor.tokens_issued()); |
| EXPECT_EQ(0U, monitor.bytes_in_flight()); |
| EXPECT_EQ(0, monitor.tokens_in_flight()); |
| EXPECT_EQ(0U, monitor.bytes_retired()); |
| EXPECT_EQ(0, monitor.tokens_retired()); |
| |
| constexpr size_t kByteCount = 2; |
| { |
| auto token = monitor.Issue(kByteCount); |
| EXPECT_EQ(kByteCount, monitor.bytes_issued()); |
| EXPECT_EQ(1, monitor.tokens_issued()); |
| EXPECT_EQ(kByteCount, monitor.bytes_in_flight()); |
| EXPECT_EQ(1, monitor.tokens_in_flight()); |
| EXPECT_EQ(0U, monitor.bytes_retired()); |
| EXPECT_EQ(0, monitor.tokens_retired()); |
| |
| token.Retire(); |
| EXPECT_EQ(kByteCount, monitor.bytes_issued()); |
| EXPECT_EQ(1, monitor.tokens_issued()); |
| EXPECT_EQ(0U, monitor.bytes_in_flight()); |
| EXPECT_EQ(0, monitor.tokens_in_flight()); |
| EXPECT_EQ(kByteCount, monitor.bytes_retired()); |
| EXPECT_EQ(1, monitor.tokens_retired()); |
| |
| // Test that a moved-from value is reusable and that it retires by going out |
| // of scope |
| token = monitor.Issue(kByteCount); |
| EXPECT_EQ(2 * kByteCount, monitor.bytes_issued()); |
| EXPECT_EQ(2, monitor.tokens_issued()); |
| EXPECT_EQ(kByteCount, monitor.bytes_in_flight()); |
| EXPECT_EQ(1, monitor.tokens_in_flight()); |
| EXPECT_EQ(kByteCount, monitor.bytes_retired()); |
| EXPECT_EQ(1, monitor.tokens_retired()); |
| } |
| |
| EXPECT_EQ(2 * kByteCount, monitor.bytes_issued()); |
| EXPECT_EQ(2, monitor.tokens_issued()); |
| EXPECT_EQ(0U, monitor.bytes_in_flight()); |
| EXPECT_EQ(0, monitor.tokens_in_flight()); |
| EXPECT_EQ(2 * kByteCount, monitor.bytes_retired()); |
| EXPECT_EQ(2, monitor.tokens_retired()); |
| } |
| |
| TEST_F(PipelineMonitorTest, TokensCanBeMoved) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| EXPECT_EQ(0U, monitor.bytes_issued()); |
| EXPECT_EQ(0, monitor.tokens_issued()); |
| EXPECT_EQ(0U, monitor.bytes_in_flight()); |
| EXPECT_EQ(0, monitor.tokens_in_flight()); |
| EXPECT_EQ(0U, monitor.bytes_retired()); |
| EXPECT_EQ(0, monitor.tokens_retired()); |
| |
| constexpr size_t kByteCount = 2; |
| auto token0 = monitor.Issue(kByteCount); |
| auto token1 = std::move(token0); |
| EXPECT_EQ(kByteCount, monitor.bytes_issued()); |
| EXPECT_EQ(1, monitor.tokens_issued()); |
| EXPECT_EQ(kByteCount, monitor.bytes_in_flight()); |
| EXPECT_EQ(1, monitor.tokens_in_flight()); |
| EXPECT_EQ(0U, monitor.bytes_retired()); |
| EXPECT_EQ(0, monitor.tokens_retired()); |
| |
| // both active token and moved-from token can be retired safely |
| token0.Retire(); |
| token1.Retire(); |
| EXPECT_EQ(kByteCount, monitor.bytes_issued()); |
| EXPECT_EQ(1, monitor.tokens_issued()); |
| EXPECT_EQ(0U, monitor.bytes_in_flight()); |
| EXPECT_EQ(0, monitor.tokens_in_flight()); |
| EXPECT_EQ(kByteCount, monitor.bytes_retired()); |
| EXPECT_EQ(1, monitor.tokens_retired()); |
| } |
| |
| TEST_F(PipelineMonitorTest, SubscribeToMaxTokensAlert) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| std::optional<PipelineMonitor::MaxTokensInFlightAlert> received_alert; |
| constexpr int kMaxTokensInFlight = 1; |
| monitor.SetAlert(PipelineMonitor::MaxTokensInFlightAlert{kMaxTokensInFlight}, |
| [&received_alert](auto alert) { received_alert = alert; }); |
| |
| // First token does not exceed in-flight threshold. |
| auto token0 = monitor.Issue(0); |
| EXPECT_FALSE(received_alert); |
| |
| // Total issued (but not in-flight) exceeds threshold. |
| token0.Retire(); |
| token0 = monitor.Issue(0); |
| ASSERT_LT(kMaxTokensInFlight, monitor.tokens_issued()); |
| EXPECT_FALSE(received_alert); |
| |
| // Total in-flight exceeds threshold. |
| auto token1 = monitor.Issue(0); |
| ASSERT_TRUE(received_alert.has_value()); |
| EXPECT_EQ(kMaxTokensInFlight + 1, received_alert.value().value); |
| |
| // Alert has expired after firing once. |
| received_alert.reset(); |
| auto token2 = monitor.Issue(0); |
| EXPECT_FALSE(received_alert); |
| } |
| |
| TEST_F(PipelineMonitorTest, SubscribeToMaxBytesAlert) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert; |
| constexpr size_t kMaxBytesInFlight = 1; |
| monitor.SetAlert(PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight}, |
| [&received_alert](auto alert) { received_alert = alert; }); |
| |
| // First token does not exceed total bytes in flight threshold. |
| auto token0 = monitor.Issue(kMaxBytesInFlight); |
| EXPECT_FALSE(received_alert); |
| |
| // Total in-flight exceeds threshold. |
| auto token1 = monitor.Issue(1); |
| ASSERT_TRUE(received_alert.has_value()); |
| EXPECT_EQ(kMaxBytesInFlight + 1, received_alert.value().value); |
| } |
| |
| TEST_F(PipelineMonitorTest, SubscribeToMaxAgeAlert) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| std::optional<PipelineMonitor::MaxAgeRetiredAlert> received_alert; |
| constexpr pw::chrono::SystemClock::duration kMaxAge = |
| std::chrono::milliseconds(500); |
| monitor.SetAlert(PipelineMonitor::MaxAgeRetiredAlert{kMaxAge}, |
| [&received_alert](auto alert) { received_alert = alert; }); |
| |
| // Token outlives threshold age, but doesn't signal alert until it's retired. |
| auto token0 = monitor.Issue(0); |
| RunFor(kMaxAge * 2); |
| EXPECT_FALSE(received_alert); |
| |
| // Total in-flight exceeds threshold. |
| token0.Retire(); |
| ASSERT_TRUE(received_alert.has_value()); |
| EXPECT_EQ(kMaxAge * 2, received_alert.value().value); |
| } |
| |
| TEST_F(PipelineMonitorTest, SubscribeToAlertInsideHandler) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert; |
| constexpr size_t kMaxBytesInFlight = 2; |
| |
| auto renew_subscription = [&monitor, &received_alert](auto) { |
| // Same threshold, so it should be triggered eventually, but not |
| // immediately. |
| monitor.SetAlert( |
| PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight - 1}, |
| [&received_alert](auto alert) { received_alert = alert; }); |
| }; |
| monitor.SetAlert(PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight}, |
| renew_subscription); |
| |
| // Total in-flight exceeds threshold. |
| auto token0 = monitor.Issue(kMaxBytesInFlight + 1); |
| EXPECT_FALSE(received_alert); |
| |
| // Re-subscribed alert doesn't get called until the monitored value |
| // potentially changes again. |
| auto token1 = monitor.Issue(0); |
| ASSERT_TRUE(received_alert.has_value()); |
| EXPECT_EQ(kMaxBytesInFlight + 1, received_alert.value().value); |
| } |
| |
| TEST_F(PipelineMonitorTest, |
| MultipleMaxBytesInFlightAlertsWithDifferentThresholds) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert_0; |
| constexpr size_t kMaxBytesInFlight0 = 1; |
| monitor.SetAlert( |
| PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight0}, |
| [&received_alert_0](auto alert) { received_alert_0 = alert; }); |
| std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert_1; |
| constexpr size_t kMaxBytesInFlight1 = 2; |
| monitor.SetAlert( |
| PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight1}, |
| [&received_alert_1](auto alert) { received_alert_1 = alert; }); |
| |
| // Total in-flight exceeds threshold 0. |
| auto token0 = monitor.Issue(kMaxBytesInFlight0 + 1); |
| ASSERT_TRUE(received_alert_0.has_value()); |
| EXPECT_LT(kMaxBytesInFlight0, received_alert_0.value().value); |
| EXPECT_GE(kMaxBytesInFlight1, received_alert_0.value().value); |
| EXPECT_FALSE(received_alert_1); |
| |
| // Total in-flight exceeds threshold 1. |
| auto token1 = monitor.Issue(kMaxBytesInFlight1); |
| ASSERT_TRUE(received_alert_1.has_value()); |
| EXPECT_LT(kMaxBytesInFlight1, received_alert_1.value().value); |
| } |
| |
| TEST_F(PipelineMonitorTest, SubscribeToMultipleDissimilarAlerts) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| constexpr size_t kMaxBytesInFlight = 2; |
| constexpr int kMaxTokensInFlight = 1; |
| |
| int listener_call_count = 0; |
| int max_bytes_alerts = 0; |
| int max_tokens_alerts = 0; |
| auto alerts_listener = [&](auto alert_value) { |
| listener_call_count++; |
| if (std::holds_alternative<PipelineMonitor::MaxBytesInFlightAlert>( |
| alert_value)) { |
| max_bytes_alerts++; |
| } else if (std::holds_alternative<PipelineMonitor::MaxTokensInFlightAlert>( |
| alert_value)) { |
| max_tokens_alerts++; |
| } |
| }; |
| monitor.SetAlerts( |
| alerts_listener, |
| PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight}, |
| PipelineMonitor::MaxTokensInFlightAlert{kMaxTokensInFlight}); |
| |
| auto token0 = monitor.Issue(0); |
| EXPECT_EQ(0, listener_call_count); |
| |
| auto token1 = monitor.Issue(0); |
| EXPECT_EQ(1, listener_call_count); |
| EXPECT_EQ(1, max_tokens_alerts); |
| |
| auto token2 = monitor.Issue(kMaxBytesInFlight + 1); |
| EXPECT_EQ(2, listener_call_count); |
| EXPECT_EQ(1, max_bytes_alerts); |
| } |
| |
| TEST_F(PipelineMonitorTest, TokensRetireIntoRetireLog) { |
| PipelineMonitor monitor( |
| dispatcher(), internal::RetireLog(/*min_depth=*/1, /*max_depth=*/64)); |
| |
| auto token = monitor.Issue(1); |
| EXPECT_EQ(0U, monitor.retire_log().depth()); |
| |
| const pw::chrono::SystemClock::duration kAge = std::chrono::milliseconds(10); |
| RunFor(kAge); |
| token.Retire(); |
| EXPECT_EQ(1U, monitor.retire_log().depth()); |
| const auto bytes_quantiles = |
| monitor.retire_log().ComputeByteCountQuantiles(std::array{0., .5, 1.}); |
| ASSERT_TRUE(bytes_quantiles.has_value()); |
| EXPECT_THAT(*bytes_quantiles, testing::ElementsAre(1, 1, 1)); |
| |
| const auto age_quantiles = |
| monitor.retire_log().ComputeAgeQuantiles(std::array{0., .5, 1.}); |
| ASSERT_TRUE(age_quantiles.has_value()); |
| EXPECT_THAT(*age_quantiles, testing::ElementsAre(kAge, kAge, kAge)); |
| } |
| |
| TEST_F(PipelineMonitorTest, TokensCanBeSplit) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| const size_t kSplits = 10; |
| Token token_main = monitor.Issue(kSplits); |
| |
| const pw::chrono::SystemClock::duration kAge = std::chrono::milliseconds(10); |
| RunFor(kAge); |
| |
| for (size_t i = 0; i < kSplits; i++) { |
| Token split_token = token_main.Split(1); |
| EXPECT_EQ(monitor.tokens_retired(), static_cast<int64_t>(i)); |
| if (i == kSplits - 1) { |
| // token_main is moved to split_token when the final byte is taken. |
| EXPECT_EQ(monitor.tokens_issued(), |
| static_cast<int64_t>(i) + |
| 1); // split_token + ("i" previous split tokens) |
| } else { |
| EXPECT_EQ( |
| monitor.tokens_issued(), |
| static_cast<int64_t>(i) + |
| 2); // token_main + split_token + ("i" previous split tokens) |
| } |
| EXPECT_EQ(monitor.bytes_retired(), i); |
| EXPECT_EQ(kSplits - i, monitor.bytes_in_flight()); |
| } |
| |
| // Even though kSplits+1 Token objects were created, we should only see |
| // kSplits retirements, which is how an PDU split into fragments for outbound |
| // send would be modeled. |
| EXPECT_EQ(static_cast<int64_t>(kSplits), monitor.tokens_retired()); |
| EXPECT_EQ(kSplits, monitor.bytes_retired()); |
| |
| ASSERT_EQ(monitor.retire_log().depth(), kSplits); |
| |
| std::optional<std::array<size_t, 2>> byte_quantiles = |
| monitor.retire_log().ComputeByteCountQuantiles(std::array{0., 1.}); |
| ASSERT_TRUE(byte_quantiles); |
| EXPECT_EQ(byte_quantiles.value()[0], 1u); |
| EXPECT_EQ(byte_quantiles.value()[1], 1u); |
| |
| std::optional<std::array<pw::chrono::SystemClock::duration, 2>> |
| age_quantiles = |
| monitor.retire_log().ComputeAgeQuantiles(std::array{0., 1.}); |
| ASSERT_TRUE(age_quantiles); |
| EXPECT_EQ(age_quantiles.value()[0], kAge); |
| EXPECT_EQ(age_quantiles.value()[1], kAge); |
| } |
| |
| using PipelineMonitorDeathTest = PipelineMonitorTest; |
| |
| TEST_F(PipelineMonitorDeathTest, SplittingTokenIntoMoreThanConstituentBytes) { |
| PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams); |
| |
| auto token_main = monitor.Issue(1); |
| |
| EXPECT_DEATH(token_main.Split(2), "byte"); |
| } |
| |
| } // namespace |
| } // namespace bt |