[tel][ril] Service providing the Fuchsia RIL for a QMI-based modem

Test: fx run-test ril_qmi_bin_test_rustc

Change-Id: I48ec7edbfdff6f4434148d6e9ceb131af3663592
diff --git a/bin/telephony/ril-qmi/BUILD.gn b/bin/telephony/ril-qmi/BUILD.gn
new file mode 100644
index 0000000..cc46247
--- /dev/null
+++ b/bin/telephony/ril-qmi/BUILD.gn
@@ -0,0 +1,61 @@
+# Copyright 2018 The Fuchsia Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import("//build/rust/rustc_binary.gni")
+import("//build/package.gni")
+
+rustc_binary("bin") {
+  name = "ril_qmi"
+  edition = "2018"
+  with_unit_tests = true
+
+  deps = [
+    "//garnet/lib/telephony/qmi-protocol",
+    "//garnet/public/fidl/fuchsia.telephony.ril:fuchsia.telephony.ril-rustc",
+    "//garnet/public/lib/fidl/rust/fidl",
+    "//garnet/public/rust/fuchsia-app",
+    "//garnet/public/rust/fuchsia-async",
+    "//garnet/public/rust/fuchsia-syslog",
+    "//garnet/public/rust/fuchsia-zircon",
+    "//third_party/rust-crates/rustc_deps:bytes",
+    "//third_party/rust-crates/rustc_deps:failure",
+    "//third_party/rust-crates/rustc_deps:futures-preview",
+    "//third_party/rust-crates/rustc_deps:log",
+    "//third_party/rust-crates/rustc_deps:parking_lot",
+    "//third_party/rust-crates/rustc_deps:pretty_assertions",
+    "//third_party/rust-crates/rustc_deps:slab",
+  ]
+}
+
+package("ril-qmi") {
+  deps = [
+    ":bin",
+  ]
+
+  binary = "rust_crates/ril_qmi"
+
+  meta = [
+    {
+      path = rebase_path("meta/ril-qmi.cmx")
+      dest = "ril-qmi.cmx"
+    },
+  ]
+}
+
+package("ril-qmi-tests") {
+  testonly = true
+
+  package_name = "ril-qmi-tests"
+
+  deps = [
+    ":bin"
+  ]
+
+  tests = [
+    {
+      name = "ril_qmi_bin_test_rustc"
+      dest = "ril-qmi-tests"
+    }
+  ]
+}
diff --git a/bin/telephony/ril-qmi/meta/ril-qmi.cmx b/bin/telephony/ril-qmi/meta/ril-qmi.cmx
new file mode 100644
index 0000000..e11774e
--- /dev/null
+++ b/bin/telephony/ril-qmi/meta/ril-qmi.cmx
@@ -0,0 +1,10 @@
+{
+    "program": {
+        "binary": "bin/app"
+    },
+    "sandbox": {
+        "services": [
+          "fuchsia.logger.LogSink"
+        ]
+    }
+}
diff --git a/bin/telephony/ril-qmi/src/client.rs b/bin/telephony/ril-qmi/src/client.rs
new file mode 100644
index 0000000..2fbf6e2
--- /dev/null
+++ b/bin/telephony/ril-qmi/src/client.rs
@@ -0,0 +1,254 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    bytes::{BufMut, BytesMut},
+    crate::errors::QmuxError,
+    crate::transport::{ClientId, SvcId},
+    crate::transport::{QmiResponse, QmiTransport},
+    fidl::endpoints::ServerEnd,
+    fidl_fuchsia_telephony_ril::NetworkConnectionMarker,
+    fuchsia_syslog::macros::*,
+    parking_lot::RwLock,
+    qmi_protocol::{Decodable, Encodable, QmiResult},
+    std::collections::HashMap,
+    std::fmt::Debug,
+    std::ops::Deref,
+    std::pin::Unpin,
+    std::sync::Arc,
+};
+
+#[derive(Debug)]
+pub struct ClientSvcMap(RwLock<HashMap<SvcId, ClientId>>);
+
+impl Default for ClientSvcMap {
+    fn default() -> Self {
+        let mut m = HashMap::new();
+        // Requests for IDs occur on the CTL service (ID 0),
+        // this default mapping allows the client to request an ID
+        // without a unique client just for this functionality
+        m.insert(SvcId(0), ClientId(0));
+        ClientSvcMap(RwLock::new(m))
+    }
+}
+impl Deref for ClientSvcMap {
+    type Target = RwLock<HashMap<SvcId, ClientId>>;
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+#[derive(Debug)]
+pub struct Connection {
+    pub conn: ServerEnd<NetworkConnectionMarker>,
+    pub pkt_handle: u32,
+}
+
+#[derive(Debug)]
+pub struct QmiClient {
+    inner: Arc<QmiTransport>,
+    clients: ClientSvcMap,
+    pub data_conn: Option<Connection>,
+}
+
+impl Unpin for QmiClient {}
+
+impl QmiClient {
+    pub fn new(inner: Arc<QmiTransport>) -> Self {
+        QmiClient {
+            inner: inner,
+            clients: ClientSvcMap::default(),
+            data_conn: None,
+        }
+    }
+
+    /// Send a QMI message and allocate the client IDs for the service
+    /// if they have not yet been
+    pub async fn send_msg<'a, E: Encodable + 'a, D: Decodable + Debug>(
+        &'a self, msg: E,
+    ) -> Result<QmiResult<D>, QmuxError> {
+        let svc_id = SvcId(msg.svc_id());
+        let mut need_id = false;
+        {
+            let map = self.clients.read();
+            // allocate a client id for this service
+            if map.get(&svc_id).is_none() {
+                need_id = true;
+            }
+        }
+        if need_id {
+            use qmi_protocol::CTL::{GetClientIdReq, GetClientIdResp};
+            fx_log_info!("allocating a client ID for service: {}", svc_id.0);
+            let resp: QmiResult<GetClientIdResp> =
+                await!(self.send_msg_actual(GetClientIdReq::new(svc_id.0)))?;
+            let client_id_resp = resp.unwrap(); // TODO from trait for QmiError to QmuxError
+            let mut map = self.clients.write();
+            assert_eq!(client_id_resp.svc_type, svc_id.0);
+            map.insert(svc_id, ClientId(client_id_resp.client_id));
+        }
+        Ok(await!(self.send_msg_actual(msg))?)
+    }
+
+    fn get_client_id(&self, svc_id: SvcId) -> ClientId {
+        let clients = self.clients.read();
+        *clients
+            .get(&svc_id)
+            .expect("Precondition of calling get_client_id is to have verified an ID is allocated")
+    }
+
+    /// Send a QMI message without checking if a client ID has been allocated for the service
+    async fn send_msg_actual<'a, E: Encodable + 'a, D: Decodable + Debug>(
+        &'a self, msg: E,
+    ) -> Result<QmiResult<D>, QmuxError> {
+        fx_log_info!("Sending a structured QMI message");
+
+        let svc_id = SvcId(msg.svc_id());
+        let client_id = self.get_client_id(svc_id);
+
+        let tx_id = self.inner.register_interest(svc_id, client_id);
+
+        let mut msg_buf = BytesMut::new();
+        let (payload_bytes, payload_len) = msg.to_bytes();
+        // QMI header
+        msg_buf.put_u8(0x01); // magic QMI number
+                              // 2 bytes total length
+        msg_buf.put_u16_le(
+            payload_len
+                           + 3 /* flags */
+                           + 2 /* length byte length */
+                           // additional length is bytes not captured in the payload length
+                           // They cannot be calculated there because multi-payload SDUs may
+                           // exist
+                           + 1 /* sdu control flag */
+                           + msg.transaction_id_len() as u16,
+        );
+
+        // 1 byte control flag
+        msg_buf.put_u8(0x00);
+        // 1 byte svc flag
+        msg_buf.put_u8(svc_id.0);
+        // 1 byte client id
+        msg_buf.put_u8(client_id.0);
+
+        // SDU
+        // 1 byte control flag
+        msg_buf.put_u8(0x00);
+        // 1 or 2 byte transaction ID
+        match msg.transaction_id_len() {
+            1 => msg_buf.put_u8(tx_id.0 as u8 + 1), // we know it's one byte
+            2 => msg_buf.put_u16_le(tx_id.0 + 1),
+            _ => panic!(
+                "Unknown transaction ID length. Please add client support or fix the message \
+                 definitions"
+            ),
+        }
+        // add the payload to the buffer
+        msg_buf.extend(payload_bytes);
+
+        let bytes = msg_buf.freeze();
+
+        if let Some(ref transport) = self.inner.transport_channel {
+            if transport.is_closed() {
+                fx_log_err!("Transport channel to modem is closed");
+            }
+            transport
+                .write(bytes.as_ref(), &mut Vec::new())
+                .map_err(QmuxError::ClientWrite)?
+        }
+
+        let resp = await!(QmiResponse {
+            client_id: client_id,
+            svc_id: svc_id,
+            tx_id: tx_id,
+            transport: Some(self.inner.clone())
+        })?;
+
+        let buf = std::io::Cursor::new(resp.bytes());
+        let decoded = D::from_bytes(buf);
+        Ok(decoded)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use {
+        fuchsia_async::{self as fasync, TimeoutExt},
+        fuchsia_zircon::{self as zx, DurationNum},
+        futures::{FutureExt, TryFutureExt},
+        parking_lot::Mutex,
+        pretty_assertions::assert_eq,
+        qmi_protocol::QmiError,
+        std::io,
+    };
+
+    #[test]
+    #[should_panic]
+    fn no_client() {
+        use qmi_protocol::WDA;
+        let mut executor = fasync::Executor::new().unwrap();
+        let modem = Arc::new(Mutex::new(crate::QmiModem::new()));
+        let modem_lock = modem.lock();
+        let sender = async {
+            let client = await!(modem_lock.create_client()).unwrap();
+            // Panic should occur here. No valid channel to send message on!
+            let _: Result<WDA::SetDataFormatResp, QmiError> = await!(client
+                .send_msg(WDA::SetDataFormatReq::new(None, Some(0x01)))
+                .map_err(|e| io::Error::new(
+                    io::ErrorKind::Other,
+                    &*format!("fidl error: {:?}", e)
+                )))
+            .unwrap();
+        };
+        executor.run_singlethreaded(sender);
+    }
+
+    #[test]
+    fn request_id() {
+        use qmi_protocol::CTL;
+        const EXPECTED: &[u8] = &[1, 15, 0, 0, 0, 0, 0, 1, 34, 0, 4, 0, 1, 1, 0, 66];
+
+        let mut executor = fasync::Executor::new().unwrap();
+
+        let (client_end, server_end) = zx::Channel::create().unwrap();
+        let client_end = fasync::Channel::from_channel(client_end).unwrap();
+        let trans = Arc::new(QmiTransport::new(client_end));
+        let modem = Arc::new(Mutex::new(crate::QmiModem::new_with_transport(trans)));
+
+        let server = fasync::Channel::from_channel(server_end).unwrap();
+        let mut buffer = zx::MessageBuf::new();
+
+        let modem_lock = modem.lock();
+
+        let receiver = async {
+            await!(server.recv_msg(&mut buffer)).expect("failed to recv msg");
+            assert_eq!(EXPECTED, buffer.bytes());
+            let bytes = &[
+                1, 15, 0, 0, 0, 0, 1, 1, 34, 0, 12, 0, 0, 4, 0, 0, 0, 0, 0, 1, 0, 0, 42, 6,
+            ];
+            let _ = server
+                .write(bytes, &mut vec![])
+                .expect("Server channel write failed");
+        };
+
+        let receiver = receiver.on_timeout(1000.millis().after_now(), || {
+            panic!("did not receiver message in time!")
+        });
+
+        let sender = async {
+            let client = await!(modem_lock.create_client()).unwrap();
+            let resp: QmiResult<CTL::GetClientIdResp> =
+                await!(client.send_msg_actual(CTL::GetClientIdReq::new(0x42))).unwrap();
+            let msg = resp.unwrap();
+            assert_eq!(msg.svc_type, 42);
+            assert_eq!(msg.client_id, 6);
+        };
+
+        let sender = sender.on_timeout(1000.millis().after_now(), || {
+            panic!("did not receive response in time!")
+        });
+
+        executor.run_singlethreaded(receiver.join(sender));
+    }
+}
diff --git a/bin/telephony/ril-qmi/src/errors.rs b/bin/telephony/ril-qmi/src/errors.rs
new file mode 100644
index 0000000..f1d0b13
--- /dev/null
+++ b/bin/telephony/ril-qmi/src/errors.rs
@@ -0,0 +1,43 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! Qmux Errors, the transport layer for QMI-based modems
+use {failure::Fail, fuchsia_zircon as zx};
+
+#[derive(Debug, Fail)]
+pub enum QmuxError {
+    /// An endpoint encountered an IO error reading a response from a channel.
+    #[fail(
+        display = "A FIDL client encountered an IO error reading a response from a channel: {}",
+        _0
+    )]
+    ClientRead(#[cause] zx::Status),
+
+    /// A endpoint encountered an IO error writing a request to a channel.
+    #[fail(
+        display = "A FIDL client encountered an IO error writing a request into a channel: {}",
+        _0
+    )]
+    ClientWrite(#[cause] zx::Status),
+
+    /// Invalid buffer.
+    #[fail(display = "Invalid QMI buffer contents")]
+    Invalid,
+
+    /// A future was polled after it had already completed.
+    #[fail(display = "A QMI future was polled after it had already completed.")]
+    PollAfterCompletion,
+
+    /// A Service or Client has not been initilialized, but a transaction for it exists
+    #[fail(display = "A Transaction for an non-existant Service/Client pair")]
+    InvalidSvcOrClient,
+
+    /// No Client.
+    #[fail(display = "Failed to negotatiate creating a client with the modem")]
+    NoClient,
+
+    /// No Transport.
+    #[fail(display = "No channel to communicate with transport layer")]
+    NoTransport,
+}
diff --git a/bin/telephony/ril-qmi/src/main.rs b/bin/telephony/ril-qmi/src/main.rs
new file mode 100644
index 0000000..993f6a1
--- /dev/null
+++ b/bin/telephony/ril-qmi/src/main.rs
@@ -0,0 +1,253 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#![deny(warnings)]
+#![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)]
+
+use {
+    crate::client::QmiClient,
+    crate::errors::QmuxError,
+    crate::transport::QmiTransport,
+    failure::{Error, ResultExt},
+    fidl::endpoints::{ClientEnd, RequestStream, ServerEnd, ServiceMarker},
+    fidl_fuchsia_telephony_ril::*,
+    fuchsia_app::server::ServicesServer,
+    fuchsia_async as fasync,
+    fuchsia_syslog::{self as syslog, macros::*},
+    fuchsia_zircon as zx,
+    futures::lock::Mutex,
+    futures::{TryFutureExt, TryStreamExt},
+    qmi_protocol::QmiResult,
+    qmi_protocol::*,
+    std::sync::Arc,
+};
+
+mod client;
+mod errors;
+mod transport;
+
+type QmiModemPtr = Arc<Mutex<QmiModem>>;
+
+pub struct QmiModem {
+    inner: Option<Arc<QmiTransport>>,
+}
+
+impl QmiModem {
+    pub fn new() -> Self {
+        QmiModem { inner: None }
+    }
+
+    pub fn new_with_transport(transport: Arc<QmiTransport>) -> Self {
+        QmiModem {
+            inner: Some(transport),
+        }
+    }
+
+    pub fn connected(&self) -> bool {
+        // TODO add aditional logic for checking transport_channel open
+        self.inner.is_some()
+    }
+
+    pub fn connect_transport(&mut self, chan: zx::Channel) -> bool {
+        fx_log_info!("Connecting the transport");
+        if self.connected() {
+            fx_log_err!("Attempted to connect more than one transport");
+            return false;
+        }
+        match fasync::Channel::from_channel(chan) {
+            Ok(chan) => {
+                if chan.is_closed() {
+                    fx_log_err!("The transport channel is not open");
+                    return false;
+                }
+                self.inner = Some(Arc::new(QmiTransport::new(chan)));
+                true
+            }
+            Err(_) => {
+                fx_log_err!("Failed to convert a zircon channel to a fasync one");
+                false
+            }
+        }
+    }
+
+    pub async fn create_client(&self) -> Result<QmiClient, Error> {
+        fx_log_info!("Client connecting...");
+        if let Some(ref inner) = self.inner {
+            let transport_inner = inner.clone();
+            let client = QmiClient::new(transport_inner);
+            Ok(client)
+        } else {
+            Err(QmuxError::NoTransport.into())
+        }
+    }
+}
+
+type ClientPtr = Arc<Mutex<Option<QmiClient>>>;
+
+/// needs to be called before all requests that use a client
+async fn setup_client<'a>(modem: QmiModemPtr, client_ptr: ClientPtr) {
+    let mut client_lock = await!(client_ptr.lock());
+    if client_lock.is_none() {
+        let modem_lock = await!(modem.lock());
+        match await!(modem_lock.create_client()) {
+            Ok(alloced_client) => *client_lock = Some(alloced_client),
+            Err(e) => {
+                fx_log_err!("Failed to allocated a client: {}", e);
+            }
+        }
+    };
+}
+
+struct FrilService;
+impl FrilService {
+    pub fn spawn(modem: QmiModemPtr, chan: fasync::Channel) {
+        let client = Arc::new(Mutex::new(None));
+        let server = RadioInterfaceLayerRequestStream::from_channel(chan)
+            .try_for_each(move |req| Self::handle_request(modem.clone(), client.clone(), req))
+            .unwrap_or_else(|e| fx_log_err!("Error running {:?}", e));
+        fasync::spawn(server);
+    }
+
+    async fn handle_request(
+        modem: QmiModemPtr, client_ptr: ClientPtr, request: RadioInterfaceLayerRequest,
+    ) -> Result<(), fidl::Error> {
+        // TODO(bwb) after component model v2, switch to on channel setup and
+        // deprecated ConnectTransport method
+        match request {
+            RadioInterfaceLayerRequest::ConnectTransport { .. } => (), // does not need a client setup
+            _ => await!(setup_client(modem.clone(), client_ptr.clone())),
+        }
+
+        match request {
+            RadioInterfaceLayerRequest::ConnectTransport { channel, responder } => {
+                let mut lock = await!(modem.lock());
+                let status = lock.connect_transport(channel);
+                fx_log_info!("Connecting the service to the transport driver: {}", status);
+                if let Ok(client) = await!(lock.create_client()) {
+                    let resp802: QmiResult<WDA::SetDataFormatResp> =
+                        await!(client.send_msg(WDA::SetDataFormatReq::new(None, Some(0x01))))
+                            .unwrap();
+                    fx_log_info!("802 Resp: {:?}", resp802);
+                }
+                return responder.send(status);
+            }
+            RadioInterfaceLayerRequest::GetNetworkSettings { responder } => {
+                match *await!(client_ptr.lock()) {
+                    Some(ref mut client) => {
+                        // TODO find out how to structure this u32 in a readable way
+                        let resp: QmiResult<WDS::GetCurrentSettingsResp> =
+                            await!(client.send_msg(WDS::GetCurrentSettingsReq::new(58160)))
+                                .unwrap();
+                        match resp {
+                            Ok(packet) => responder.send(
+                                &mut GetNetworkSettingsReturn::Settings(NetworkSettings {
+                                    ip_v4_addr: packet.ipv4_addr.unwrap(),
+                                    ip_v4_dns: packet.ipv4_dns.unwrap(),
+                                    ip_v4_subnet: packet.ipv4_subnet.unwrap(),
+                                    ip_v4_gateway: packet.ipv4_gateway.unwrap(),
+                                    mtu: packet.mtu.unwrap(),
+                                }),
+                            )?,
+                            Err(e) => {
+                                fx_log_err!("Error network: {:?}", e);
+                                // TODO different error
+                                responder
+                                    .send(&mut GetNetworkSettingsReturn::Error(RilError::NoRadio))?
+                            }
+                        }
+                    }
+                    None => {
+                        responder.send(&mut GetNetworkSettingsReturn::Error(RilError::NoRadio))?
+                    }
+                }
+            }
+            RadioInterfaceLayerRequest::StartNetwork { apn, responder } => {
+                match *await!(client_ptr.lock()) {
+                    Some(ref mut client) => {
+                        let resp: QmiResult<WDS::StartNetworkInterfaceResp> =
+                            await!(client
+                                .send_msg(WDS::StartNetworkInterfaceReq::new(Some(apn), Some(4))))
+                            .unwrap();
+                        match resp {
+                            Ok(packet) => {
+                                let (server_chan, client_chan) = zx::Channel::create().unwrap();
+                                let server_end =
+                                    ServerEnd::<NetworkConnectionMarker>::new(server_chan.into());
+                                client.data_conn = Some(client::Connection {
+                                    pkt_handle: packet.packet_data_handle,
+                                    conn: server_end,
+                                });
+                                let client_end =
+                                    ClientEnd::<NetworkConnectionMarker>::new(client_chan.into());
+                                responder.send(&mut StartNetworkReturn::Conn(client_end))?
+                            }
+                            Err(e) => {
+                                fx_log_info!("error network: {:?}", e);
+                                // TODO different error
+                                responder.send(&mut StartNetworkReturn::Error(RilError::NoRadio))?
+                            }
+                        }
+                    }
+                    None => responder.send(&mut StartNetworkReturn::Error(RilError::NoRadio))?,
+                }
+            }
+            RadioInterfaceLayerRequest::GetDeviceIdentity { responder } => {
+                match *await!(client_ptr.lock()) {
+                    Some(ref mut client) => {
+                        let resp: QmiResult<
+                            DMS::GetDeviceSerialNumbersResp,
+                        > = await!(client.send_msg(DMS::GetDeviceSerialNumbersReq::new())).unwrap();
+                        responder.send(&mut GetDeviceIdentityReturn::Imei(resp.unwrap().imei))?
+                    }
+                    None => {
+                        responder.send(&mut GetDeviceIdentityReturn::Error(RilError::NoRadio))?
+                    }
+                }
+            }
+            RadioInterfaceLayerRequest::RadioPowerStatus { responder } => {
+                match *await!(client_ptr.lock()) {
+                    Some(ref mut client) => {
+                        let resp: DMS::GetOperatingModeResp =
+                            await!(client.send_msg(DMS::GetOperatingModeReq::new()))
+                                .unwrap()
+                                .unwrap();
+                        if resp.operating_mode == 0x00 {
+                            responder
+                                .send(&mut RadioPowerStatusReturn::Result(RadioPowerState::On))?
+                        } else {
+                            responder
+                                .send(&mut RadioPowerStatusReturn::Result(RadioPowerState::Off))?
+                        }
+                    }
+                    None => {
+                        responder.send(&mut RadioPowerStatusReturn::Error(RilError::NoRadio))?
+                    }
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
+fn main() -> Result<(), Error> {
+    syslog::init_with_tags(&["ril-qmi"]).expect("Can't init logger");
+    fx_log_info!("Starting ril-qmi...");
+
+    let mut executor = fasync::Executor::new().context("Error creating executor")?;
+
+    let modem = Arc::new(Mutex::new(QmiModem::new()));
+
+    let server = ServicesServer::new()
+        .add_service((
+            RadioInterfaceLayerMarker::NAME,
+            move |chan: fasync::Channel| {
+                fx_log_info!("New client connecting to the Fuchsia RIL");
+                FrilService::spawn(modem.clone(), chan)
+            },
+        ))
+        .start()
+        .context("Error starting QMI modem service")?;
+
+    executor.run_singlethreaded(server)
+}
diff --git a/bin/telephony/ril-qmi/src/transport.rs b/bin/telephony/ril-qmi/src/transport.rs
new file mode 100644
index 0000000..10ac3b7
--- /dev/null
+++ b/bin/telephony/ril-qmi/src/transport.rs
@@ -0,0 +1,283 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use bytes::Buf;
+use crate::errors::QmuxError;
+use fuchsia_async as fasync;
+use fuchsia_zircon as zx;
+use futures::{
+    task::{LocalWaker, Waker},
+    Future, Poll,
+};
+use parking_lot::Mutex;
+use slab::Slab;
+use std::collections::HashMap;
+use std::io::Cursor;
+use std::pin::{Pin, Unpin};
+use std::sync::Arc;
+
+/// A client ID indicating the endpoint
+#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Hash)]
+pub struct ClientId(pub u8);
+
+/// A service id for the QMI service
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
+pub struct SvcId(pub u8);
+
+/// A message interest id.
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub struct TxId(pub u16);
+
+impl TxId {
+    fn as_raw_id(&self) -> usize {
+        self.0 as usize
+    }
+}
+
+/// A future which polls for the response to a client message.
+#[must_use]
+#[derive(Debug)]
+pub struct QmiResponse {
+    pub client_id: ClientId,
+    pub svc_id: SvcId,
+    pub tx_id: TxId,
+    // `None` if the message response has been recieved
+    pub transport: Option<Arc<QmiTransport>>,
+}
+
+impl Unpin for QmiResponse {}
+
+impl Future for QmiResponse {
+    type Output = Result<zx::MessageBuf, QmuxError>;
+    fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
+        let this = &mut *self;
+        let transport = this
+            .transport
+            .as_ref()
+            .ok_or(QmuxError::PollAfterCompletion)?;
+        transport.poll_recv_msg_response(this.client_id, this.svc_id, this.tx_id, waker)
+    }
+}
+
+impl Drop for QmiResponse {
+    fn drop(&mut self) {
+        if let Some(transport) = &self.transport {
+            transport.deregister_interest(self.svc_id, self.client_id, self.tx_id);
+            transport.wake_any();
+        }
+    }
+}
+
+/// An enum reprenting either a resolved message interest or a task on which to alert
+/// that a response message has arrived.
+#[derive(Debug)]
+enum MessageInterest {
+    /// A new `MessageInterest`
+    WillPoll,
+    /// A task is waiting to receive a response, and can be awoken with `Waker`.
+    Waiting(Waker),
+    /// A message has been received, and a task will poll to receive it.
+    Received(zx::MessageBuf),
+    /// A message has not been received, but the person interested in the response
+    /// no longer cares about it, so the message should be discared upon arrival.
+    Discard,
+}
+
+impl MessageInterest {
+    /// Check if a message has been received.
+    fn is_received(&self) -> bool {
+        if let MessageInterest::Received(_) = *self {
+            return true;
+        }
+        false
+    }
+
+    fn unwrap_received(self) -> zx::MessageBuf {
+        if let MessageInterest::Received(buf) = self {
+            return buf;
+        }
+        panic!("EXPECTED received message")
+    }
+}
+
+#[derive(Debug)]
+pub struct QmuxHeader {
+    pub length: u16,
+    pub ctrl_flags: u8,
+    pub svc_type: u8,
+    pub client_id: u8,
+    // general service header
+    pub svc_ctrl_flags: u8,
+    pub transaction_id: u16, // TODO this needs to be u16 for anything not a CTL
+}
+
+pub fn parse_qmux_header<T: Buf>(mut buf: T) -> Result<(QmuxHeader, T), QmuxError> {
+    // QMUX headers start with 0x01
+    if 0x01 != buf.get_u8() {
+        return Err(QmuxError::Invalid);
+    }
+    let length = buf.get_u16_le();
+    let ctrl_flags = buf.get_u8();
+    let svc_type = buf.get_u8();
+    let client_id = buf.get_u8();
+    let svc_ctrl_flags;
+    let transaction_id;
+    // TODO(bwb): Consider passing these paramaters in from the Decodable trait'd object,
+    // more generic than a hardcode for CTL interfaces
+    if svc_type == 0x00 {
+        svc_ctrl_flags = buf.get_u8();
+        // ctl service is one byte
+        transaction_id = buf.get_u8() as u16;
+    } else {
+        svc_ctrl_flags = buf.get_u8() >> 1;
+        transaction_id = buf.get_u16_le();
+        // The bits for the ctrl flags are shifted by one in non CTL messages
+    }
+    Ok((
+        QmuxHeader {
+            length,
+            ctrl_flags,
+            svc_type,
+            client_id,
+            svc_ctrl_flags,
+            transaction_id,
+        },
+        buf,
+    ))
+}
+
+/// Shared transport channel
+#[derive(Debug)]
+pub struct QmiTransport {
+    pub transport_channel: Option<fasync::Channel>,
+    message_interests: Mutex<HashMap<(SvcId, ClientId), Slab<MessageInterest>>>,
+}
+
+impl QmiTransport {
+    pub fn new(chan: fasync::Channel) -> Self {
+        QmiTransport {
+            transport_channel: Some(chan),
+            message_interests: Mutex::new(HashMap::new()),
+        }
+    }
+
+    pub fn register_interest(&self, svc_id: SvcId, client_id: ClientId) -> TxId {
+        let mut lock = self.message_interests.lock();
+        let interests = lock
+            .entry((svc_id, client_id))
+            .or_insert(Slab::<MessageInterest>::new());
+        let id = interests.insert(MessageInterest::WillPoll);
+        TxId(id as u16)
+    }
+
+    pub fn deregister_interest(&self, svc_id: SvcId, client_id: ClientId, tx_id: TxId) {
+        let mut lock = self.message_interests.lock();
+        let id = tx_id.as_raw_id();
+        if let Some(ref mut interests) = lock.get_mut(&(svc_id, client_id)) {
+            if interests.contains(id) {
+                if interests[id].is_received() {
+                    interests.remove(id as usize);
+                } else {
+                    interests[id] = MessageInterest::Discard;
+                }
+            }
+        }
+    }
+
+    // Wakes up an arbitrary task that has begun polling on the channel so that
+    // it will call recv_all and be registered as the new channel reader.
+    fn wake_any(&self) {
+        let lock = self.message_interests.lock();
+        // any service/client will do
+        for (_, message_interest_map) in lock.iter() {
+            // any TxId will do
+            for (_, message_interest) in message_interest_map.iter() {
+                if let MessageInterest::Waiting(waker) = message_interest {
+                    waker.wake();
+                    return;
+                }
+            }
+        }
+        // TODO use client code from fidl for event/indication inspiration
+    }
+
+    /// Poll for the receipt of any response message or an event.
+    ///
+    /// Returns whether or not the channel is closed.
+    fn recv_all(&self, waker: &LocalWaker) -> Result<bool, QmuxError> {
+        if let Some(ref transport_channel) = self.transport_channel {
+            if transport_channel.is_closed() {
+                return Ok(true);
+            }
+            loop {
+                let mut buf = zx::MessageBuf::new();
+                match transport_channel.recv_from(&mut buf, waker) {
+                    Poll::Ready(Ok(())) => {}
+                    Poll::Ready(Err(zx::Status::PEER_CLOSED)) => return Ok(true),
+                    Poll::Ready(Err(e)) => return Err(QmuxError::ClientRead(e)),
+                    Poll::Pending => return Ok(false),
+                }
+                let buf = Cursor::new(buf.bytes());
+                let (header, buf) = parse_qmux_header(buf)?;
+
+                // TODO add indication support here, only handles responses for now
+                // This is a response for ONLY the CTL interface, will need indication support
+                // just throw them away for now
+                if header.svc_ctrl_flags != 0x01 {
+                    continue;
+                }
+
+                let mut mi = self.message_interests.lock();
+                if let Some(ref mut interest_slab) =
+                    mi.get_mut(&(SvcId(header.svc_type), ClientId(header.client_id)))
+                {
+                    let tx_id = TxId(header.transaction_id.into());
+                    let raw_tx_id = tx_id.as_raw_id() - 1;
+                    if let Some(&MessageInterest::Discard) = (*interest_slab).get(raw_tx_id) {
+                        interest_slab.remove(raw_tx_id);
+                    } else if let Some(entry) = interest_slab.get_mut(raw_tx_id) {
+                        let dst: Vec<u8> = buf.collect::<Vec<u8>>();
+                        let new_buf = zx::MessageBuf::new_with(dst, Vec::new());
+                        let old_entry =
+                            std::mem::replace(entry, MessageInterest::Received(new_buf));
+                        if let MessageInterest::Waiting(waker) = old_entry {
+                            waker.wake();
+                        }
+                    }
+                }
+            }
+        } else {
+            return Ok(false);
+        }
+    }
+
+    pub fn poll_recv_msg_response(
+        &self, client_id: ClientId, svc_id: SvcId, txid: TxId, waker: &LocalWaker,
+    ) -> Poll<Result<zx::MessageBuf, QmuxError>> {
+        let is_closed = self.recv_all(waker)?;
+        let mut mi = self.message_interests.lock();
+        let message_interests: &mut Slab<MessageInterest> = mi
+            .get_mut(&(svc_id, client_id))
+            .ok_or(QmuxError::InvalidSvcOrClient)?;
+        if message_interests
+            .get(txid.as_raw_id())
+            .expect("Polled unregistered interest")
+            .is_received()
+        {
+            let buf = message_interests.remove(txid.as_raw_id()).unwrap_received();
+            Poll::Ready(Ok(buf))
+        } else {
+            // Set the current waker to be notified when a response arrives.
+            *message_interests
+                .get_mut(txid.as_raw_id())
+                .expect("Polled unregistered interest") =
+                MessageInterest::Waiting(waker.clone().into_waker());
+            if is_closed {
+                Poll::Ready(Err(QmuxError::ClientRead(zx::Status::PEER_CLOSED)))
+            } else {
+                Poll::Pending
+            }
+        }
+    }
+}
diff --git a/packages/prod/telephony b/packages/prod/telephony
index 1aa7682..8d215f6 100644
--- a/packages/prod/telephony
+++ b/packages/prod/telephony
@@ -1,5 +1,7 @@
 {
   "packages": [
+    "//garnet/bin/telephony/ril-qmi",
+    "//garnet/bin/telephony/ril-qmi:ril-qmi-tests",
     "//garnet/drivers/telephony/qmi-usb-transport"
   ]
 }