| //! This implements "anonymous" sockets, that do not correspond to anything on the host system and |
| //! are entirely implemented inside Miri. |
| //! We also use the same infrastructure to implement unnamed pipes. |
| |
| use std::cell::{Cell, OnceCell, RefCell}; |
| use std::collections::VecDeque; |
| use std::io; |
| use std::io::{ErrorKind, Read}; |
| |
| use rustc_abi::Size; |
| |
| use crate::concurrency::VClock; |
| use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef}; |
| use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _}; |
| use crate::shims::unix::*; |
| use crate::*; |
| |
| /// The maximum capacity of the socketpair buffer in bytes. |
| /// This number is arbitrary as the value can always |
| /// be configured in the real system. |
| const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992; |
| |
| /// One end of a pair of connected unnamed sockets. |
| #[derive(Debug)] |
| struct AnonSocket { |
| /// The buffer we are reading from, or `None` if this is the writing end of a pipe. |
| /// (In that case, the peer FD will be the reading end of that pipe.) |
| readbuf: Option<RefCell<Buffer>>, |
| /// The `AnonSocket` file descriptor that is our "peer", and that holds the buffer we are |
| /// writing to. This is a weak reference because the other side may be closed before us; all |
| /// future writes will then trigger EPIPE. |
| peer_fd: OnceCell<WeakFileDescriptionRef>, |
| /// Indicates whether the peer has lost data when the file description is closed. |
| /// This flag is set to `true` if the peer's `readbuf` is non-empty at the time |
| /// of closure. |
| peer_lost_data: Cell<bool>, |
| is_nonblock: bool, |
| } |
| |
| #[derive(Debug)] |
| struct Buffer { |
| buf: VecDeque<u8>, |
| clock: VClock, |
| } |
| |
| impl Buffer { |
| fn new() -> Self { |
| Buffer { buf: VecDeque::new(), clock: VClock::default() } |
| } |
| } |
| |
| impl AnonSocket { |
| fn peer_fd(&self) -> &WeakFileDescriptionRef { |
| self.peer_fd.get().unwrap() |
| } |
| } |
| |
| impl FileDescription for AnonSocket { |
| fn name(&self) -> &'static str { |
| "socketpair" |
| } |
| |
| fn get_epoll_ready_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadyEvents> { |
| // We only check the status of EPOLLIN, EPOLLOUT, EPOLLHUP and EPOLLRDHUP flags. |
| // If other event flags need to be supported in the future, the check should be added here. |
| |
| let mut epoll_ready_events = EpollReadyEvents::new(); |
| |
| // Check if it is readable. |
| if let Some(readbuf) = &self.readbuf { |
| if !readbuf.borrow().buf.is_empty() { |
| epoll_ready_events.epollin = true; |
| } |
| } else { |
| // Without a read buffer, reading never blocks, so we are always ready. |
| epoll_ready_events.epollin = true; |
| } |
| |
| // Check if is writable. |
| if let Some(peer_fd) = self.peer_fd().upgrade() { |
| if let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf { |
| let data_size = writebuf.borrow().buf.len(); |
| let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size); |
| if available_space != 0 { |
| epoll_ready_events.epollout = true; |
| } |
| } else { |
| // Without a write buffer, writing never blocks. |
| epoll_ready_events.epollout = true; |
| } |
| } else { |
| // Peer FD has been closed. This always sets both the RDHUP and HUP flags |
| // as we do not support `shutdown` that could be used to partially close the stream. |
| epoll_ready_events.epollrdhup = true; |
| epoll_ready_events.epollhup = true; |
| // Since the peer is closed, even if no data is available reads will return EOF and |
| // writes will return EPIPE. In other words, they won't block, so we mark this as ready |
| // for read and write. |
| epoll_ready_events.epollin = true; |
| epoll_ready_events.epollout = true; |
| // If there is data lost in peer_fd, set EPOLLERR. |
| if self.peer_lost_data.get() { |
| epoll_ready_events.epollerr = true; |
| } |
| } |
| interp_ok(epoll_ready_events) |
| } |
| |
| fn close<'tcx>( |
| self: Box<Self>, |
| _communicate_allowed: bool, |
| ecx: &mut MiriInterpCx<'tcx>, |
| ) -> InterpResult<'tcx, io::Result<()>> { |
| if let Some(peer_fd) = self.peer_fd().upgrade() { |
| // If the current readbuf is non-empty when the file description is closed, |
| // notify the peer that data lost has happened in current file description. |
| if let Some(readbuf) = &self.readbuf { |
| if !readbuf.borrow().buf.is_empty() { |
| peer_fd.downcast::<AnonSocket>().unwrap().peer_lost_data.set(true); |
| } |
| } |
| // Notify peer fd that close has happened, since that can unblock reads and writes. |
| ecx.check_and_update_readiness(&peer_fd)?; |
| } |
| interp_ok(Ok(())) |
| } |
| |
| fn read<'tcx>( |
| &self, |
| _self_ref: &FileDescriptionRef, |
| _communicate_allowed: bool, |
| ptr: Pointer, |
| len: usize, |
| dest: &MPlaceTy<'tcx>, |
| ecx: &mut MiriInterpCx<'tcx>, |
| ) -> InterpResult<'tcx> { |
| let mut bytes = vec![0; len]; |
| |
| // Always succeed on read size 0. |
| if len == 0 { |
| return ecx.return_read_success(ptr, &bytes, 0, dest); |
| } |
| |
| let Some(readbuf) = &self.readbuf else { |
| // FIXME: This should return EBADF, but there's no nice way to do that as there's no |
| // corresponding ErrorKind variant. |
| throw_unsup_format!("reading from the write end of a pipe"); |
| }; |
| let mut readbuf = readbuf.borrow_mut(); |
| if readbuf.buf.is_empty() { |
| if self.peer_fd().upgrade().is_none() { |
| // Socketpair with no peer and empty buffer. |
| // 0 bytes successfully read indicates end-of-file. |
| return ecx.return_read_success(ptr, &bytes, 0, dest); |
| } else { |
| if self.is_nonblock { |
| // Non-blocking socketpair with writer and empty buffer. |
| // https://linux.die.net/man/2/read |
| // EAGAIN or EWOULDBLOCK can be returned for socket, |
| // POSIX.1-2001 allows either error to be returned for this case. |
| // Since there is no ErrorKind for EAGAIN, WouldBlock is used. |
| return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); |
| } else { |
| // Blocking socketpair with writer and empty buffer. |
| // FIXME: blocking is currently not supported |
| throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet"); |
| } |
| } |
| } |
| |
| // Synchronize with all previous writes to this buffer. |
| // FIXME: this over-synchronizes; a more precise approach would be to |
| // only sync with the writes whose data we will read. |
| ecx.acquire_clock(&readbuf.clock); |
| |
| // Do full read / partial read based on the space available. |
| // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. |
| let actual_read_size = readbuf.buf.read(&mut bytes).unwrap(); |
| |
| // Need to drop before others can access the readbuf again. |
| drop(readbuf); |
| |
| // A notification should be provided for the peer file description even when it can |
| // only write 1 byte. This implementation is not compliant with the actual Linux kernel |
| // implementation. For optimization reasons, the kernel will only mark the file description |
| // as "writable" when it can write more than a certain number of bytes. Since we |
| // don't know what that *certain number* is, we will provide a notification every time |
| // a read is successful. This might result in our epoll emulation providing more |
| // notifications than the real system. |
| if let Some(peer_fd) = self.peer_fd().upgrade() { |
| ecx.check_and_update_readiness(&peer_fd)?; |
| } |
| |
| ecx.return_read_success(ptr, &bytes, actual_read_size, dest) |
| } |
| |
| fn write<'tcx>( |
| &self, |
| _self_ref: &FileDescriptionRef, |
| _communicate_allowed: bool, |
| ptr: Pointer, |
| len: usize, |
| dest: &MPlaceTy<'tcx>, |
| ecx: &mut MiriInterpCx<'tcx>, |
| ) -> InterpResult<'tcx> { |
| // Always succeed on write size 0. |
| // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.") |
| if len == 0 { |
| return ecx.return_write_success(0, dest); |
| } |
| |
| // We are writing to our peer's readbuf. |
| let Some(peer_fd) = self.peer_fd().upgrade() else { |
| // If the upgrade from Weak to Rc fails, it indicates that all read ends have been |
| // closed. |
| return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest); |
| }; |
| |
| let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else { |
| // FIXME: This should return EBADF, but there's no nice way to do that as there's no |
| // corresponding ErrorKind variant. |
| throw_unsup_format!("writing to the reading end of a pipe"); |
| }; |
| let mut writebuf = writebuf.borrow_mut(); |
| let data_size = writebuf.buf.len(); |
| let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size); |
| if available_space == 0 { |
| if self.is_nonblock { |
| // Non-blocking socketpair with a full buffer. |
| return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); |
| } else { |
| // Blocking socketpair with a full buffer. |
| throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet"); |
| } |
| } |
| // Remember this clock so `read` can synchronize with us. |
| ecx.release_clock(|clock| { |
| writebuf.clock.join(clock); |
| }); |
| // Do full write / partial write based on the space available. |
| let actual_write_size = len.min(available_space); |
| let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?; |
| writebuf.buf.extend(&bytes[..actual_write_size]); |
| |
| // Need to stop accessing peer_fd so that it can be notified. |
| drop(writebuf); |
| |
| // Notification should be provided for peer fd as it became readable. |
| // The kernel does this even if the fd was already readable before, so we follow suit. |
| ecx.check_and_update_readiness(&peer_fd)?; |
| |
| ecx.return_write_success(actual_write_size, dest) |
| } |
| } |
| |
| impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} |
| pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { |
| /// For more information on the arguments see the socketpair manpage: |
| /// <https://linux.die.net/man/2/socketpair> |
| fn socketpair( |
| &mut self, |
| domain: &OpTy<'tcx>, |
| type_: &OpTy<'tcx>, |
| protocol: &OpTy<'tcx>, |
| sv: &OpTy<'tcx>, |
| ) -> InterpResult<'tcx, Scalar> { |
| let this = self.eval_context_mut(); |
| |
| let domain = this.read_scalar(domain)?.to_i32()?; |
| let mut flags = this.read_scalar(type_)?.to_i32()?; |
| let protocol = this.read_scalar(protocol)?.to_i32()?; |
| let sv = this.deref_pointer(sv)?; |
| |
| let mut is_sock_nonblock = false; |
| |
| // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so |
| // if there is anything left at the end, that's an unsupported flag. |
| if this.tcx.sess.target.os == "linux" { |
| // SOCK_NONBLOCK only exists on Linux. |
| let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK"); |
| let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC"); |
| if flags & sock_nonblock == sock_nonblock { |
| is_sock_nonblock = true; |
| flags &= !sock_nonblock; |
| } |
| if flags & sock_cloexec == sock_cloexec { |
| flags &= !sock_cloexec; |
| } |
| } |
| |
| // Fail on unsupported input. |
| // AF_UNIX and AF_LOCAL are synonyms, so we accept both in case |
| // their values differ. |
| if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") { |
| throw_unsup_format!( |
| "socketpair: domain {:#x} is unsupported, only AF_UNIX \ |
| and AF_LOCAL are allowed", |
| domain |
| ); |
| } else if flags != this.eval_libc_i32("SOCK_STREAM") { |
| throw_unsup_format!( |
| "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \ |
| SOCK_CLOEXEC and SOCK_NONBLOCK are allowed", |
| flags |
| ); |
| } else if protocol != 0 { |
| throw_unsup_format!( |
| "socketpair: socket protocol {protocol} is unsupported, \ |
| only 0 is allowed", |
| ); |
| } |
| |
| // Generate file descriptions. |
| let fds = &mut this.machine.fds; |
| let fd0 = fds.new_ref(AnonSocket { |
| readbuf: Some(RefCell::new(Buffer::new())), |
| peer_fd: OnceCell::new(), |
| peer_lost_data: Cell::new(false), |
| is_nonblock: is_sock_nonblock, |
| }); |
| let fd1 = fds.new_ref(AnonSocket { |
| readbuf: Some(RefCell::new(Buffer::new())), |
| peer_fd: OnceCell::new(), |
| peer_lost_data: Cell::new(false), |
| is_nonblock: is_sock_nonblock, |
| }); |
| |
| // Make the file descriptions point to each other. |
| fd0.downcast::<AnonSocket>().unwrap().peer_fd.set(fd1.downgrade()).unwrap(); |
| fd1.downcast::<AnonSocket>().unwrap().peer_fd.set(fd0.downgrade()).unwrap(); |
| |
| // Insert the file description to the fd table, generating the file descriptors. |
| let sv0 = fds.insert(fd0); |
| let sv1 = fds.insert(fd1); |
| |
| // Return socketpair file descriptors to the caller. |
| let sv0 = Scalar::from_int(sv0, sv.layout.size); |
| let sv1 = Scalar::from_int(sv1, sv.layout.size); |
| this.write_scalar(sv0, &sv)?; |
| this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?; |
| |
| interp_ok(Scalar::from_i32(0)) |
| } |
| |
| fn pipe2( |
| &mut self, |
| pipefd: &OpTy<'tcx>, |
| flags: Option<&OpTy<'tcx>>, |
| ) -> InterpResult<'tcx, Scalar> { |
| let this = self.eval_context_mut(); |
| |
| let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?; |
| let mut flags = match flags { |
| Some(flags) => this.read_scalar(flags)?.to_i32()?, |
| None => 0, |
| }; |
| |
| let cloexec = this.eval_libc_i32("O_CLOEXEC"); |
| let o_nonblock = this.eval_libc_i32("O_NONBLOCK"); |
| |
| // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so |
| // if there is anything left at the end, that's an unsupported flag. |
| let mut is_nonblock = false; |
| if flags & o_nonblock == o_nonblock { |
| is_nonblock = true; |
| flags &= !o_nonblock; |
| } |
| // As usual we ignore CLOEXEC. |
| if flags & cloexec == cloexec { |
| flags &= !cloexec; |
| } |
| if flags != 0 { |
| throw_unsup_format!("unsupported flags in `pipe2`"); |
| } |
| |
| // Generate file descriptions. |
| // pipefd[0] refers to the read end of the pipe. |
| let fds = &mut this.machine.fds; |
| let fd0 = fds.new_ref(AnonSocket { |
| readbuf: Some(RefCell::new(Buffer::new())), |
| peer_fd: OnceCell::new(), |
| peer_lost_data: Cell::new(false), |
| is_nonblock, |
| }); |
| let fd1 = fds.new_ref(AnonSocket { |
| readbuf: None, |
| peer_fd: OnceCell::new(), |
| peer_lost_data: Cell::new(false), |
| is_nonblock, |
| }); |
| |
| // Make the file descriptions point to each other. |
| fd0.downcast::<AnonSocket>().unwrap().peer_fd.set(fd1.downgrade()).unwrap(); |
| fd1.downcast::<AnonSocket>().unwrap().peer_fd.set(fd0.downgrade()).unwrap(); |
| |
| // Insert the file description to the fd table, generating the file descriptors. |
| let pipefd0 = fds.insert(fd0); |
| let pipefd1 = fds.insert(fd1); |
| |
| // Return file descriptors to the caller. |
| let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size); |
| let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size); |
| this.write_scalar(pipefd0, &pipefd)?; |
| this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?; |
| |
| interp_ok(Scalar::from_i32(0)) |
| } |
| } |