blob: 980e6b2b7b603c1a1643c0c78b9cc4b1cc3abd3c [file] [log] [blame] [edit]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::Container;
use anyhow::{Context as _, Error};
use fidl::endpoints::{ControlHandle, RequestStream, ServerEnd};
use fuchsia_async::{
DurationExt, {self as fasync},
};
use futures::channel::oneshot;
use futures::{
AsyncReadExt, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt, pin_mut,
select,
};
use starnix_core::execution::{create_init_child_process, execute_task_with_prerun_result};
use starnix_core::fs::devpts::create_main_and_replica;
use starnix_core::fs::fuchsia::create_fuchsia_pipe;
use starnix_core::task::dynamic_thread_spawner::SpawnRequestBuilder;
use starnix_core::task::{CurrentTask, ExitStatus, Kernel, LockedAndTask, ProcessEntryRef};
use starnix_core::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
use starnix_core::vfs::file_server::serve_file_at;
use starnix_core::vfs::socket::VsockSocket;
use starnix_core::vfs::{FdFlags, FileHandle};
use starnix_logging::{log_error, log_warn};
use starnix_modules_framebuffer::Framebuffer;
use starnix_sync::{Locked, Unlocked};
use starnix_task_command::TaskCommand;
use starnix_uapi::auth::Credentials;
use starnix_uapi::errors::Errno;
use starnix_uapi::open_flags::OpenFlags;
use starnix_uapi::signals::UncheckedSignal;
use starnix_uapi::{errno, error, uapi};
use std::ffi::CString;
use std::ops::DerefMut;
use {
fidl_fuchsia_component_runner as frunner, fidl_fuchsia_element as felement,
fidl_fuchsia_io as fio, fidl_fuchsia_memory_attribution as fattribution,
fidl_fuchsia_posix as fposix, fidl_fuchsia_starnix_binder as fbinder,
fidl_fuchsia_starnix_container as fstarcontainer,
};
use super::start_component;
pub fn expose_root(
locked: &mut Locked<Unlocked>,
system_task: &CurrentTask,
server_end: ServerEnd<fio::DirectoryMarker>,
) -> Result<(), Error> {
let root_file = system_task.open_file(locked, "/".into(), OpenFlags::RDONLY)?;
serve_file_at(server_end.into_channel().into(), system_task, &root_file, Credentials::root())?;
Ok(())
}
pub async fn serve_component_runner(
request_stream: frunner::ComponentRunnerRequestStream,
system_task: &CurrentTask,
) -> Result<(), Error> {
request_stream
.try_for_each_concurrent(None, |event| async {
match event {
frunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
if let Err(e) = start_component(start_info, controller, system_task).await {
log_error!("failed to start component: {:?}", e);
}
}
frunner::ComponentRunnerRequest::_UnknownMethod { ordinal, .. } => {
log_warn!("Unknown ComponentRunner request: {ordinal}");
}
}
Ok(())
})
.await
.map_err(Error::from)
}
fn to_winsize(window_size: Option<fstarcontainer::ConsoleWindowSize>) -> uapi::winsize {
window_size
.map(|window_size| uapi::winsize {
ws_row: window_size.rows,
ws_col: window_size.cols,
ws_xpixel: window_size.x_pixels,
ws_ypixel: window_size.y_pixels,
})
.unwrap_or(uapi::winsize::default())
}
async fn spawn_console(
kernel: &Kernel,
payload: fstarcontainer::ControllerSpawnConsoleRequest,
) -> Result<Result<u8, fstarcontainer::SpawnConsoleError>, Error> {
if let (Some(console_in), Some(console_out), Some(binary_path)) =
(payload.console_in, payload.console_out, payload.binary_path)
{
let binary_path = CString::new(binary_path)?;
let argv = payload
.argv
.unwrap_or(vec![])
.into_iter()
.map(CString::new)
.collect::<Result<Vec<_>, _>>()?;
let environ = payload
.environ
.unwrap_or(vec![])
.into_iter()
.map(CString::new)
.collect::<Result<Vec<_>, _>>()?;
let window_size = to_winsize(payload.window_size);
let current_task = create_init_child_process(
kernel.kthreads.unlocked_for_async().deref_mut(),
&kernel.weak_self.upgrade().expect("Kernel must still be alive"),
TaskCommand::new(binary_path.as_bytes()),
Credentials::with_ids(0, 0),
None,
)?;
let (sender, receiver) = oneshot::channel();
let pty = execute_task_with_prerun_result(
kernel.kthreads.unlocked_for_async().deref_mut(),
current_task,
move |locked, current_task| {
let executable = current_task.open_file(
locked,
binary_path.as_bytes().into(),
OpenFlags::RDONLY,
)?;
current_task.exec(locked, executable, binary_path, argv, environ)?;
let (pty, pts) = create_main_and_replica(locked, &current_task, window_size)?;
let fd_flags = FdFlags::empty();
assert_eq!(0, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
assert_eq!(1, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
assert_eq!(2, current_task.add_file(locked, pts, fd_flags)?.raw());
Ok(pty)
},
move |result| {
let _ = match result {
Ok(ExitStatus::Exit(exit_code)) => sender.send(Ok(exit_code)),
_ => sender.send(Err(fstarcontainer::SpawnConsoleError::Canceled)),
};
},
None,
)?;
let _ = forward_to_pty(kernel, console_in, console_out, pty).map_err(|e| {
log_error!("failed to forward to terminal {:?}", e);
});
Ok(receiver.await?)
} else {
Ok(Err(fstarcontainer::SpawnConsoleError::InvalidArgs))
}
}
pub async fn serve_container_controller(
request_stream: fstarcontainer::ControllerRequestStream,
system_task: &CurrentTask,
) -> Result<(), Error> {
request_stream
.map_err(Error::from)
.try_for_each_concurrent(None, |event| async {
match event {
fstarcontainer::ControllerRequest::VsockConnect {
payload:
fstarcontainer::ControllerVsockConnectRequest { port, bridge_socket, .. },
..
} => {
let Some(port) = port else {
log_warn!("vsock connection missing port");
return Ok(());
};
let Some(bridge_socket) = bridge_socket else {
log_warn!("vsock connection missing bridge_socket");
return Ok(());
};
connect_to_vsock(port, bridge_socket, system_task).await.unwrap_or_else(|e| {
log_error!("failed to connect to vsock {:?}", e);
});
}
fstarcontainer::ControllerRequest::SpawnConsole { payload, responder } => {
responder.send(spawn_console(system_task.kernel(), payload).await?)?;
}
fstarcontainer::ControllerRequest::GetVmoReferences { payload, responder } => {
if let Some(koid) = payload.koid {
let thread_groups = system_task
.kernel()
.pids
.read()
.get_thread_groups()
.collect::<Vec<_>>();
let mut results = vec![];
for thread_group in thread_groups {
if let Some(leader) =
system_task.get_task(thread_group.leader).upgrade()
{
if let Ok(live) = leader.live() {
let files = &live.files;
let fds = files.get_all_fds();
for fd in fds {
if let Ok(file) = files.get(fd) {
if let Ok(memory) = file.get_memory(
system_task
.kernel()
.kthreads
.unlocked_for_async()
.deref_mut(),
system_task,
None,
starnix_core::mm::ProtectionFlags::READ,
) {
let memory_koid = memory
.info()
.expect("Failed to get memory info")
.koid;
if memory_koid.raw_koid() == koid {
let process_name = thread_group
.process
.get_name()
.unwrap_or_default();
results.push(fstarcontainer::VmoReference {
process_name: Some(
process_name.to_string(),
),
pid: Some(leader.get_pid() as u64),
fd: Some(fd.raw()),
koid: Some(koid),
..Default::default()
});
}
}
}
}
}
}
}
let _ =
responder.send(&fstarcontainer::ControllerGetVmoReferencesResponse {
references: Some(results),
..Default::default()
});
}
}
fstarcontainer::ControllerRequest::GetJobHandle { responder } => {
let _result = responder.send(fstarcontainer::ControllerGetJobHandleResponse {
job: Some(
fuchsia_runtime::job_default()
.duplicate(zx::Rights::SAME_RIGHTS)
.expect("Failed to dup handle"),
),
..Default::default()
});
}
fstarcontainer::ControllerRequest::SendSignal {
payload:
fstarcontainer::ControllerSendSignalRequest {
pid: Some(pid),
signal: Some(signal),
..
},
responder,
} => {
let pids = system_task.kernel().pids.read();
if let Some(ProcessEntryRef::Process(target_thread_group)) =
pids.get_process(pid)
{
#[allow(
clippy::undocumented_unsafe_blocks,
reason = "Force documented unsafe blocks in Starnix"
)]
match unsafe {
target_thread_group.send_signal_unchecked_debug(
system_task,
UncheckedSignal::new(signal),
)
} {
Ok(_) => {
let _result = responder.send(Ok(()));
}
Err(_) => {
let _result =
responder.send(Err(fstarcontainer::SignalError::InvalidSignal));
}
};
} else {
let _result =
responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
};
}
// The request did not contain both a signal and a target pid.
fstarcontainer::ControllerRequest::SendSignal { responder, .. } => {
log_error!("malformed SendSignal request");
let _result = responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
}
fstarcontainer::ControllerRequest::SetSyscallLogFilter { payload, responder } => {
if let Some(process_name) = payload.process_name {
system_task.kernel().add_syscall_log_filter(&process_name);
let _ = responder.send(Ok(()));
} else {
let _ = responder.send(Err(
fstarcontainer::SetSyscallLogFilterError::MissingProcessName,
));
}
}
fstarcontainer::ControllerRequest::ClearSyscallLogFilters { responder } => {
system_task.kernel().clear_syscall_log_filters();
let _ = responder.send();
}
fstarcontainer::ControllerRequest::_UnknownMethod { .. } => (),
}
Ok(())
})
.await
}
async fn connect_to_vsock(
port: u32,
bridge_socket: fidl::Socket,
system_task: &CurrentTask,
) -> Result<(), Error> {
let socket = loop {
if let Ok(socket) = system_task.kernel().default_abstract_vsock_namespace.lookup(&port) {
break socket;
};
fasync::Timer::new(fasync::MonotonicDuration::from_millis(100).after_now()).await;
};
let pipe = create_fuchsia_pipe(
system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
system_task,
bridge_socket,
OpenFlags::RDWR | OpenFlags::NONBLOCK,
)?;
socket.downcast_socket::<VsockSocket>().unwrap().remote_connection(
system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
&socket,
system_task,
pipe,
)?;
Ok(())
}
fn forward_to_pty(
kernel: &Kernel,
console_in: fidl::Socket,
console_out: fidl::Socket,
pty: FileHandle,
) -> Result<(), Error> {
// Matches fuchsia.io.Transfer capacity, somewhat arbitrarily.
const BUFFER_CAPACITY: usize = 8192;
let mut rx = fuchsia_async::Socket::from_socket(console_in);
let mut tx = fuchsia_async::Socket::from_socket(console_out);
let pty_sink = pty.clone();
let closure = async move |locked_and_task: LockedAndTask<'_>| {
let _result: Result<(), Error> = (async || {
let mut buffer = vec![0u8; BUFFER_CAPACITY];
loop {
let bytes = rx.read(&mut buffer[..]).await?;
if bytes == 0 {
return Ok(());
}
pty_sink.write(
&mut locked_and_task.unlocked(),
locked_and_task.current_task(),
&mut VecInputBuffer::new(&buffer[..bytes]),
)?;
}
})()
.await;
};
let req = SpawnRequestBuilder::new()
.with_debug_name("forward-to-pty-in")
.with_async_closure(closure)
.build();
kernel.kthreads.spawner().spawn_from_request(req);
let pty_source = pty;
let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
let _result: Result<(), Error> =
fasync::LocalExecutor::default().run_singlethreaded(async {
let mut buffer = VecOutputBuffer::new(BUFFER_CAPACITY);
loop {
buffer.reset();
let bytes = pty_source.read(locked, current_task, &mut buffer)?;
if bytes == 0 {
return Ok(());
}
tx.write_all(buffer.data()).await?;
}
});
};
let req = SpawnRequestBuilder::new()
.with_debug_name("forward-to-pty-out")
.with_sync_closure(closure)
.build();
kernel.kthreads.spawner().spawn_from_request(req);
Ok(())
}
pub async fn serve_graphical_presenter(
mut request_stream: felement::GraphicalPresenterRequestStream,
kernel: &Kernel,
) -> Result<(), Error> {
while let Some(request) = request_stream.next().await {
match request.context("reading graphical presenter request")? {
felement::GraphicalPresenterRequest::PresentView {
view_spec,
annotation_controller: _,
view_controller_request: _,
responder,
} => match view_spec.viewport_creation_token {
Some(token) => {
let fb = Framebuffer::get(kernel).context("getting framebuffer from kernel")?;
fb.present_view(token);
let _ = responder.send(Ok(()));
}
None => {
let _ = responder.send(Err(felement::PresentViewError::InvalidArgs));
}
},
}
}
Ok(())
}
/// Serves the memory attribution provider for the Kernel ELF component.
pub fn serve_memory_attribution_provider_elfkernel(
mut request_stream: fattribution::ProviderRequestStream,
container: &Container,
) -> impl Future<Output = Result<(), Error>> {
let observer = container.new_memory_attribution_observer(request_stream.control_handle());
async move {
while let Some(event) = request_stream.try_next().await? {
match event {
fattribution::ProviderRequest::Get { responder } => {
observer.next(responder);
}
fattribution::ProviderRequest::_UnknownMethod {
ordinal, control_handle, ..
} => {
log_error!("Invalid request to AttributionProvider: {ordinal}");
control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
}
}
}
Ok(())
}
}
/// Serves the memory attribution provider for the Container component.
pub fn serve_memory_attribution_provider_container(
mut request_stream: fattribution::ProviderRequestStream,
kernel: &Kernel,
) -> impl Future<Output = ()> + use<> {
let observer = kernel.new_memory_attribution_observer(request_stream.control_handle());
async move {
while let Some(event) = request_stream
.try_next()
.await
.inspect_err(|err| {
log_warn!("Error while serving container memory attribution: {:?}", err)
})
.ok()
.flatten()
{
match event {
fattribution::ProviderRequest::Get { responder } => {
observer.next(responder);
}
fattribution::ProviderRequest::_UnknownMethod {
ordinal, control_handle, ..
} => {
log_error!("Invalid request to AttributionProvider: {ordinal}");
control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
}
}
}
}
}
async fn select_first<O>(f1: impl Future<Output = O>, f2: impl Future<Output = O>) -> O {
let f1 = f1.fuse();
let f2 = f2.fuse();
pin_mut!(f1, f2);
select! {
f1 = f1 => f1,
f2 = f2 => f2,
}
}
/// Serve the LutexController protocol.
pub async fn serve_lutex_controller(
request_stream: fbinder::LutexControllerRequestStream,
current_task: &CurrentTask,
) -> Result<(), Error> {
let kernel = current_task.kernel();
request_stream
.map_err(Error::from)
.try_for_each_concurrent(None, |event| async move {
match event {
fbinder::LutexControllerRequest::WaitBitset { payload, responder } => {
let deadline_and_receiver = (|| {
let mut unlocked = kernel.kthreads.unlocked_for_async();
let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
let value = payload.value.ok_or_else(|| errno!(EINVAL))?;
let mask = payload.mask.unwrap_or(u32::MAX);
let deadline = payload.deadline.map(zx::MonotonicInstant::from_nanos);
kernel
.shared_futexes
.external_wait(&mut unlocked, vmo.into(), offset, value, mask)
.map(|(event, receiver)| (deadline, event, receiver))
})();
let result = match deadline_and_receiver {
Ok((deadline, event, receiver)) => {
// We construct a specific `wait_fut` to explicitly bind the lifecycle
// of the `event` to the duration of this wait operation. If the FIDL
// client disconnects (or the future is otherwise dropped/cancelled),
// the `event` is immediately dropped. This zeros the strong reference
// count on the `InterruptibleEvent`, signaling to the `FutexTable`
// that this external waiter is now stale and should be garbage
// collected on its next cleanup pass.
let wait_fut = async move {
let _event = event;
let receiver = receiver.map_err(|_| errno!(EINTR));
if let Some(deadline) = deadline {
let timer =
fasync::Timer::new(deadline).map(|_| error!(ETIMEDOUT));
select_first(timer, receiver).await
} else {
receiver.await
}
};
wait_fut.await
}
Err(e) => Err(e),
};
let result = result.map_err(|e: Errno| {
fposix::Errno::from_primitive(e.code.error_code() as i32)
.unwrap_or(fposix::Errno::Einval)
});
responder
.send(result)
.context("Unable to send LutexControllerRequest::WaitBitset response")?;
}
fbinder::LutexControllerRequest::WakeBitset { payload, responder } => {
let result = (|| {
let mut unlocked = kernel.kthreads.unlocked_for_async();
let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
let count = payload.count.ok_or_else(|| errno!(EINVAL))?;
let mask = payload.mask.unwrap_or(u32::MAX);
kernel.shared_futexes.external_wake(
&mut unlocked,
vmo.into(),
offset,
count as usize,
mask,
)
})();
let result = result
.map(|count| fbinder::WakeResponse {
count: Some(count as u64),
..fbinder::WakeResponse::default()
})
.map_err(|e: Errno| {
fposix::Errno::from_primitive(e.code.error_code() as i32)
.unwrap_or(fposix::Errno::Einval)
});
responder
.send(result)
.context("Unable to send LutexControllerRequest::WakeBitset response")?;
}
fbinder::LutexControllerRequest::_UnknownMethod { ordinal, .. } => {
log_warn!("Unknown LutexController ordinal: {}", ordinal);
}
}
Ok(())
})
.await
.context("failed fbinder::LutexController request")
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fidl::HandleBased;
use starnix_core::testing::*;
use {fidl_fuchsia_posix as fposix, zx};
#[fuchsia::test]
async fn lutex_controller_test() {
spawn_kernel_and_run(async |mut _locked, current_task| {
let (sender, receiver) = oneshot::channel::<()>();
current_task.kernel.kthreads.spawn_future(
{
let kernel = current_task.kernel.clone();
move || async move {
let (lutex_controller, stream) = fidl::endpoints::create_proxy_and_stream::<
fbinder::LutexControllerMarker,
>();
// Spawn the server
let server_fut =
serve_lutex_controller(stream, kernel.kthreads.system_task());
let client_fut = async move {
const VMO_SIZE: usize = 4 * 1024;
let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("Vmo::create");
// Wait on an incorrect value.
let wait = lutex_controller
.wait_bitset(fbinder::WaitBitsetRequest {
vmo: Some(
vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
.expect("duplicate vmo"),
),
offset: Some(0),
value: Some(1),
..Default::default()
})
.await
.expect("got_answer");
assert_matches!(wait, Err(fposix::Errno::Eagain));
// Wait with a timeout
let wait = lutex_controller
.wait_bitset(fbinder::WaitBitsetRequest {
vmo: Some(
vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
.expect("duplicate vmo"),
),
offset: Some(0),
value: Some(0),
deadline: Some(0),
..Default::default()
})
.await
.expect("got_answer");
assert_matches!(wait, Err(fposix::Errno::Etimedout));
let mut wait = Box::pin(
lutex_controller.wait_bitset(fbinder::WaitBitsetRequest {
vmo: Some(
vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
.expect("duplicate vmo"),
),
offset: Some(0),
value: Some(0),
deadline: None,
..Default::default()
}),
);
// The wait is correct, the future should stay pending until a wake.
assert!(futures::poll!(&mut wait).is_pending());
let waken_up: fbinder::WakeResponse = lutex_controller
.wake_bitset(fbinder::WakeBitsetRequest {
vmo: Some(
vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
.expect("duplicate vmo"),
),
offset: Some(0),
count: Some(1),
mask: None,
..Default::default()
})
.await
.expect("wake_answer")
.expect("wake_response");
assert_eq!(waken_up.count, Some(1));
// The wait should now return.
assert!(wait.await.expect("await_answer").is_ok());
};
let (server_res, _) = futures::join!(server_fut, client_fut);
server_res.expect("server failed");
let _ = sender.send(());
}
},
"lutex_controller_test",
);
receiver.await.expect("test failed");
})
.await;
}
}