A production-ready accept loop needs the following things:
There are two kinds of errors in an accept loop:
Here is the example of a per-connection error (printed in normal and debug mode):
Error: Connection reset by peer (os error 104) Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }
And the following is the most common example of a resource shortage error:
Error: Too many open files (os error 24) Error: Os { code: 24, kind: Other, message: "Too many open files" }
To test your application for these errors try the following (this works on unixes only).
Lower limits and start the application:
$ ulimit -n 100 $ cargo run --example your_app Compiling your_app v0.1.0 (/work) Finished dev [unoptimized + debuginfo] target(s) in 5.47s Running `target/debug/examples/your_app` Server is listening on: http://127.0.0.1:1234
Then in another console run the wrk
benchmark tool:
$ wrk -c 1000 http://127.0.0.1:1234 Running 10s test @ http://localhost:8080/ 2 threads and 1000 connections $ telnet localhost 1234 Trying ::1... Connected to localhost.
Important is to check the following things:
wrk
). This is what telnet
does in example above, make sure it prints Connected to <hostname>
.Too many open files
error is logged in the appropriate log. This requires to set “maximum number of simultaneous connections” parameter (see below) of your application to a value greater then 100
for this example.If it's possible, use the appropriate benchmark tool and set the appropriate number of connections. For example redis-benchmark
has a -c
parameter for that, if you implement redis protocol.
Alternatively, can still use wrk
, just make sure that connection is not immediately closed. If it is, put a temporary timeout before handing the connection to the protocol handler, like this:
# extern crate async_std; # use std::time::Duration; # use async_std::{ # net::{TcpListener, ToSocketAddrs}, # prelude::*, # }; # # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; # #async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { # let listener = TcpListener::bind(addr).await?; # let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { task::spawn(async { task::sleep(Duration::from_secs(10)).await; // 1 connection_loop(stream).await; }); } # Ok(()) # }
Here is how basic accept loop could look like:
# extern crate async_std; # use std::time::Duration; # use async_std::{ # net::{TcpListener, ToSocketAddrs}, # prelude::*, # }; # # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; # async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let mut incoming = listener.incoming(); while let Some(result) = incoming.next().await { let stream = match stream { Err(ref e) if is_connection_error(e) => continue, // 1 Err(e) => { eprintln!("Error: {}. Pausing for 500ms."); // 3 task::sleep(Duration::from_millis(500)).await; // 2 continue; } Ok(s) => s, }; // body } Ok(()) }
Be sure to test your application.
The crate async-listen
has a helper to achieve this task:
# extern crate async_std; # extern crate async_listen; # use std::time::Duration; # use async_std::{ # net::{TcpListener, ToSocketAddrs}, # prelude::*, # }; # # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; # use async_listen::{ListenExt, error_hint}; async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let mut incoming = listener .incoming() .log_warnings(log_accept_error) // 1 .handle_errors(Duration::from_millis(500)); while let Some(socket) = incoming.next().await { // 2 // body } Ok(()) } fn log_accept_error(e: &io::Error) { eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3 }
async-listen
calls them warnings). If you use log
crate or any other in your app this should go to the log.Result
wrapper after handle_errors
because all errors are already handled.Be sure to test your application.
Even if you've applied everything described in Handling Errors section, there is still a problem.
Let's imagine you have a server that needs to open a file to process client request. At some point, you might encounter the following situation:
Too many open files
error so it sleeps.Too many open files
error, until some other client drops a connection.There are many more possible situations, this is just a small illustation that limiting number of connections is very useful. Generally, it's one of the ways to control resources used by a server and avoiding some kinds of deny of service (DoS) attacks.
async-listen
crateLimiting maximum number of simultaneous connections with async-listen
looks like the following:
# extern crate async_std; # extern crate async_listen; # use std::time::Duration; # use async_std::{ # net::{TcpListener, TcpStream, ToSocketAddrs}, # prelude::*, # }; # # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; # use async_listen::{ListenExt, Token, error_hint}; async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let mut incoming = listener .incoming() .log_warnings(log_accept_error) .handle_errors(Duration::from_millis(500)) // 1 .backpressure(100); while let Some((token, socket)) = incoming.next().await { // 2 task::spawn(async move { connection_loop(&token, stream).await; // 3 }); } Ok(()) } async fn connection_loop(_token: &Token, stream: TcpStream) { // 4 // ... } # fn log_accept_error(e: &io::Error) { # eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)); # }
backpressure
helper expects stream of TcpStream
rather than Result
._token
Be sure to test this behavior.