blob: c404fa91f5da1d3ad4674c9c6dfb5b7a13d8891d [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "status_watcher.h"
#include <lib/async-loop/cpp/loop.h>
#include <lib/syslog/global.h>
#include <lib/zx/event.h>
#include <queue>
#include <thread>
#include <fbl/mutex.h>
#include <zxtest/zxtest.h>
namespace network {
namespace testing {
using network::internal::StatusWatcher;
constexpr netdev::StatusFlags kStatusOnline = netdev::StatusFlags::ONLINE;
constexpr netdev::StatusFlags kStatusOffline = netdev::StatusFlags();
status_t MakeStatus(netdev::StatusFlags status_flags, uint32_t mtu) {
status_t ret;
ret.flags = static_cast<uint32_t>(status_flags);
ret.mtu = mtu;
return ret;
}
class ObservedStatus {
public:
explicit ObservedStatus(const netdev::Status& status)
: mtu_(status.mtu()), flags_(status.flags()) {}
uint32_t mtu() const { return mtu_; }
const netdev::StatusFlags& flags() const { return flags_; }
private:
uint32_t mtu_;
netdev::StatusFlags flags_;
};
class WatchClient {
public:
static constexpr uint32_t kEvent = ZX_USER_SIGNAL_0;
explicit WatchClient(zx::channel chan) : channel_(std::move(chan)), running_(true) {
ASSERT_OK(zx::event::create(0, &event_));
thread_ = std::thread([this]() { Thread(); });
}
~WatchClient() {
channel_.reset();
event_.reset();
thread_.join();
}
fit::optional<ObservedStatus> PopObserved() {
fbl::AutoLock lock(&lock_);
fit::optional<ObservedStatus> ret;
if (!observed_status_.empty()) {
ret = observed_status_.front();
observed_status_.pop();
}
return ret;
}
void WaitForEventAndReset() {
ASSERT_OK(event_.wait_one(kEvent, zx::time::infinite(), nullptr));
event_.signal(kEvent, 0);
}
void NoEvents(zx::duration duration) {
ASSERT_EQ(event_.wait_one(kEvent, zx::deadline_after(duration), nullptr), ZX_ERR_TIMED_OUT);
}
void WaitForEventCount(size_t count) {
for (;;) {
{
fbl::AutoLock lock(&lock_);
if (observed_status_.size() >= count) {
return;
}
}
WaitForEventAndReset();
}
}
private:
void Thread() {
for (;;) {
auto result = netdev::StatusWatcher::Call::WatchStatus(zx::unowned_channel(channel_));
{
fbl::AutoLock lock(&lock_);
event_.signal(0, kEvent);
if (result.ok()) {
observed_status_.emplace(result.value().device_status);
} else {
break;
}
}
}
{
fbl::AutoLock lock(&lock_);
running_ = false;
}
}
fbl::Mutex lock_;
zx::channel channel_;
zx::event event_;
std::thread thread_;
std::queue<ObservedStatus> observed_status_ __TA_GUARDED(lock_);
bool running_ __TA_GUARDED(lock_);
};
class StatusWatcherTest : public zxtest::Test {
public:
StatusWatcherTest() : loop_(&kAsyncLoopConfigNeverAttachToThread) {}
void SetUp() override {
ASSERT_OK(loop_.StartThread("test-thread", nullptr));
fx_logger_config_t log_cfg = {
.min_severity = -2,
.console_fd = dup(STDOUT_FILENO),
.log_service_channel = ZX_HANDLE_INVALID,
.tags = nullptr,
.num_tags = 0,
};
fx_log_reconfigure(&log_cfg);
}
void TearDown() override {
sync_completion_t completion;
bool wait_for_watchers;
{
fbl::AutoLock lock(&lock_);
teardown_completion_ = &completion;
wait_for_watchers = !watchers_.is_empty();
for (auto& w : watchers_) {
w.Unbind();
}
}
if (wait_for_watchers) {
// Wait for all watchers to be safely unbound and destroyed.
sync_completion_wait(&completion, ZX_TIME_INFINITE);
}
}
protected:
// Creates a watcher and returns an unowned pointer to it for interaction.
// The test fixture will destroy the created watcher objects when they are unbound.
StatusWatcher* MakeWatcher(zx::channel channel, uint32_t buffers,
fit::callback<void(StatusWatcher*)> on_closed = nullptr) {
fbl::AutoLock lock(&lock_);
auto watcher = std::make_unique<StatusWatcher>(buffers);
zx_status_t status = watcher->Bind(
loop_.dispatcher(), std::move(channel),
[this, callback = std::move(on_closed)](StatusWatcher* closed_watcher) mutable {
if (callback) {
callback(closed_watcher);
}
fbl::AutoLock lock(&lock_);
watchers_.erase(*closed_watcher);
if (teardown_completion_ && watchers_.is_empty()) {
sync_completion_signal(teardown_completion_);
}
});
if (status != ZX_OK) {
ADD_FATAL_FAILURE("Failed to bind watcher: %s", zx_status_get_string(status));
}
auto* unowned = watcher.get();
watchers_.push_back(std::move(watcher));
return unowned;
}
async::Loop loop_;
fbl::Mutex lock_;
fbl::DoublyLinkedList<std::unique_ptr<StatusWatcher>> watchers_ __TA_GUARDED(lock_);
sync_completion_t* teardown_completion_ __TA_GUARDED(lock_);
};
TEST_F(StatusWatcherTest, HangsForStatus) {
zx::channel ch, req;
ASSERT_OK(zx::channel::create(0, &ch, &req));
WatchClient cli(std::move(ch));
ASSERT_NO_FATAL_FAILURES();
StatusWatcher* watcher;
ASSERT_NO_FATAL_FAILURES(watcher = MakeWatcher(std::move(req), 2));
// ensure that the client is hanging waiting for a new status observation.
ASSERT_NO_FATAL_FAILURES(cli.NoEvents(zx::msec(10)));
for (int i = 0; i < 100; i++) {
ASSERT_FALSE(cli.PopObserved().has_value());
}
}
TEST_F(StatusWatcherTest, SingleStatus) {
zx::channel ch, req;
ASSERT_OK(zx::channel::create(0, &ch, &req));
WatchClient cli(std::move(ch));
ASSERT_NO_FATAL_FAILURES();
StatusWatcher* watcher;
ASSERT_NO_FATAL_FAILURES(watcher = MakeWatcher(std::move(req), 1));
watcher->PushStatus(MakeStatus(kStatusOnline, 100));
cli.WaitForEventAndReset();
auto opt = cli.PopObserved();
ASSERT_TRUE(opt.has_value());
auto status = opt.value();
ASSERT_EQ(status.mtu(), 100);
ASSERT_TRUE(status.flags() & netdev::StatusFlags::ONLINE);
}
TEST_F(StatusWatcherTest, QueuesStatus) {
zx::channel ch, req;
ASSERT_OK(zx::channel::create(0, &ch, &req));
StatusWatcher* watcher;
ASSERT_NO_FATAL_FAILURES(watcher = MakeWatcher(std::move(req), 3));
watcher->PushStatus(MakeStatus(kStatusOnline, 100));
watcher->PushStatus(MakeStatus(kStatusOnline, 200));
watcher->PushStatus(MakeStatus(kStatusOnline, 300));
WatchClient cli(std::move(ch));
ASSERT_NO_FATAL_FAILURES();
cli.WaitForEventCount(3);
auto opt = cli.PopObserved();
ASSERT_TRUE(opt.has_value());
auto status = opt.value();
EXPECT_EQ(status.mtu(), 100);
EXPECT_TRUE(status.flags() & netdev::StatusFlags::ONLINE);
opt = cli.PopObserved();
ASSERT_TRUE(opt.has_value());
status = opt.value();
EXPECT_EQ(status.mtu(), 200);
EXPECT_TRUE(status.flags() & netdev::StatusFlags::ONLINE);
opt = cli.PopObserved();
ASSERT_TRUE(opt.has_value());
status = opt.value();
EXPECT_EQ(status.mtu(), 300);
EXPECT_TRUE(status.flags() & netdev::StatusFlags::ONLINE);
}
TEST_F(StatusWatcherTest, DropsOldestStatus) {
zx::channel ch, req;
ASSERT_OK(zx::channel::create(0, &ch, &req));
StatusWatcher* watcher;
ASSERT_NO_FATAL_FAILURES(watcher = MakeWatcher(std::move(req), 2));
watcher->PushStatus(MakeStatus(kStatusOnline, 100));
watcher->PushStatus(MakeStatus(kStatusOnline, 200));
watcher->PushStatus(MakeStatus(kStatusOnline, 300));
WatchClient cli(std::move(ch));
ASSERT_NO_FATAL_FAILURES();
cli.WaitForEventCount(2);
auto opt = cli.PopObserved();
ASSERT_TRUE(opt.has_value());
auto status = opt.value();
ASSERT_EQ(status.mtu(), 200);
ASSERT_TRUE(status.flags() & netdev::StatusFlags::ONLINE);
opt = cli.PopObserved();
ASSERT_TRUE(opt.has_value());
status = opt.value();
ASSERT_EQ(status.mtu(), 300);
ASSERT_TRUE(status.flags() & netdev::StatusFlags::ONLINE);
ASSERT_FALSE(cli.PopObserved().has_value());
}
TEST_F(StatusWatcherTest, CallsOnClosedCallback) {
zx::channel ch, req;
ASSERT_OK(zx::channel::create(0, &ch, &req));
StatusWatcher* watcher;
sync_completion_t completion;
ASSERT_NO_FATAL_FAILURES(
watcher = MakeWatcher(std::move(req), 2, [&completion](StatusWatcher* watcher) {
sync_completion_signal(&completion);
}));
// close the channel:
ch.reset();
sync_completion_wait(&completion, ZX_TIME_INFINITE);
}
TEST_F(StatusWatcherTest, LockStepWatch) {
// Tests that if everytime a status is pushed a waiter is already registered (no queuing ever
// happens), StatusWatcher beahves appropriately.
zx::channel ch, req;
ASSERT_OK(zx::channel::create(0, &ch, &req));
StatusWatcher* watcher;
ASSERT_NO_FATAL_FAILURES(watcher = MakeWatcher(std::move(req), 2));
// push an initial status
watcher->PushStatus(MakeStatus(kStatusOnline, 100));
WatchClient cli(std::move(ch));
ASSERT_NO_FATAL_FAILURES();
cli.WaitForEventAndReset();
auto evt = cli.PopObserved();
ASSERT_TRUE(evt.has_value());
EXPECT_TRUE(evt->flags() & netdev::StatusFlags::ONLINE);
// wait for a bit to guarantee that WatchClient is hanging.
ASSERT_NO_FATAL_FAILURES(cli.NoEvents(zx::msec(10)));
watcher->PushStatus(MakeStatus(kStatusOffline, 100));
ASSERT_NO_FATAL_FAILURES(cli.WaitForEventAndReset());
evt = cli.PopObserved();
ASSERT_TRUE(evt.has_value());
EXPECT_FALSE(evt->flags() & netdev::StatusFlags::ONLINE);
// go again with status set to online.
ASSERT_NO_FATAL_FAILURES(cli.NoEvents(zx::msec(10)));
watcher->PushStatus(MakeStatus(kStatusOnline, 100));
ASSERT_NO_FATAL_FAILURES(cli.WaitForEventAndReset());
evt = cli.PopObserved();
ASSERT_TRUE(evt.has_value());
EXPECT_TRUE(evt->flags() & netdev::StatusFlags::ONLINE);
}
TEST_F(StatusWatcherTest, IgnoresDuplicateStatus) {
// Tests that if PushStatus is called twice with the same status, only one event is generated.
zx::channel ch, req;
ASSERT_OK(zx::channel::create(0, &ch, &req));
StatusWatcher* watcher;
ASSERT_NO_FATAL_FAILURES(watcher = MakeWatcher(std::move(req), 2));
// push an initial status twice
watcher->PushStatus(MakeStatus(kStatusOnline, 100));
watcher->PushStatus(MakeStatus(kStatusOnline, 100));
WatchClient cli(std::move(ch));
ASSERT_NO_FATAL_FAILURES();
cli.WaitForEventAndReset();
auto evt = cli.PopObserved();
// wait for a bit to guarantee that WatchClient is hanging.
ASSERT_NO_FATAL_FAILURES(cli.NoEvents(zx::msec(10)));
// push a different status.
watcher->PushStatus(MakeStatus(kStatusOffline, 100));
ASSERT_NO_FATAL_FAILURES(cli.WaitForEventAndReset());
evt = cli.PopObserved();
ASSERT_TRUE(evt.has_value());
EXPECT_FALSE(evt->flags() & netdev::StatusFlags::ONLINE);
}
} // namespace testing
} // namespace network