Convert dhcpd to futures 0.3

Change-Id: I3a091dd25a02903bdfc1d8124e5b60b615f840cc
diff --git a/bin/dhcpd/src/main.rs b/bin/dhcpd/src/main.rs
index ea6a831..368871d 100644
--- a/bin/dhcpd/src/main.rs
+++ b/bin/dhcpd/src/main.rs
@@ -1,6 +1,8 @@
 // Copyright 2018 The Fuchsia Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
+
+#![feature(futures_api)]
 #![deny(warnings)]
 
 extern crate dhcp;
@@ -16,7 +18,7 @@
 use dhcp::protocol::{Message, SERVER_PORT};
 use dhcp::server::Server;
 use failure::{Error, Fail, ResultExt};
-use futures::future::{self, Loop};
+use futures::future;
 use futures::prelude::*;
 use getopts::Options;
 use std::env;
@@ -45,8 +47,8 @@
     let lease_expiration_handler = define_lease_expiration_handler_future(server.clone());
 
     println!("dhcpd: starting server");
-    exec.run_singlethreaded(msg_handling_loop.join(lease_expiration_handler))
-        .context("failed to start event loop")?;
+    exec.run_singlethreaded(msg_handling_loop.try_join(lease_expiration_handler))
+        .map_err(|e| e.context("failed to start event loop"))?;
     println!("dhcpd: server shutting down");
     Ok(())
 }
@@ -71,8 +73,9 @@
 
 fn define_msg_handling_loop_future(
     sock: UdpSocket, server: Arc<Mutex<Server>>,
-) -> impl Future<Item = UdpSocket, Error = Error> {
-    future::loop_fn(sock, move |sock| {
+) -> impl Future<Output = Result<UdpSocket, Error>> {
+    stream::repeat(()).map(Ok).try_fold(sock, move |sock, ()| {
+    //future::loop_fn(sock, move |sock| {
         // Cloning the server ARC and then moving the value into the first and_then() call
         // allows the server value to live multiple loop iterations.
         let server = server.clone();
@@ -82,17 +85,17 @@
             .and_then(move |(sock, buf, received, addr)| {
                 println!("dhcpd: received {} bytes", received);
                 match Message::from_buffer(&buf) {
-                    None => Err(failure::err_msg("unable to parse buffer")),
+                    None => future::ready(Err(failure::err_msg("unable to parse buffer"))),
                     Some(msg) => {
                         println!("dhcpd: msg parsed {:?}", msg);
                         // This call should not block because the server is single-threaded.
                         match server.lock().unwrap().dispatch(msg) {
-                            None => Err(failure::err_msg("invalid message")),
+                            None => future::ready(Err(failure::err_msg("invalid message"))),
                             Some(response) => {
                                 println!("dhcpd: msg dispatched to server {:?}", response);
                                 let response_buffer = response.serialize();
                                 println!("dhcpd: response serialized");
-                                Ok((sock, response_buffer, addr))
+                                future::ready(Ok((sock, response_buffer, addr)))
                             }
                         }
                     }
@@ -103,21 +106,19 @@
             .and_then(|sock| {
                 println!("dhcpd: response sent");
                 println!("dhcpd: continuing event loop");
-                Ok(Loop::Continue(sock))
+                future::ready(Ok(sock))
             })
     })
 }
 
 fn define_lease_expiration_handler_future(
     server: Arc<Mutex<Server>>,
-) -> impl Future<Item = (), Error = Error> {
-    let expiration_interval: Interval<Error> = Interval::new(EXPIRATION_INTERVAL_SECS.seconds());
+) -> impl Future<Output = Result<(), Error>> {
+    let expiration_interval = Interval::new(EXPIRATION_INTERVAL_SECS.seconds());
     expiration_interval
-        .for_each(move |()| {
+        .map(move |()| {
             println!("dhcpd: interval timer fired");
             server.lock().unwrap().release_expired_leases();
             println!("dhcpd: expired leases released");
-            Ok(())
-        }).map(|_| ())
-        .map_err(|e| e.context("interval timer failed").into())
+        }).map(|_| Ok(())).try_collect::<()>()
 }