blob: f164702337b8c3161d557b1315cc5c2980ab77d6 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/lib/debug_ipc/helper/message_loop_poll.h"
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include "garnet/public/lib/fxl/build_config.h"
#include "garnet/public/lib/fxl/files/eintr_wrapper.h"
namespace debug_ipc {
namespace {
#if !defined(OS_LINUX)
bool SetCloseOnExec(int fd) {
const int flags = fcntl(fd, F_GETFD);
if (flags == -1)
return false;
if (flags & FD_CLOEXEC)
return true;
if (HANDLE_EINTR(fcntl(fd, F_SETFD, flags | FD_CLOEXEC)) == -1)
return false;
return true;
}
bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
if (flags == -1)
return false;
if (flags & O_NONBLOCK)
return true;
if (HANDLE_EINTR(fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1)
return false;
return true;
}
#endif
// Creates a nonblocking temporary pipe pipe and assigns the two ends of it to
// the two out parameters. Returns true on success.
bool CreateLocalNonBlockingPipe(fxl::UniqueFD* out_end, fxl::UniqueFD* in_end) {
#if defined(OS_LINUX)
int fds[2];
if (pipe2(fds, O_CLOEXEC | O_NONBLOCK) != 0)
return false;
out_end->reset(fds[0]);
in_end->reset(fds[1]);
return true;
#else
int fds[2];
if (pipe(fds) != 0)
return false;
fxl::UniqueFD fd_out(fds[0]);
fxl::UniqueFD fd_in(fds[1]);
if (!SetCloseOnExec(fd_out.get()))
return false;
if (!SetCloseOnExec(fd_in.get()))
return false;
if (!SetNonBlocking(fd_out.get()))
return false;
if (!SetNonBlocking(fd_in.get()))
return false;
*out_end = std::move(fd_out);
*in_end = std::move(fd_in);
return true;
#endif
}
} // namespace
struct MessageLoopPoll::WatchInfo {
int fd = 0;
WatchMode mode = WatchMode::kReadWrite;
FDWatcher* watcher = nullptr;
};
MessageLoopPoll::MessageLoopPoll() {
if (!CreateLocalNonBlockingPipe(&wakeup_pipe_out_, &wakeup_pipe_in_))
fprintf(stderr, "Can't create pipe.\n");
}
MessageLoopPoll::~MessageLoopPoll() = default;
void MessageLoopPoll::Init() {
MessageLoop::Init();
wakeup_pipe_watch_ = WatchFD(WatchMode::kRead, wakeup_pipe_out_.get(), this);
}
void MessageLoopPoll::Cleanup() {
// Force unregister out watch before cleaning up current MessageLoop.
wakeup_pipe_watch_ = WatchHandle();
MessageLoop::Cleanup();
}
MessageLoop::WatchHandle MessageLoopPoll::WatchFD(WatchMode mode, int fd,
FDWatcher* watcher) {
// The dispatch code for watch callbacks requires this be called on the
// same thread as the message loop is.
FXL_DCHECK(Current() == static_cast<MessageLoop*>(this));
WatchInfo info;
info.fd = fd;
info.mode = mode;
info.watcher = watcher;
// The reason this function must be called on the message loop thread is that
// otherwise adding a new watch would require synchronously breaking out of
// the existing poll() call to add the new handle and then resuming it.
int watch_id = next_watch_id_;
next_watch_id_++;
watches_[watch_id] = info;
return WatchHandle(this, watch_id);
}
void MessageLoopPoll::RunImpl() {
std::vector<pollfd> poll_vect;
std::vector<size_t> map_indices;
while (!should_quit()) {
// This could be optimized to avoid recomputing every time.
ConstructFDMapping(&poll_vect, &map_indices);
FXL_DCHECK(!poll_vect.empty());
FXL_DCHECK(poll_vect.size() == map_indices.size());
poll(&poll_vect[0], static_cast<nfds_t>(poll_vect.size()), -1);
for (size_t i = 0; i < poll_vect.size(); i++) {
if (poll_vect[i].revents)
OnHandleSignaled(poll_vect[i].fd, poll_vect[i].revents, map_indices[i]);
}
}
}
void MessageLoopPoll::StopWatching(int id) {
// The dispatch code for watch callbacks requires this be called on the
// same thread as the message loop is.
FXL_DCHECK(Current() == this);
std::lock_guard<std::mutex> guard(mutex_);
auto found = watches_.find(id);
if (found == watches_.end()) {
FXL_NOTREACHED();
return;
}
watches_.erase(found);
}
void MessageLoopPoll::OnFDReadable(int fd) {
FXL_DCHECK(fd == wakeup_pipe_out_.get());
// Remove and discard the wakeup byte.
char buf;
int nread = HANDLE_EINTR(read(wakeup_pipe_out_.get(), &buf, 1));
FXL_DCHECK(nread == 1);
// ProcessPendingTask must be called with the lock held.
std::lock_guard<std::mutex> guard(mutex_);
if (ProcessPendingTask())
SetHasTasks();
}
void MessageLoopPoll::SetHasTasks() {
// Wake up the poll() by writing to the pipe.
char buf = 0;
int written = HANDLE_EINTR(write(wakeup_pipe_in_.get(), &buf, 1));
FXL_DCHECK(written == 1 || errno == EAGAIN);
}
void MessageLoopPoll::ConstructFDMapping(
std::vector<pollfd>* poll_vect, std::vector<size_t>* map_indices) const {
// The watches_ vector is not threadsafe.
FXL_DCHECK(Current() == this);
poll_vect->resize(watches_.size());
map_indices->resize(watches_.size());
size_t i = 0;
for (const auto& pair : watches_) {
pollfd& pfd = (*poll_vect)[i];
pfd.fd = pair.second.fd;
pfd.events = 0;
if (pair.second.mode == WatchMode::kRead ||
pair.second.mode == WatchMode::kReadWrite)
pfd.events |= POLLIN;
if (pair.second.mode == WatchMode::kWrite ||
pair.second.mode == WatchMode::kReadWrite)
pfd.events |= POLLOUT;
pfd.revents = 0;
(*map_indices)[i] = pair.first;
i++;
}
}
void MessageLoopPoll::OnHandleSignaled(int fd, short events, int watch_id) {
// The watches_ vector is not threadsafe.
FXL_DCHECK(Current() == this);
auto found = watches_.find(watch_id);
if (found == watches_.end()) {
// Handle could have been just closed. Since all signaled handles are
// notified for one call to poll(), a previous callback could have removed
// a watch.
return;
}
FXL_DCHECK(fd == found->second.fd);
// Since notifications can cause the watcher to be removed, this flag tracks
// if anything has been issued and therefore we should re-check the watcher
// registration before dereferencing anything.
bool sent_notification = false;
if (events & POLLIN) {
FXL_DCHECK(found->second.mode == WatchMode::kRead ||
found->second.mode == WatchMode::kReadWrite);
found->second.watcher->OnFDReadable(fd);
sent_notification = true;
}
if (events & POLLOUT) {
if (sent_notification) {
found = watches_.find(watch_id);
if (found == watches_.end())
return;
}
FXL_DCHECK(found->second.mode == WatchMode::kWrite ||
found->second.mode == WatchMode::kReadWrite);
found->second.watcher->OnFDWritable(fd);
sent_notification = true;
}
if (sent_notification)
return; // ERASEME
if ((events & POLLERR) || (events & POLLHUP) || (events & POLLNVAL)
#if defined(POLLRDHUP) // Mac doesn't have this.
|| (events & POLLRDHUP)
#endif
) {
if (sent_notification) {
found = watches_.find(watch_id);
if (found == watches_.end())
return;
}
found->second.watcher->OnFDError(fd);
sent_notification = true;
}
}
} // namespace debug_ipc