Convert dhcpd to futures 0.3
Change-Id: I3a091dd25a02903bdfc1d8124e5b60b615f840cc
diff --git a/bin/dhcpd/src/main.rs b/bin/dhcpd/src/main.rs
index ea6a831..368871d 100644
--- a/bin/dhcpd/src/main.rs
+++ b/bin/dhcpd/src/main.rs
@@ -1,6 +1,8 @@
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+
+#![feature(futures_api)]
#![deny(warnings)]
extern crate dhcp;
@@ -16,7 +18,7 @@
use dhcp::protocol::{Message, SERVER_PORT};
use dhcp::server::Server;
use failure::{Error, Fail, ResultExt};
-use futures::future::{self, Loop};
+use futures::future;
use futures::prelude::*;
use getopts::Options;
use std::env;
@@ -45,8 +47,8 @@
let lease_expiration_handler = define_lease_expiration_handler_future(server.clone());
println!("dhcpd: starting server");
- exec.run_singlethreaded(msg_handling_loop.join(lease_expiration_handler))
- .context("failed to start event loop")?;
+ exec.run_singlethreaded(msg_handling_loop.try_join(lease_expiration_handler))
+ .map_err(|e| e.context("failed to start event loop"))?;
println!("dhcpd: server shutting down");
Ok(())
}
@@ -71,8 +73,9 @@
fn define_msg_handling_loop_future(
sock: UdpSocket, server: Arc<Mutex<Server>>,
-) -> impl Future<Item = UdpSocket, Error = Error> {
- future::loop_fn(sock, move |sock| {
+) -> impl Future<Output = Result<UdpSocket, Error>> {
+ stream::repeat(()).map(Ok).try_fold(sock, move |sock, ()| {
+ //future::loop_fn(sock, move |sock| {
// Cloning the server ARC and then moving the value into the first and_then() call
// allows the server value to live multiple loop iterations.
let server = server.clone();
@@ -82,17 +85,17 @@
.and_then(move |(sock, buf, received, addr)| {
println!("dhcpd: received {} bytes", received);
match Message::from_buffer(&buf) {
- None => Err(failure::err_msg("unable to parse buffer")),
+ None => future::ready(Err(failure::err_msg("unable to parse buffer"))),
Some(msg) => {
println!("dhcpd: msg parsed {:?}", msg);
// This call should not block because the server is single-threaded.
match server.lock().unwrap().dispatch(msg) {
- None => Err(failure::err_msg("invalid message")),
+ None => future::ready(Err(failure::err_msg("invalid message"))),
Some(response) => {
println!("dhcpd: msg dispatched to server {:?}", response);
let response_buffer = response.serialize();
println!("dhcpd: response serialized");
- Ok((sock, response_buffer, addr))
+ future::ready(Ok((sock, response_buffer, addr)))
}
}
}
@@ -103,21 +106,19 @@
.and_then(|sock| {
println!("dhcpd: response sent");
println!("dhcpd: continuing event loop");
- Ok(Loop::Continue(sock))
+ future::ready(Ok(sock))
})
})
}
fn define_lease_expiration_handler_future(
server: Arc<Mutex<Server>>,
-) -> impl Future<Item = (), Error = Error> {
- let expiration_interval: Interval<Error> = Interval::new(EXPIRATION_INTERVAL_SECS.seconds());
+) -> impl Future<Output = Result<(), Error>> {
+ let expiration_interval = Interval::new(EXPIRATION_INTERVAL_SECS.seconds());
expiration_interval
- .for_each(move |()| {
+ .map(move |()| {
println!("dhcpd: interval timer fired");
server.lock().unwrap().release_expired_leases();
println!("dhcpd: expired leases released");
- Ok(())
- }).map(|_| ())
- .map_err(|e| e.context("interval timer failed").into())
+ }).map(|_| Ok(())).try_collect::<()>()
}