blob: 1830f637517e5f4a05b8dc366e3cc45901a38676 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// ============================================================================
// This is an accompanying example code for writing thread safe code in C++.
// Head over there for the full walk-through:
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async
// ============================================================================
#include <lib/async-loop/cpp/loop.h>
#include <lib/async/cpp/sequence_checker.h>
#include <lib/async/cpp/wait.h>
#include <lib/zx/channel.h>
#include <gtest/gtest.h>
namespace {
// [START synchronization_checker]
TEST(Async, SynchronizationCheckerExample) {
// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
public:
ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
: dispatcher_(dispatcher),
checker_(dispatcher),
channel_(std::move(channel)),
wait_(channel_.get(), ZX_CHANNEL_READABLE) {}
~ChannelReader() {
// |lock| explicitly checks that the dispatcher is not calling callbacks
// that use this |ChannelReader| instance in the meantime.
checker_.lock();
}
// Asynchronously wait for the channel to become readable, then read the
// data into a member variable.
void AsyncRead() {
// This guard checks that the |AsyncRead| method is called from a task
// running on a synchronized dispatcher.
std::lock_guard guard(checker_);
data_.clear();
zx_status_t status = wait_.Begin(
// The dispatcher that will perform the waiting.
dispatcher_,
// The async dispatcher will call this callback when the channel is
// ready to be read from. Because this callback captures `this`, we
// must ensure the callback does not race with destroying the
// |ChannelReader| instance. This is accomplished by calling
// `checker_.lock()` in the |ChannelReader| destructor.
[this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
return;
}
std::lock_guard guard(checker_);
uint32_t actual;
data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
status = channel_.read(0, data_.data(), nullptr,
static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
if (status != ZX_OK) {
data_.clear();
return;
}
data_.resize(actual);
});
ZX_ASSERT(status == ZX_OK);
}
std::vector<uint8_t> data() const {
// Here we also verify synchronization, because we want to avoid race
// conditions such as the user calling |AsyncRead| which clears the
// data and calling |data| to get the data at the same time.
std::lock_guard guard(checker_);
return data_;
}
private:
async_dispatcher_t* dispatcher_;
async::synchronization_checker checker_;
zx::channel channel_;
std::vector<uint8_t> data_ __TA_GUARDED(checker_);
async::WaitOnce wait_;
};
zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};
ASSERT_EQ(reader.data(), std::vector<uint8_t>{});
const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));
// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();
ASSERT_EQ(reader.data(), kData);
// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();
}
// [END synchronization_checker]
} // namespace