blob: 3f3990f75d0d3bc2a44be9a9da83a4cbd9ee0b70 [file] [log] [blame]
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdint.h>
#include <sys/select.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstring>
#include <memory>
#include <vector>
#include "gtest/gtest.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/port.h"
// IWYU pragma: no_include <arpa/inet.h>
// IWYU pragma: no_include <ratio>
// This test won't work except with posix sockets enabled
#ifdef GRPC_POSIX_SOCKET_EV
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <unistd.h>
#include "absl/status/status.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/strerror.h"
#include "test/core/event_engine/posix/posix_engine_test_utils.h"
#include "test/core/util/port.h"
static gpr_mu g_mu;
static std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
g_event_poller;
// buffer size used to send and receive data.
// 1024 is the minimal value to set TCP send and receive buffer.
#define BUF_SIZE 1024
// Max number of connections pending to be accepted by listen().
#define MAX_NUM_FD 1024
// Client write buffer size
#define CLIENT_WRITE_BUF_SIZE 10
// Total number of times that the client fills up the write buffer
#define CLIENT_TOTAL_WRITE_CNT 3
namespace grpc_event_engine {
namespace experimental {
using namespace std::chrono_literals;
namespace {
absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) {
return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
sizeof(buffer_size_bytes))
? absl::OkStatus()
: absl::Status(absl::StatusCode::kInternal,
grpc_core::StrError(errno).c_str());
}
// Create a test socket with the right properties for testing.
// port is the TCP port to listen or connect to.
// Return a socket FD and sockaddr_in.
void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) {
int fd;
int one = 1;
int buffer_size_bytes = BUF_SIZE;
int flags;
fd = socket(AF_INET6, SOCK_STREAM, 0);
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
// Reset the size of socket send buffer to the minimal value to facilitate
// buffer filling up and triggering notify_on_write
EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
// Make fd non-blocking.
flags = fcntl(fd, F_GETFL, 0);
EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
*socket_fd = fd;
// Use local address for test.
memset(sin, 0, sizeof(struct sockaddr_in6));
sin->sin6_family = AF_INET6;
(reinterpret_cast<char*>(&sin->sin6_addr))[15] = 1;
EXPECT_TRUE(port >= 0 && port < 65536);
sin->sin6_port = htons(static_cast<uint16_t>(port));
}
// =======An upload server to test notify_on_read===========
// The server simply reads and counts a stream of bytes.
// An upload server.
typedef struct {
EventHandle* em_fd; // listening fd
ssize_t read_bytes_total; // total number of received bytes
int done; // set to 1 when a server finishes serving
PosixEngineClosure* listen_closure;
} server;
void ServerInit(server* sv) {
sv->read_bytes_total = 0;
sv->done = 0;
}
// An upload session.
// Created when a new upload request arrives in the server.
typedef struct {
server* sv; // not owned by a single session
EventHandle* em_fd; // fd to read upload bytes
char read_buf[BUF_SIZE]; // buffer to store upload bytes
PosixEngineClosure* session_read_closure;
} session;
// Called when an upload session can be safely shutdown.
// Close session FD and start to shutdown listen FD.
void SessionShutdownCb(session* se, bool /*success*/) {
server* sv = se->sv;
se->em_fd->OrphanHandle(nullptr, nullptr, "a");
gpr_free(se);
// Start to shutdown listen fd.
sv->em_fd->ShutdownHandle(
absl::Status(absl::StatusCode::kUnknown, "SessionShutdownCb"));
}
// Called when data become readable in a session.
void SessionReadCb(session* se, absl::Status status) {
int fd = se->em_fd->WrappedFd();
ssize_t read_once = 0;
ssize_t read_total = 0;
if (!status.ok()) {
SessionShutdownCb(se, true);
return;
}
do {
read_once = read(fd, se->read_buf, BUF_SIZE);
if (read_once > 0) read_total += read_once;
} while (read_once > 0);
se->sv->read_bytes_total += read_total;
// read() returns 0 to indicate the TCP connection was closed by the
// client read(fd, read_buf, 0) also returns 0 which should never be called as
// such. It is possible to read nothing due to spurious edge event or data has
// been drained, In such a case, read() returns -1 and set errno to
// EAGAIN.
if (read_once == 0) {
SessionShutdownCb(se, true);
} else if (read_once == -1) {
EXPECT_EQ(errno, EAGAIN);
// An edge triggered event is cached in the kernel until next poll.
// In the current single thread implementation, SessionReadCb is called
// in the polling thread, such that polling only happens after this
// callback, and will catch read edge event if data is available again
// before notify_on_read.
se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
[se](absl::Status status) { SessionReadCb(se, status); });
se->em_fd->NotifyOnRead(se->session_read_closure);
}
}
// Called when the listen FD can be safely shutdown. Close listen FD and
// signal that server can be shutdown.
void ListenShutdownCb(server* sv) {
sv->em_fd->OrphanHandle(nullptr, nullptr, "b");
gpr_mu_lock(&g_mu);
sv->done = 1;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
// Called when a new TCP connection request arrives in the listening port.
void ListenCb(server* sv, absl::Status status) {
int fd;
int flags;
session* se;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
EventHandle* listen_em_fd = sv->em_fd;
if (!status.ok()) {
ListenShutdownCb(sv);
return;
}
do {
fd = accept(listen_em_fd->WrappedFd(),
reinterpret_cast<struct sockaddr*>(&ss), &slen);
} while (fd < 0 && errno == EINTR);
if (fd < 0 && errno == EAGAIN) {
sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
listen_em_fd->NotifyOnRead(sv->listen_closure);
return;
} else if (fd < 0) {
gpr_log(GPR_ERROR, "Failed to acceot a connection, returned error: %s",
grpc_core::StrError(errno).c_str());
}
EXPECT_GE(fd, 0);
EXPECT_LT(fd, FD_SETSIZE);
flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
se->em_fd = g_event_poller->CreateHandle(fd, "listener", false);
se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
[se](absl::Status status) { SessionReadCb(se, status); });
se->em_fd->NotifyOnRead(se->session_read_closure);
sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
listen_em_fd->NotifyOnRead(sv->listen_closure);
}
// Start a test server, return the TCP listening port bound to listen_fd.
// ListenCb() is registered to be interested in reading from listen_fd.
// When connection request arrives, ListenCb() is called to accept the
// connection request.
int ServerStart(server* sv) {
int port = grpc_pick_unused_port_or_die();
int fd;
struct sockaddr_in6 sin;
socklen_t addr_len;
CreateTestSocket(port, &fd, &sin);
addr_len = sizeof(sin);
EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
port = ntohs(sin.sin6_port);
EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
sv->em_fd = g_event_poller->CreateHandle(fd, "server", false);
sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
sv->em_fd->NotifyOnRead(sv->listen_closure);
return port;
}
// ===An upload client to test notify_on_write===
// An upload client.
typedef struct {
EventHandle* em_fd;
char write_buf[CLIENT_WRITE_BUF_SIZE];
ssize_t write_bytes_total;
// Number of times that the client fills up the write buffer and calls
// notify_on_write to schedule another write.
int client_write_cnt;
int done;
PosixEngineClosure* write_closure;
} client;
void ClientInit(client* cl) {
memset(cl->write_buf, 0, sizeof(cl->write_buf));
cl->write_bytes_total = 0;
cl->client_write_cnt = 0;
cl->done = 0;
}
// Called when a client upload session is ready to shutdown.
void ClientSessionShutdownCb(client* cl) {
cl->em_fd->OrphanHandle(nullptr, nullptr, "c");
gpr_mu_lock(&g_mu);
cl->done = 1;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
// Write as much as possible, then register notify_on_write.
void ClientSessionWrite(client* cl, absl::Status status) {
int fd = cl->em_fd->WrappedFd();
ssize_t write_once = 0;
if (!status.ok()) {
ClientSessionShutdownCb(cl);
return;
}
do {
write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
if (write_once > 0) cl->write_bytes_total += write_once;
} while (write_once > 0);
EXPECT_EQ(errno, EAGAIN);
gpr_mu_lock(&g_mu);
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure = PosixEngineClosure::TestOnlyToClosure(
[cl](absl::Status status) { ClientSessionWrite(cl, status); });
cl->client_write_cnt++;
gpr_mu_unlock(&g_mu);
cl->em_fd->NotifyOnWrite(cl->write_closure);
} else {
gpr_mu_unlock(&g_mu);
ClientSessionShutdownCb(cl);
}
}
// Start a client to send a stream of bytes.
void ClientStart(client* cl, int port) {
int fd;
struct sockaddr_in6 sin;
CreateTestSocket(port, &fd, &sin);
if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
-1) {
if (errno == EINPROGRESS) {
struct pollfd pfd;
pfd.fd = fd;
pfd.events = POLLOUT;
pfd.revents = 0;
if (poll(&pfd, 1, -1) == -1) {
gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
abort();
}
} else {
grpc_core::Crash(
absl::StrFormat("Failed to connect to the server (errno=%d)", errno));
}
}
cl->em_fd = g_event_poller->CreateHandle(fd, "client", false);
ClientSessionWrite(cl, absl::OkStatus());
}
// Wait for the signal to shutdown client and server.
void WaitAndShutdown(server* sv, client* cl) {
Poller::WorkResult result;
gpr_mu_lock(&g_mu);
while (!sv->done || !cl->done) {
gpr_mu_unlock(&g_mu);
result = g_event_poller->Work(24h, []() {});
ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
gpr_mu_lock(&g_mu);
}
gpr_mu_unlock(&g_mu);
}
class EventPollerTest : public ::testing::Test {
void SetUp() override {
engine_ =
std::make_unique<grpc_event_engine::experimental::PosixEventEngine>();
EXPECT_NE(engine_, nullptr);
scheduler_ =
std::make_unique<grpc_event_engine::experimental::TestScheduler>(
engine_.get());
EXPECT_NE(scheduler_, nullptr);
g_event_poller = MakeDefaultPoller(scheduler_.get());
engine_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(g_event_poller);
EXPECT_NE(engine_, nullptr);
scheduler_->ChangeCurrentEventEngine(engine_.get());
if (g_event_poller != nullptr) {
gpr_log(GPR_INFO, "Using poller: %s", g_event_poller->Name().c_str());
}
}
void TearDown() override {
if (g_event_poller != nullptr) {
g_event_poller->Shutdown();
}
}
public:
TestScheduler* Scheduler() { return scheduler_.get(); }
private:
std::shared_ptr<grpc_event_engine::experimental::PosixEventEngine> engine_;
std::unique_ptr<grpc_event_engine::experimental::TestScheduler> scheduler_;
};
// Test grpc_fd. Start an upload server and client, upload a stream of bytes
// from the client to the server, and verify that the total number of sent
// bytes is equal to the total number of received bytes.
TEST_F(EventPollerTest, TestEventPollerHandle) {
server sv;
client cl;
int port;
if (g_event_poller == nullptr) {
return;
}
ServerInit(&sv);
port = ServerStart(&sv);
ClientInit(&cl);
ClientStart(&cl, port);
WaitAndShutdown(&sv, &cl);
EXPECT_EQ(sv.read_bytes_total, cl.write_bytes_total);
}
typedef struct FdChangeData {
void (*cb_that_ran)(struct FdChangeData*, absl::Status);
} FdChangeData;
void InitChangeData(FdChangeData* fdc) { fdc->cb_that_ran = nullptr; }
void DestroyChangeData(FdChangeData* /*fdc*/) {}
void FirstReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
gpr_mu_lock(&g_mu);
fdc->cb_that_ran = FirstReadCallback;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
gpr_mu_lock(&g_mu);
fdc->cb_that_ran = SecondReadCallback;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
// Test that changing the callback we use for notify_on_read actually works.
// Note that we have two different but almost identical callbacks above -- the
// point is to have two different function pointers and two different data
// pointers and make sure that changing both really works.
TEST_F(EventPollerTest, TestEventPollerHandleChange) {
EventHandle* em_fd;
FdChangeData a, b;
int flags;
int sv[2];
char data;
ssize_t result;
if (g_event_poller == nullptr) {
return;
}
PosixEngineClosure* first_closure = PosixEngineClosure::TestOnlyToClosure(
[a = &a](absl::Status status) { FirstReadCallback(a, status); });
PosixEngineClosure* second_closure = PosixEngineClosure::TestOnlyToClosure(
[b = &b](absl::Status status) { SecondReadCallback(b, status); });
InitChangeData(&a);
InitChangeData(&b);
EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
flags = fcntl(sv[0], F_GETFL, 0);
EXPECT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
flags = fcntl(sv[1], F_GETFL, 0);
EXPECT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
em_fd =
g_event_poller->CreateHandle(sv[0], "TestEventPollerHandleChange", false);
EXPECT_NE(em_fd, nullptr);
// Register the first callback, then make its FD readable
em_fd->NotifyOnRead(first_closure);
data = 0;
result = write(sv[1], &data, 1);
EXPECT_EQ(result, 1);
// And now wait for it to run.
auto poller_work = [](FdChangeData* fdc) {
Poller::WorkResult result;
gpr_mu_lock(&g_mu);
while (fdc->cb_that_ran == nullptr) {
gpr_mu_unlock(&g_mu);
result = g_event_poller->Work(24h, []() {});
ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
gpr_mu_lock(&g_mu);
}
};
poller_work(&a);
EXPECT_EQ(a.cb_that_ran, FirstReadCallback);
gpr_mu_unlock(&g_mu);
// And drain the socket so we can generate a new read edge
result = read(sv[0], &data, 1);
EXPECT_EQ(result, 1);
// Now register a second callback with distinct change data, and do the same
// thing again.
em_fd->NotifyOnRead(second_closure);
data = 0;
result = write(sv[1], &data, 1);
EXPECT_EQ(result, 1);
// And now wait for it to run.
poller_work(&b);
// Except now we verify that SecondReadCallback ran instead.
EXPECT_EQ(b.cb_that_ran, SecondReadCallback);
gpr_mu_unlock(&g_mu);
em_fd->OrphanHandle(nullptr, nullptr, "d");
DestroyChangeData(&a);
DestroyChangeData(&b);
close(sv[1]);
}
std::atomic<int> kTotalActiveWakeupFdHandles{0};
// A helper class representing one file descriptor. Its implemented using
// a WakeupFd. It registers itself with the poller and waits to be notified
// of read events. Upon receiving a read event, (1) it processes it,
// (2) registes to be notified of the next read event and (3) schedules
// generation of the next read event. The Fd orphanes itself after processing
// a specified number of read events.
class WakeupFdHandle : public grpc_core::DualRefCounted<WakeupFdHandle> {
public:
WakeupFdHandle(int num_wakeups, Scheduler* scheduler,
PosixEventPoller* poller)
: num_wakeups_(num_wakeups),
scheduler_(scheduler),
poller_(poller),
on_read_(
PosixEngineClosure::ToPermanentClosure([this](absl::Status status) {
EXPECT_TRUE(status.ok());
status = ReadPipe();
if (!status.ok()) {
// Rarely epoll1 poller may generate an EPOLLHUP - which is a
// spurious wakeup. Poll based poller may also likely generate a
// lot of spurious wakeups because of the level triggered nature
// of poll In such cases do not bother changing the number of
// wakeups received.
EXPECT_EQ(status, absl::InternalError("Spurious Wakeup"));
handle_->NotifyOnRead(on_read_);
return;
}
if (--num_wakeups_ == 0) {
// This should invoke the registered NotifyOnRead callbacks with
// the shutdown error. When those callbacks call Unref(), the
// WakeupFdHandle should call OrphanHandle in the Unref() method
// implementation.
handle_->ShutdownHandle(absl::InternalError("Shutting down"));
Unref();
} else {
handle_->NotifyOnRead(on_read_);
Ref().release();
// Schedule next wakeup to trigger the registered NotifyOnRead
// callback.
scheduler_->Run(SelfDeletingClosure::Create([this]() {
// Send next wakeup.
EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
Unref();
}));
}
})) {
WeakRef().release();
++kTotalActiveWakeupFdHandles;
EXPECT_GT(num_wakeups_, 0);
EXPECT_NE(scheduler_, nullptr);
EXPECT_NE(poller_, nullptr);
wakeup_fd_ = *PipeWakeupFd::CreatePipeWakeupFd();
handle_ = poller_->CreateHandle(wakeup_fd_->ReadFd(), "test", false);
EXPECT_NE(handle_, nullptr);
handle_->NotifyOnRead(on_read_);
// Send a wakeup initially.
EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
}
~WakeupFdHandle() override { delete on_read_; }
void Orphan() override {
// Once the handle has orphaned itself, decrement
// kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves,
// send a Kick to the poller.
handle_->OrphanHandle(
PosixEngineClosure::TestOnlyToClosure(
[poller = poller_, wakeupfd_handle = this](absl::Status status) {
EXPECT_TRUE(status.ok());
if (--kTotalActiveWakeupFdHandles == 0) {
poller->Kick();
}
wakeupfd_handle->WeakUnref();
}),
nullptr, "");
}
private:
absl::Status ReadPipe() {
char buf[128];
ssize_t r;
int total_bytes_read = 0;
for (;;) {
r = read(wakeup_fd_->ReadFd(), buf, sizeof(buf));
if (r > 0) {
total_bytes_read += r;
continue;
}
if (r == 0) return absl::OkStatus();
switch (errno) {
case EAGAIN:
return total_bytes_read > 0 ? absl::OkStatus()
: absl::InternalError("Spurious Wakeup");
case EINTR:
continue;
default:
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("read: ", grpc_core::StrError(errno)));
}
}
}
int num_wakeups_;
Scheduler* scheduler_;
PosixEventPoller* poller_;
PosixEngineClosure* on_read_;
std::unique_ptr<WakeupFd> wakeup_fd_;
EventHandle* handle_;
};
// A helper class to create Fds and drive the polling for these Fds. It
// repeatedly calls the Work(..) method on the poller to get pet pending events,
// then schedules another parallel Work(..) instantiation and processes these
// pending events. This continues until all Fds have orphaned themselves.
class Worker : public grpc_core::DualRefCounted<Worker> {
public:
Worker(Scheduler* scheduler, PosixEventPoller* poller, int num_handles,
int num_wakeups_per_handle)
: scheduler_(scheduler), poller_(poller) {
handles_.reserve(num_handles);
for (int i = 0; i < num_handles; i++) {
handles_.push_back(
new WakeupFdHandle(num_wakeups_per_handle, scheduler_, poller_));
}
WeakRef().release();
}
void Orphan() override { signal.Notify(); }
void Start() {
// Start executing Work(..).
scheduler_->Run([this]() { Work(); });
}
void Wait() {
signal.WaitForNotification();
WeakUnref();
}
private:
void Work() {
auto result = g_event_poller->Work(24h, [this]() {
// Schedule next work instantiation immediately and take a Ref for
// the next instantiation.
Ref().release();
scheduler_->Run([this]() { Work(); });
});
ASSERT_TRUE(result == Poller::WorkResult::kOk ||
result == Poller::WorkResult::kKicked);
// Corresponds to the Ref taken for the current instantiation. If the
// result was Poller::WorkResult::kKicked, then the next work instantiation
// would not have been scheduled and the poll_again callback should have
// been deleted.
Unref();
}
Scheduler* scheduler_;
PosixEventPoller* poller_;
grpc_core::Notification signal;
std::vector<WakeupFdHandle*> handles_;
};
// This test creates kNumHandles file descriptors and kNumWakeupsPerHandle
// separate read events to the created Fds. The Fds use the NotifyOnRead API to
// wait for a read event, upon receiving a read event they process it
// immediately and schedule the wait for the next read event. A new read event
// is also generated for each fd in parallel after the previous one is
// processed.
TEST_F(EventPollerTest, TestMultipleHandles) {
static constexpr int kNumHandles = 100;
static constexpr int kNumWakeupsPerHandle = 100;
if (g_event_poller == nullptr) {
return;
}
Worker* worker = new Worker(Scheduler(), g_event_poller.get(), kNumHandles,
kNumWakeupsPerHandle);
worker->Start();
worker->Wait();
}
} // namespace
} // namespace experimental
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
gpr_mu_init(&g_mu);
auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
auto strings = absl::StrSplit(poll_strategy, ',');
if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
// Skip the test entirely if poll strategy is none.
return 0;
}
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}
#else // GRPC_POSIX_SOCKET_EV
int main(int argc, char** argv) { return 1; }
#endif // GRPC_POSIX_SOCKET_EV