blob: 5f4b14b1aec7143f95db74d75b41975086aabfd8 [file] [log] [blame] [edit]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use starnix_lock::{Mutex, MutexGuard};
use std::{convert::TryInto, sync::Arc};
use crate::{
fs::{
buffers::{
InputBuffer, InputBufferCallback, MessageQueue, OutputBuffer, OutputBufferCallback,
},
default_fcntl, default_ioctl, fileops_impl_nonseekable, CacheMode, FdEvents, FileHandle,
FileObject, FileOps, FileSystem, FileSystemHandle, FileSystemOps, FileSystemOptions,
FsNodeInfo, FsStr, MountInfo, SpecialNode,
},
mm::{MemoryAccessorExt, PAGE_SIZE},
signals::{send_signal, SignalInfo},
syscalls::{
errno, error, mode, statfs, uapi, Errno, OpenFlags, SyscallArg, SyscallResult, UserAddress,
UserRef, FIONREAD, F_GETPIPE_SZ, F_SETPIPE_SZ, PIPEFS_MAGIC, SIGPIPE, SUCCESS,
},
task::{CurrentTask, EventHandler, Kernel, WaitCanceler, WaitQueue, Waiter},
};
const ATOMIC_IO_BYTES: u16 = 4096;
const PIPE_MAX_SIZE: usize = 1048576; // From pipe.go in gVisor.
fn round_up(value: usize, increment: usize) -> usize {
(value + (increment - 1)) & !(increment - 1)
}
#[derive(Debug)]
pub struct Pipe {
messages: MessageQueue,
waiters: WaitQueue,
/// The number of open readers.
reader_count: usize,
/// Whether the pipe has ever had a reader.
had_reader: bool,
/// The number of open writers.
writer_count: usize,
/// Whether the pipe has ever had a writer.
had_writer: bool,
}
impl Default for Pipe {
fn default() -> Self {
// The default size of a pipe is 16 pages.
let default_pipe_capacity = (*PAGE_SIZE * 16) as usize;
Pipe {
messages: MessageQueue::new(default_pipe_capacity),
waiters: WaitQueue::default(),
reader_count: 0,
had_reader: false,
writer_count: 0,
had_writer: false,
}
}
}
pub type PipeHandle = Arc<Mutex<Pipe>>;
impl Pipe {
pub fn new() -> PipeHandle {
Arc::new(Mutex::new(Pipe::default()))
}
pub fn open(pipe: &Arc<Mutex<Self>>, flags: OpenFlags) -> Result<Box<dyn FileOps>, Errno> {
let mut events = FdEvents::empty();
let mut pipe_locked = pipe.lock();
if flags.can_read() {
if !pipe_locked.had_reader {
events |= FdEvents::POLLOUT;
}
pipe_locked.add_reader();
}
if flags.can_write() {
// https://man7.org/linux/man-pages/man2/open.2.html says:
//
// ENXIO O_NONBLOCK | O_WRONLY is set, the named file is a FIFO,
// and no process has the FIFO open for reading.
if flags.contains(OpenFlags::NONBLOCK) && pipe_locked.reader_count == 0 {
assert!(!flags.can_read()); // Otherwise we would have called add_reader() above.
return error!(ENXIO);
}
if !pipe_locked.had_writer {
events |= FdEvents::POLLIN;
}
pipe_locked.add_writer();
}
if events != FdEvents::empty() {
pipe_locked.waiters.notify_fd_events(events);
}
Ok(Box::new(PipeFileObject { pipe: Arc::clone(pipe) }))
}
/// Increments the reader count for this pipe by 1.
pub fn add_reader(&mut self) {
self.reader_count += 1;
self.had_reader = true;
}
/// Increments the writer count for this pipe by 1.
pub fn add_writer(&mut self) {
self.writer_count += 1;
self.had_writer = true;
}
fn capacity(&self) -> usize {
self.messages.capacity()
}
fn set_capacity(&mut self, mut requested_capacity: usize) -> Result<(), Errno> {
if requested_capacity > PIPE_MAX_SIZE {
return error!(EINVAL);
}
let page_size = *PAGE_SIZE as usize;
if requested_capacity < page_size {
requested_capacity = page_size;
}
requested_capacity = round_up(requested_capacity, page_size);
self.messages.set_capacity(requested_capacity)
}
fn is_readable(&self) -> bool {
!self.messages.is_empty() || (self.writer_count == 0 && self.had_writer)
}
/// Returns whether the pipe can accommodate at least part of a message of length `data_size`.
fn is_writable(&self, data_size: usize) -> bool {
// POSIX requires that a write smaller than PIPE_BUF be atomic, but requires no
// atomicity for writes larger than this.
self.had_reader
&& (self.messages.available_capacity() >= data_size
|| data_size > uapi::PIPE_BUF as usize)
}
pub fn read(&mut self, data: &mut dyn OutputBuffer) -> Result<usize, Errno> {
// If there isn't any data to read from the pipe, then the behavior
// depends on whether there are any open writers. If there is an
// open writer, then we return EAGAIN, to signal that the callers
// should wait for the writer to write something into the pipe.
// Otherwise, we'll fall through the rest of this function and
// return that we have read zero bytes, which will let the caller
// know that they're done reading the pipe.
if !self.is_readable() {
return error!(EAGAIN);
}
self.messages.read_stream(data).map(|info| info.bytes_read)
}
pub fn write(
&mut self,
current_task: &CurrentTask,
data: &mut dyn InputBuffer,
) -> Result<usize, Errno> {
if !self.had_reader {
return error!(EAGAIN);
}
if self.reader_count == 0 {
send_signal(current_task, SignalInfo::default(SIGPIPE));
return error!(EPIPE);
}
if !self.is_writable(data.available()) {
return error!(EAGAIN);
}
self.messages.write_stream(data, None, &mut vec![])
}
fn query_events(&self, flags: OpenFlags) -> FdEvents {
let mut events = FdEvents::empty();
if flags.can_read() && self.is_readable() {
let writer_closed = self.writer_count == 0 && self.had_writer;
let has_data = !self.messages.is_empty();
if writer_closed {
events |= FdEvents::POLLHUP;
}
if !writer_closed || has_data {
events |= FdEvents::POLLIN;
}
}
if flags.can_write() && self.is_writable(1) {
if self.reader_count == 0 && self.had_reader {
events |= FdEvents::POLLERR;
}
events |= FdEvents::POLLOUT;
}
events
}
fn fcntl(
&mut self,
_file: &FileObject,
_current_task: &CurrentTask,
cmd: u32,
arg: u64,
) -> Result<SyscallResult, Errno> {
match cmd {
F_GETPIPE_SZ => Ok(self.capacity().into()),
F_SETPIPE_SZ => {
self.set_capacity(arg as usize)?;
Ok(self.capacity().into())
}
_ => default_fcntl(cmd),
}
}
fn ioctl(
&self,
file: &FileObject,
current_task: &CurrentTask,
request: u32,
arg: SyscallArg,
) -> Result<SyscallResult, Errno> {
let user_addr = UserAddress::from(arg);
match request {
FIONREAD => {
let addr = UserRef::<i32>::new(user_addr);
let value: i32 = self.messages.len().try_into().map_err(|_| errno!(EINVAL))?;
current_task.write_object(addr, &value)?;
Ok(SUCCESS)
}
_ => default_ioctl(file, current_task, request, arg),
}
}
fn notify_read(&self) {
self.waiters.notify_fd_events(FdEvents::POLLOUT);
}
fn notify_write(&self) {
self.waiters.notify_fd_events(FdEvents::POLLIN);
}
}
/// Creates a new pipe between the two returned FileObjects.
///
/// The first FileObject is the read endpoint of the pipe. The second is the
/// write endpoint of the pipe. This order matches the order expected by
/// sys_pipe2().
pub fn new_pipe(current_task: &CurrentTask) -> Result<(FileHandle, FileHandle), Errno> {
let fs = pipe_fs(current_task.kernel());
let node = fs.create_node(SpecialNode, |id| {
let mut info = FsNodeInfo::new(id, mode!(IFIFO, 0o600), current_task.as_fscred());
info.blksize = ATOMIC_IO_BYTES.into();
info
});
let open = |flags: OpenFlags| {
Ok(FileObject::new_anonymous(
node.open(current_task, &MountInfo::detached(), flags, false)?,
Arc::clone(&node),
flags,
))
};
Ok((open(OpenFlags::RDONLY)?, open(OpenFlags::WRONLY)?))
}
struct PipeFs;
impl FileSystemOps for PipeFs {
fn statfs(&self, _fs: &FileSystem, _current_task: &CurrentTask) -> Result<statfs, Errno> {
Ok(statfs::default(PIPEFS_MAGIC))
}
fn name(&self) -> &'static FsStr {
b"pipe"
}
}
fn pipe_fs(kernel: &Arc<Kernel>) -> &FileSystemHandle {
kernel.pipe_fs.get_or_init(|| {
FileSystem::new(kernel, CacheMode::Uncached, PipeFs, FileSystemOptions::default())
})
}
pub struct PipeFileObject {
pipe: Arc<Mutex<Pipe>>,
}
impl FileOps for PipeFileObject {
fileops_impl_nonseekable!();
fn close(&self, file: &FileObject) {
let mut events = FdEvents::empty();
let mut pipe = self.pipe.lock();
let flags = file.flags();
if flags.can_read() {
assert!(pipe.reader_count > 0);
pipe.reader_count -= 1;
if pipe.reader_count == 0 {
events |= FdEvents::POLLOUT;
}
}
if flags.can_write() {
assert!(pipe.writer_count > 0);
pipe.writer_count -= 1;
if pipe.writer_count == 0 {
if pipe.reader_count > 0 {
events |= FdEvents::POLLHUP;
}
if !pipe.messages.is_empty() {
events |= FdEvents::POLLIN;
}
}
}
if events != FdEvents::empty() {
pipe.waiters.notify_fd_events(events);
}
}
fn read(
&self,
file: &FileObject,
current_task: &CurrentTask,
offset: usize,
data: &mut dyn OutputBuffer,
) -> Result<usize, Errno> {
debug_assert!(offset == 0);
file.blocking_op(current_task, FdEvents::POLLIN | FdEvents::POLLHUP, None, || {
let mut pipe = self.pipe.lock();
let actual = pipe.read(data)?;
if actual > 0 {
pipe.notify_read();
}
Ok(actual)
})
}
fn write(
&self,
file: &FileObject,
current_task: &CurrentTask,
offset: usize,
data: &mut dyn InputBuffer,
) -> Result<usize, Errno> {
debug_assert!(offset == 0);
debug_assert!(data.bytes_read() == 0);
let result = file.blocking_op(current_task, FdEvents::POLLOUT, None, || {
let mut pipe = self.pipe.lock();
let offset_before = data.bytes_read();
let bytes_written = pipe.write(current_task, data)?;
debug_assert!(data.bytes_read() - offset_before == bytes_written);
if bytes_written > 0 {
pipe.notify_write();
}
if data.available() > 0 {
return error!(EAGAIN);
}
Ok(())
});
let bytes_written = data.bytes_read();
if bytes_written == 0 {
// We can only return an error if no data was actually sent. If partial data was
// sent, swallow the error and return how much was sent.
result?;
}
Ok(bytes_written)
}
fn wait_async(
&self,
file: &FileObject,
_current_task: &CurrentTask,
waiter: &Waiter,
mut events: FdEvents,
handler: EventHandler,
) -> Option<WaitCanceler> {
let flags = file.flags();
if !flags.can_read() {
events.remove(FdEvents::POLLIN);
}
if !flags.can_write() {
events.remove(FdEvents::POLLOUT);
}
Some(self.pipe.lock().waiters.wait_async_fd_events(waiter, events, handler))
}
fn query_events(
&self,
file: &FileObject,
_current_task: &CurrentTask,
) -> Result<FdEvents, Errno> {
Ok(self.pipe.lock().query_events(file.flags()))
}
fn fcntl(
&self,
file: &FileObject,
current_task: &CurrentTask,
cmd: u32,
arg: u64,
) -> Result<SyscallResult, Errno> {
self.pipe.lock().fcntl(file, current_task, cmd, arg)
}
fn ioctl(
&self,
file: &FileObject,
current_task: &CurrentTask,
request: u32,
arg: SyscallArg,
) -> Result<SyscallResult, Errno> {
self.pipe.lock().ioctl(file, current_task, request, arg)
}
}
/// An OutputBuffer that will write the data to `pipe`.
#[derive(Debug)]
struct SpliceOutputBuffer<'a> {
pipe: &'a mut Pipe,
len: usize,
available: usize,
}
impl<'a> OutputBuffer for SpliceOutputBuffer<'a> {
fn write_each(&mut self, callback: &mut OutputBufferCallback<'_>) -> Result<usize, Errno> {
let mut bytes = vec![0; self.available];
let result = callback(&mut bytes)?;
bytes.truncate(result);
if result > 0 {
self.pipe.messages.write_message(bytes.into());
self.pipe.notify_write();
self.available -= result;
}
Ok(result)
}
fn available(&self) -> usize {
self.available
}
fn bytes_written(&self) -> usize {
self.len - self.available
}
fn zero(&mut self) -> Result<usize, Errno> {
let bytes = vec![0; self.available];
let len = bytes.len();
if len > 0 {
self.pipe.messages.write_message(bytes.into());
self.pipe.notify_write();
self.available -= len;
}
Ok(len)
}
}
/// An InputBuffer that will read the data from `pipe`.
#[derive(Debug)]
struct SpliceInputBuffer<'a> {
pipe: &'a mut Pipe,
len: usize,
available: usize,
}
impl<'a> InputBuffer for SpliceInputBuffer<'a> {
fn peek_each(&mut self, callback: &mut InputBufferCallback<'_>) -> Result<usize, Errno> {
let mut read = 0;
let mut available = self.available;
for message in self.pipe.messages.messages() {
let to_read = std::cmp::min(available, message.len());
let result = callback(&message.data.bytes()[0..to_read])?;
if result > to_read {
return error!(EINVAL);
}
read += result;
available -= result;
if result != to_read {
break;
}
}
Ok(read)
}
fn available(&self) -> usize {
self.available
}
fn bytes_read(&self) -> usize {
self.len - self.available
}
fn drain(&mut self) -> usize {
let result = self.available;
self.available = 0;
result
}
fn advance(&mut self, mut length: usize) -> Result<(), Errno> {
if length == 0 {
return Ok(());
}
if length > self.available {
return error!(EINVAL);
}
self.available -= length;
while let Some(mut message) = self.pipe.messages.read_message() {
if let Some(data) = message.data.split_off(length) {
// Some data is left in the message. Push it back.
self.pipe.messages.write_front(data.into());
}
length -= message.len();
if length == 0 {
self.pipe.notify_read();
return Ok(());
}
}
panic!();
}
}
impl PipeFileObject {
/// Returns the result of `pregen` and a lock on pipe, once `condition` returns true, ensuring
/// `pregen` is run before the pipe is locked.
///
/// This will wait on `events` if the file is opened in blocking mode. If the file is opened in
/// not blocking mode and `condition` is not realized, this will return EAGAIN.
fn wait_for_condition<'a, F, G, V>(
&'a self,
current_task: &CurrentTask,
file: &FileHandle,
condition: F,
pregen: G,
events: FdEvents,
) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
where
F: Fn(&Pipe) -> bool,
G: Fn() -> Result<V, Errno>,
{
file.blocking_op(current_task, events, None, || {
let other = pregen()?;
let pipe = self.pipe.lock();
if condition(&pipe) {
Ok((other, pipe))
} else {
error!(EAGAIN)
}
})
}
/// Lock the pipe for reading, after having run `pregen`.
fn lock_pipe_for_reading_with<'a, G, V>(
&'a self,
current_task: &CurrentTask,
file: &FileHandle,
pregen: G,
non_blocking: bool,
) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
where
G: Fn() -> Result<V, Errno>,
{
if non_blocking {
let other = pregen()?;
let pipe = self.pipe.lock();
if !pipe.is_readable() {
return error!(EAGAIN);
}
Ok((other, pipe))
} else {
self.wait_for_condition(
current_task,
file,
|pipe| pipe.is_readable(),
pregen,
FdEvents::POLLIN | FdEvents::POLLHUP,
)
}
}
fn lock_pipe_for_reading<'a>(
&'a self,
current_task: &CurrentTask,
file: &FileHandle,
non_blocking: bool,
) -> Result<MutexGuard<'a, Pipe>, Errno> {
self.lock_pipe_for_reading_with(current_task, file, || Ok(()), non_blocking).map(|(_, l)| l)
}
/// Lock the pipe for writing, after having run `pregen`.
fn lock_pipe_for_writing_with<'a, G, V>(
&'a self,
current_task: &CurrentTask,
file: &FileHandle,
pregen: G,
non_blocking: bool,
len: usize,
) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
where
G: Fn() -> Result<V, Errno>,
{
if non_blocking {
let other = pregen()?;
let pipe = self.pipe.lock();
if !pipe.is_writable(len) {
return error!(EAGAIN);
}
Ok((other, pipe))
} else {
self.wait_for_condition(
current_task,
file,
|pipe| pipe.is_writable(len),
pregen,
FdEvents::POLLOUT,
)
}
}
fn lock_pipe_for_writing<'a>(
&'a self,
current_task: &CurrentTask,
file: &FileHandle,
non_blocking: bool,
len: usize,
) -> Result<MutexGuard<'a, Pipe>, Errno> {
self.lock_pipe_for_writing_with(current_task, file, || Ok(()), non_blocking, len)
.map(|(_, l)| l)
}
/// Splice from this pipe to the `to` pipe.
fn splice_to_pipe(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
if len == 0 {
return Ok(0);
}
let len = std::cmp::min(
len,
std::cmp::min(from.messages.len(), to.messages.available_capacity()),
);
let mut left = len;
while let Some(mut message) = from.messages.read_message() {
if let Some(data) = message.data.split_off(left) {
// Some data is left in the message. Push it back.
from.messages.write_front(data.into());
}
left -= message.len();
to.messages.write_message(message);
if left == 0 {
from.notify_read();
to.notify_write();
return Ok(len);
}
}
panic!();
}
/// splice from the given file handle to this pipe.
pub fn splice_from(
&self,
current_task: &CurrentTask,
self_file: &FileHandle,
from: &FileHandle,
offset: Option<usize>,
len: usize,
non_blocking: bool,
) -> Result<usize, Errno> {
// If both ends are pipes, locks on pipes must be taken such that the lock on this object
// is always taken before the lock on the other.
if let Some(pipe_file_object) = from.downcast_file::<PipeFileObject>() {
let (mut write_pipe, mut read_pipe) = pipe_file_object.lock_pipe_for_reading_with(
current_task,
from,
|| self.lock_pipe_for_writing(current_task, self_file, non_blocking, len),
non_blocking,
)?;
return Self::splice_to_pipe(&mut read_pipe, &mut write_pipe, len);
}
let mut pipe = self.lock_pipe_for_writing(current_task, self_file, non_blocking, len)?;
let len = std::cmp::min(len, pipe.messages.available_capacity());
let mut buffer = SpliceOutputBuffer { pipe: &mut pipe, len, available: len };
from.read_raw(current_task, offset.unwrap_or(0), &mut buffer)
}
pub fn splice_to(
&self,
current_task: &CurrentTask,
self_file: &FileHandle,
to: &FileHandle,
offset: Option<usize>,
len: usize,
non_blocking: bool,
) -> Result<usize, Errno> {
// If both ends are pipes, locks on pipes must be taken such that the lock on this object
// is always taken before the lock on the other.
if let Some(pipe_file_object) = to.downcast_file::<PipeFileObject>() {
let (mut read_pipe, mut write_pipe) = pipe_file_object.lock_pipe_for_writing_with(
current_task,
to,
|| self.lock_pipe_for_reading(current_task, self_file, non_blocking),
non_blocking,
len,
)?;
return Self::splice_to_pipe(&mut read_pipe, &mut write_pipe, len);
}
let mut pipe = self.lock_pipe_for_reading(current_task, self_file, non_blocking)?;
let len = std::cmp::min(len, pipe.messages.len());
let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
to.write_raw(current_task, offset.unwrap_or(0), &mut buffer)
}
}