blob: 3976071dd160053af24d2a179f3ea22c461b01d9 [file] [log] [blame] [edit]
use {localhost, sleep_ms, TryRead, TryWrite};
use mio::*;
use mio::deprecated::{EventLoop, EventLoopBuilder, Handler};
use mio::net::{TcpListener, TcpStream};
use std::collections::LinkedList;
use slab;
use std::{io, thread};
use std::time::Duration;
// Don't touch the connection slab
const SERVER: Token = Token(10_000_000);
const CLIENT: Token = Token(10_000_001);
#[cfg(windows)]
const N: usize = 10_000;
#[cfg(unix)]
const N: usize = 1_000_000;
struct EchoConn {
sock: TcpStream,
token: Option<Token>,
count: usize,
buf: Vec<u8>
}
type Slab<T> = slab::Slab<T, Token>;
impl EchoConn {
fn new(sock: TcpStream) -> EchoConn {
let mut ec =
EchoConn {
sock: sock,
token: None,
buf: Vec::with_capacity(22),
count: 0
};
unsafe { ec.buf.set_len(22) };
ec
}
fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
event_loop.reregister(&self.sock, self.token.unwrap(),
Ready::readable(),
PollOpt::edge() | PollOpt::oneshot())
}
fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
loop {
match self.sock.try_read(&mut self.buf[..]) {
Ok(None) => {
break;
}
Ok(Some(_)) => {
self.count += 1;
if self.count % 10000 == 0 {
info!("Received {} messages", self.count);
}
if self.count == N {
event_loop.shutdown();
}
}
Err(_) => {
break;
}
};
}
event_loop.reregister(&self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
}
}
struct EchoServer {
sock: TcpListener,
conns: Slab<EchoConn>
}
impl EchoServer {
fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
debug!("server accepting socket");
let sock = self.sock.accept().unwrap().0;
let conn = EchoConn::new(sock,);
let tok = self.conns.insert(conn)
.ok().expect("could not add connection to slab");
// Register the connection
self.conns[tok].token = Some(tok);
event_loop.register(&self.conns[tok].sock, tok, Ready::readable(),
PollOpt::edge() | PollOpt::oneshot())
.ok().expect("could not register socket with event loop");
Ok(())
}
fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>,
tok: Token) -> io::Result<()> {
debug!("server conn readable; tok={:?}", tok);
self.conn(tok).readable(event_loop)
}
fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>,
tok: Token) -> io::Result<()> {
debug!("server conn writable; tok={:?}", tok);
self.conn(tok).writable(event_loop)
}
fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
&mut self.conns[tok]
}
}
struct EchoClient {
sock: TcpStream,
backlog: LinkedList<String>,
token: Token,
count: u32
}
// Sends a message and expects to receive the same exact message, one at a time
impl EchoClient {
fn new(sock: TcpStream, tok: Token) -> EchoClient {
EchoClient {
sock: sock,
backlog: LinkedList::new(),
token: tok,
count: 0
}
}
fn readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
Ok(())
}
fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
debug!("client socket writable");
while self.backlog.len() > 0 {
match self.sock.try_write(self.backlog.front().unwrap().as_bytes()) {
Ok(None) => {
break;
}
Ok(Some(_)) => {
self.backlog.pop_front();
self.count += 1;
if self.count % 10000 == 0 {
info!("Sent {} messages", self.count);
}
}
Err(e) => { debug!("not implemented; client err={:?}", e); break; }
}
}
if self.backlog.len() > 0 {
event_loop.reregister(&self.sock, self.token, Ready::writable(),
PollOpt::edge() | PollOpt::oneshot()).unwrap();
}
Ok(())
}
}
struct Echo {
server: EchoServer,
client: EchoClient,
}
impl Echo {
fn new(srv: TcpListener, client: TcpStream) -> Echo {
Echo {
server: EchoServer {
sock: srv,
conns: Slab::with_capacity(128),
},
client: EchoClient::new(client, CLIENT),
}
}
}
impl Handler for Echo {
type Timeout = usize;
type Message = String;
fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token,
events: Ready) {
if events.is_readable() {
match token {
SERVER => self.server.accept(event_loop).unwrap(),
CLIENT => self.client.readable(event_loop).unwrap(),
i => self.server.conn_readable(event_loop, i).unwrap()
}
}
if events.is_writable() {
match token {
SERVER => panic!("received writable for token 0"),
CLIENT => self.client.writable(event_loop).unwrap(),
_ => self.server.conn_writable(event_loop, token).unwrap()
}
}
}
fn notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String) {
match self.client.sock.try_write(msg.as_bytes()) {
Ok(Some(n)) => {
self.client.count += 1;
if self.client.count % 10000 == 0 {
info!("Sent {} bytes: count {}", n, self.client.count);
}
},
_ => {
self.client.backlog.push_back(msg);
event_loop.reregister(
&self.client.sock,
self.client.token,
Ready::writable(),
PollOpt::edge() | PollOpt::oneshot()).unwrap();
}
}
}
}
#[test]
pub fn test_echo_server() {
debug!("Starting TEST_ECHO_SERVER");
let mut b = EventLoopBuilder::new();
b.notify_capacity(1_048_576)
.messages_per_tick(64)
.timer_tick(Duration::from_millis(100))
.timer_wheel_size(1_024)
.timer_capacity(65_536);
let mut event_loop = b.build().unwrap();
let addr = localhost();
let srv = TcpListener::bind(&addr).unwrap();
info!("listen for connections");
event_loop.register(&srv, SERVER, Ready::readable(),
PollOpt::edge() | PollOpt::oneshot()).unwrap();
let sock = TcpStream::connect(&addr).unwrap();
// Connect to the server
event_loop.register(&sock, CLIENT, Ready::writable(),
PollOpt::edge() | PollOpt::oneshot()).unwrap();
let chan = event_loop.channel();
let go = move || {
let mut i = N;
sleep_ms(1_000);
let message = "THIS IS A TEST MESSAGE".to_string();
while i > 0 {
chan.send(message.clone()).unwrap();
i -= 1;
if i % 10000 == 0 {
info!("Enqueued {} messages", N - i);
}
}
};
let t = thread::spawn(go);
// Start the event loop
event_loop.run(&mut Echo::new(srv, sock)).unwrap();
t.join().unwrap();
}