blob: 7ba168cb1262b3010ce46ab1288920dddb996841 [file] [log] [blame] [edit]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/shell/mirror/server.h"
#include <condition_variable>
#include <mutex>
#include <thread>
#include <sdk/lib/syslog/cpp/macros.h>
#include "src/developer/shell/mirror/common.h"
namespace shell::mirror {
#define PRINT_FLUSH(...) \
printf(__VA_ARGS__); \
fflush(stdout);
// SocketConnection --------------------------------------------------------------------------------
uint64_t SocketConnection::global_id_ = 0;
SocketConnection::~SocketConnection() {}
Err SocketConnection::Accept(debug_ipc::MessageLoop* main_thread_loop, int server_fd) {
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
socklen_t addrlen = sizeof(addr);
fbl::unique_fd client(
accept4(server_fd, reinterpret_cast<sockaddr*>(&addr), &addrlen, SOCK_NONBLOCK));
if (!client.is_valid()) {
return Err(kConnection, "Couldn't accept connection.");
}
main_thread_loop->PostTask(
FROM_HERE, [this, client = std::move(client),
server_loop = debug_ipc::MessageLoop::Current()]() mutable {
if (!buffer_.Init(std::move(client))) {
FX_LOGS(ERROR) << "Error waiting for data.";
debug_ipc::MessageLoop::Current()->QuitNow();
return;
}
buffer_.set_data_available_callback(
[buffer = &buffer_, path = this->server_->GetPath(), server_loop, connection = this]() {
debug_ipc::StreamBuffer stream = buffer->stream();
char buf[32];
size_t len = stream.Read(buf, 32);
if (len >= strlen(remote_commands::kQuitCommand) &&
strncmp(remote_commands::kQuitCommand, buf, len) == 0) {
debug_ipc::MessageLoop::Current()->QuitNow();
server_loop->QuitNow();
return;
}
if (len >= strlen(remote_commands::kFilesCommand) &&
strncmp(remote_commands::kFilesCommand, buf, len) == 0) {
Update update(&stream, &path);
update.SendUpdates();
} else {
FX_LOGS(ERROR) << "Unrecognized command from socket";
}
connection->UnregisterAndDestroy();
});
#if 0
// TODO(jeremymanson): Should probably do something sensible here.
buffer_.set_error_callback([]() {
// Do this on Ctrl-C.
});
#endif
});
PRINT_FLUSH("Accepted connection.\n");
connected_ = true;
return Err();
}
void SocketConnection::UnregisterAndDestroy() { server_->RemoveConnection(this); }
// SocketServer --------------------------------------------------------------------------------
void SocketServer::Run(ConnectionConfig config) {
// Wait for one connection.
PRINT_FLUSH("Waiting on port %d for shell connections...\n", config.port);
config_ = config;
connection_monitor_ = debug_ipc::MessageLoop::Current()->WatchFD(
debug_ipc::MessageLoop::WatchMode::kRead, server_socket_.get(), this);
}
Err SocketServer::RunInLoop(ConnectionConfig config, debug_ipc::FileLineFunction from_here,
fit::closure inited_fn) {
std::string init_error_message;
// This loop manages incoming connections, and runs in this thread.
debug_ipc::PlatformMessageLoop server_loop;
if (!server_loop.Init(&init_error_message)) {
return Err(kInit, init_error_message);
}
// Do appropriate init and start accepting connections.
uint16_t port = static_cast<uint16_t>(config.port);
Err err = Init(&port);
if (!err.ok()) {
server_loop.Cleanup();
return err;
}
config.port = port;
config.message_loop = &server_loop;
Run(std::move(config));
server_loop.PostTask(from_here, std::move(inited_fn));
server_loop.Run();
// Shut down the individual connections associated with the message loop.
auto it = this->connections_.begin();
while (it != this->connections_.end()) {
it = this->connections_.begin();
this->RemoveConnection(it->get());
}
// Shutdown.
// Stop monitoring for new connections (otherwise the destructor complains).
this->connection_monitor_.StopWatching();
server_loop.Cleanup();
return Err();
}
Err SocketServer::Init(uint16_t* port) {
constexpr uint8_t kMaxAttempts = 6;
for (uint8_t attempt = 0; attempt < kMaxAttempts; attempt++) {
server_socket_.reset(socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP));
if (!server_socket_.is_valid()) {
return Err(kConnection, "Could not create socket: " + std::string(strerror(errno)));
}
// Bind to local address.
struct sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_addr = in6addr_any;
addr.sin6_port = htons(*port);
if (bind(server_socket_.get(), reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0) {
if (attempt != 0 || attempt == kMaxAttempts - 1) {
// Either we tried a designated port in the first iteration, we tried 0 in the first
// iteration and it couldn't give us anything, or we've tried as many times as we could.
std::string msg = "Could not bind socket: ";
msg += strerror(errno);
return Err(kConnection, msg);
} else {
// We're looping - just try again with another port
close(server_socket_.get());
server_socket_.release();
*port = 0;
continue;
}
}
if (*port != 0) {
break;
}
// If port wasn't assigned, we want to get one assigned automatically. We passed 0 in, which
// means bind() will give you an unused ephemeral port, which is unbound.
// Figure out which port was granted, close it, and then pretend it's the real port. Because
// this is a bit racy (someone else might try to grab the port), do it up to |attempts| times.
struct sockaddr_in6 addr_out;
socklen_t out_length = sizeof(addr_out);
if ((getsockname(server_socket_.get(), reinterpret_cast<sockaddr*>(&addr), &out_length) < 0) ||
out_length != sizeof(addr_out)) {
std::string msg = "Could not get info for socket: ";
msg += strerror(errno);
return Err(kConnection, msg);
}
*port = addr_out.sin6_port;
close(server_socket_.get());
server_socket_.release();
}
if (listen(server_socket_.get(), 1) < 0) {
std::string msg = "Could not listen on socket: ";
msg += strerror(errno);
return Err(kConnection, msg);
}
return Err();
}
void SocketServer::OnFDReady(int fd, bool readable, bool writeable, bool err) {
if (!readable) {
return;
}
// insert() returns a pair <iterator, bool>
auto p = this->connections_.insert(std::make_unique<SocketConnection>(this));
if (!p.second) {
FX_LOGS(ERROR) << "Internal error";
return;
}
Err error = p.first->get()->Accept(config_.message_loop, fd);
if (!error.ok()) {
FX_LOGS(INFO) << error.msg;
return;
}
PRINT_FLUSH("Connection established.\n");
}
// SocketServer --------------------------------------------------------------------------------
Err Update::SendUpdates() {
for (auto it_entry = std::filesystem::recursive_directory_iterator(path_);
it_entry != std::filesystem::recursive_directory_iterator(); ++it_entry) {
const std::string filename = std::filesystem::absolute(it_entry->path());
files_.AddFile(filename.c_str());
}
std::vector<char> dumped_files;
if (files_.DumpFiles(&dumped_files) != 0) {
std::string msg = "Could not dump files: " + std::string(strerror(errno));
return Err(kWrite, msg);
}
stream_->Write(dumped_files);
return Err();
}
} // namespace shell::mirror