blob: 4fdd3365c96158853f3427d3e611ab7dd17dc59c [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/debug/shared/message_loop_poll.h"
#include <fcntl.h>
#include <lib/syslog/cpp/macros.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "src/lib/files/eintr_wrapper.h"
#include "src/lib/fxl/build_config.h"
namespace debug_ipc {
namespace {
#if !defined(OS_LINUX)
bool SetCloseOnExec(int fd) {
const int flags = fcntl(fd, F_GETFD);
if (flags == -1)
return false;
if (flags & FD_CLOEXEC)
return true;
if (HANDLE_EINTR(fcntl(fd, F_SETFD, flags | FD_CLOEXEC)) == -1)
return false;
return true;
}
bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
if (flags == -1)
return false;
if (flags & O_NONBLOCK)
return true;
if (HANDLE_EINTR(fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1)
return false;
return true;
}
#endif
// Creates a nonblocking temporary pipe pipe and assigns the two ends of it to
// the two out parameters. Returns true on success.
bool CreateLocalNonBlockingPipe(fbl::unique_fd* out_end, fbl::unique_fd* in_end) {
#if defined(OS_LINUX)
int fds[2];
if (pipe2(fds, O_CLOEXEC | O_NONBLOCK) != 0)
return false;
out_end->reset(fds[0]);
in_end->reset(fds[1]);
return true;
#else
int fds[2];
if (pipe(fds) != 0)
return false;
fbl::unique_fd fd_out(fds[0]);
fbl::unique_fd fd_in(fds[1]);
if (!SetCloseOnExec(fd_out.get()))
return false;
if (!SetCloseOnExec(fd_in.get()))
return false;
if (!SetNonBlocking(fd_out.get()))
return false;
if (!SetNonBlocking(fd_in.get()))
return false;
*out_end = std::move(fd_out);
*in_end = std::move(fd_in);
return true;
#endif
}
} // namespace
struct MessageLoopPoll::WatchInfo {
int fd = 0;
WatchMode mode = WatchMode::kReadWrite;
FDWatcher* watcher = nullptr;
};
MessageLoopPoll::MessageLoopPoll() {
if (!CreateLocalNonBlockingPipe(&wakeup_pipe_out_, &wakeup_pipe_in_))
fprintf(stderr, "Can't create pipe.\n");
}
MessageLoopPoll::~MessageLoopPoll() = default;
bool MessageLoopPoll::Init(std::string* error_message) {
if (!MessageLoop::Init(error_message))
return false;
wakeup_pipe_watch_ = WatchFD(WatchMode::kRead, wakeup_pipe_out_.get(), this);
return true;
}
void MessageLoopPoll::Cleanup() {
// Force unregister out watch before cleaning up current MessageLoop.
wakeup_pipe_watch_ = WatchHandle();
MessageLoop::Cleanup();
}
MessageLoop::WatchHandle MessageLoopPoll::WatchFD(WatchMode mode, int fd, FDWatcher* watcher) {
// The dispatch code for watch callbacks requires this be called on the
// same thread as the message loop is.
FX_DCHECK(Current() == static_cast<MessageLoop*>(this));
WatchInfo info;
info.fd = fd;
info.mode = mode;
info.watcher = watcher;
// The reason this function must be called on the message loop thread is that
// otherwise adding a new watch would require synchronously breaking out of
// the existing poll() call to add the new handle and then resuming it.
int watch_id = next_watch_id_;
next_watch_id_++;
watches_[watch_id] = info;
return WatchHandle(this, watch_id);
}
uint64_t MessageLoopPoll::GetMonotonicNowNS() const {
struct timespec ts;
int ret = clock_gettime(CLOCK_MONOTONIC, &ts);
FX_DCHECK(!ret);
return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
}
void MessageLoopPoll::RunImpl() {
std::vector<pollfd> poll_vect;
std::vector<int> map_indices;
while (!should_quit()) {
// This could be optimized to avoid recomputing every time.
ConstructFDMapping(&poll_vect, &map_indices);
FX_DCHECK(!poll_vect.empty());
FX_DCHECK(poll_vect.size() == map_indices.size());
int poll_timeout;
uint64_t delay = DelayNS();
if (delay == MessageLoop::kMaxDelay) {
poll_timeout = -1;
} else {
delay += 999999;
delay /= 1000000;
poll_timeout = static_cast<int>(delay);
}
int res = poll(&poll_vect[0], static_cast<nfds_t>(poll_vect.size()), poll_timeout);
FX_DCHECK(res >= 0 || errno == EINTR) << "poll() failed: " << strerror(errno);
for (size_t i = 0; i < poll_vect.size(); i++) {
if (poll_vect[i].revents)
OnHandleSignaled(poll_vect[i].fd, poll_vect[i].revents, map_indices[i]);
}
// Process one pending task. If there are more set us to wake up again.
// ProcessPendingTask must be called with the lock held.
std::lock_guard<std::mutex> guard(mutex_);
if (ProcessPendingTask())
SetHasTasks();
}
}
void MessageLoopPoll::StopWatching(int id) {
// The dispatch code for watch callbacks requires this be called on the
// same thread as the message loop is.
FX_DCHECK(Current() == this);
std::lock_guard<std::mutex> guard(mutex_);
auto found = watches_.find(id);
if (found == watches_.end()) {
FX_NOTREACHED();
return;
}
watches_.erase(found);
}
void MessageLoopPoll::OnFDReady(int fd, bool readable, bool, bool) {
if (!readable) {
return;
}
FX_DCHECK(fd == wakeup_pipe_out_.get());
// Remove and discard the wakeup byte.
char buf;
auto nread = HANDLE_EINTR(read(wakeup_pipe_out_.get(), &buf, 1));
FX_DCHECK(nread == 1);
// This is just here to wake us up and run the loop again. We don't need to
// actually respond to the data.
}
void MessageLoopPoll::SetHasTasks() {
// Wake up the poll() by writing to the pipe.
char buf = 0;
auto written = HANDLE_EINTR(write(wakeup_pipe_in_.get(), &buf, 1));
FX_DCHECK(written == 1 || errno == EAGAIN);
}
void MessageLoopPoll::ConstructFDMapping(std::vector<pollfd>* poll_vect,
std::vector<int>* map_indices) const {
// The watches_ vector is not threadsafe.
FX_DCHECK(Current() == this);
poll_vect->resize(watches_.size());
map_indices->resize(watches_.size());
size_t i = 0;
for (const auto& pair : watches_) {
pollfd& pfd = (*poll_vect)[i];
pfd.fd = pair.second.fd;
pfd.events = 0;
if (pair.second.mode == WatchMode::kRead || pair.second.mode == WatchMode::kReadWrite)
pfd.events |= POLLIN;
if (pair.second.mode == WatchMode::kWrite || pair.second.mode == WatchMode::kReadWrite)
pfd.events |= POLLOUT;
pfd.revents = 0;
(*map_indices)[i] = pair.first;
i++;
}
}
bool MessageLoopPoll::HasWatch(int watch_id) {
auto it = watches_.find(watch_id);
if (it == watches_.end())
return false;
return true;
}
void MessageLoopPoll::OnHandleSignaled(int fd, short events, int watch_id) {
// The watches_ vector is not threadsafe.
FX_DCHECK(Current() == this);
// Handle could have been just closed. Since all signaled handles are
// notified for one call to poll(), a previous callback could have removed
// a watch.
if (!HasWatch(watch_id))
return;
// We obtain the watch info and see what kind of signal we received.
auto it = watches_.find(watch_id);
const auto& watch_info = it->second;
FX_DCHECK(fd == watch_info.fd);
bool error = (events & POLLERR) || (events & POLLHUP) || (events & POLLNVAL);
#if defined(POLLRDHUP) // Mac doesn't have this.
error = error || (events & POLLRDHUP);
#endif
bool readable = !!(events & POLLIN);
bool writable = !!(events & POLLOUT);
if (HasWatch(watch_id))
watch_info.watcher->OnFDReady(fd, readable, writable, error);
}
} // namespace debug_ipc