blob: 6c09e26396858df0870d87e7834f7456ee23aa0e [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "watchdog.h"
#include <chrono>
#include <cstdint>
#include <optional>
#include <thread>
#include <vector>
#include <android-base/logging.h>
struct watcher {
watcher(const char* id, const struct storage_msg* request);
void SetState(const char* new_state);
void LogTimeout();
void LogFinished();
const char* id_;
uint32_t cmd_;
uint32_t op_id_;
uint32_t flags_;
const char* state_;
using clock = std::chrono::high_resolution_clock;
clock::time_point start_;
clock::time_point state_change_;
std::chrono::milliseconds Elapsed(clock::time_point end);
bool triggered_;
};
watcher::watcher(const char* id, const struct storage_msg* request)
: id_(id), state_(nullptr), triggered_(false) {
cmd_ = request->cmd;
op_id_ = request->op_id;
flags_ = request->flags;
start_ = clock::now();
state_change_ = start_;
}
void watcher::SetState(const char* new_state) {
state_ = new_state;
state_change_ = clock::now();
}
void watcher::LogTimeout() {
if (!triggered_) {
triggered_ = true;
LOG(ERROR) << "Storageproxyd watchdog triggered: " << id_ << " cmd: " << cmd_
<< " op_id: " << op_id_ << " flags: " << flags_;
}
if (state_) {
LOG(ERROR) << "...elapsed: " << Elapsed(clock::now()).count() << "ms (" << state_ << " "
<< Elapsed(state_change_).count() << "ms)";
} else {
LOG(ERROR) << "...elapsed: " << Elapsed(clock::now()).count() << "ms";
}
}
void watcher::LogFinished() {
if (triggered_) {
LOG(ERROR) << "...completed: " << Elapsed(clock::now()).count() << "ms";
}
}
std::chrono::milliseconds watcher::Elapsed(watcher::clock::time_point end) {
return std::chrono::duration_cast<std::chrono::milliseconds>(end - start_);
}
namespace {
class Watchdog {
private:
static constexpr std::chrono::milliseconds kDefaultTimeoutMs = std::chrono::milliseconds(500);
static constexpr std::chrono::milliseconds kMaxTimeoutMs = std::chrono::seconds(10);
public:
Watchdog() : watcher_(), done_(false) {}
~Watchdog();
struct watcher* RegisterWatch(const char* id, const struct storage_msg* request);
void AddProgress(struct watcher* watcher, const char* state);
void UnRegisterWatch(struct watcher* watcher);
private:
// Syncronizes access to watcher_ and watcher_change_ between the main
// thread and watchdog loop thread. watcher_ may only be modified by the
// main thread; the watchdog loop is read-only.
std::mutex watcher_mutex_;
std::unique_ptr<struct watcher> watcher_;
std::condition_variable watcher_change_;
std::thread watchdog_thread_;
bool done_;
void WatchdogLoop();
void LogWatchdogTriggerLocked();
};
Watchdog gWatchdog;
} // Anonymous namespace
// Assumes that caller is single-threaded. If we want to use this from a
// multi-threaded context we need to ensure that the watchdog thread is
// initialized safely once and accessing an existing watcher is done while the
// watcher lock is held.
struct watcher* Watchdog::RegisterWatch(const char* id, const struct storage_msg* request) {
if (!watchdog_thread_.joinable()) {
watchdog_thread_ = std::thread(&Watchdog::WatchdogLoop, this);
}
if (watcher_) {
LOG(ERROR) << "Replacing registered watcher " << watcher_->id_;
UnRegisterWatch(watcher_.get());
}
struct watcher* ret = nullptr;
{
std::unique_lock<std::mutex> watcherLock(watcher_mutex_);
watcher_ = std::make_unique<struct watcher>(id, request);
ret = watcher_.get();
}
watcher_change_.notify_one();
return ret;
}
void Watchdog::UnRegisterWatch(struct watcher* watcher) {
{
std::lock_guard<std::mutex> watcherLock(watcher_mutex_);
if (!watcher_) {
LOG(ERROR) << "Cannot unregister watcher, no watcher registered";
return;
}
if (watcher_.get() != watcher) {
LOG(ERROR) << "Unregistering watcher that doesn't match current watcher";
}
watcher_->LogFinished();
watcher_.reset(nullptr);
}
watcher_change_.notify_one();
}
void Watchdog::AddProgress(struct watcher* watcher, const char* state) {
std::lock_guard<std::mutex> watcherLock(watcher_mutex_);
if (watcher_.get() != watcher) {
LOG(ERROR) << "Watcher was not registered, cannot log progress: " << state;
return;
}
watcher->SetState(state);
}
void Watchdog::WatchdogLoop() {
std::unique_lock<std::mutex> lock(watcher_mutex_);
std::chrono::milliseconds timeout = kDefaultTimeoutMs;
while (!done_) {
// wait for a watch to be registered
watcher_change_.wait(lock, [this] { return !!watcher_; });
// wait for the timeout or unregistration
timeout = kDefaultTimeoutMs;
do {
if (!watcher_change_.wait_for(lock, timeout, [this] { return !watcher_; })) {
watcher_->LogTimeout();
timeout = std::min(timeout * 2, kMaxTimeoutMs);
}
} while (!!watcher_);
}
}
Watchdog::~Watchdog() {
{
std::lock_guard<std::mutex> watcherLock(watcher_mutex_);
watcher_.reset(nullptr);
done_ = true;
}
watcher_change_.notify_one();
if (watchdog_thread_.joinable()) {
watchdog_thread_.join();
}
}
struct watcher* watch_start(const char* id, const struct storage_msg* request) {
return gWatchdog.RegisterWatch(id, request);
}
void watch_progress(struct watcher* watcher, const char* state) {
gWatchdog.AddProgress(watcher, state);
}
void watch_finish(struct watcher* watcher) {
gWatchdog.UnRegisterWatch(watcher);
}