blob: 62dd7440239797d2f8213563b89fb23d8d8db073 [file] [log] [blame]
//! The main event loop which performs I/O on the pseudoterminal.
use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::{self, Display, Formatter};
use std::fs::File;
use std::io::{self, ErrorKind, Read, Write};
use std::num::NonZeroUsize;
use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
use log::error;
use polling::{Event as PollingEvent, Events, PollMode};
use crate::event::{self, Event, EventListener, WindowSize};
use crate::sync::FairMutex;
use crate::term::Term;
use crate::vte::ansi;
use crate::{thread, tty};
/// Max bytes to read from the PTY before forced terminal synchronization.
pub(crate) const READ_BUFFER_SIZE: usize = 0x10_0000;
/// Max bytes to read from the PTY while the terminal is locked.
const MAX_LOCKED_READ: usize = u16::MAX as usize;
/// Messages that may be sent to the `EventLoop`.
#[derive(Debug)]
pub enum Msg {
/// Data that should be written to the PTY.
Input(Cow<'static, [u8]>),
/// Indicates that the `EventLoop` should shut down, as Alacritty is shutting down.
Shutdown,
/// Instruction to resize the PTY.
Resize(WindowSize),
}
/// The main event loop.
///
/// Handles all the PTY I/O and runs the PTY parser which updates terminal
/// state.
pub struct EventLoop<T: tty::EventedPty, U: EventListener> {
poll: Arc<polling::Poller>,
pty: T,
rx: PeekableReceiver<Msg>,
tx: Sender<Msg>,
terminal: Arc<FairMutex<Term<U>>>,
event_proxy: U,
hold: bool,
ref_test: bool,
}
impl<T, U> EventLoop<T, U>
where
T: tty::EventedPty + event::OnResize + Send + 'static,
U: EventListener + Send + 'static,
{
/// Create a new event loop.
pub fn new(
terminal: Arc<FairMutex<Term<U>>>,
event_proxy: U,
pty: T,
hold: bool,
ref_test: bool,
) -> io::Result<EventLoop<T, U>> {
let (tx, rx) = mpsc::channel();
let poll = polling::Poller::new()?.into();
Ok(EventLoop {
poll,
pty,
tx,
rx: PeekableReceiver::new(rx),
terminal,
event_proxy,
hold,
ref_test,
})
}
pub fn channel(&self) -> EventLoopSender {
EventLoopSender { sender: self.tx.clone(), poller: self.poll.clone() }
}
/// Drain the channel.
///
/// Returns `false` when a shutdown message was received.
fn drain_recv_channel(&mut self, state: &mut State) -> bool {
while let Some(msg) = self.rx.recv() {
match msg {
Msg::Input(input) => state.write_list.push_back(input),
Msg::Resize(window_size) => self.pty.on_resize(window_size),
Msg::Shutdown => return false,
}
}
true
}
#[inline]
fn pty_read<X>(
&mut self,
state: &mut State,
buf: &mut [u8],
mut writer: Option<&mut X>,
) -> io::Result<()>
where
X: Write,
{
let mut unprocessed = 0;
let mut processed = 0;
// Reserve the next terminal lock for PTY reading.
let _terminal_lease = Some(self.terminal.lease());
let mut terminal = None;
loop {
// Read from the PTY.
match self.pty.reader().read(&mut buf[unprocessed..]) {
// This is received on Windows/macOS when no more data is readable from the PTY.
Ok(0) if unprocessed == 0 => break,
Ok(got) => unprocessed += got,
Err(err) => match err.kind() {
ErrorKind::Interrupted | ErrorKind::WouldBlock => {
// Go back to mio if we're caught up on parsing and the PTY would block.
if unprocessed == 0 {
break;
}
},
_ => return Err(err),
},
}
// Attempt to lock the terminal.
let terminal = match &mut terminal {
Some(terminal) => terminal,
None => terminal.insert(match self.terminal.try_lock_unfair() {
// Force block if we are at the buffer size limit.
None if unprocessed >= READ_BUFFER_SIZE => self.terminal.lock_unfair(),
None => continue,
Some(terminal) => terminal,
}),
};
// Write a copy of the bytes to the ref test file.
if let Some(writer) = &mut writer {
writer.write_all(&buf[..unprocessed]).unwrap();
}
// Parse the incoming bytes.
for byte in &buf[..unprocessed] {
state.parser.advance(&mut **terminal, *byte);
}
processed += unprocessed;
unprocessed = 0;
// Assure we're not blocking the terminal too long unnecessarily.
if processed >= MAX_LOCKED_READ {
break;
}
}
// Queue terminal redraw unless all processed bytes were synchronized.
if state.parser.sync_bytes_count() < processed && processed > 0 {
self.event_proxy.send_event(Event::Wakeup);
}
Ok(())
}
#[inline]
fn pty_write(&mut self, state: &mut State) -> io::Result<()> {
state.ensure_next();
'write_many: while let Some(mut current) = state.take_current() {
'write_one: loop {
match self.pty.writer().write(current.remaining_bytes()) {
Ok(0) => {
state.set_current(Some(current));
break 'write_many;
},
Ok(n) => {
current.advance(n);
if current.finished() {
state.goto_next();
break 'write_one;
}
},
Err(err) => {
state.set_current(Some(current));
match err.kind() {
ErrorKind::Interrupted | ErrorKind::WouldBlock => break 'write_many,
_ => return Err(err),
}
},
}
}
}
Ok(())
}
pub fn spawn(mut self) -> JoinHandle<(Self, State)> {
thread::spawn_named("PTY reader", move || {
let mut state = State::default();
let mut buf = [0u8; READ_BUFFER_SIZE];
let poll_opts = PollMode::Level;
let mut interest = PollingEvent::readable(0);
// Register TTY through EventedRW interface.
if let Err(err) = unsafe { self.pty.register(&self.poll, interest, poll_opts) } {
error!("Event loop registration error: {}", err);
return (self, state);
}
let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap());
let mut pipe = if self.ref_test {
Some(File::create("./alacritty.recording").expect("create alacritty recording"))
} else {
None
};
'event_loop: loop {
// Wakeup the event loop when a synchronized update timeout was reached.
let handler = state.parser.sync_timeout();
let timeout =
handler.sync_timeout().map(|st| st.saturating_duration_since(Instant::now()));
events.clear();
if let Err(err) = self.poll.wait(&mut events, timeout) {
match err.kind() {
ErrorKind::Interrupted => continue,
_ => {
error!("Event loop polling error: {}", err);
break 'event_loop;
},
}
}
// Handle synchronized update timeout.
if events.is_empty() && self.rx.peek().is_none() {
state.parser.stop_sync(&mut *self.terminal.lock());
self.event_proxy.send_event(Event::Wakeup);
continue;
}
// Handle channel events, if there are any.
if !self.drain_recv_channel(&mut state) {
break;
}
for event in events.iter() {
match event.key {
tty::PTY_CHILD_EVENT_TOKEN => {
if let Some(tty::ChildEvent::Exited(code)) = self.pty.next_child_event()
{
if let Some(code) = code {
self.event_proxy.send_event(Event::ChildExit(code));
}
if self.hold {
// With hold enabled, make sure the PTY is drained.
let _ = self.pty_read(&mut state, &mut buf, pipe.as_mut());
} else {
// Without hold, shutdown the terminal.
self.terminal.lock().exit();
}
self.event_proxy.send_event(Event::Wakeup);
break 'event_loop;
}
},
tty::PTY_READ_WRITE_TOKEN => {
if event.is_interrupt() {
// Don't try to do I/O on a dead PTY.
continue;
}
if event.readable {
if let Err(err) = self.pty_read(&mut state, &mut buf, pipe.as_mut())
{
// On Linux, a `read` on the master side of a PTY can fail
// with `EIO` if the client side hangs up. In that case,
// just loop back round for the inevitable `Exited` event.
// This sucks, but checking the process is either racy or
// blocking.
#[cfg(target_os = "linux")]
if err.raw_os_error() == Some(libc::EIO) {
continue;
}
error!("Error reading from PTY in event loop: {}", err);
break 'event_loop;
}
}
if event.writable {
if let Err(err) = self.pty_write(&mut state) {
error!("Error writing to PTY in event loop: {}", err);
break 'event_loop;
}
}
},
_ => (),
}
}
// Register write interest if necessary.
let needs_write = state.needs_write();
if needs_write != interest.writable {
interest.writable = needs_write;
// Re-register with new interest.
self.pty.reregister(&self.poll, interest, poll_opts).unwrap();
}
}
// The evented instances are not dropped here so deregister them explicitly.
let _ = self.pty.deregister(&self.poll);
(self, state)
})
}
}
/// Helper type which tracks how much of a buffer has been written.
struct Writing {
source: Cow<'static, [u8]>,
written: usize,
}
pub struct Notifier(pub EventLoopSender);
impl event::Notify for Notifier {
fn notify<B>(&self, bytes: B)
where
B: Into<Cow<'static, [u8]>>,
{
let bytes = bytes.into();
// Terminal hangs if we send 0 bytes through.
if bytes.len() == 0 {
return;
}
let _ = self.0.send(Msg::Input(bytes));
}
}
impl event::OnResize for Notifier {
fn on_resize(&mut self, window_size: WindowSize) {
let _ = self.0.send(Msg::Resize(window_size));
}
}
#[derive(Debug)]
pub enum EventLoopSendError {
/// Error polling the event loop.
Io(io::Error),
/// Error sending a message to the event loop.
Send(mpsc::SendError<Msg>),
}
impl Display for EventLoopSendError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
EventLoopSendError::Io(err) => err.fmt(f),
EventLoopSendError::Send(err) => err.fmt(f),
}
}
}
impl std::error::Error for EventLoopSendError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
EventLoopSendError::Io(err) => err.source(),
EventLoopSendError::Send(err) => err.source(),
}
}
}
#[derive(Clone)]
pub struct EventLoopSender {
sender: Sender<Msg>,
poller: Arc<polling::Poller>,
}
impl EventLoopSender {
pub fn send(&self, msg: Msg) -> Result<(), EventLoopSendError> {
self.sender.send(msg).map_err(EventLoopSendError::Send)?;
self.poller.notify().map_err(EventLoopSendError::Io)
}
}
/// All of the mutable state needed to run the event loop.
///
/// Contains list of items to write, current write state, etc. Anything that
/// would otherwise be mutated on the `EventLoop` goes here.
#[derive(Default)]
pub struct State {
write_list: VecDeque<Cow<'static, [u8]>>,
writing: Option<Writing>,
parser: ansi::Processor,
}
impl State {
#[inline]
fn ensure_next(&mut self) {
if self.writing.is_none() {
self.goto_next();
}
}
#[inline]
fn goto_next(&mut self) {
self.writing = self.write_list.pop_front().map(Writing::new);
}
#[inline]
fn take_current(&mut self) -> Option<Writing> {
self.writing.take()
}
#[inline]
fn needs_write(&self) -> bool {
self.writing.is_some() || !self.write_list.is_empty()
}
#[inline]
fn set_current(&mut self, new: Option<Writing>) {
self.writing = new;
}
}
impl Writing {
#[inline]
fn new(c: Cow<'static, [u8]>) -> Writing {
Writing { source: c, written: 0 }
}
#[inline]
fn advance(&mut self, n: usize) {
self.written += n;
}
#[inline]
fn remaining_bytes(&self) -> &[u8] {
&self.source[self.written..]
}
#[inline]
fn finished(&self) -> bool {
self.written >= self.source.len()
}
}
struct PeekableReceiver<T> {
rx: Receiver<T>,
peeked: Option<T>,
}
impl<T> PeekableReceiver<T> {
fn new(rx: Receiver<T>) -> Self {
Self { rx, peeked: None }
}
fn peek(&mut self) -> Option<&T> {
if self.peeked.is_none() {
self.peeked = self.rx.try_recv().ok();
}
self.peeked.as_ref()
}
fn recv(&mut self) -> Option<T> {
if self.peeked.is_some() {
self.peeked.take()
} else {
match self.rx.try_recv() {
Err(TryRecvError::Disconnected) => panic!("event loop channel closed"),
res => res.ok(),
}
}
}
}