blob: 47cf6c25584bf4008d42deebcec62cf73bb5ed6c [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/sessionmgr/agent_runner/agent_runner.h"
#include <map>
#include <set>
#include <utility>
#include <lib/async/cpp/task.h>
#include <lib/async/default.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/functional/make_copyable.h>
#include "peridot/bin/sessionmgr/agent_runner/agent_context_impl.h"
#include "peridot/bin/sessionmgr/agent_runner/agent_runner_storage_impl.h"
#include "peridot/bin/sessionmgr/storage/constants_and_utils.h"
#include "peridot/lib/fidl/array_to_string.h"
#include "peridot/lib/fidl/json_xdr.h"
namespace modular {
constexpr zx::duration kTeardownTimeout = zx::sec(3);
AgentRunner::AgentRunner(
fuchsia::sys::Launcher* const launcher,
MessageQueueManager* const message_queue_manager,
fuchsia::ledger::internal::LedgerRepository* const ledger_repository,
AgentRunnerStorage* const agent_runner_storage,
fuchsia::auth::TokenManager* const token_manager,
fuchsia::modular::UserIntelligenceProvider* const
user_intelligence_provider,
EntityProviderRunner* const entity_provider_runner)
: launcher_(launcher),
message_queue_manager_(message_queue_manager),
ledger_repository_(ledger_repository),
agent_runner_storage_(agent_runner_storage),
token_manager_(token_manager),
user_intelligence_provider_(user_intelligence_provider),
entity_provider_runner_(entity_provider_runner),
terminating_(std::make_shared<bool>(false)) {
agent_runner_storage_->Initialize(this, [] {});
}
AgentRunner::~AgentRunner() = default;
void AgentRunner::Connect(
fidl::InterfaceRequest<fuchsia::modular::AgentProvider> request) {
agent_provider_bindings_.AddBinding(this, std::move(request));
}
void AgentRunner::Teardown(const std::function<void()>& callback) {
// No new agents will be scheduled to run.
*terminating_ = true;
FXL_LOG(INFO) << "AgentRunner::Teardown() " << running_agents_.size()
<< " agents";
// No agents were running, we are good to go.
if (running_agents_.empty()) {
callback();
return;
}
// This is called when agents are done being removed
auto called = std::make_shared<bool>(false);
auto termination_callback = [this, called,
callback](const bool from_timeout) {
if (*called) {
return;
}
*called = true;
if (from_timeout) {
FXL_LOG(ERROR) << "AgentRunner::Teardown() timed out";
}
callback();
};
for (auto& it : running_agents_) {
// The running agent will call |AgentRunner::RemoveAgent()| to remove itself
// from the agent runner. When all agents are done being removed,
// |termination_callback| will be executed.
it.second->StopForTeardown([this, termination_callback]() {
if (running_agents_.empty()) {
termination_callback(/* from_timeout= */ false);
}
});
}
async::PostDelayedTask(async_get_default_dispatcher(),
[termination_callback] {
termination_callback(/* from_timeout= */ true);
},
kTeardownTimeout);
}
void AgentRunner::EnsureAgentIsRunning(const std::string& agent_url,
const std::function<void()>& done) {
auto agent_it = running_agents_.find(agent_url);
if (agent_it != running_agents_.end()) {
if (agent_it->second->state() == AgentContextImpl::State::TERMINATING) {
run_agent_callbacks_[agent_url].push_back(done);
return;
}
// fuchsia::modular::Agent is already running, so we can issue the callback
// immediately.
done();
return;
}
run_agent_callbacks_[agent_url].push_back(done);
RunAgent(agent_url);
}
void AgentRunner::RunAgent(const std::string& agent_url) {
// Start the agent and issue all callbacks.
ComponentContextInfo component_info = {message_queue_manager_, this,
ledger_repository_,
entity_provider_runner_};
AgentContextInfo info = {component_info, launcher_, token_manager_,
user_intelligence_provider_};
fuchsia::modular::AppConfig agent_config;
agent_config.url = agent_url;
FXL_CHECK(running_agents_
.emplace(agent_url, std::make_unique<AgentContextImpl>(
info, std::move(agent_config)))
.second);
auto run_callbacks_it = run_agent_callbacks_.find(agent_url);
if (run_callbacks_it != run_agent_callbacks_.end()) {
for (auto& callback : run_callbacks_it->second) {
callback();
}
run_agent_callbacks_.erase(agent_url);
}
UpdateWatchers();
}
void AgentRunner::ConnectToAgent(
const std::string& requestor_url, const std::string& agent_url,
fidl::InterfaceRequest<fuchsia::sys::ServiceProvider>
incoming_services_request,
fidl::InterfaceRequest<fuchsia::modular::AgentController>
agent_controller_request) {
// Drop all new requests if AgentRunner is terminating.
if (*terminating_) {
return;
}
pending_agent_connections_[agent_url].push_back(
{requestor_url, std::move(incoming_services_request),
std::move(agent_controller_request)});
EnsureAgentIsRunning(agent_url, [this, agent_url] {
// If the agent was terminating and has restarted, forwarding connections
// here is redundant, since it was already forwarded earlier.
ForwardConnectionsToAgent(agent_url);
});
}
void AgentRunner::ConnectToEntityProvider(
const std::string& agent_url,
fidl::InterfaceRequest<fuchsia::modular::EntityProvider>
entity_provider_request,
fidl::InterfaceRequest<fuchsia::modular::AgentController>
agent_controller_request) {
// Drop all new requests if AgentRunner is terminating.
if (*terminating_) {
return;
}
pending_entity_provider_connections_[agent_url] = {
std::move(entity_provider_request), std::move(agent_controller_request)};
EnsureAgentIsRunning(agent_url, [this, agent_url] {
auto it = pending_entity_provider_connections_.find(agent_url);
FXL_DCHECK(it != pending_entity_provider_connections_.end());
running_agents_[agent_url]->NewEntityProviderConnection(
std::move(it->second.entity_provider_request),
std::move(it->second.agent_controller_request));
pending_entity_provider_connections_.erase(it);
});
}
void AgentRunner::RemoveAgent(const std::string agent_url) {
running_agents_.erase(agent_url);
if (*terminating_) {
return;
}
UpdateWatchers();
// At this point, if there are pending requests to start the agent (because
// the previous one was in a terminating state), we can start it up again.
if (run_agent_callbacks_.find(agent_url) != run_agent_callbacks_.end()) {
RunAgent(agent_url);
}
}
void AgentRunner::ForwardConnectionsToAgent(const std::string& agent_url) {
// Did we hold onto new connections as the previous one was exiting?
auto found_it = pending_agent_connections_.find(agent_url);
if (found_it != pending_agent_connections_.end()) {
AgentContextImpl* agent = running_agents_[agent_url].get();
for (auto& pending_connection : found_it->second) {
agent->NewAgentConnection(
pending_connection.requestor_url,
std::move(pending_connection.incoming_services_request),
std::move(pending_connection.agent_controller_request));
}
pending_agent_connections_.erase(found_it);
}
}
void AgentRunner::ScheduleTask(const std::string& agent_url,
fuchsia::modular::TaskInfo task_info) {
AgentRunnerStorage::TriggerInfo data;
data.agent_url = agent_url;
data.task_id = task_info.task_id;
if (task_info.trigger_condition.is_message_on_queue()) {
data.queue_name = task_info.trigger_condition.message_on_queue();
data.task_type = AgentRunnerStorage::TriggerInfo::TYPE_QUEUE_MESSAGE;
} else if (task_info.trigger_condition.is_queue_deleted()) {
data.queue_token = task_info.trigger_condition.queue_deleted();
data.task_type = AgentRunnerStorage::TriggerInfo::TYPE_QUEUE_DELETION;
} else if (task_info.trigger_condition.is_alarm_in_seconds()) {
data.task_type = AgentRunnerStorage::TriggerInfo::TYPE_ALARM;
data.alarm_in_seconds = task_info.trigger_condition.alarm_in_seconds();
} else {
// Not a defined trigger condition.
FXL_NOTREACHED();
}
if (task_info.persistent) {
// |AgentRunnerStorageImpl::WriteTask| eventually calls |AddedTask()| after
// this trigger information has been added to the ledger via a ledger page
// watching mechanism.
agent_runner_storage_->WriteTask(agent_url, data, [](bool) {});
} else {
AddedTask(MakeTriggerKey(agent_url, data.task_id), data);
}
}
void AgentRunner::AddedTask(const std::string& key,
AgentRunnerStorage::TriggerInfo data) {
switch (data.task_type) {
case AgentRunnerStorage::TriggerInfo::TYPE_QUEUE_MESSAGE:
ScheduleMessageQueueNewMessageTask(data.agent_url, data.task_id,
data.queue_name);
break;
case AgentRunnerStorage::TriggerInfo::TYPE_QUEUE_DELETION:
ScheduleMessageQueueDeletionTask(data.agent_url, data.task_id,
data.queue_token);
break;
case AgentRunnerStorage::TriggerInfo::TYPE_ALARM:
ScheduleAlarmTask(data.agent_url, data.task_id, data.alarm_in_seconds,
true);
break;
}
task_by_ledger_key_[key] = std::make_pair(data.agent_url, data.task_id);
UpdateWatchers();
}
void AgentRunner::DeletedTask(const std::string& key) {
auto data = task_by_ledger_key_.find(key);
if (data == task_by_ledger_key_.end()) {
// Never scheduled, nothing to delete.
return;
}
DeleteMessageQueueTask(data->second.first, data->second.second);
DeleteAlarmTask(data->second.first, data->second.second);
task_by_ledger_key_.erase(key);
UpdateWatchers();
}
void AgentRunner::DeleteMessageQueueTask(const std::string& agent_url,
const std::string& task_id) {
auto agent_it = watched_queues_.find(agent_url);
if (agent_it == watched_queues_.end()) {
return;
}
auto& agent_map = agent_it->second;
auto task_id_it = agent_map.find(task_id);
if (task_id_it == agent_map.end()) {
return;
}
// The specific type of message queue task identified by |task_id| is not
// available, so explicitly clean up both types.
message_queue_manager_->DropMessageWatcher(kAgentComponentNamespace,
agent_url, task_id_it->second);
message_queue_manager_->DropDeletionWatcher(kAgentComponentNamespace,
agent_url, task_id_it->second);
watched_queues_[agent_url].erase(task_id);
if (watched_queues_[agent_url].empty()) {
watched_queues_.erase(agent_url);
}
}
void AgentRunner::DeleteAlarmTask(const std::string& agent_url,
const std::string& task_id) {
auto agent_it = running_alarms_.find(agent_url);
if (agent_it == running_alarms_.end()) {
return;
}
auto& agent_map = agent_it->second;
auto task_id_it = agent_map.find(task_id);
if (task_id_it == agent_map.end()) {
return;
}
running_alarms_[agent_url].erase(task_id);
if (running_alarms_[agent_url].empty()) {
running_alarms_.erase(agent_url);
}
}
void AgentRunner::ScheduleMessageQueueDeletionTask(
const std::string& agent_url, const std::string& task_id,
const std::string& queue_token) {
auto found_it = watched_queues_.find(agent_url);
if (found_it != watched_queues_.end()) {
if (found_it->second.count(task_id) != 0) {
if (found_it->second[task_id] == queue_token) {
// This means that we are already watching the message queue.
// Do nothing.
return;
}
// We were watching some other queue for this task_id. Stop watching.
message_queue_manager_->DropMessageWatcher(
kAgentComponentNamespace, agent_url, found_it->second[task_id]);
}
} else {
bool inserted = false;
std::tie(found_it, inserted) = watched_queues_.emplace(
agent_url, std::map<std::string, std::string>());
FXL_DCHECK(inserted);
}
found_it->second[task_id] = queue_token;
message_queue_manager_->RegisterDeletionWatcher(
kAgentComponentNamespace, agent_url, queue_token,
[this, agent_url, task_id, terminating = terminating_] {
// If agent runner is terminating or has already terminated, do not run
// any new tasks.
if (*terminating) {
return;
}
EnsureAgentIsRunning(agent_url, [agent_url, task_id, this] {
running_agents_[agent_url]->NewTask(task_id);
});
});
}
void AgentRunner::ScheduleMessageQueueNewMessageTask(
const std::string& agent_url, const std::string& task_id,
const std::string& queue_name) {
auto found_it = watched_queues_.find(agent_url);
if (found_it != watched_queues_.end()) {
if (found_it->second.count(task_id) != 0) {
if (found_it->second[task_id] == queue_name) {
// This means that we are already watching the message queue.
// Do nothing.
return;
}
// We were watching some other queue for this task_id. Stop watching.
message_queue_manager_->DropMessageWatcher(
kAgentComponentNamespace, agent_url, found_it->second[task_id]);
}
} else {
bool inserted = false;
std::tie(found_it, inserted) = watched_queues_.emplace(
agent_url, std::map<std::string, std::string>());
FXL_DCHECK(inserted);
}
found_it->second[task_id] = queue_name;
auto terminating = terminating_;
message_queue_manager_->RegisterMessageWatcher(
kAgentComponentNamespace, agent_url, queue_name,
[this, agent_url, task_id, terminating] {
// If agent runner is terminating or has already terminated, do not run
// any new tasks.
if (*terminating) {
return;
}
EnsureAgentIsRunning(agent_url, [agent_url, task_id, this] {
running_agents_[agent_url]->NewTask(task_id);
});
});
}
void AgentRunner::ScheduleAlarmTask(const std::string& agent_url,
const std::string& task_id,
const uint32_t alarm_in_seconds,
const bool is_new_request) {
auto found_it = running_alarms_.find(agent_url);
if (found_it != running_alarms_.end()) {
if (found_it->second.count(task_id) != 0 && is_new_request) {
// We are already running a task with the same task_id. We might
// just have to update the alarm frequency.
found_it->second[task_id] = alarm_in_seconds;
return;
}
} else {
bool inserted = false;
std::tie(found_it, inserted) =
running_alarms_.emplace(agent_url, std::map<std::string, uint32_t>());
FXL_DCHECK(inserted);
}
found_it->second[task_id] = alarm_in_seconds;
auto terminating = terminating_;
async::PostDelayedTask(
async_get_default_dispatcher(),
[this, agent_url, task_id, terminating] {
// If agent runner is terminating, do not run any new tasks.
if (*terminating) {
return;
}
// Stop the alarm if entry not found.
auto found_it = running_alarms_.find(agent_url);
if (found_it == running_alarms_.end()) {
return;
}
if (found_it->second.count(task_id) == 0) {
return;
}
EnsureAgentIsRunning(agent_url, [agent_url, task_id, found_it, this]() {
running_agents_[agent_url]->NewTask(task_id);
ScheduleAlarmTask(agent_url, task_id, found_it->second[task_id],
false);
});
},
zx::sec(alarm_in_seconds));
}
void AgentRunner::DeleteTask(const std::string& agent_url,
const std::string& task_id) {
// This works for non-persistent tasks too since
// |AgentRunnerStorageImpl::DeleteTask| handles missing keys in ledger
// gracefully.
agent_runner_storage_->DeleteTask(agent_url, task_id, [](bool) {});
}
std::vector<std::string> AgentRunner::GetAllAgents() {
// A set of all agents that are either running or scheduled to be run.
std::set<std::string> agents;
for (auto const& it : running_agents_) {
agents.insert(it.first);
}
for (auto const& it : watched_queues_) {
agents.insert(it.first);
}
for (auto const& it : running_alarms_) {
agents.insert(it.first);
}
std::vector<std::string> agent_urls;
for (auto const& it : agents) {
agent_urls.push_back(it);
}
return agent_urls;
}
void AgentRunner::UpdateWatchers() {
if (*terminating_) {
return;
}
for (auto& watcher : agent_provider_watchers_.ptrs()) {
(*watcher)->OnUpdate(GetAllAgents());
}
}
void AgentRunner::Watch(
fidl::InterfaceHandle<fuchsia::modular::AgentProviderWatcher> watcher) {
auto ptr = watcher.Bind();
// 1. Send this watcher the current list of agents.
ptr->OnUpdate(GetAllAgents());
// 2. Add this watcher to a set that is updated when a new list of agents is
// available.
agent_provider_watchers_.AddInterfacePtr(std::move(ptr));
}
} // namespace modular