blob: 2bcf132486e41c41d5432afd39022a867439e4ca [file] [log] [blame] [edit]
use {TryRead, TryWrite};
use std::mem;
use mio::*;
use std::io;
use mio::deprecated::{EventLoop, Handler};
use mio::deprecated::unix::{PipeReader, PipeWriter};
use std::process::{Command, Stdio, Child};
struct SubprocessClient {
stdin: Option<PipeWriter>,
stdout: Option<PipeReader>,
stderr: Option<PipeReader>,
stdin_token : Token,
stdout_token : Token,
stderr_token : Token,
output : Vec<u8>,
output_stderr : Vec<u8>,
input : Vec<u8>,
input_offset : usize,
buf : [u8; 65536],
}
// Sends a message and expects to receive the same exact message, one at a time
impl SubprocessClient {
fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient {
SubprocessClient {
stdin: stdin,
stdout: stdout,
stderr: stderr,
stdin_token : Token(0),
stdout_token : Token(1),
stderr_token : Token(2),
output : Vec::<u8>::new(),
output_stderr : Vec::<u8>::new(),
buf : [0; 65536],
input : data.to_vec(),
input_offset : 0,
}
}
fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
let mut eof = false;
match self.stdout {
None => unreachable!(),
Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) {
Ok(None) => {
}
Ok(Some(r)) => {
if r == 0 {
eof = true;
} else {
self.output.extend(&self.buf[0..r]);
}
}
Err(e) => {
return Err(e);
}
}
};
if eof {
drop(self.stdout.take());
match self.stderr {
None => event_loop.shutdown(),
Some(_) => {},
}
}
return Ok(());
}
fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
let mut eof = false;
match self.stderr {
None => unreachable!(),
Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) {
Ok(None) => {
}
Ok(Some(r)) => {
if r == 0 {
eof = true;
} else {
self.output_stderr.extend(&self.buf[0..r]);
}
}
Err(e) => {
return Err(e);
}
}
};
if eof {
drop(self.stderr.take());
match self.stdout {
None => event_loop.shutdown(),
Some(_) => {},
}
}
return Ok(());
}
fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
let mut ok = true;
match self.stdin {
None => unreachable!(),
Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
Ok(None) => {
},
Ok(Some(r)) => {
if r == 0 {
ok = false;
} else {
self.input_offset += r;
}
},
Err(_) => {
ok = false;
},
}
}
if self.input_offset == self.input.len() || !ok {
drop(self.stdin.take());
match self.stderr {
None => match self.stdout {
None => event_loop.shutdown(),
Some(_) => {},
},
Some(_) => {},
}
}
return Ok(());
}
}
impl Handler for SubprocessClient {
type Timeout = usize;
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
_: Ready) {
if token == self.stderr_token {
let _x = self.readable_stderr(event_loop);
} else {
let _x = self.readable(event_loop);
}
if token == self.stdin_token {
let _y = self.writable(event_loop);
}
}
}
const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) {
let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
let stdin : Option<PipeWriter>;
let stdin_exists : bool;
match process.stdin {
None => stdin_exists = false,
Some(_) => stdin_exists = true,
}
if stdin_exists {
match PipeWriter::from_stdin(process.stdin.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stdin = Some(pipe),
}
} else {
stdin = None;
}
let stdout_exists : bool;
let stdout : Option<PipeReader>;
match process.stdout {
None => stdout_exists = false,
Some(_) => stdout_exists = true,
}
if stdout_exists {
match PipeReader::from_stdout(process.stdout.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stdout = Some(pipe),
}
} else {
stdout = None;
}
let stderr_exists : bool;
let stderr : Option<PipeReader>;
match process.stderr {
None => stderr_exists = false,
Some(_) => stderr_exists = true,
}
if stderr_exists {
match PipeReader::from_stderr(process.stderr.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stderr = Some(pipe),
}
} else {
stderr = None
}
let mut subprocess = SubprocessClient::new(stdin,
stdout,
stderr,
input);
match subprocess.stdout {
Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, Ready::readable(),
PollOpt::level()).unwrap(),
None => {},
}
match subprocess.stderr {
Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, Ready::readable(),
PollOpt::level()).unwrap(),
None => {},
}
// Connect to the server
match subprocess.stdin {
Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, Ready::writable(),
PollOpt::level()).unwrap(),
None => {},
}
// Start the event loop
event_loop.run(&mut subprocess).unwrap();
let _ = process.wait();
let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
return (ret_stdout, ret_stderr);
}
#[test]
fn test_subprocess_pipe() {
let process =
Command::new("/bin/cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn().unwrap();
let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]);
assert_eq!(TEST_DATA.len(), ret_stdout.len());
assert_eq!(0usize, ret_stderr.len());
let mut i : usize = 0;
for item in TEST_DATA.iter() {
assert_eq!(*item, ret_stdout[i]);
i += 1;
}
}