[netstack3] Begin implementing POSIX sockets API

This change starts laying the groudwork for the POSIX sockets API. It
implements the fuchsia.posix.socket.Provider API, returning a zircon
channel which implements the fuchsia.posix.socket.Control interface. So
far, only the Describe method is implemented, allowing a caller to get
the underlying zircon socket.

Change-Id: I3c0bc89ebce89e3b85e915e103b98c4db912cd8c
diff --git a/src/connectivity/network/netstack3/BUILD.gn b/src/connectivity/network/netstack3/BUILD.gn
index 3d9b4a8..8c2cafa 100644
--- a/src/connectivity/network/netstack3/BUILD.gn
+++ b/src/connectivity/network/netstack3/BUILD.gn
@@ -32,6 +32,7 @@
     "//third_party/rust_crates:log",
     "//third_party/rust_crates:rand",
     "//zircon/public/fidl/fuchsia-hardware-ethernet:fuchsia-hardware-ethernet-rustc",
+    "//zircon/public/fidl/fuchsia-io:fuchsia-io-rustc",
     "//zircon/public/fidl/fuchsia-net:fuchsia-net-rustc",
     "//zircon/public/fidl/fuchsia-net-stack:fuchsia-net-stack-rustc",
     "//zircon/public/fidl/fuchsia-posix-socket:fuchsia-posix-socket-rustc",
diff --git a/src/connectivity/network/netstack3/src/eventloop/integration_tests.rs b/src/connectivity/network/netstack3/src/eventloop/integration_tests.rs
index 413c101..69498b4 100644
--- a/src/connectivity/network/netstack3/src/eventloop/integration_tests.rs
+++ b/src/connectivity/network/netstack3/src/eventloop/integration_tests.rs
@@ -4,6 +4,7 @@
 
 use failure::{format_err, Error, ResultExt};
 use fidl::encoding::Decodable;
+use fidl_fuchsia_io as fidl_io;
 use fidl_fuchsia_netemul_network as net;
 use fidl_fuchsia_netemul_sandbox as sandbox;
 use fuchsia_async as fasync;
@@ -90,6 +91,18 @@
         Ok(stack)
     }
 
+    fn connect_socket_provider(&self) -> Result<fidl_fuchsia_posix_socket::ProviderProxy, Error> {
+        let (stack, rs) = fidl::endpoints::create_proxy_and_stream::<
+            fidl_fuchsia_posix_socket::ProviderMarker,
+        >()?;
+        let events =
+            self.event_sender.clone().sink_map_err(|e| panic!("event sender error: {}", e));
+        fasync::spawn_local(
+            rs.map_ok(Event::FidlSocketProviderEvent).map_err(|_| ()).forward(events).map(|_| ()),
+        );
+        Ok(stack)
+    }
+
     async fn wait_for_interface_online(&mut self, if_id: u64) {
         if let Some(status) = self.data.lock().unwrap().device_status_cache.get(&if_id) {
             if status.contains(EthernetStatus::ONLINE) {
@@ -947,6 +960,43 @@
     );
 }
 
+#[fasync::run_singlethreaded(test)]
+async fn test_get_socket() {
+    let mut t = await!(TestSetupBuilder::new().add_endpoint().add_empty_stack().build()).unwrap();
+    let test_stack = t.get(0);
+    let socket_provider = test_stack.connect_socket_provider().unwrap();
+    let socket_response = await!(test_stack.run_future(socket_provider.socket(
+        libc::AF_INET as i16,
+        libc::SOCK_DGRAM as i16,
+        0,
+    )))
+        .expect("Socket call succeeds");
+    assert_eq!(socket_response.0, 0);
+}
+
+#[fasync::run_singlethreaded(test)]
+async fn test_socket_describe() {
+    let mut t = await!(TestSetupBuilder::new().add_endpoint().add_empty_stack().build()).unwrap();
+    let test_stack = t.get(0);
+    let socket_provider = test_stack.connect_socket_provider().unwrap();
+    let socket_response = await!(test_stack.run_future(socket_provider.socket(
+        libc::AF_INET as i16,
+        libc::SOCK_DGRAM as i16,
+        0,
+    )))
+        .expect("Socket call succeeds");
+    assert_eq!(socket_response.0, 0);
+    let info = await!(test_stack.run_future(
+        socket_response.1.expect("Socket returns a channel").into_proxy().unwrap().describe(),
+    ))
+        .expect("Describe call succeeds");
+    match info {
+        fidl_io::NodeInfo::Socket(_) => (),
+        _ => panic!("Socket Describe call did not return Node of type Socket"),
+    }
+}
+
+#[fasync::run_singlethreaded(test)]
 async fn test_main_loop() {
     let (event_sender, evt_rcv) = futures::channel::mpsc::unbounded();
     let mut event_loop = EventLoop::new_with_channels(event_sender.clone(), evt_rcv);
diff --git a/src/connectivity/network/netstack3/src/eventloop/mod.rs b/src/connectivity/network/netstack3/src/eventloop/mod.rs
index 39ae8b9..cfa9e76 100644
--- a/src/connectivity/network/netstack3/src/eventloop/mod.rs
+++ b/src/connectivity/network/netstack3/src/eventloop/mod.rs
@@ -75,6 +75,7 @@
 
 #[cfg(test)]
 mod integration_tests;
+mod socket;
 mod timers;
 mod util;
 
@@ -85,12 +86,14 @@
 use std::convert::TryFrom;
 use std::fs::File;
 use std::marker::PhantomData;
+use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
 use failure::{bail, format_err, Error};
-use fidl::endpoints::{RequestStream, ServiceMarker};
+use fidl::endpoints::{ClientEnd, RequestStream, ServiceMarker};
 use fidl_fuchsia_hardware_ethernet as fidl_ethernet;
 use fidl_fuchsia_hardware_ethernet_ext::{EthernetInfo, EthernetStatus, MacAddress};
+use fidl_fuchsia_io;
 use fidl_fuchsia_net as fidl_net;
 use fidl_fuchsia_net_stack as fidl_net_stack;
 use fidl_fuchsia_net_stack::{
@@ -102,6 +105,7 @@
     StackGetForwardingTableResponder, StackGetInterfaceInfoResponder, StackListInterfacesResponder,
     StackMarker, StackRequest, StackRequestStream,
 };
+use fidl_fuchsia_posix_socket as psocket;
 use fidl_fuchsia_posix_socket::ProviderRequest;
 use futures::channel::mpsc;
 use futures::future::{AbortHandle, Abortable};
@@ -111,7 +115,7 @@
 use integration_tests::TestEvent;
 use log::{debug, error, info, trace};
 use net_types::ethernet::Mac;
-use net_types::ip::{AddrSubnet, AddrSubnetEither, Subnet, SubnetEither};
+use net_types::ip::{AddrSubnet, AddrSubnetEither, IpAddr, IpVersion, Subnet, SubnetEither};
 use packet::{Buf, BufferMut, Serializer};
 use rand::{rngs::OsRng, Rng};
 use std::convert::TryInto;
@@ -229,7 +233,9 @@
     /// A request from the fuchsia.net.stack.Stack FIDL interface.
     FidlStackEvent(StackRequest),
     /// A request from the fuchsia.posix.socket.Provider FIDL interface.
-    FidlSocketProviderEvent(ProviderRequest),
+    FidlSocketProviderEvent(psocket::ProviderRequest),
+    /// A request from the fuchsia.posix.socket.Control FIDL interface.
+    FidlSocketControlEvent((Arc<Mutex<socket::SocketControlWorkerInner>>, psocket::ControlRequest)),
     /// An event from an ethernet interface. Either a status change or a frame.
     EthEvent((BindingId, eth::Event)),
     /// An indication that an ethernet device is ready to be used.
@@ -317,6 +323,9 @@
             Some(Event::FidlSocketProviderEvent(req)) => {
                 await!(self.handle_fidl_socket_provider_request(req));
             }
+            Some(Event::FidlSocketControlEvent((sock, req))) => {
+                sock.lock().unwrap().handle_request(self, req);
+            }
             Some(Event::EthEvent((id, eth::Event::StatusChanged))) => {
                 info!("device {:?} status changed signal", id);
                 // We need to call get_status even if we don't use the output, since calling it
@@ -379,8 +388,49 @@
         Ok(())
     }
 
-    async fn handle_fidl_socket_provider_request(&mut self, req: ProviderRequest) {
-        // TODO(wesleyac)
+    async fn handle_fidl_socket_provider_request(&mut self, req: psocket::ProviderRequest) {
+        match req {
+            psocket::ProviderRequest::Socket { domain, type_, protocol, responder } => {
+                let domain = i32::from(domain);
+                let nonblock = i32::from(type_) & libc::SOCK_NONBLOCK != 0;
+                let type_ = i32::from(type_) & !(libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC);
+                let net_proto = match domain {
+                    libc::AF_INET => IpVersion::V4,
+                    libc::AF_INET6 => IpVersion::V6,
+                    _ => {
+                        responder.send(libc::EAFNOSUPPORT as i16, None);
+                        return;
+                    }
+                };
+                let trans_proto = match i32::from(type_) {
+                    libc::SOCK_DGRAM => socket::TransProto::UDP,
+                    libc::SOCK_STREAM => socket::TransProto::TCP,
+                    _ => {
+                        responder.send(libc::EAFNOSUPPORT as i16, None);
+                        return;
+                    }
+                };
+
+                if let Ok((c0, c1)) = zx::Channel::create() {
+                    let worker = socket::SocketControlWorker::new(
+                        psocket::ControlRequestStream::from_channel(
+                            fasync::Channel::from_channel(c0).unwrap(),
+                        ),
+                        net_proto,
+                        trans_proto,
+                        nonblock,
+                    );
+                    if let Ok(worker) = worker {
+                        worker.spawn(self.ctx.dispatcher().event_send.clone());
+                        responder.send(0, Some(ClientEnd::new(c1)));
+                    } else {
+                        responder.send(libc::ENOBUFS as i16, None);
+                    }
+                } else {
+                    responder.send(libc::ENOBUFS as i16, None);
+                }
+            }
+        }
     }
 
     async fn handle_fidl_stack_request(&mut self, req: StackRequest) {
diff --git a/src/connectivity/network/netstack3/src/eventloop/socket.rs b/src/connectivity/network/netstack3/src/eventloop/socket.rs
new file mode 100644
index 0000000..fd7e52d
--- /dev/null
+++ b/src/connectivity/network/netstack3/src/eventloop/socket.rs
@@ -0,0 +1,130 @@
+use crate::devices::BindingId;
+use crate::eventloop::{Event, EventLoop};
+use failure::Error;
+use fidl_fuchsia_posix_socket as psocket;
+use fuchsia_async as fasync;
+use fuchsia_zircon::{self as zx, prelude::HandleBased};
+use futures::channel::mpsc;
+use futures::{TryFutureExt, TryStreamExt};
+use log::error;
+use net_types::ip::{AddrSubnet, AddrSubnetEither, IpAddr, IpVersion, Subnet, SubnetEither};
+use std::sync::{Arc, Mutex};
+
+pub struct SocketControlWorker {
+    events: psocket::ControlRequestStream,
+    inner: Arc<Mutex<SocketControlWorkerInner>>,
+}
+
+#[derive(Debug)]
+pub struct SocketControlWorkerInner {
+    local_socket: zx::Socket,
+    peer_socket: zx::Socket,
+    info: SocketControlInfo,
+}
+
+#[derive(Debug)]
+pub enum SocketControlInfo {
+    Unbound(UnboundSocket),
+    Bound(SocketWorker),
+}
+
+#[derive(Debug)]
+pub struct UnboundSocket {
+    net_proto: IpVersion,    // TODO(wesleyac): Pull into type?
+    trans_proto: TransProto, // TODO(wesleyac): Pull into type?
+    nonblock: bool,
+}
+
+#[derive(Debug)]
+pub enum TransProto {
+    UDP,
+    TCP,
+}
+
+impl SocketControlWorker {
+    pub fn new(
+        events: psocket::ControlRequestStream,
+        net_proto: IpVersion,
+        trans_proto: TransProto,
+        nonblock: bool,
+    ) -> Result<Self, ()> {
+        let sockopt = match trans_proto {
+            TransProto::UDP => zx::SocketOpts::DATAGRAM,
+            TransProto::TCP => zx::SocketOpts::STREAM,
+        };
+        let (local_socket, peer_socket) = zx::Socket::create(sockopt).map_err(|_| ())?;
+        Ok(Self {
+            events,
+            inner: Arc::new(Mutex::new(SocketControlWorkerInner {
+                local_socket,
+                peer_socket,
+                info: SocketControlInfo::Unbound(UnboundSocket {
+                    net_proto,
+                    trans_proto,
+                    nonblock,
+                }),
+            })),
+        })
+    }
+
+    pub fn spawn(mut self, sender: mpsc::UnboundedSender<Event>) {
+        fasync::spawn_local(
+            async move {
+                while let Some(evt) = await!(self.events.try_next())? {
+                    sender.unbounded_send(Event::FidlSocketControlEvent((
+                        Arc::clone(&self.inner),
+                        evt,
+                    )));
+                }
+                Ok(())
+            }
+                .unwrap_or_else(|e: Error| error!("{:?}", e)),
+        );
+    }
+}
+
+impl SocketControlWorkerInner {
+    pub fn handle_request(&mut self, event_loop: &mut EventLoop, req: psocket::ControlRequest) {
+        match req {
+            psocket::ControlRequest::Clone { .. } => {}
+            psocket::ControlRequest::Close { .. } => {}
+            psocket::ControlRequest::Describe { responder } => {
+                let peer = self.peer_socket.duplicate_handle(zx::Rights::SAME_RIGHTS);
+                if let Ok(peer) = peer {
+                    let mut info =
+                        fidl_fuchsia_io::NodeInfo::Socket(fidl_fuchsia_io::Socket { socket: peer });
+                    responder.send(&mut info);
+                }
+                // If the call to duplicate_handle fails, we have no choice but to drop the
+                // responder and close the channel, since Describe must be infallible.
+            }
+            psocket::ControlRequest::Sync { .. } => {}
+            psocket::ControlRequest::GetAttr { .. } => {}
+            psocket::ControlRequest::SetAttr { .. } => {}
+            psocket::ControlRequest::Ioctl { .. } => {}
+            psocket::ControlRequest::Bind { .. } => {}
+            psocket::ControlRequest::Connect { .. } => {}
+            psocket::ControlRequest::Listen { .. } => {}
+            psocket::ControlRequest::Accept { .. } => {}
+            psocket::ControlRequest::GetSockName { .. } => {}
+            psocket::ControlRequest::GetPeerName { .. } => {}
+            psocket::ControlRequest::SetSockOpt { .. } => {}
+            psocket::ControlRequest::GetSockOpt { .. } => {}
+            psocket::ControlRequest::IoctlPosix { .. } => {}
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct SocketWorker {
+    address: IpAddr,
+    port: u16,
+    nic: BindingId,
+    trans_proto: TransProto,
+}
+
+impl SocketWorker {
+    pub fn spawn(mut self) {
+        unimplemented!()
+    }
+}