blob: d02d9b45b5c653879d11a03f15b59fd0be6bc6a3 [file] [log] [blame]
// Copyright 2025 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fidl::endpoints::create_sync_proxy;
use starnix_core::device::DeviceOps;
use starnix_core::task::{
CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, WaitCanceler, Waiter,
};
use starnix_core::vfs::{FileObject, FileObjectState, FileOps, InputBufferExt, NamespaceNode};
use starnix_core::{fileops_impl_noop_sync, fileops_impl_seekable};
use starnix_logging::{impossible_error, log_error, log_warn};
use starnix_sync::{FileOpsCore, Locked};
use starnix_uapi::device_type::DeviceType;
use starnix_uapi::error;
use starnix_uapi::errors::{EIO, Errno, errno};
use starnix_uapi::open_flags::OpenFlags;
use starnix_uapi::vfs::FdEvents;
use std::sync::Arc;
use zx::{AsHandleRef, HandleBased, Rights, WaitResult};
use {fidl_fuchsia_hardware_google_nanohub as fnanohub, fidl_fuchsia_starnix_runner as frunner};
use fuchsia_runtime;
#[derive(Clone)]
pub struct DataChannelDevice {
manager: Arc<frunner::ManagerSynchronousProxy>,
service_proxy: Arc<fnanohub::StarnixDataChannelServiceProxy>,
}
impl DataChannelDevice {
pub fn new(service_proxy: fnanohub::StarnixDataChannelServiceProxy) -> Self {
let manager = Arc::new(
fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
.expect("failed to create runner proxy"),
);
DataChannelDevice { manager, service_proxy: Arc::new(service_proxy) }
}
}
impl DeviceOps for DataChannelDevice {
fn open(
&self,
_locked: &mut Locked<FileOpsCore>,
_current_task: &CurrentTask,
_id: DeviceType,
_node: &NamespaceNode,
_flags: OpenFlags,
) -> Result<Box<dyn FileOps>, Errno> {
let unbound_proxy = self
.service_proxy
.connect_to_waitable_sync()
.map_err(|_| errno!(EIO, "Failed to get unbound data channel device"))?;
Ok(Box::new(DataChannelFile::new(Arc::new(unbound_proxy), self.manager.clone())?))
}
}
pub struct DataChannelFile {
manager: Arc<frunner::ManagerSynchronousProxy>,
waitable_client: Arc<fnanohub::WaitableDataChannelSynchronousProxy>,
// Event used to determine when data is available to read or write.
event: Arc<zx::Event>,
}
impl DataChannelFile {
pub fn new(
unbound_waitable_client: Arc<fnanohub::UnboundWaitableDataChannelSynchronousProxy>,
manager: Arc<frunner::ManagerSynchronousProxy>,
) -> Result<Self, Errno> {
let event = zx::Event::create();
let event_dup = event.duplicate_handle(Rights::SAME_RIGHTS).map_err(|e| {
log_error!("Failed to duplicate event handle: {:?}", e);
Errno::new(EIO)
})?;
let wake_source_event = event.duplicate_handle(Rights::SAME_RIGHTS).map_err(|e| {
log_error!("Failed to duplicate event handle for wake source: {:?}", e);
Errno::new(EIO)
})?;
manager
.add_wake_source(frunner::ManagerAddWakeSourceRequest {
container_job: Some(
fuchsia_runtime::job_default()
.duplicate(Rights::SAME_RIGHTS)
.expect("Failed to dup handle"),
),
name: Some("nanohub-datachannel".to_string()),
handle: Some(wake_source_event.into_handle()),
signals: Some((zx::Signals::from_bits_truncate(fnanohub::SIGNAL_WAKELOCK)).bits()),
..Default::default()
})
.map_err(|e| errno!(EIO, e))?;
let (data_channel_proxy, server_end) =
create_sync_proxy::<fnanohub::WaitableDataChannelMarker>();
let req = fnanohub::UnboundWaitableDataChannelBindRequest {
server: Some(server_end),
event: Some(event_dup),
..Default::default()
};
unbound_waitable_client
.bind(req, zx::MonotonicInstant::INFINITE)
.map_err(|e| errno!(EIO, e))?
.map_err(|e| errno!(EIO, e))?;
Ok(DataChannelFile {
manager,
waitable_client: Arc::new(data_channel_proxy),
event: Arc::new(event),
})
}
}
impl FileOps for DataChannelFile {
fileops_impl_seekable!();
fileops_impl_noop_sync!();
fn close(
self: Box<Self>,
_locked: &mut Locked<FileOpsCore>,
_file: &FileObjectState,
_current_task: &CurrentTask,
) {
let event =
self.event.duplicate_handle(Rights::SAME_RIGHTS).expect("Failed to duplicate event");
let _ = self
.manager
.remove_wake_source(frunner::ManagerRemoveWakeSourceRequest {
container_job: Some(
fuchsia_runtime::job_default()
.duplicate(Rights::SAME_RIGHTS)
.expect("Failed to dup handle"),
),
handle: Some(event.into_handle()),
..Default::default()
})
.map_err(|_| {
log_error!("Failed to remove wake source");
});
}
fn read(
&self,
locked: &mut Locked<FileOpsCore>,
file: &FileObject,
current_task: &CurrentTask,
_offset: usize,
data: &mut dyn starnix_core::vfs::OutputBuffer,
) -> Result<usize, Errno> {
file.blocking_op(locked, current_task, FdEvents::POLLIN | FdEvents::POLLHUP, None, |_| {
match self
.waitable_client
.read(zx::MonotonicInstant::INFINITE)
.map_err(|e| errno!(EIO, e))?
{
Ok(response) => {
// Keep the wake lease alive until the data has been processed.
// The lease is dropped at the end of this scope.
if let Some(d) = response.data {
if d.len() > data.available() {
log_warn!("Data returned by datachannel too large for buffer");
// We will drop data in this case.
}
data.write(&d)
} else {
Ok(0)
}
}
Err(zx::sys::ZX_ERR_SHOULD_WAIT) => error!(EAGAIN),
Err(e) => error!(EIO, e),
}
})
}
fn write(
&self,
locked: &mut Locked<FileOpsCore>,
file: &FileObject,
current_task: &CurrentTask,
_offset: usize,
data: &mut dyn starnix_core::vfs::InputBuffer,
) -> Result<usize, Errno> {
let data_vector = data.read_to_vec_limited(fnanohub::MAX_MESSAGE_SIZE as usize)?;
let len = data_vector.len();
file.blocking_op(locked, current_task, FdEvents::POLLOUT | FdEvents::POLLHUP, None, |_| {
let request = fnanohub::WaitableDataChannelWriteRequest {
data: Some(data_vector.clone()),
..Default::default()
};
match self
.waitable_client
.write(&request, zx::MonotonicInstant::INFINITE)
.map_err(|e| errno!(EIO, e))?
{
Ok(_) => Ok(len),
Err(e) if e == zx::sys::ZX_ERR_NO_RESOURCES => error!(EAGAIN, e),
Err(e) => error!(EIO, e),
}
})
}
fn query_events(
&self,
_locked: &mut Locked<FileOpsCore>,
_file: &FileObject,
_current_task: &CurrentTask,
) -> Result<FdEvents, Errno> {
let current_events = self.event.wait_handle(
zx::Signals::from_bits_truncate(fnanohub::SIGNAL_READABLE | fnanohub::SIGNAL_WRITABLE)
| zx::Signals::CHANNEL_PEER_CLOSED,
zx::MonotonicInstant::INFINITE_PAST,
);
match current_events {
WaitResult::Ok(signals) => Ok(get_events_from_signals(signals)),
WaitResult::TimedOut(_) => Ok(FdEvents::empty()),
WaitResult::Canceled(_) => {
error!(EAGAIN)
}
WaitResult::Err(e) => Err(impossible_error(e)),
}
}
fn wait_async(
&self,
_locked: &mut Locked<FileOpsCore>,
_file: &FileObject,
_current_task: &CurrentTask,
waiter: &Waiter,
events: FdEvents,
handler: EventHandler,
) -> Option<WaitCanceler> {
let signal_handler = SignalHandler {
inner: SignalHandlerInner::ZxHandle(get_events_from_signals),
event_handler: handler,
err_code: None,
};
let pw = waiter
.wake_on_zircon_signals(
&self.event.as_handle_ref(),
get_signals_from_events(events),
signal_handler,
)
.unwrap();
Some(WaitCanceler::new_port(pw))
}
}
fn get_signals_from_events(events: FdEvents) -> zx::Signals {
let mut result = zx::Signals::empty();
if events.contains(FdEvents::POLLIN) {
result |= zx::Signals::from_bits_truncate(fnanohub::SIGNAL_READABLE);
}
if events.contains(FdEvents::POLLOUT) {
result |= zx::Signals::from_bits_truncate(fnanohub::SIGNAL_WRITABLE);
}
if events.contains(FdEvents::POLLHUP) {
result |= zx::Signals::CHANNEL_PEER_CLOSED;
}
result
}
fn get_events_from_signals(signals: zx::Signals) -> FdEvents {
let mut result = FdEvents::empty();
if signals.contains(zx::Signals::from_bits_truncate(fnanohub::SIGNAL_READABLE)) {
result |= FdEvents::POLLIN;
}
if signals.contains(zx::Signals::from_bits_truncate(fnanohub::SIGNAL_WRITABLE)) {
result |= FdEvents::POLLOUT;
}
if signals.contains(zx::Signals::CHANNEL_PEER_CLOSED) {
result |= FdEvents::POLLHUP;
}
result
}