blob: 8ce2b685d3d5f7e661572523713b6c57a9b71f31 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/virtualization/bin/vmm/virtio_vsock.h"
#include <lib/async/cpp/task.h>
#include <lib/async/task.h>
#include <lib/fit/defer.h>
#include <lib/stdcompat/bit.h>
#include <zircon/status.h>
#include <zircon/syscalls/object.h>
#include <zircon/types.h>
namespace {
using ::fuchsia::virtualization::HostVsockConnector_Connect_Result;
// Maximum number of unprocessed control packets the guest is allowed to cause
// us to generate before we stop emitting packets.
//
// In normal operation, this limit should never be reached: we only enqueue at
// most one outgoing packet per incoming packet, and the virtio protocol
// requires the guest to process received packets prior to sending any more.
constexpr size_t kMaxQueuedPackets = 10'000;
void SendResetPacket(VsockSendQueue& queue, const ConnectionKey& key) {
queue.Write(virtio_vsock_hdr_t{
.src_cid = key.local_cid,
.dst_cid = key.remote_cid,
.src_port = key.local_port,
.dst_port = key.remote_port,
.type = VIRTIO_VSOCK_TYPE_STREAM,
.op = VIRTIO_VSOCK_OP_RST,
});
}
} // namespace
std::optional<VsockChain> VsockChain::FromQueue(VirtioQueue* queue, bool writable) {
VirtioDescriptor desc;
uint16_t index;
// Read through descriptors on the queue until we find one that matches our
// criteria, or run out.
//
// If the guest is functioning reasonably, we expect all incoming
// descriptors to match our criteria.
while (queue->NextAvail(&index) == ZX_OK) {
zx_status_t status = queue->ReadDesc(index, &desc);
if (status != ZX_OK) {
FX_LOGS(WARNING) << "Failed to read descriptor from queue: " << status;
queue->Return(index, /*len=*/0);
continue;
}
// Ensure it has the correct read/write mode.
if (desc.writable != writable) {
FX_LOGS(ERROR) << "Descriptor is not " << (writable ? "writable" : "readable");
queue->Return(index, /*len=*/0);
continue;
}
// Ensure it is big enough.
if (desc.len < sizeof(virtio_vsock_hdr_t)) {
FX_LOGS(ERROR) << "Descriptor is too small";
queue->Return(index, /*len=*/0);
continue;
}
return VsockChain(queue, index, desc);
}
return std::nullopt;
}
void VsockChain::Release() {
queue_ = nullptr;
index_ = 0;
}
VsockChain::VsockChain(VsockChain&& other) noexcept { *this = std::move(other); }
VsockChain::~VsockChain() {
FX_CHECK(queue_ == nullptr) << "VsockChain was destroyed without Return() being called.";
}
VsockChain& VsockChain::operator=(VsockChain&& other) noexcept {
queue_ = other.queue_;
desc_ = other.desc_;
index_ = other.index_;
other.Release();
return *this;
}
virtio_vsock_hdr_t* VsockChain::header() const {
FX_DCHECK(desc_.len >= sizeof(virtio_vsock_hdr_t));
return static_cast<virtio_vsock_hdr_t*>(desc_.addr);
}
void VsockChain::Return(uint32_t used) {
queue_->Return(index_, used);
Release();
}
// Hash a ConnectionKey.
size_t ConnectionKey::Hash::operator()(const ConnectionKey& key) const {
return ((static_cast<size_t>(key.local_cid) << 32) | key.local_port) ^
(cpp20::rotl(static_cast<size_t>(key.remote_cid) << 32 | key.remote_port, 16));
}
VsockSendQueue::VsockSendQueue(VirtioQueue* queue) : queue_(queue) {}
std::optional<VsockChain> VsockSendQueue::StartWrite() {
// Attempt to drain all queued packets.
if (!Drain()) {
return std::nullopt;
}
// Start a new transmit.
return VsockChain::FromQueue(queue_, /*writable=*/true);
}
void VsockSendQueue::Write(const virtio_vsock_hdr_t& header) {
// If we are able to drain all existing packets and another guest RX
// descriptor is available, send the packet directly.
if (Drain() && TryWritePacket(header)) {
return;
}
// Otherwise, buffer the packet.
send_buffer_.emplace_back(header);
}
bool VsockSendQueue::Drain() {
while (!send_buffer_.empty()) {
if (!TryWritePacket(send_buffer_.front())) {
return false;
}
send_buffer_.pop_front();
}
return true;
}
bool VsockSendQueue::TryWritePacket(const virtio_vsock_hdr_t& packet) {
std::optional<VsockChain> chain = VsockChain::FromQueue(queue_, /*writable=*/true);
if (!chain.has_value()) {
return false;
}
*chain->header() = packet;
chain->Return(/*used=*/sizeof(virtio_vsock_hdr_t));
return true;
}
// We take a |queue_callback| to decouple the connection from the device. This
// allows a connection to wait on a Virtio queue and update the device state,
// without having direct access to the device.
VirtioVsock::Connection::Connection(
const ConnectionKey& key, zx::socket socket, async_dispatcher_t* dispatcher,
fuchsia::virtualization::GuestVsockAcceptor::AcceptCallback accept_callback,
fit::closure queue_callback)
: dispatcher_(dispatcher),
accept_callback_(std::move(accept_callback)),
queue_callback_(std::move(queue_callback)),
key_(key),
socket_(std::move(socket)) {}
VirtioVsock::Connection::~Connection() {
if (accept_callback_) {
accept_callback_(fuchsia::virtualization::GuestVsockAcceptor_Accept_Result::WithErr(
ZX_ERR_CONNECTION_REFUSED));
}
// We must cancel the async wait before the socket is destroyed.
rx_wait_.Cancel();
tx_wait_.Cancel();
}
std::unique_ptr<VirtioVsock::Connection> VirtioVsock::Connection::Create(
const ConnectionKey& key, zx::socket socket, async_dispatcher_t* dispatcher,
fuchsia::virtualization::GuestVsockAcceptor::AcceptCallback accept_callback,
fit::closure queue_callback) {
// Using `new` to allow access to private constructor.
return std::unique_ptr<VirtioVsock::Connection>(new VirtioVsock::Connection(
key, std::move(socket), dispatcher, std::move(accept_callback), std::move(queue_callback)));
}
zx_status_t VirtioVsock::Connection::Init() {
rx_wait_.set_object(socket_.get());
rx_wait_.set_trigger(ZX_SOCKET_READABLE | ZX_SOCKET_PEER_WRITE_DISABLED |
ZX_SOCKET_WRITE_DISABLED | ZX_SOCKET_PEER_CLOSED);
rx_wait_.set_handler([this](async_dispatcher_t* dispatcher, async::Wait* wait, zx_status_t status,
const zx_packet_signal_t* signal) { OnReady(status, signal); });
tx_wait_.set_object(socket_.get());
tx_wait_.set_trigger(ZX_SOCKET_WRITABLE);
tx_wait_.set_handler([this](async_dispatcher_t* dispatcher, async::Wait* wait, zx_status_t status,
const zx_packet_signal_t* signal) { OnReady(status, signal); });
return WaitOnReceive();
}
zx_status_t VirtioVsock::Connection::Accept() {
// The guest has accepted the connection request. Move the connection into the
// RW state and let the connector know that the connection is ready.
//
// If we don't have an acceptor then this is a spurious response so reset the
// connection.
if (accept_callback_) {
UpdateOp(VIRTIO_VSOCK_OP_RW);
accept_callback_(fuchsia::virtualization::GuestVsockAcceptor_Accept_Result::WithResponse({}));
accept_callback_ = nullptr;
return WaitOnReceive();
}
UpdateOp(VIRTIO_VSOCK_OP_RST);
return ZX_OK;
}
// Connection state machine:
//
// ------------- --------------
// |CREDIT_UPDATE| | ANY_STATE |
// ------------- --------------
// /|\ | | |
// | | | |
// | \|/ \|/ \|/
// ------- -------- ------- -------- -----
// |REQUEST|--->|RESPONSE|--->| RW |<---|SHUTDOWN|--->|RESET|
// ------- -------- -------- -------- -----
// | /|\
// | |
// \|/ |
// -------------
// |CREDIT_REQUEST|
// -------------
void VirtioVsock::Connection::UpdateOp(uint16_t new_op) {
std::lock_guard<std::mutex> lock(op_update_mutex_);
if (new_op == op_) {
return;
}
switch (new_op) {
case VIRTIO_VSOCK_OP_SHUTDOWN:
case VIRTIO_VSOCK_OP_RST:
op_ = new_op;
return;
case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
if (op_ == VIRTIO_VSOCK_OP_RW) {
op_ = new_op;
return;
}
if (op_ == VIRTIO_VSOCK_OP_RESPONSE) {
// NOTE: This is an invalid state.
// We end up here when Mux and Demux race to update the state, and vsock
// has essentially 'not yet completed connecting client' while trying to
// 'report available credit'.
// Do not update the op_ field here, as we risk that side handling
// RESPONSE will never accept the client.
FX_LOGS(INFO) << "Ignoring premature machine state change.";
return;
}
break;
case VIRTIO_VSOCK_OP_RW:
switch (op_) {
// SHUTDOWN -> RW only valid if one of the streams is still active.
case VIRTIO_VSOCK_OP_SHUTDOWN:
if (flags_ == VIRTIO_VSOCK_FLAG_SHUTDOWN_BOTH) {
break;
}
case VIRTIO_VSOCK_OP_RESPONSE:
case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
op_ = new_op;
return;
}
break;
case VIRTIO_VSOCK_OP_RESPONSE:
if (op_ == VIRTIO_VSOCK_OP_REQUEST) {
op_ = new_op;
return;
}
break;
// No transitions to REQUEST allowed, but this is the initial state of the
// connection object.
case VIRTIO_VSOCK_OP_REQUEST:
default:
break;
}
FX_LOGS(ERROR) << "Invalid state transition from " << op_ << " to " << new_op
<< "; resetting connection";
op_ = VIRTIO_VSOCK_OP_RST;
}
uint32_t VirtioVsock::Connection::PeerFree() const {
// See 5.10.6.3 Buffer Space Management, from the Virtio Socket Device 1.1 spec.
FX_DCHECK(tx_cnt_ >= peer_fwd_cnt_);
FX_DCHECK(peer_buf_alloc_ >= (tx_cnt_ - peer_fwd_cnt_));
return peer_buf_alloc_ - (tx_cnt_ - peer_fwd_cnt_);
}
void VirtioVsock::Connection::ReadCredit(virtio_vsock_hdr_t* header) {
SetCredit(header->buf_alloc, header->fwd_cnt);
}
void VirtioVsock::Connection::SetCredit(uint32_t buf_alloc, uint32_t fwd_cnt) {
if (tx_cnt_ < fwd_cnt || buf_alloc < (tx_cnt_ - fwd_cnt)) {
UpdateOp(VIRTIO_VSOCK_OP_RST);
} else {
peer_buf_alloc_ = buf_alloc;
peer_fwd_cnt_ = fwd_cnt;
}
}
zx_status_t VirtioVsock::Connection::WaitOnTransmit() {
if (tx_wait_.is_pending() || !tx_wait_.has_handler()) {
return ZX_OK;
}
return tx_wait_.Begin(dispatcher_);
}
zx_status_t VirtioVsock::Connection::WaitOnReceive() {
if (rx_wait_.is_pending() || !rx_wait_.has_handler()) {
return ZX_OK;
}
return rx_wait_.Begin(dispatcher_);
}
void VirtioVsock::Connection::OnReady(zx_status_t status, const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed while waiting on socket " << status;
return;
}
// If the socket is readable and our peer has buffer space, wait on the
// Virtio receive queue. Do this before checking for peer closed so that
// we first send any remaining data in the socket.
if (signal->observed & ZX_SOCKET_READABLE && PeerFree() > 0) {
queue_callback_();
return;
}
// If the socket has been partially or fully closed, wait on the Virtio
// receive queue.
if (signal->observed &
(ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_PEER_WRITE_DISABLED | ZX_SOCKET_WRITE_DISABLED)) {
zx_signals_t signals = rx_wait_.trigger();
if (signal->observed & ZX_SOCKET_PEER_CLOSED) {
// The peer closed the socket, therefore we move to sending a full
// connection shutdown.
UpdateOp(VIRTIO_VSOCK_OP_SHUTDOWN);
flags_ |= VIRTIO_VSOCK_FLAG_SHUTDOWN_BOTH;
rx_wait_.set_trigger(signals & ~ZX_SOCKET_PEER_CLOSED);
} else {
if (signal->observed & ZX_SOCKET_PEER_WRITE_DISABLED &&
!(flags_ & VIRTIO_VSOCK_FLAG_SHUTDOWN_RECV)) {
// The peer disabled reading, therefore we move to sending a partial
// connection shutdown.
UpdateOp(VIRTIO_VSOCK_OP_SHUTDOWN);
flags_ |= VIRTIO_VSOCK_FLAG_SHUTDOWN_RECV;
rx_wait_.set_trigger(signals & ~ZX_SOCKET_PEER_WRITE_DISABLED);
}
if (signal->observed & ZX_SOCKET_WRITE_DISABLED &&
!(flags_ & VIRTIO_VSOCK_FLAG_SHUTDOWN_SEND)) {
// The peer disabled writing, therefore we move to sending a partial
// connection shutdown.
UpdateOp(VIRTIO_VSOCK_OP_SHUTDOWN);
flags_ |= VIRTIO_VSOCK_FLAG_SHUTDOWN_SEND;
rx_wait_.set_trigger(signals & ~ZX_SOCKET_WRITE_DISABLED);
}
}
queue_callback_();
return;
}
// If the socket is writable and we last reported the buffer as full, send a
// credit update message to the guest indicating buffer space is now
// available.
if (reported_buf_avail_ == 0 && signal->observed & ZX_SOCKET_WRITABLE) {
UpdateOp(VIRTIO_VSOCK_OP_CREDIT_UPDATE);
queue_callback_();
}
}
zx_status_t VirtioVsock::Connection::WriteCredit(virtio_vsock_hdr_t* header) {
zx_info_socket_t info;
zx_status_t status = socket_.get_info(ZX_INFO_SOCKET, &info, sizeof(info), nullptr, nullptr);
if (status != ZX_OK) {
return status;
}
header->buf_alloc = static_cast<uint32_t>(info.tx_buf_max);
header->fwd_cnt = static_cast<uint32_t>(rx_cnt_ - info.tx_buf_size);
reported_buf_avail_ = info.tx_buf_max - info.tx_buf_size;
return reported_buf_avail_ != 0 ? ZX_OK : ZX_ERR_UNAVAILABLE;
}
zx_status_t VirtioVsock::Connection::Shutdown(uint32_t flags) {
uint32_t disposition = 0;
if (flags & VIRTIO_VSOCK_FLAG_SHUTDOWN_SEND) {
disposition = ZX_SOCKET_DISPOSITION_WRITE_DISABLED;
}
uint32_t disposition_peer = 0;
if (flags & VIRTIO_VSOCK_FLAG_SHUTDOWN_RECV) {
disposition_peer = ZX_SOCKET_DISPOSITION_WRITE_DISABLED;
}
return socket_.set_disposition(disposition, disposition_peer);
}
static zx_status_t setup_desc_chain(VirtioQueue* queue, virtio_vsock_hdr_t* header,
VirtioDescriptor* desc) {
desc->addr = header + 1;
desc->len -= sizeof(*header);
// If the descriptor was only large enough for the header, read the next
// descriptor, if there is one.
if (desc->len == 0 && desc->has_next) {
return queue->ReadDesc(desc->next, desc);
}
return ZX_OK;
}
zx_status_t VirtioVsock::Connection::Read(VirtioQueue* queue, virtio_vsock_hdr_t* header,
const VirtioDescriptor& desc, uint32_t* used) {
VirtioDescriptor next = desc;
zx_status_t status = setup_desc_chain(queue, header, &next);
while (status == ZX_OK) {
size_t len = std::min(next.len, PeerFree());
size_t actual;
status = socket_.read(0, next.addr, len, &actual);
if (status != ZX_OK) {
break;
}
*used += actual;
tx_cnt_ += actual;
if (PeerFree() == 0 || !next.has_next || actual < next.len) {
break;
}
status = queue->ReadDesc(next.next, &next);
}
header->len = *used;
return status;
}
zx_status_t VirtioVsock::Connection::Write(VirtioQueue* queue, virtio_vsock_hdr_t* header,
const VirtioDescriptor& desc) {
VirtioDescriptor next = desc;
zx_status_t status = setup_desc_chain(queue, header, &next);
while (status == ZX_OK) {
uint32_t len = std::min(next.len, header->len);
size_t actual;
status = socket_.write(0, next.addr, len, &actual);
if (status != ZX_OK || actual < len) {
// If we've failed to write just reset the connection. Note that it
// should not be possible to receive a ZX_ERR_SHOULD_WAIT here if
// the guest is honoring our credit messages that describe socket
// buffer space.
UpdateOp(VIRTIO_VSOCK_OP_RST);
return ZX_OK;
}
rx_cnt_ += actual;
header->len -= actual;
reported_buf_avail_ -= actual;
if (reported_buf_avail_ == 0 || !next.has_next || header->len == 0) {
return ZX_OK;
}
status = queue->ReadDesc(next.next, &next);
}
return status;
}
VirtioVsock::VirtioVsock(sys::ComponentContext* context, const PhysMem& phys_mem,
async_dispatcher_t* dispatcher)
: VirtioInprocessDevice("Virtio Vsock", phys_mem, 0 /* device_features */),
dispatcher_(dispatcher),
rx_queue_wait_(this, rx_queue()->event(), VirtioQueue::SIGNAL_QUEUE_AVAIL),
tx_queue_wait_(this, tx_queue()->event(), VirtioQueue::SIGNAL_QUEUE_AVAIL),
send_queue_(rx_queue()) {
config_.guest_cid = fuchsia::virtualization::DEFAULT_GUEST_CID;
if (context) {
context->outgoing()->AddPublicService(
fidl::InterfaceRequestHandler<fuchsia::virtualization::GuestVsockEndpoint>(
fit::bind_member(this, &VirtioVsock::Bind)));
}
}
void VirtioVsock::Bind(
fidl::InterfaceRequest<fuchsia::virtualization::GuestVsockEndpoint> request) {
// Construct a request handler that posts a task to the VirtioVsock
// dispatcher. VirtioVsock is not thread-safe and we must ensure that all
// interactions with the endpoint binding set occur on the same thread.
//
// This handler will run on the initial thread, but other interactions run
// on the "vsock-handler" thread. So we post a task to the dispatcher of the
// async loop running on that thread.
async::PostTask(dispatcher_, [this, request = std::move(request)]() mutable {
endpoint_bindings_.AddBinding(this, std::move(request), dispatcher_);
});
}
uint32_t VirtioVsock::guest_cid() const {
std::lock_guard<std::mutex> lock(device_config_.mutex);
return static_cast<uint32_t>(config_.guest_cid);
}
bool VirtioVsock::HasConnection(uint32_t src_cid, uint32_t src_port, uint32_t dst_port) const {
ConnectionKey key{src_cid, src_port, guest_cid(), dst_port};
std::lock_guard<std::mutex> lock(mutex_);
return connections_.find(key) != connections_.end();
}
void VirtioVsock::SetContextId(
uint32_t cid, fidl::InterfaceHandle<fuchsia::virtualization::HostVsockConnector> connector,
fidl::InterfaceRequest<fuchsia::virtualization::GuestVsockAcceptor> acceptor) {
{
std::lock_guard<std::mutex> lock(device_config_.mutex);
config_.guest_cid = cid;
}
acceptor_bindings_.AddBinding(this, std::move(acceptor), dispatcher_);
FX_CHECK(connector_.Bind(std::move(connector), dispatcher_) == ZX_OK);
// Start waiting for incoming packets from the driver.
zx_status_t status = tx_queue_wait_.Begin(dispatcher_);
if (status != ZX_OK && status != ZX_ERR_ALREADY_EXISTS) {
FX_LOGS(ERROR) << "Failed to wait on virtio TX queue: " << zx_status_get_string(status);
}
}
void VirtioVsock::Accept(uint32_t src_cid, uint32_t src_port, uint32_t port, zx::socket socket,
fuchsia::virtualization::GuestVsockAcceptor::AcceptCallback callback) {
if (HasConnection(src_cid, src_port, port)) {
callback(
fuchsia::virtualization::GuestVsockAcceptor_Accept_Result::WithErr(ZX_ERR_ALREADY_BOUND));
return;
}
ConnectionKey key{src_cid, src_port, guest_cid(), port};
auto conn =
Connection::Create(key, std::move(socket), dispatcher_, std::move(callback), [this, key] {
std::lock_guard<std::mutex> lock(mutex_);
WaitOnQueueLocked(key);
});
if (!conn) {
callback(fuchsia::virtualization::GuestVsockAcceptor_Accept_Result::WithErr(
ZX_ERR_CONNECTION_REFUSED));
return;
}
// From here on out the |conn| destructor will handle connection refusal upon
// deletion.
std::lock_guard<std::mutex> lock(mutex_);
AddConnectionLocked(key, std::move(conn));
}
void VirtioVsock::ConnectCallback(ConnectionKey key, zx_status_t status, zx::socket socket,
uint32_t buf_alloc, uint32_t fwd_cnt) {
// If the connection request resulted in an error, send a reset.
if (status != ZX_OK) {
std::lock_guard<std::mutex> lock(mutex_);
SendResetPacket(send_queue_, key);
return;
}
// Create a new connection object to track this virtio socket.
std::unique_ptr<VirtioVsock::Connection> new_conn =
Connection::Create(key, std::move(socket), dispatcher_, nullptr, [this, key] {
std::lock_guard<std::mutex> lock(mutex_);
WaitOnQueueLocked(key);
});
if (!new_conn) {
std::lock_guard<std::mutex> lock(mutex_);
SendResetPacket(send_queue_, key);
return;
}
Connection* conn = new_conn.get();
{
std::lock_guard<std::mutex> lock(mutex_);
zx_status_t add_status = AddConnectionLocked(key, std::move(new_conn));
if (add_status != ZX_OK) {
return;
}
}
conn->UpdateOp(VIRTIO_VSOCK_OP_RESPONSE);
status = conn->Init();
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to setup connection " << status;
}
conn->SetCredit(buf_alloc, fwd_cnt);
}
zx_status_t VirtioVsock::AddConnectionLocked(ConnectionKey key, std::unique_ptr<Connection> conn) {
bool inserted;
std::tie(std::ignore, inserted) = connections_.emplace(key, std::move(conn));
if (!inserted) {
FX_LOGS(ERROR) << "Connection already exists";
return ZX_ERR_ALREADY_EXISTS;
}
WaitOnQueueLocked(key);
return ZX_OK;
}
VirtioVsock::Connection* VirtioVsock::GetConnectionLocked(ConnectionKey key) {
auto it = connections_.find(key);
return it == connections_.end() ? nullptr : it->second.get();
}
void VirtioVsock::RemoveConnectionLocked(ConnectionKey key) {
// Find the connection.
auto it = connections_.find(key);
FX_CHECK(it != connections_.end()) << "Attempted to erase unknown connection.";
// Notify endpoints that it has been terminated.
for (auto& binding : endpoint_bindings_.bindings()) {
binding->events().OnShutdown(key.local_cid, key.local_port, guest_cid(), key.remote_port);
}
// Remove the connection.
connections_.erase(it);
}
void VirtioVsock::WaitOnQueueLocked(ConnectionKey key) {
zx_status_t status = rx_queue_wait_.Begin(dispatcher_);
if (status != ZX_OK && status != ZX_ERR_ALREADY_EXISTS) {
FX_LOGS(ERROR) << "Failed to wait on queue " << status;
RemoveConnectionLocked(key);
return;
}
readable_.insert(key);
}
zx_status_t VirtioVsock::Connection::Transmit(VirtioQueue* queue, virtio_vsock_hdr_t* header,
const VirtioDescriptor& desc, uint32_t* used) {
// Write out the header.
*header = {
.src_cid = key_.local_cid,
.dst_cid = key_.remote_cid,
.src_port = key_.local_port,
.dst_port = key_.remote_port,
.type = VIRTIO_VSOCK_TYPE_STREAM,
.op = op(),
};
// If reading was shutdown, but we're still receiving a read request, send
// a connection reset.
if (op() == VIRTIO_VSOCK_OP_RW && flags_ & VIRTIO_VSOCK_FLAG_SHUTDOWN_RECV) {
UpdateOp(VIRTIO_VSOCK_OP_RST);
FX_LOGS(ERROR) << "Receive was shutdown";
}
zx_status_t write_status = WriteCredit(header);
switch (write_status) {
case ZX_OK:
break;
case ZX_ERR_UNAVAILABLE: {
zx_status_t status = WaitOnTransmit();
if (status != ZX_OK) {
return ZX_ERR_STOP;
}
break;
}
default:
UpdateOp(VIRTIO_VSOCK_OP_RST);
FX_LOGS(ERROR) << "Failed to write credit " << write_status;
break;
}
switch (op()) {
case VIRTIO_VSOCK_OP_REQUEST:
// We are sending a connection request, therefore we move to waiting
// for response.
UpdateOp(VIRTIO_VSOCK_OP_RESPONSE);
return ZX_OK;
case VIRTIO_VSOCK_OP_RESPONSE:
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
// We are sending a response or credit update, therefore we move to ready
// to read/write.
UpdateOp(VIRTIO_VSOCK_OP_RW);
return ZX_OK;
case VIRTIO_VSOCK_OP_RW:
// We are reading from the socket.
return Read(queue, header, desc, used);
case VIRTIO_VSOCK_OP_SHUTDOWN:
header->flags = flags_;
if (header->flags == VIRTIO_VSOCK_FLAG_SHUTDOWN_BOTH) {
// We are sending a full connection shutdown, therefore we move to
// waiting for a connection reset.
UpdateOp(VIRTIO_VSOCK_OP_RST);
} else {
// One side of the connection is still active, therefore we move to
// ready to read/write.
UpdateOp(VIRTIO_VSOCK_OP_RW);
}
return ZX_OK;
default:
case VIRTIO_VSOCK_OP_RST:
// We are sending a connection reset, therefore remove the connection.
header->op = VIRTIO_VSOCK_OP_RST;
return ZX_ERR_STOP;
}
}
bool VirtioVsock::ProcessReadyConnection(ConnectionKey key) {
// Get the connection associated with the key.
Connection* conn = GetConnectionLocked(key);
if (conn == nullptr) {
return true;
}
// Read an available chain.
std::optional<VsockChain> chain = send_queue_.StartWrite();
if (!chain.has_value()) {
return false;
}
// Attempt to transmit data.
uint32_t used = 0;
zx_status_t transmit_status = conn->Transmit(rx_queue(), chain->header(), chain->desc(), &used);
chain->Return(/*used=*/used + sizeof(virtio_vsock_hdr_t));
// If the connection has been closed, remove it.
if (transmit_status == ZX_ERR_STOP) {
RemoveConnectionLocked(key);
return true;
}
// Notify when the connection next has data pending.
zx_status_t wait_status = conn->WaitOnReceive();
if (wait_status != ZX_OK) {
RemoveConnectionLocked(key);
}
return true;
}
void VirtioVsock::Mux(async_dispatcher_t*, async::WaitBase*, zx_status_t status,
const zx_packet_signal_t*) {
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Error while waiting on virtio RX queue: " << zx_status_get_string(status);
return;
}
std::lock_guard<std::mutex> lock(mutex_);
// Send any buffered control packets.
send_queue_.Drain();
// Process all connections that are ready to transmit, until we run out
// of connections or descriptors in the guest's RX queue.
for (auto i = readable_.begin(), end = readable_.end(); i != end; i = readable_.erase(i)) {
bool continue_sending = ProcessReadyConnection(/*key=*/*i);
if (!continue_sending || is_send_queue_full()) {
break;
}
}
// If we still have queued packets or connections waiting to send,
// wait on more descriptors to arrive.
if (!readable_.empty() || send_queue_.buffered_packets() > 0) {
zx_status_t status = rx_queue_wait_.Begin(dispatcher_);
if (status != ZX_OK && status != ZX_ERR_ALREADY_EXISTS) {
FX_LOGS(ERROR) << "Failed to wait on RX queue: " << status;
}
}
}
static void set_shutdown(virtio_vsock_hdr_t* header) {
header->op = VIRTIO_VSOCK_OP_SHUTDOWN;
header->flags = VIRTIO_VSOCK_FLAG_SHUTDOWN_BOTH;
}
zx_status_t VirtioVsock::Connection::Receive(VirtioQueue* queue, virtio_vsock_hdr_t* header,
const VirtioDescriptor& desc) {
// If we are getting a connection request for a connection that already
// exists, then the driver is in a bad state and the connection should be
// shut down.
if (header->op == VIRTIO_VSOCK_OP_REQUEST) {
set_shutdown(header);
FX_LOGS(ERROR) << "Connection request for an existing connection";
}
// We are receiving a write, but send was shutdown.
if (op() == VIRTIO_VSOCK_OP_RW && flags_ & VIRTIO_VSOCK_FLAG_SHUTDOWN_SEND) {
set_shutdown(header);
FX_LOGS(ERROR) << "Send was shutdown";
}
ReadCredit(header);
switch (header->op) {
case VIRTIO_VSOCK_OP_RESPONSE: {
zx_status_t status = Init();
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to setup connection " << status;
return status;
}
return Accept();
}
case VIRTIO_VSOCK_OP_RW:
// We are writing to the socket.
return Write(queue, header, desc);
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
// Credit update is handled outside of this function.
return ZX_OK;
case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
// We received a credit request, therefore we move to sending a credit
// update.
UpdateOp(VIRTIO_VSOCK_OP_CREDIT_UPDATE);
return ZX_OK;
case VIRTIO_VSOCK_OP_RST:
// We received a connection reset, therefore remove the connection.
return ZX_ERR_STOP;
default:
header->flags = VIRTIO_VSOCK_FLAG_SHUTDOWN_BOTH;
case VIRTIO_VSOCK_OP_SHUTDOWN:
if (header->flags == VIRTIO_VSOCK_FLAG_SHUTDOWN_BOTH) {
// We received a full connection shutdown, therefore we move to sending
// a connection reset.
UpdateOp(VIRTIO_VSOCK_OP_RST);
return ZX_OK;
} else if (header->flags != 0) {
return Shutdown(header->flags);
} else {
FX_LOGS(ERROR) << "Connection shutdown with no shutdown flags set";
return ZX_ERR_BAD_STATE;
}
}
}
void VirtioVsock::ProcessIncomingPacket(const VsockChain& chain) {
virtio_vsock_hdr_t* header = chain.header();
ConnectionKey key{
.local_cid = static_cast<uint32_t>(header->dst_cid),
.local_port = header->dst_port,
.remote_cid = guest_cid(),
.remote_port = header->src_port,
};
// Reject packets with unknown socket types.
if (header->type != VIRTIO_VSOCK_TYPE_STREAM) {
FX_LOGS(ERROR) << "Guest sent socket packet with unknown type 0x" << std::hex << header->type;
SendResetPacket(send_queue_, key);
return;
}
// If the source CID does not match guest CID, then the driver is in a
// bad state and the request should be ignored.
if (header->src_cid != guest_cid()) {
FX_LOGS(ERROR) << "Source CID does not match guest CID";
return;
}
// Fetch the connection associated with this packet.
Connection* conn = GetConnectionLocked(key);
if (conn != nullptr) {
// Process the packet.
zx_status_t status = conn->Receive(tx_queue(), header, chain.desc());
if (status != ZX_OK) {
RemoveConnectionLocked(key);
return;
}
// If the connection immediately needs to send an outgoing packet, add
// the connection to the send queue.
if (conn->op() == VIRTIO_VSOCK_OP_RST || conn->op() == VIRTIO_VSOCK_OP_CREDIT_UPDATE) {
WaitOnQueueLocked(key);
return;
}
// Wake up again when the connection next contains data.
status = conn->WaitOnTransmit();
if (status != ZX_OK) {
RemoveConnectionLocked(key);
}
return;
}
// If we have a connector, handle new incoming connections.
if (header->op == VIRTIO_VSOCK_OP_REQUEST && connector_) {
connector_->Connect(static_cast<uint32_t>(header->src_cid), header->src_port,
static_cast<uint32_t>(header->dst_cid), header->dst_port,
[this, key, buf_alloc = header->buf_alloc,
fwd_cnt = header->fwd_cnt](HostVsockConnector_Connect_Result result) {
if (result.is_response()) {
ConnectCallback(key, ZX_OK, std::move(result.response().socket),
buf_alloc, fwd_cnt);
} else {
ConnectCallback(key, result.err(), zx::socket(), buf_alloc, fwd_cnt);
}
});
return;
}
// Otherwise, reject the packet by sending a reset, unless the spurious
// packet was a reset itself.
FX_LOGS(WARNING) << "Received spurious packet from guest";
if (header->op != VIRTIO_VSOCK_OP_RST) {
SendResetPacket(send_queue_, key);
}
}
void VirtioVsock::Demux(async_dispatcher_t*, async::WaitBase*, zx_status_t status,
const zx_packet_signal_t*) {
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Error while waiting on virtio TX queue: " << zx_status_get_string(status);
return;
}
std::lock_guard<std::mutex> lock(mutex_);
// If our outgoing queue is full, abort.
//
// Processing more incoming packets may cause even more outgoing packets to be
// generated, and at this point the guest is unreasonably behind.
if (is_send_queue_full()) {
FX_LOGS(WARNING) << "Guest " << guest_cid()
<< " not responding to sent vsock packets. Stopping receive.";
return;
}
// Process all packets in the guest's TX queue.
do {
std::optional<VsockChain> chain = VsockChain::FromQueue(tx_queue(), /*writable=*/false);
if (!chain.has_value()) {
break;
}
ProcessIncomingPacket(*chain);
chain->Return(/*used=*/0);
} while (!is_send_queue_full());
// Schedule this function to be called again next time the queue receives
// a packet.
status = tx_queue_wait_.Begin(dispatcher_);
if (status != ZX_OK && status != ZX_ERR_ALREADY_EXISTS) {
FX_LOGS(ERROR) << "Failed to wait on TX queue: " << status;
}
}
bool VirtioVsock::is_send_queue_full() const {
return send_queue_.buffered_packets() >= kMaxQueuedPackets;
}