blob: ae001748d8500ccf58c2a656ab138264328f58a2 [file] [log] [blame]
// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_bluetooth_sapphire/peripheral.h"
#include "pw_async/fake_dispatcher.h"
#include "pw_async2/pend_func_task.h"
#include "pw_bluetooth_sapphire/internal/host/gap/fake_adapter.h"
#include "pw_unit_test/framework.h"
using Peripheral2 = pw::bluetooth::low_energy::Peripheral2;
using AdvertisedPeripheral2 = pw::bluetooth::low_energy::AdvertisedPeripheral2;
using ManufacturerData = pw::bluetooth::low_energy::ManufacturerData;
using AdvertiseError = pw::bluetooth::low_energy::Peripheral2::AdvertiseError;
template <typename T>
class ReceiverTask final : public pw::async2::Task {
public:
ReceiverTask(pw::async2::OnceReceiver<T> receiver)
: receiver_(std::move(receiver)) {}
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
pw::async2::Poll<pw::Result<T>> pend = receiver_.Pend(cx);
if (pend.IsPending()) {
return pw::async2::Pending();
}
result_ = std::move(pend.value());
return pw::async2::Ready();
}
pw::Result<T>& result() { return result_; }
private:
pw::async2::OnceReceiver<T> receiver_;
pw::Result<T> result_;
};
class PeripheralTest : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
// Returns nullopt if OnceReceiver received no result or a OnceReceiver error.
std::optional<Peripheral2::AdvertiseResult> Advertise(
Peripheral2::AdvertisingParameters& parameters) {
pw::async2::OnceReceiver<Peripheral2::AdvertiseResult> receiver =
peripheral().Advertise(parameters);
ReceiverTask<Peripheral2::AdvertiseResult> task(std::move(receiver));
dispatcher2().Post(task);
EXPECT_TRUE(task.result().status().IsUnknown());
dispatcher().RunUntilIdle();
EXPECT_TRUE(dispatcher2().RunUntilStalled().IsReady());
if (!task.result().status().ok()) {
return std::nullopt;
}
return std::move(task.result().value());
}
AdvertisedPeripheral2::Ptr AdvertiseExpectSuccess(
Peripheral2::AdvertisingParameters& parameters) {
std::optional<Peripheral2::AdvertiseResult> result = Advertise(parameters);
if (!result.has_value()) {
ADD_FAILURE();
return nullptr;
}
if (!result.value().has_value()) {
ADD_FAILURE();
return nullptr;
}
return std::move(result.value().value());
}
pw::bluetooth_sapphire::Peripheral& peripheral() { return peripheral_; }
bt::gap::testing::FakeAdapter& adapter() { return adapter_; }
pw::async::test::FakeDispatcher& dispatcher() { return async_dispatcher_; }
pw::async2::Dispatcher& dispatcher2() { return async2_dispatcher_; }
private:
pw::async::test::FakeDispatcher async_dispatcher_;
pw::async2::Dispatcher async2_dispatcher_;
bt::gap::testing::FakeAdapter adapter_{async_dispatcher_};
pw::bluetooth_sapphire::Peripheral peripheral_{adapter_.AsWeakPtr(),
async_dispatcher_};
};
TEST_F(PeripheralTest,
StartAdvertisingWithNameAndDestroyAdvertisedPeripheralStopsAdvertising) {
Peripheral2::AdvertisingParameters parameters;
parameters.data.name = "pigweed";
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.data.local_name()->name, parameters.data.name);
EXPECT_EQ(advertisement.data.appearance(),
static_cast<uint16_t>(pw::bluetooth::Appearance::kUnknown));
EXPECT_FALSE(advertisement.extended_pdu);
EXPECT_FALSE(advertisement.include_tx_power_level);
EXPECT_FALSE(advertisement.connectable);
EXPECT_FALSE(advertisement.anonymous);
advertised_peripheral.reset();
dispatcher().RunUntilIdle();
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 0u);
}
TEST_F(PeripheralTest, StartAdvertisingWithTooLongName) {
std::string name(300, 'A');
Peripheral2::AdvertisingParameters parameters;
parameters.data.name = name;
std::optional<Peripheral2::AdvertiseResult> result = Advertise(parameters);
ASSERT_TRUE(result.has_value());
ASSERT_FALSE(result.value().has_value());
EXPECT_EQ(result.value().error(),
pw::bluetooth_sapphire::Peripheral::AdvertiseError::
kAdvertisingDataTooLong);
}
TEST_F(PeripheralTest, StartAdvertisingWithServiceData) {
const uint16_t uuid_0 = 42, uuid_1 = 43;
pw::bluetooth::low_energy::ServiceData service_data_0;
service_data_0.uuid = pw::bluetooth::Uuid(uuid_0);
std::array<std::byte, 3> service_data_0_data = {
std::byte{0x00}, std::byte{0x01}, std::byte{0x02}};
service_data_0.data = pw::span(service_data_0_data);
pw::bluetooth::low_energy::ServiceData service_data_1;
service_data_1.uuid = pw::bluetooth::Uuid(uuid_1);
std::array<std::byte, 3> service_data_1_data = {
std::byte{0x10}, std::byte{0x11}, std::byte{0x12}};
service_data_1.data = pw::span(service_data_1_data);
std::array<pw::bluetooth::low_energy::ServiceData, 2> service_data = {
service_data_0, service_data_1};
Peripheral2::AdvertisingParameters parameters;
parameters.data.service_data = service_data;
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.data.service_data(bt::UUID(uuid_0)),
bt::BufferView(service_data_0.data));
EXPECT_EQ(advertisement.data.service_data(bt::UUID(uuid_1)),
bt::BufferView(service_data_1.data));
}
TEST_F(PeripheralTest, StartAdvertisingWithServiceUuids) {
const uint16_t uuid_0 = 42, uuid_1 = 43;
std::array<pw::bluetooth::Uuid, 2> service_uuids = {
pw::bluetooth::Uuid(uuid_0), pw::bluetooth::Uuid(uuid_1)};
std::unordered_set<bt::UUID> expected_uuids{bt::UUID(uuid_0),
bt::UUID(uuid_1)};
Peripheral2::AdvertisingParameters parameters;
parameters.data.service_uuids = service_uuids;
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.data.service_uuids(), expected_uuids);
}
TEST_F(PeripheralTest, StartAdvertisingWithManufacturerData) {
std::array<std::byte, 3> data_0 = {
std::byte{0x00}, std::byte{0x01}, std::byte{0x02}};
std::array<std::byte, 3> data_1 = {
std::byte{0x03}, std::byte{0x04}, std::byte{0x05}};
std::array<ManufacturerData, 2> manufacturer_data{
ManufacturerData{.company_id = 0, .data = data_0},
ManufacturerData{.company_id = 1, .data = data_1}};
Peripheral2::AdvertisingParameters parameters;
parameters.data.manufacturer_data = manufacturer_data;
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.data.manufacturer_data(0), bt::BufferView(data_0));
EXPECT_EQ(advertisement.data.manufacturer_data(1), bt::BufferView(data_1));
}
TEST_F(PeripheralTest, StartAdvertisingWithUris) {
std::string uri_0("https://abc.xyz");
std::string uri_1("https://pigweed.dev");
std::array<std::string_view, 2> uris = {uri_0, uri_1};
std::unordered_set<std::string> uri_set = {uri_0, uri_1};
Peripheral2::AdvertisingParameters parameters;
parameters.data.uris = uris;
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.data.uris(), uri_set);
}
TEST_F(PeripheralTest, StartAdvertisingWithPublicAddressType) {
Peripheral2::AdvertisingParameters parameters;
parameters.address_type = pw::bluetooth::Address::Type::kPublic;
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.addr_type, bt::DeviceAddress::Type::kLEPublic);
}
TEST_F(PeripheralTest, StartAdvertisingWithRandomAddressType) {
adapter().fake_le()->EnablePrivacy(true);
Peripheral2::AdvertisingParameters parameters;
parameters.address_type =
pw::bluetooth::Address::Type::kRandomResolvablePrivate;
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.addr_type, bt::DeviceAddress::Type::kLERandom);
}
TEST_F(PeripheralTest, StartAdvertisingWithLegacyProcedureWithScanResponse) {
pw::bluetooth::low_energy::AdvertisingData scan_rsp;
scan_rsp.name = "robot";
Peripheral2::AdvertisingParameters parameters;
parameters.procedure = Peripheral2::LegacyAdvertising{
.scan_response = std::move(scan_rsp), .connection_options = std::nullopt};
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.scan_response.local_name()->name, "robot");
}
TEST_F(PeripheralTest,
StartAdvertisingWithLegacyProcedureWithConnectionOptionsNonBondable) {
Peripheral2::AdvertisingParameters parameters;
Peripheral2::ConnectionOptions connection_options{
.bondable_mode = false,
.service_filter = std::nullopt,
.parameters = std::nullopt,
.att_mtu = std::nullopt};
parameters.procedure = Peripheral2::LegacyAdvertising{
.scan_response = std::nullopt, .connection_options = connection_options};
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
ASSERT_TRUE(advertisement.connectable.has_value());
EXPECT_EQ(advertisement.connectable->bondable_mode,
bt::sm::BondableMode::NonBondable);
}
TEST_F(PeripheralTest,
StartAdvertisingWithLegacyProcedureWithConnectionOptionsBondable) {
Peripheral2::AdvertisingParameters parameters;
Peripheral2::ConnectionOptions connection_options{
.bondable_mode = true,
.service_filter = std::nullopt,
.parameters = std::nullopt,
.att_mtu = std::nullopt};
parameters.procedure = Peripheral2::LegacyAdvertising{
.scan_response = std::nullopt, .connection_options = connection_options};
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
ASSERT_TRUE(advertisement.connectable.has_value());
EXPECT_EQ(advertisement.connectable->bondable_mode,
bt::sm::BondableMode::Bondable);
}
TEST_F(PeripheralTest, StartAdvertisingAnonymous) {
Peripheral2::AdvertisingParameters parameters;
parameters.procedure = Peripheral2::ExtendedAdvertising{
.configuration = pw::bluetooth::low_energy::Peripheral2::
ExtendedAdvertising::Anonymous(),
.tx_power = std::nullopt,
.primary_phy = pw::bluetooth::low_energy::Phy::k1Megabit,
.secondary_phy = pw::bluetooth::low_energy::Phy::k1Megabit};
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_TRUE(advertisement.anonymous);
}
TEST_F(PeripheralTest, StartAdvertisingWithExtendedProcedureWithScanResponse) {
Peripheral2::ScanResponse scan_rsp;
scan_rsp.name = "robot";
Peripheral2::AdvertisingParameters parameters;
parameters.procedure = Peripheral2::ExtendedAdvertising{
.configuration = scan_rsp,
.tx_power = std::nullopt,
.primary_phy = pw::bluetooth::low_energy::Phy::k1Megabit,
.secondary_phy = pw::bluetooth::low_energy::Phy::k1Megabit};
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
EXPECT_EQ(advertisement.scan_response.local_name()->name, "robot");
EXPECT_FALSE(advertisement.anonymous);
}
TEST_F(PeripheralTest,
StartAdvertisingWithExtendedProcedureWithConnectionOptionsNonBondable) {
Peripheral2::AdvertisingParameters parameters;
Peripheral2::ConnectionOptions connection_options{
.bondable_mode = false,
.service_filter = std::nullopt,
.parameters = std::nullopt,
.att_mtu = std::nullopt};
parameters.procedure = Peripheral2::ExtendedAdvertising{
.configuration = connection_options,
.tx_power = std::nullopt,
.primary_phy = pw::bluetooth::low_energy::Phy::k1Megabit,
.secondary_phy = pw::bluetooth::low_energy::Phy::k1Megabit};
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
ASSERT_TRUE(advertisement.connectable.has_value());
EXPECT_EQ(advertisement.connectable->bondable_mode,
bt::sm::BondableMode::NonBondable);
}
TEST_F(PeripheralTest,
StartAdvertisingWithExtendedProcedureWithConnectionOptionsBondable) {
Peripheral2::AdvertisingParameters parameters;
Peripheral2::ConnectionOptions connection_options{
.bondable_mode = true,
.service_filter = std::nullopt,
.parameters = std::nullopt,
.att_mtu = std::nullopt};
parameters.procedure = Peripheral2::ExtendedAdvertising{
.configuration = connection_options,
.tx_power = std::nullopt,
.primary_phy = pw::bluetooth::low_energy::Phy::k1Megabit,
.secondary_phy = pw::bluetooth::low_energy::Phy::k1Megabit};
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
auto& advertisement =
adapter().fake_le()->registered_advertisements().begin()->second;
ASSERT_TRUE(advertisement.connectable.has_value());
EXPECT_EQ(advertisement.connectable->bondable_mode,
bt::sm::BondableMode::Bondable);
}
TEST_F(PeripheralTest, StartAdvertisingFailureInternalError) {
adapter().fake_le()->set_advertising_result(
ToResult(bt::HostError::kScanResponseTooLong));
Peripheral2::AdvertisingParameters parameters;
std::optional<Peripheral2::AdvertiseResult> result = Advertise(parameters);
ASSERT_TRUE(result.has_value());
ASSERT_FALSE(result.value().has_value());
EXPECT_EQ(result.value().error(), AdvertiseError::kScanResponseDataTooLong);
}
TEST_F(PeripheralTest,
StartAdvertisingAndCallAdvertisedPeripheralStopAdvertising) {
Peripheral2::AdvertisingParameters parameters;
AdvertisedPeripheral2::Ptr advertised_peripheral =
AdvertiseExpectSuccess(parameters);
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
advertised_peripheral->StopAdvertising();
pw::async2::PendFuncTask stop_task(
[&advertised_peripheral](pw::async2::Context& cx) -> pw::async2::Poll<> {
pw::async2::Poll<pw::Status> pend = advertised_peripheral->PendStop(cx);
if (pend.IsReady()) {
EXPECT_TRUE(pend->ok());
return pw::async2::Ready();
}
return pw::async2::Pending();
});
dispatcher2().Post(stop_task);
EXPECT_EQ(dispatcher2().RunUntilStalled(stop_task), pw::async2::Pending());
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 1u);
// Process the stop request.
dispatcher().RunUntilIdle();
// Process the waker wake.
EXPECT_EQ(dispatcher2().RunUntilStalled(stop_task), pw::async2::Ready());
ASSERT_EQ(adapter().fake_le()->registered_advertisements().size(), 0u);
}