blob: 0a70c2e323d0754e19b4624841405a77ce9ef927 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdio.h>
#include <lib/async-loop/cpp/loop.h>
#include <lib/async-loop/default.h>
#include <lib/fidl/txn_header.h>
#include <lib/fidl-async/cpp/client_base.h>
#include <lib/sync/completion.h>
#include <lib/zx/channel.h>
#include <zircon/fidl.h>
#include <zircon/syscalls.h>
#include <zircon/types.h>
#include <zxtest/zxtest.h>
#include <mutex>
#include <thread>
#include <unordered_set>
namespace fidl {
namespace internal {
namespace {
class TestClient : public ClientBase {
public:
TestClient(zx::channel channel, async_dispatcher_t* dispatcher,
TypeErasedOnUnboundFn on_unbound)
: ClientBase(std::move(channel), dispatcher, std::move(on_unbound)) {
ASSERT_OK(Bind());
}
void PrepareAsyncTxn(ResponseContext* context) {
ClientBase::PrepareAsyncTxn(context);
std::unique_lock lock(lock_);
EXPECT_FALSE(txids_.count(context->Txid()));
txids_.insert(context->Txid());
}
void ForgetAsyncTxn(ResponseContext* context) {
{
std::unique_lock lock(lock_);
txids_.erase(context->Txid());
}
ClientBase::ForgetAsyncTxn(context);
}
std::shared_ptr<AsyncBinding> GetBinding() {
return ClientBase::GetBinding();
}
// For responses, find and remove the entry for the matching txid. For events, increment the
// event count.
zx_status_t Dispatch(fidl_msg_t* msg, ResponseContext* context) override {
auto* hdr = reinterpret_cast<fidl_message_header_t*>(msg->bytes);
EXPECT_EQ(!hdr->txid, !context); // hdr->txid == 0 iff context == nullptr.
if (!hdr->txid != !context) {
return ZX_OK; // This is a failure, but let the test continue.
}
std::unique_lock lock(lock_);
if (hdr->txid) {
auto txid_it = txids_.find(hdr->txid);
EXPECT_TRUE(txid_it != txids_.end()); // the transaction must be found.
if (txid_it != txids_.end()) {
txids_.erase(txid_it);
}
} else {
event_count_++;
}
return ZX_OK;
}
uint32_t GetEventCount() {
std::unique_lock lock(lock_);
return event_count_;
}
bool IsPending(zx_txid_t txid) {
std::unique_lock lock(lock_);
return txids_.count(txid);
}
size_t GetTxidCount() {
auto internal_count = list_length(&contexts_);
std::unique_lock lock(lock_);
EXPECT_EQ(txids_.size(), internal_count);
return internal_count;
}
std::mutex lock_;
std::unordered_set<zx_txid_t> txids_;
uint32_t event_count_ = 0;
};
class TestResponseContext : public ResponseContext {
public:
TestResponseContext() = default;
void OnError() {}
};
TEST(ClientBaseTestCase, AsyncTxn) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
zx_handle_t local_handle = local.get();
sync_completion_t unbound;
TypeErasedOnUnboundFn on_unbound =
[&](void* impl, UnboundReason reason, zx::channel channel) {
EXPECT_EQ(fidl::UnboundReason::kPeerClosed, reason);
EXPECT_EQ(local_handle, channel.get());
auto* client = static_cast<TestClient*>(impl);
EXPECT_EQ(0, client->GetTxidCount());
delete client;
sync_completion_signal(&unbound);
};
auto* client = new TestClient(std::move(local), loop.dispatcher(), std::move(on_unbound));
// Generate a txid for a ResponseContext. Send a "response" message with the same txid from the
// remote end of the channel.
TestResponseContext context;
client->PrepareAsyncTxn(&context);
EXPECT_TRUE(client->IsPending(context.Txid()));
fidl_message_header_t hdr;
fidl_init_txn_header(&hdr, context.Txid(), 0);
ASSERT_OK(remote.write(0, &hdr, sizeof(fidl_message_header_t), nullptr, 0));
// Trigger unbound handler.
remote.reset();
EXPECT_OK(sync_completion_wait(&unbound, ZX_TIME_INFINITE));
}
TEST(ClientBaseTestCase, ParallelAsyncTxns) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
zx_handle_t local_handle = local.get();
sync_completion_t unbound;
TypeErasedOnUnboundFn on_unbound =
[&](void* impl, UnboundReason reason, zx::channel channel) {
EXPECT_EQ(fidl::UnboundReason::kPeerClosed, reason);
EXPECT_EQ(local_handle, channel.get());
auto* client = static_cast<TestClient*>(impl);
EXPECT_EQ(0, client->GetTxidCount());
delete client;
sync_completion_signal(&unbound);
};
auto* client = new TestClient(std::move(local), loop.dispatcher(), std::move(on_unbound));
// In parallel, simulate 10 async transactions and send "response" messages from the remote end of
// the channel.
TestResponseContext contexts[10];
std::thread threads[10];
for (int i = 0; i < 10; ++i) {
threads[i] = std::thread([context = &contexts[i], &remote, client]{
client->PrepareAsyncTxn(context);
EXPECT_TRUE(client->IsPending(context->Txid()));
fidl_message_header_t hdr;
fidl_init_txn_header(&hdr, context->Txid(), 0);
ASSERT_OK(remote.write(0, &hdr, sizeof(fidl_message_header_t), nullptr, 0));
});
}
for (int i = 0; i < 10; ++i)
threads[i].join();
// Trigger unbound handler.
remote.reset();
EXPECT_OK(sync_completion_wait(&unbound, ZX_TIME_INFINITE));
}
TEST(ClientBaseTestCase, ForgetAsyncTxn) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
TestClient client(std::move(local), loop.dispatcher(), nullptr);
// Generate a txid for a ResponseContext.
TestResponseContext context;
client.PrepareAsyncTxn(&context);
EXPECT_TRUE(client.IsPending(context.Txid()));
// Forget the transaction.
client.ForgetAsyncTxn(&context);
EXPECT_EQ(0, client.GetTxidCount());
}
TEST(ClientBaseTestCase, UnknownResponseTxid) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
zx_handle_t local_handle = local.get();
sync_completion_t unbound;
TypeErasedOnUnboundFn on_unbound =
[&](void* impl, UnboundReason reason, zx::channel channel) {
EXPECT_EQ(fidl::UnboundReason::kInternalError, reason);
EXPECT_EQ(local_handle, channel.get());
auto* client = static_cast<TestClient*>(impl);
EXPECT_EQ(0, client->GetTxidCount());
delete client;
sync_completion_signal(&unbound);
};
auto* client = new TestClient(std::move(local), loop.dispatcher(), std::move(on_unbound));
// Send a "response" message for which there was no outgoing request.
ASSERT_EQ(0, client->GetTxidCount());
fidl_message_header_t hdr;
fidl_init_txn_header(&hdr, 1, 0);
ASSERT_OK(remote.write(0, &hdr, sizeof(fidl_message_header_t), nullptr, 0));
// on_unbound should be triggered by the erroneous response.
EXPECT_OK(sync_completion_wait(&unbound, ZX_TIME_INFINITE));
}
TEST(ClientBaseTestCase, Events) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
zx_handle_t local_handle = local.get();
sync_completion_t unbound;
TypeErasedOnUnboundFn on_unbound =
[&](void* impl, UnboundReason reason, zx::channel channel) {
EXPECT_EQ(fidl::UnboundReason::kPeerClosed, reason);
EXPECT_EQ(local_handle, channel.get());
auto* client = static_cast<TestClient*>(impl);
EXPECT_EQ(10, client->GetEventCount()); // Expect 10 events.
delete client;
sync_completion_signal(&unbound);
};
auto* client = new TestClient(std::move(local), loop.dispatcher(), std::move(on_unbound));
(void)client;
// In parallel, send 10 event messages from the remote end of the channel.
std::thread threads[10];
for (int i = 0; i < 10; ++i) {
threads[i] = std::thread([&remote]{
fidl_message_header_t hdr;
fidl_init_txn_header(&hdr, 0, 0);
ASSERT_OK(remote.write(0, &hdr, sizeof(fidl_message_header_t), nullptr, 0));
});
}
for (int i = 0; i < 10; ++i)
threads[i].join();
// Trigger unbound handler.
remote.reset();
EXPECT_OK(sync_completion_wait(&unbound, ZX_TIME_INFINITE));
}
TEST(ClientBaseTestCase, Unbind) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
zx_handle_t local_handle = local.get();
sync_completion_t unbound;
TypeErasedOnUnboundFn on_unbound = [&](void*, UnboundReason reason, zx::channel channel) {
EXPECT_EQ(fidl::UnboundReason::kUnbind, reason);
EXPECT_EQ(local_handle, channel.get());
sync_completion_signal(&unbound);
};
auto* client = new TestClient(std::move(local), loop.dispatcher(), std::move(on_unbound));
// Unbind the client and wait for on_unbound to run.
client->Unbind();
EXPECT_OK(sync_completion_wait(&unbound, ZX_TIME_INFINITE));
}
TEST(ClientBaseTestCase, UnbindOnDestroy) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
zx_handle_t local_handle = local.get();
sync_completion_t unbound;
TypeErasedOnUnboundFn on_unbound = [&](void*, UnboundReason reason, zx::channel channel) {
EXPECT_EQ(fidl::UnboundReason::kUnbind, reason);
EXPECT_EQ(local_handle, channel.get());
sync_completion_signal(&unbound);
};
auto* client = new TestClient(std::move(local), loop.dispatcher(), std::move(on_unbound));
// Delete the client and wait for on_unbound to run.
delete client;
EXPECT_OK(sync_completion_wait(&unbound, ZX_TIME_INFINITE));
}
TEST(ClientBaseTestCase, BindingRefPreventsUnbind) {
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
zx_handle_t local_handle = local.get();
sync_completion_t unbound;
TypeErasedOnUnboundFn on_unbound = [&](void*, UnboundReason reason, zx::channel channel) {
EXPECT_EQ(fidl::UnboundReason::kUnbind, reason);
EXPECT_EQ(local_handle, channel.get());
sync_completion_signal(&unbound);
};
auto* client = new TestClient(std::move(local), loop.dispatcher(), std::move(on_unbound));
// Create a strong reference to the binding. Spawn a thread to trigger an Unbind().
auto binding = client->GetBinding();
std::thread([client] { client->Unbind(); }).detach();
// Yield to allow the other thread to run.
zx_nanosleep(0);
// unbound should not be signaled until the strong reference is released.
ASSERT_EQ(ZX_ERR_TIMED_OUT, sync_completion_wait(&unbound, 0));
binding.reset();
EXPECT_OK(sync_completion_wait(&unbound, ZX_TIME_INFINITE));
}
TEST(ClientBaseTestCase, ReleaseOutstandingTxnsOnDestroy) {
class ReleaseTestResponseContext : public ResponseContext {
public:
ReleaseTestResponseContext(sync_completion_t* done) : done_(done) {}
void OnError() {
sync_completion_signal(done_);
delete this;
}
sync_completion_t* done_;
};
async::Loop loop(&kAsyncLoopConfigNoAttachToCurrentThread);
ASSERT_OK(loop.StartThread());
zx::channel local, remote;
ASSERT_OK(zx::channel::create(0, &local, &remote));
auto* client = new TestClient(std::move(local), loop.dispatcher(), nullptr);
// Create and register a response context which will signal when deleted.
sync_completion_t done;
client->PrepareAsyncTxn(new ReleaseTestResponseContext(&done));
// Delete the client and ensure that the response context is deleted.
delete client;
EXPECT_OK(sync_completion_wait(&done, ZX_TIME_INFINITE));
}
} // namespace
} // namespace internal
} // namespace fidl