blob: d1a8f54e39c53e453127a5d7cdd500a54b15dbe2 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/debug/shared/message_loop_poll.h"
#include <lib/syslog/cpp/macros.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "src/lib/files/eintr_wrapper.h"
namespace debug {
struct MessageLoopPoll::WatchInfo {
int fd = -1;
WatchMode mode = WatchMode::kReadWrite;
FDWatcher watcher;
};
MessageLoopPoll::MessageLoopPoll() {
if (!CreateLocalNonBlockingPipe(&wakeup_pipe_out_, &wakeup_pipe_in_))
fprintf(stderr, "Can't create pipe.\n");
}
MessageLoopPoll::~MessageLoopPoll() = default;
bool MessageLoopPoll::Init(std::string* error_message) {
if (!MessageLoop::Init(error_message))
return false;
wakeup_pipe_watch_ =
WatchFD(WatchMode::kRead, wakeup_pipe_out_.get(), [this](int fd, bool readable, bool, bool) {
if (!readable) {
return;
}
FX_DCHECK(fd == wakeup_pipe_out_.get());
// Remove and discard the wakeup byte.
char buf;
auto nread = HANDLE_EINTR(read(wakeup_pipe_out_.get(), &buf, 1));
FX_DCHECK(nread == 1);
// This is just here to wake us up and run the loop again. We don't need to
// actually respond to the data.
});
return true;
}
void MessageLoopPoll::Cleanup() {
// Force unregister our watch before cleaning up current MessageLoop.
wakeup_pipe_watch_ = WatchHandle();
// Delay calling watches_.clear() until the very end, because the destructors of the FDWatchers,
// timers and tasks may call StopWatching() and expect the key is still there.
std::vector<FDWatcher> to_delete;
to_delete.reserve(watches_.size());
for (auto& [key, info] : watches_) {
to_delete.push_back(std::move(info.watcher));
}
to_delete.clear();
MessageLoop::Cleanup();
// Do this at last.
watches_.clear();
}
MessageLoop::WatchHandle MessageLoopPoll::WatchFD(WatchMode mode, int fd, FDWatcher watcher) {
// The dispatch code for watch callbacks requires this be called on the
// same thread as the message loop is.
FX_DCHECK(Current() == static_cast<MessageLoop*>(this));
WatchInfo info;
info.fd = fd;
info.mode = mode;
info.watcher = std::move(watcher);
// The reason this function must be called on the message loop thread is that
// otherwise adding a new watch would require synchronously breaking out of
// the existing poll() call to add the new handle and then resuming it.
int watch_id = next_watch_id_;
next_watch_id_++;
watches_[watch_id] = std::move(info);
return WatchHandle(this, watch_id);
}
int MessageLoopPoll::GetNextWatchId() {
int watch_id = next_watch_id_;
next_watch_id_++;
return watch_id;
}
uint64_t MessageLoopPoll::GetMonotonicNowNS() const {
struct timespec ts;
int ret = clock_gettime(CLOCK_MONOTONIC, &ts);
FX_DCHECK(!ret);
return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
}
void MessageLoopPoll::RunImpl() {
std::vector<pollfd> poll_vect;
std::vector<int> map_indices;
while (!should_quit()) {
// This could be optimized to avoid recomputing every time.
ConstructFDMapping(&poll_vect, &map_indices);
FX_DCHECK(!poll_vect.empty());
FX_DCHECK(poll_vect.size() == map_indices.size());
int poll_timeout;
uint64_t delay = DelayNS();
if (delay == MessageLoop::kMaxDelay) {
poll_timeout = -1;
} else {
delay += 999999;
delay /= 1000000;
poll_timeout = static_cast<int>(delay);
}
int res = poll(poll_vect.data(), static_cast<nfds_t>(poll_vect.size()), poll_timeout);
FX_DCHECK(res >= 0 || errno == EINTR) << "poll() failed: " << strerror(errno);
for (size_t i = 0; i < poll_vect.size(); i++) {
if (poll_vect[i].revents)
OnHandleSignaled(poll_vect[i].fd, poll_vect[i].revents, map_indices[i]);
}
// Process one pending task. If there are more set us to wake up again.
// ProcessPendingTask must be called with the lock held.
std::lock_guard<std::mutex> guard(mutex_);
if (ProcessPendingTask())
SetHasTasks();
}
}
void MessageLoopPoll::StopWatching(int id) {
// The dispatch code for watch callbacks requires this be called on the
// same thread as the message loop is.
FX_DCHECK(Current() == this);
std::lock_guard<std::mutex> guard(mutex_);
auto found = watches_.find(id);
if (found == watches_.end()) {
FX_NOTREACHED();
return;
}
watches_.erase(found);
}
void MessageLoopPoll::SetHasTasks() {
// Wake up the poll() by writing to the pipe.
char buf = 0;
auto written = HANDLE_EINTR(write(wakeup_pipe_in_.get(), &buf, 1));
FX_DCHECK(written == 1 || errno == EAGAIN);
}
void MessageLoopPoll::ConstructFDMapping(std::vector<pollfd>* poll_vect,
std::vector<int>* map_indices) const {
// The watches_ vector is not threadsafe.
FX_DCHECK(Current() == this);
poll_vect->resize(watches_.size());
map_indices->resize(watches_.size());
size_t i = 0;
for (const auto& pair : watches_) {
pollfd& pfd = (*poll_vect)[i];
pfd.fd = pair.second.fd;
pfd.events = 0;
if (pair.second.mode == WatchMode::kRead || pair.second.mode == WatchMode::kReadWrite)
pfd.events |= POLLIN;
if (pair.second.mode == WatchMode::kWrite || pair.second.mode == WatchMode::kReadWrite)
pfd.events |= POLLOUT;
pfd.revents = 0;
(*map_indices)[i] = pair.first;
i++;
}
}
bool MessageLoopPoll::HasWatch(int watch_id) { return watches_.find(watch_id) != watches_.end(); }
void MessageLoopPoll::OnHandleSignaled(int fd, short events, int watch_id) {
// The watches_ vector is not threadsafe.
FX_DCHECK(Current() == this);
// Handle could have been just closed. Since all signaled handles are
// notified for one call to poll(), a previous callback could have removed
// a watch.
if (!HasWatch(watch_id))
return;
// We obtain the watch info and see what kind of signal we received.
auto& watch_info = watches_[watch_id];
FX_DCHECK(fd == watch_info.fd);
bool error = (events & POLLERR) || (events & POLLHUP) || (events & POLLNVAL);
#if defined(POLLRDHUP) // Mac doesn't have this.
error = error || (events & POLLRDHUP);
#endif
bool readable = !!(events & POLLIN);
bool writable = !!(events & POLLOUT);
watch_info.watcher(fd, readable, writable, error);
// watch_info might be invalid because watcher could have called StopWatching().
}
} // namespace debug