| //! An example of offloading work to a thread pool instead of doing work on the |
| //! main event loop. |
| //! |
| //! In this example the server will act as a form of echo server except that |
| //! it'll echo back gzip-compressed data. Each connected client will have the |
| //! data written streamed back as the compressed version is available, and all |
| //! compressing will occur on a thread pool rather than the main event loop. |
| //! |
| //! You can preview this example with in one terminal: |
| //! |
| //! cargo run --example compress |
| //! |
| //! and in another terminal; |
| //! |
| //! echo test | cargo run --example connect 127.0.0.1:8080 | gunzip |
| //! |
| //! The latter command will need to be tweaked for non-unix-like shells, but |
| //! you can also redirect the stdout of the `connect` program to a file |
| //! and then decompress that. |
| |
| extern crate futures; |
| extern crate futures_cpupool; |
| extern crate flate2; |
| extern crate tokio_core; |
| extern crate tokio_io; |
| |
| use std::io; |
| use std::env; |
| use std::net::SocketAddr; |
| |
| use futures::{Future, Stream, Poll}; |
| use futures_cpupool::CpuPool; |
| use tokio_core::net::{TcpListener, TcpStream}; |
| use tokio_core::reactor::Core; |
| use tokio_io::{AsyncRead, AsyncWrite}; |
| use flate2::write::GzEncoder; |
| |
| fn main() { |
| // As with many other examples, parse our CLI arguments and prepare the |
| // reactor. |
| let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); |
| let addr = addr.parse::<SocketAddr>().unwrap(); |
| let mut core = Core::new().unwrap(); |
| let handle = core.handle(); |
| let socket = TcpListener::bind(&addr, &handle).unwrap(); |
| println!("Listening on: {}", addr); |
| |
| // This is where we're going to offload our computationally heavy work |
| // (compressing) to. Here we just use a convenience constructor to create a |
| // pool of threads equal to the number of CPUs we have. |
| let pool = CpuPool::new_num_cpus(); |
| |
| // The compress logic will happen in the function below, but everything's |
| // still a future! Each client is spawned to concurrently get processed. |
| let server = socket.incoming().for_each(move |(socket, addr)| { |
| handle.spawn(compress(socket, &pool).then(move |result| { |
| match result { |
| Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w), |
| Err(e) => println!("{}: failed when compressing: {}", addr, e), |
| } |
| Ok(()) |
| })); |
| Ok(()) |
| }); |
| |
| core.run(server).unwrap(); |
| } |
| |
| /// The main workhorse of this example. This'll compress all data read from |
| /// `socket` on the `pool` provided, writing it back out to `socket` as it's |
| /// available. |
| fn compress(socket: TcpStream, pool: &CpuPool) |
| -> Box<Future<Item = (u64, u64), Error = io::Error>> |
| { |
| use tokio_io::io; |
| |
| // The general interface that `CpuPool` provides is that we'll *spawn a |
| // future* onto it. All execution of the future will occur on the `CpuPool` |
| // and we'll get back a handle representing the completed value of the |
| // future. In essence it's our job here to create a future that represents |
| // compressing `socket`, and then we'll simply spawn it at the very end. |
| // |
| // Here we exploit the fact that `TcpStream` itself is `Send` in this |
| // function as well. That is, we can read/write the TCP stream on any |
| // thread, and we'll get notifications about it being ready from the reactor |
| // thread. |
| // |
| // Otherwise this is the same as the echo server except that after splitting |
| // we apply some encoding to one side, followed by a `shutdown` when we're |
| // done to ensure that all gz footers are written. |
| let (read, write) = socket.split(); |
| let write = Count { io: write, amt: 0 }; |
| let write = GzEncoder::new(write, flate2::Compression::Best); |
| let process = io::copy(read, write).and_then(|(amt, _read, write)| { |
| io::shutdown(write).map(move |io| (amt, io.get_ref().amt)) |
| }); |
| |
| // Spawn the future so is executes entirely on the thread pool here |
| Box::new(pool.spawn(process)) |
| } |
| |
| struct Count<T> { |
| io: T, |
| amt: u64, |
| } |
| |
| impl<T: io::Write> io::Write for Count<T> { |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| let n = self.io.write(buf)?; |
| self.amt += n as u64; |
| Ok(n) |
| } |
| |
| fn flush(&mut self) -> io::Result<()> { |
| self.io.flush() |
| } |
| } |
| |
| impl<T: AsyncWrite> AsyncWrite for Count<T> { |
| fn shutdown(&mut self) -> Poll<(), io::Error> { |
| self.io.shutdown() |
| } |
| } |