| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use anyhow::format_err; |
| use bt_rfcomm::frame::mux_commands::*; |
| use bt_rfcomm::frame::{CommandResponse, Frame, FrameData, FrameParseError, UIHData, UserData}; |
| use bt_rfcomm::{Role, ServerChannel, DLCI}; |
| use fidl_fuchsia_bluetooth::ErrorCode; |
| use fuchsia_async as fasync; |
| use fuchsia_bluetooth::types::{Channel, PeerId}; |
| use fuchsia_inspect as inspect; |
| use fuchsia_inspect_derive::{AttachError, Inspect}; |
| use futures::channel::{mpsc, oneshot}; |
| use futures::future::{BoxFuture, Shared}; |
| use futures::lock::Mutex; |
| use futures::{select, FutureExt, SinkExt, StreamExt}; |
| use packet_encoding::Encodable; |
| use std::collections::{hash_map::Entry, HashMap}; |
| use std::sync::Arc; |
| use tracing::{error, info, trace, warn}; |
| |
| /// RFCOMM channels used to communicate with profile clients. |
| pub mod channel; |
| |
| /// The multiplexer that manages RFCOMM channels for this session. |
| pub mod multiplexer; |
| |
| use self::{ |
| channel::{Credits, FlowControlMode, FlowControlledData}, |
| multiplexer::{SessionMultiplexer, SessionParameters}, |
| }; |
| use crate::rfcomm::inspect::SessionInspect; |
| use crate::rfcomm::types::{Error, SignaledTask}; |
| |
| /// A function used to relay an opened inbound RFCOMM channel to a local client. |
| type ChannelOpenedFn = Box< |
| dyn Fn(ServerChannel, Channel) -> BoxFuture<'static, Result<(), anyhow::Error>> + Send + Sync, |
| >; |
| |
| /// Represents the callback for a pending open channel request initiated by a local client. |
| type ChannelRequestFn = |
| Box<dyn FnOnce(Result<Channel, ErrorCode>) -> Result<(), anyhow::Error> + Send + Sync>; |
| |
| /// Maintains the set of outstanding frames that have been sent to the remote peer. |
| /// Provides an API for inserting and removing sent Frames that expect a response. |
| struct OutstandingFrames { |
| /// Outstanding command frames that have been sent to the remote peer and are awaiting |
| /// responses. These are non-UIH frames. Per GSM 7.10 5.4.4.1, there shall only be one |
| /// such outstanding command frame per DLCI. |
| commands: HashMap<DLCI, Frame>, |
| |
| /// Outstanding mux command frames that have been sent to the remote peer and are awaiting |
| /// responses. Per RFCOMM 5.5, there can be multiple outstanding mux command frames |
| /// awaiting responses. However, there can only be one of each type per DLCI. Some |
| /// MuxCommands are associated with a DLCI - we uniquely identify such a command by it's |
| /// optional DLCI and command type. See the `mux_commands` mod for more details. |
| mux_commands: HashMap<MuxCommandIdentifier, MuxCommand>, |
| } |
| |
| impl OutstandingFrames { |
| fn new() -> Self { |
| Self { commands: HashMap::new(), mux_commands: HashMap::new() } |
| } |
| |
| /// Potentially registers a new `frame` with the `OutstandingFrames` manager. Returns |
| /// true if the frame requires a response and is registered, false if no response |
| /// is needed, or an error if the frame was unable to be processed. |
| fn register_frame(&mut self, frame: &Frame) -> Result<bool, Error> { |
| // We don't care about Response frames as we don't expect a response for them. |
| if frame.command_response == CommandResponse::Response { |
| return Ok(false); |
| } |
| |
| // MuxCommands are a special case. Namely, there cannot be multiple outstanding |
| // MuxCommands of the same type on the same DLCI. Our implementation will not |
| // attempt to send duplicate MuxCommands. |
| if let FrameData::UnnumberedInfoHeaderCheck(UIHData::Mux(data)) = &frame.data { |
| return match self.mux_commands.entry(data.identifier()) { |
| Entry::Occupied(_) => Err(Error::Other(format_err!("MuxCommand outstanding"))), |
| Entry::Vacant(entry) => { |
| let _ = entry.insert(data.clone()); |
| Ok(true) |
| } |
| }; |
| } |
| |
| // There can be multiple outstanding user data frames as no response is required. |
| if let FrameData::UnnumberedInfoHeaderCheck(UIHData::User(_)) = &frame.data { |
| return Ok(false); |
| } |
| |
| // Otherwise, it's a non-UIH frame. We only care about frames that require a |
| // response (i.e Command frames with the P bit set). |
| // See GSM 5.4.4.1 and 5.4.4.2 for the exact interpretation of the poll_final bit. |
| if frame.poll_final { |
| if self.commands.contains_key(&frame.dlci) { |
| // There can only be one outstanding command frame with P/F = 1 per |
| // DLCI. |
| // TODO(https://fxbug.dev/42139132): Our implementation should never try to send |
| // more than one command frame on the same DLCI. However, it may make |
| // sense to make this more intelligent and queue for later. |
| return Err(Error::Other(format_err!("Command Frame outstanding"))); |
| } |
| let _ = self.commands.insert(frame.dlci, frame.clone()); |
| return Ok(true); |
| } |
| Ok(false) |
| } |
| |
| /// Attempts to find and remove the outstanding command frame associated with |
| /// the provided `dlci`. Returns None if no such frame exists. |
| fn remove_frame(&mut self, dlci: &DLCI) -> Option<Frame> { |
| self.commands.remove(dlci) |
| } |
| |
| /// Attempts to find and remove the outstanding MuxCommand associated with the |
| /// provided `key`. Returns None if no such MuxCommand exists. |
| fn remove_mux_command(&mut self, key: &MuxCommandIdentifier) -> Option<MuxCommand> { |
| self.mux_commands.remove(key) |
| } |
| } |
| |
| /// An RFCOMM Session that multiplexes multiple channels over a single channel. This object |
| /// handles the business logic for an RFCOMM Session. Namely, it parses and handles RFCOMM |
| /// frames, modifies the state and role of the Session, and multiplexes any opened |
| /// RFCOMM channels. |
| /// |
| /// `SessionInner::process_incoming_frames` is the data path for any incoming packets |
| /// received from the remote peer connected to this session. An owner of the `SessionInner` |
| /// should use `SessionInner::process_incoming_frames` to start processing the aforementioned |
| /// data. Any frames to be sent to the peer will be relayed using the `outgoing_frame_sender` |
| /// provided in `SessionInner::create`. |
| pub struct SessionInner { |
| /// The session multiplexer that manages the current state of the session and any opened |
| /// RFCOMM channels. |
| multiplexer: SessionMultiplexer, |
| /// Outstanding frames that have been sent to the remote peer and are awaiting responses. |
| outstanding_frames: OutstandingFrames, |
| /// Open channel requests that are waiting for either multiplexer startup, parameter |
| /// negotiation, or channel establishment to complete. |
| pending_channels: HashMap<ServerChannel, ChannelRequestFn>, |
| /// Sender used to relay outgoing frames to be sent to the remote peer. |
| outgoing_frame_sender: mpsc::Sender<Frame>, |
| /// The channel opened callback that is called anytime a new RFCOMM channel is opened. The |
| /// `SessionInner` will relay the client end of the channel to this closure. |
| channel_opened_fn: ChannelOpenedFn, |
| /// The inspect node for this object. |
| inspect: SessionInspect, |
| } |
| |
| impl Inspect for &mut SessionInner { |
| fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> { |
| self.inspect.iattach(parent, name)?; |
| self.multiplexer.iattach(self.inspect.node(), "multiplexer") |
| } |
| } |
| |
| impl SessionInner { |
| /// Creates and returns an RFCOMM SessionInner that represents a Session between this device |
| /// and a remote peer. |
| /// `outgoing_frame_sender` is used to relay RFCOMM frames to be sent to the remote peer. |
| /// `channel_opened_fn` is used by the `SessionInner` to relay peer-opened RFCOMM channels to |
| /// local clients. |
| fn create( |
| id: PeerId, |
| outgoing_frame_sender: mpsc::Sender<Frame>, |
| channel_opened_fn: ChannelOpenedFn, |
| ) -> Self { |
| Self { |
| multiplexer: SessionMultiplexer::create(), |
| outstanding_frames: OutstandingFrames::new(), |
| pending_channels: HashMap::new(), |
| outgoing_frame_sender, |
| channel_opened_fn, |
| inspect: SessionInspect::new(id), |
| } |
| } |
| |
| fn multiplexer(&mut self) -> &mut SessionMultiplexer { |
| &mut self.multiplexer |
| } |
| |
| fn role(&self) -> Role { |
| self.multiplexer.role() |
| } |
| |
| /// Returns true if credit-based flow control is enabled for this session. |
| fn credit_based_flow(&self) -> bool { |
| self.multiplexer.credit_based_flow() |
| } |
| |
| #[cfg(test)] |
| fn session_parameters(&self) -> SessionParameters { |
| self.multiplexer.parameters() |
| } |
| |
| #[cfg(test)] |
| fn session_parameters_negotiated(&self) -> bool { |
| self.multiplexer.parameters_negotiated() |
| } |
| |
| /// Establishes the SessionChannel for the provided `dlci`. |
| /// Returns true if establishment is successful. |
| async fn establish_session_channel(&mut self, dlci: DLCI) -> bool { |
| let server_channel = dlci.try_into().unwrap(); |
| let user_data_sender = self.outgoing_frame_sender.clone(); |
| match self.multiplexer().establish_session_channel(dlci, user_data_sender) { |
| Ok(channel) => { |
| let result = if dlci.initiator(self.role()).expect("should be valid") { |
| self.relay_outbound_channel_result_to_client(server_channel, Ok(channel)) |
| } else { |
| self.relay_inbound_channel_to_client(server_channel, channel).await |
| }; |
| if let Err(e) = result { |
| warn!("Couldn't relay channel to client: {e:?}"); |
| // Close the local end of the RFCOMM channel. |
| let _ = self.multiplexer().close_session_channel(&dlci); |
| return false; |
| } |
| trace!(%server_channel, %dlci, "Established RFCOMM channel"); |
| true |
| } |
| Err(e) => { |
| warn!(%server_channel, "Couldn't establish DLCI {dlci:?}: {e:?}"); |
| false |
| } |
| } |
| } |
| |
| /// Processes the pending open channel request for the provided `server_channel`. |
| async fn process_channel_pending_parameter_negotiation( |
| &mut self, |
| server_channel: ServerChannel, |
| ) -> Result<(), Error> { |
| if !self.multiplexer().started() { |
| return Err(Error::MultiplexerNotStarted); |
| } |
| |
| if let Some(channel_open_fn) = self.pending_channels.remove(&server_channel) { |
| if let Err(e) = self.open_remote_channel(server_channel, channel_open_fn).await { |
| warn!(%server_channel, "Error opening remote channel: {e:?}"); |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Processes all of the pending open channel requests that are waiting for multiplexer startup |
| /// to complete. |
| async fn process_channels_pending_startup(&mut self) -> Result<(), Error> { |
| if !self.multiplexer().started() { |
| return Err(Error::MultiplexerNotStarted); |
| } |
| |
| let outstanding_channels = std::mem::take(&mut self.pending_channels); |
| for (server_channel, channel_open_fn) in outstanding_channels { |
| trace!(%server_channel, "Processing RFCOMM open channel request."); |
| if let Err(e) = self.open_remote_channel(server_channel, channel_open_fn).await { |
| warn!(%server_channel, "Error opening remote channel: {e:?}"); |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Cancels all pending channel requests that are waiting for multiplexer startup, |
| /// parameter negotiation, or establishment to complete. |
| fn cancel_pending_channels(&mut self) { |
| let outstanding_requests = std::mem::take(&mut self.pending_channels); |
| for (_, callback) in outstanding_requests { |
| // Result of the callback irrelevant as it means there is an issue with the client. |
| let _ = callback(Err(ErrorCode::Canceled)); |
| } |
| } |
| |
| /// Finishes parameter negotiation for the Session with the provided `params` and |
| /// reserves the specified DLCI. |
| fn finish_parameter_negotiation(&mut self, params: &ParameterNegotiationParams) { |
| // Update the session-specific parameters - currently only credit-based flow control |
| // and max frame size are negotiated. |
| let requested_parameters = SessionParameters { |
| credit_based_flow: params.credit_based_flow(), |
| max_frame_size: usize::from(params.max_frame_size), |
| }; |
| let updated_parameters = self.multiplexer().negotiate_parameters(requested_parameters); |
| |
| // Reserve the DLCI if it doesn't exist. |
| let _ = self.multiplexer().find_or_create_session_channel(params.dlci); |
| |
| // Set the flow control method depending on the negotiated parameters. |
| let flow_control = if updated_parameters.credit_based_flow() { |
| // The credits provided in the peer's response `params` is our (local) credit count. |
| // `DEFAULT_INITIAL_CREDITS` is always assigned as the peer's (remote) credit count. |
| let credits = Credits::new( |
| usize::from(params.initial_credits), |
| usize::from(DEFAULT_INITIAL_CREDITS), |
| ); |
| FlowControlMode::CreditBased(credits) |
| } else { |
| FlowControlMode::None |
| }; |
| // The result is irrelevant because the DLCI was just created and can't be established |
| // already. Setting the initial credits should always succeed. |
| if let Err(e) = self.multiplexer().set_flow_control(params.dlci, flow_control) { |
| warn!("Setting flow control failed: {e:?}"); |
| } |
| } |
| |
| /// Relays the outbound `channel_result` for the provided `server_channel` to the local client |
| /// who requested it. Returns an error if delivery fails or if there is no such client. |
| fn relay_outbound_channel_result_to_client( |
| &mut self, |
| server_channel: ServerChannel, |
| channel_result: Result<Channel, ErrorCode>, |
| ) -> Result<(), Error> { |
| if let Some(callback) = self.pending_channels.remove(&server_channel) { |
| return callback(channel_result).map_err(|e| Error::Other(format_err!("{e:?}").into())); |
| } |
| Err(Error::Other(format_err!("No outstanding client for: {server_channel:?}").into())) |
| } |
| |
| /// Relays the inbound `channel` opened for the provided `server_channel` to the local clients |
| /// of the session. Returns the status of the delivery. |
| async fn relay_inbound_channel_to_client( |
| &self, |
| server_channel: ServerChannel, |
| channel: Channel, |
| ) -> Result<(), Error> { |
| (self.channel_opened_fn)(server_channel, channel) |
| .await |
| .map_err(|e| format_err!("{e:?}").into()) |
| } |
| |
| /// Attempts to initiate multiplexer startup by sending an SABM command over the |
| /// Mux Control DLCI. |
| async fn start_multiplexer(&mut self) -> Result<(), Error> { |
| if self.multiplexer().started() || self.role() == Role::Negotiating { |
| warn!(role = ?self.role(), "Multiplexer already started"); |
| return Err(Error::MultiplexerAlreadyStarted); |
| } |
| self.multiplexer().set_role(Role::Negotiating); |
| |
| // Send the SABM command to initiate mux startup with the remote peer. |
| self.send_sabm_command(DLCI::MUX_CONTROL_DLCI).await; |
| Ok(()) |
| } |
| |
| /// Attempts to initiate the parameter negotiation (PN) procedure as defined in RFCOMM 5.5.3 |
| /// for the given `dlci`. |
| async fn start_parameter_negotiation(&mut self, dlci: DLCI) -> Result<(), Error> { |
| if !self.multiplexer().started() { |
| warn!(role = ?self.role(), "ParameterNegotiation request before multiplexer startup"); |
| return Err(Error::MultiplexerNotStarted); |
| } |
| |
| let pn_params = ParameterNegotiationParams::default_command(dlci); |
| let pn_command = MuxCommand { |
| params: MuxCommandParams::ParameterNegotiation(pn_params), |
| command_response: CommandResponse::Command, |
| }; |
| let pn_frame = Frame::make_mux_command(self.role(), pn_command); |
| self.send_frame(pn_frame).await; |
| Ok(()) |
| } |
| |
| /// Cancels parameter negotiation for the provided `dlci`. The `dlci` should be a valid |
| /// user DLCI. |
| fn cancel_parameter_negotiation(&mut self, dlci: DLCI) { |
| if !dlci.is_user() { |
| return; |
| } |
| |
| // Notify the client of the canceled request. |
| let _ = self.relay_outbound_channel_result_to_client( |
| dlci.try_into().unwrap(), |
| Err(ErrorCode::Canceled), |
| ); |
| } |
| |
| /// Attempts to open an RFCOMM channel for the provided `server_channel`. The result |
| /// of the operation is relayed using the `channel_request_fn`. |
| async fn open_remote_channel( |
| &mut self, |
| server_channel: ServerChannel, |
| channel_request_fn: ChannelRequestFn, |
| ) -> Result<(), Error> { |
| // There can only be one outstanding request per `server_channel`. |
| if self.pending_channels.contains_key(&server_channel) { |
| let _ = channel_request_fn(Err(ErrorCode::Failed)); |
| return Err(Error::Other(format_err!("Request in progress").into())); |
| } |
| |
| // If the multiplexer has not started yet, save the open channel request and |
| // attempt to start the multiplexer. |
| if !self.multiplexer().started() { |
| let _ = self.pending_channels.insert(server_channel, channel_request_fn); |
| |
| // Only attempt to start the multiplexer if we're not already negotiating. |
| if self.multiplexer().role() == Role::Unassigned { |
| self.start_multiplexer().await?; |
| } |
| return Ok(()); |
| } |
| |
| // When opening a remote channel, the DLCI is formed by taking the ServerChannel |
| // and the opposite of our role. See RFCOMM 5.4. |
| let dlci = server_channel.to_dlci(self.role().opposite_role())?; |
| |
| // If the DLC parameters have not been negotiated yet, save the open channel |
| // request and attempt to negotiate the parameters. Per RFCOMM 5.5.3, PN should occur |
| // at least once before creation of the first DLC. This implementation chooses to do |
| // PN before the creation of every DLC. |
| if !self.multiplexer().dlc_parameters_negotiated(&dlci) { |
| let _ = self.pending_channels.insert(server_channel, channel_request_fn); |
| self.start_parameter_negotiation(dlci).await?; |
| return Ok(()); |
| } |
| |
| if self.multiplexer().dlci_established(&dlci) { |
| let _ = channel_request_fn(Err(ErrorCode::Canceled)); |
| return Err(Error::ChannelAlreadyEstablished(dlci)); |
| } |
| |
| // Otherwise, save the pending channel request and send the SABM Command to begin |
| // channel establishment. |
| let _ = self.pending_channels.insert(server_channel, channel_request_fn); |
| self.send_sabm_command(dlci).await; |
| Ok(()) |
| } |
| |
| /// Attempts to close the established RFCOMM Session with the remote peer by sending |
| /// the Disconnect command. |
| async fn close(&mut self) -> Result<(), Error> { |
| // There's nothing to close if the Session Multiplexer has not been started. |
| if !self.multiplexer().started() { |
| return Err(Error::MultiplexerNotStarted); |
| } |
| |
| // Send the disconnect command to the peer. The session will shut down when we receive |
| // the acknowledgement from the peer (either UA or DM response). |
| self.send_disc_command(DLCI::MUX_CONTROL_DLCI).await; |
| Ok(()) |
| } |
| |
| /// Attempts to send a Remote Line `status` update to the remote peer. The status is associated |
| /// with the established RFCOMM channel identified by the provided `server_channel` number. |
| /// |
| /// Returns Error if the multiplexer hasn't started or if there is no such established channel. |
| async fn send_remote_line_status( |
| &mut self, |
| server_channel: ServerChannel, |
| status: Option<RlsError>, |
| ) -> Result<(), Error> { |
| // It's invalid to report the line status if the multiplexer has not started. |
| if !self.multiplexer().started() { |
| return Err(Error::MultiplexerNotStarted); |
| } |
| |
| // The RLS command refers to the status of a local channel. Therefore, the DLCI is formed |
| // by taking the channel number and _our_ role. |
| let dlci = server_channel.to_dlci(self.role())?; |
| |
| // Updating the line status is only valid if the DLCI has been established. |
| if !self.multiplexer().dlci_established(&dlci) { |
| return Err(Error::ChannelNotEstablished(dlci)); |
| } |
| |
| self.send_remote_line_status_command(dlci, status).await; |
| Ok(()) |
| } |
| |
| /// Handles an SABM command over the given `dlci` and sends a response frame to the remote peer. |
| /// |
| /// There are two important cases: |
| /// 1) Mux Control DLCI - indicates request to start up the session multiplexer. |
| /// 2) User DLCI - indicates request to establish up an RFCOMM channel over the provided `dlci`. |
| async fn handle_sabm_command(&mut self, dlci: DLCI) { |
| trace!(%dlci, "Handling SABM"); |
| if dlci.is_mux_control() { |
| match &self.role() { |
| Role::Unassigned => { |
| // Remote device has requested to start up the multiplexer, respond positively |
| // and assume the Responder role. |
| match self.multiplexer().start(Role::Responder) { |
| Ok(_) => self.send_ua_response(dlci).await, |
| Err(e) => { |
| warn!("Mux startup failed: {e:?}"); |
| self.send_dm_response(dlci).await; |
| } |
| } |
| } |
| Role::Negotiating => { |
| // We're currently negotiating the multiplexer role. We should send a DM, and |
| // attempt to restart the multiplexer after a random interval. See RFCOMM 5.2.1 |
| self.send_dm_response(dlci).await |
| // TODO(https://fxbug.dev/42140184): We can improve this by restarting the multiplexer. |
| } |
| _role => { |
| // Remote device incorrectly trying to start up the multiplexer when it has |
| // already started. This is invalid - send a DM to respond negatively. |
| warn!("Received SABM when multiplexer already started"); |
| self.send_dm_response(dlci).await; |
| } |
| } |
| return; |
| } |
| |
| // Otherwise, it's a request to open a user channel. Attempt to establish the session |
| // channel for the given DLCI. If this fails, reply with a DM response for the `dlci`. |
| match dlci.validate(self.role()) { |
| Err(e) => { |
| warn!("Received SABM with invalid DLCI: {e:?}"); |
| self.send_dm_response(dlci).await; |
| } |
| Ok(_) => { |
| if self.establish_session_channel(dlci).await { |
| self.send_ua_response(dlci).await; |
| // After positively acknowledging the established channel. Report our |
| // current modem signals to indicate we are ready. |
| self.send_modem_status_command(dlci).await; |
| } else { |
| self.send_dm_response(dlci).await; |
| } |
| } |
| } |
| } |
| |
| /// Handles a multiplexer command over the Mux Control DLCI. Potentially sends a response |
| /// frame to the remote peer. Returns an error if the `mux_command` couldn't be handled. |
| async fn handle_mux_command(&mut self, mux_command: &MuxCommand) -> Result<(), Error> { |
| trace!("Handling MuxCommand: {:?}", mux_command); |
| |
| // For responses, validate that we were expecting the response and finish the operation. |
| if mux_command.command_response == CommandResponse::Response { |
| return match self.outstanding_frames.remove_mux_command(&mux_command.identifier()) { |
| Some(_) => { |
| match &mux_command.params { |
| MuxCommandParams::ParameterNegotiation(pn_response) => { |
| // Finish parameter negotiation based on remote peer's response. |
| self.finish_parameter_negotiation(&pn_response); |
| // Process the open channel request for the DLCI. The DLCI is guaranteed |
| // to be a valid user DLCI since we initiated the PN. |
| let server_channel = pn_response.dlci.try_into().unwrap(); |
| self.process_channel_pending_parameter_negotiation(server_channel) |
| .await?; |
| Ok(()) |
| } |
| MuxCommandParams::ModemStatus(_) |
| | MuxCommandParams::RemoteLineStatus(_) => Ok(()), |
| params => { |
| // We don't send any other mux commands so any such responses are |
| // unexpected and unhandled. |
| warn!("Received unexpected {params:?} response. Ignoring"); |
| Err(Error::Other(format_err!("Unexpected response").into())) |
| } |
| } |
| } |
| None => { |
| warn!("Received unexpected MuxCommand response: {:?}", mux_command); |
| Err(Error::Other(format_err!("Unexpected response").into())) |
| } |
| }; |
| } |
| |
| let mux_response = match &mux_command.params { |
| MuxCommandParams::ParameterNegotiation(pn_command) => { |
| if !pn_command.dlci.is_user() { |
| warn!("Received PN command over invalid DLCI: {:?}", pn_command.dlci); |
| self.send_dm_response(pn_command.dlci).await; |
| return Ok(()); |
| } |
| |
| // Update the session specific parameters. |
| self.finish_parameter_negotiation(&pn_command); |
| |
| // Reply back with the negotiated parameters as a response - most parameters are |
| // simply echoed. |
| // Session-wide parameters: Credit-based flow & max frame size are negotiated. |
| // DLC-specific parameters: Initial credit count is set to a default value. |
| let mut pn_response = pn_command.clone(); |
| let updated_parameters = self.multiplexer().parameters(); |
| pn_response.credit_based_flow_handshake = if updated_parameters.credit_based_flow() |
| { |
| CreditBasedFlowHandshake::SupportedResponse |
| } else { |
| CreditBasedFlowHandshake::Unsupported |
| }; |
| pn_response.max_frame_size = updated_parameters.max_frame_size() as u16; |
| pn_response.initial_credits = DEFAULT_INITIAL_CREDITS; |
| MuxCommandParams::ParameterNegotiation(pn_response) |
| } |
| MuxCommandParams::RemotePortNegotiation(command) => { |
| MuxCommandParams::RemotePortNegotiation(command.response()) |
| } |
| command => { |
| // All other Mux Commands can be echoed back. |
| command.clone() |
| } |
| }; |
| let response = |
| MuxCommand { params: mux_response, command_response: CommandResponse::Response }; |
| self.send_frame(Frame::make_mux_command(self.role(), response)).await; |
| Ok(()) |
| } |
| |
| /// Handles a Disconnect command over the provided `dlci`. Returns a flag indicating |
| /// session termination. |
| async fn handle_disconnect_command(&mut self, dlci: DLCI) -> bool { |
| trace!(%dlci, "Received Disconnect"); |
| |
| let terminate_session = if dlci.is_user() { |
| let pn_identifier = |
| MuxCommandIdentifier(Some(dlci), MuxCommandMarker::ParameterNegotiation); |
| // Peer rejected our request to negotiate parameters for the `dlci`. Cancel the |
| // PN and respond positively. See RFCOMM 5.5.3. |
| if self.outstanding_frames.remove_mux_command(&pn_identifier).is_some() { |
| self.cancel_parameter_negotiation(dlci); |
| self.send_ua_response(dlci).await; |
| return false; |
| } |
| |
| // Otherwise, it's a request to close the DLC. |
| if !self.multiplexer().close_session_channel(&dlci) { |
| warn!(%dlci, "Received Disc command for unopened DLCI"); |
| self.send_dm_response(dlci).await; |
| return false; |
| } |
| false |
| } else { |
| // Disconnect over the Mux Control DLCI; we should terminate the session. |
| true |
| }; |
| // The default response for Disconnect is a UA. See RFCOMM 5.2.2 and GSM 7.10 Section 5.3.4. |
| self.send_ua_response(dlci).await; |
| terminate_session |
| } |
| |
| /// Handles a received user `user_data` payload with optional `credits` and routes to the RFCOMM |
| /// channel specified by `dlci`. |
| /// |
| /// If routing fails, sends a DM response over the provided `dlci`. |
| async fn handle_user_data(&mut self, dlci: DLCI, user_data: UserData, credits: Option<u8>) { |
| // In general, UserData frames do not need to be acknowledged. |
| if let Err(e) = self |
| .multiplexer() |
| .receive_user_data(dlci, FlowControlledData { user_data, credits }) |
| .await |
| { |
| // If there was an error sending the user data for any reason, we reply with |
| // a DM to indicate failure. |
| warn!("Couldn't relay user data: {e:?}"); |
| self.send_dm_response(dlci).await; |
| } |
| } |
| |
| /// Handles an UnnumberedAcknowledgement response over the provided `dlci`. |
| /// Returns a flag indicating session termination. |
| async fn handle_ua_response(&mut self, dlci: DLCI) -> bool { |
| match self.outstanding_frames.remove_frame(&dlci).map(|frame| frame.data) { |
| Some(FrameData::SetAsynchronousBalancedMode) if dlci.is_mux_control() => { |
| // If we are not negotiating anymore, mux startup was either canceled |
| // or completed. No need to do anything. |
| if self.role() != Role::Negotiating { |
| trace!(role = ?self.role(), |
| "Received response when mux startup was either canceled or completed", |
| ); |
| return false; |
| } |
| // Otherwise, assume the initiator role and complete startup. Starting should never |
| // fail because we are guaranteed to be in the Negotiating role. |
| self.multiplexer().start(Role::Initiator).unwrap(); |
| // Process any pending channels awaiting startup. |
| let _ = self.process_channels_pending_startup().await; |
| } |
| Some(FrameData::SetAsynchronousBalancedMode) => { |
| // Positive acknowledgement to open a remote channel on a user DLCI. |
| if self.establish_session_channel(dlci).await { |
| // After successfully opening the RFCOMM channel, report our |
| // current Modem signals to indicate readiness. |
| self.send_modem_status_command(dlci).await; |
| } |
| } |
| Some(FrameData::Disconnect) if dlci.is_mux_control() => { |
| info!("Received UA response to Disconnect of RFCOMM Session. Shutting down..."); |
| return true; |
| } |
| // It's possible that we have received a UA response to an individual SessionChannel's |
| // Disconnect request. In this case, there is no further action needed as the individual |
| // channel has been closed. This is handled gracefully and logged. |
| Some(_) | None => warn!("Received unexpected UA response over DLCI: {:?}", dlci), |
| } |
| false |
| } |
| |
| /// Handles a DisconnectedMode response over the provided `dlci`. |
| /// Returns a flag indicating session termination. |
| async fn handle_dm_response(&mut self, dlci: DLCI) -> bool { |
| // See GSM 7.10 Section 5.5.3 for the usage of the DM response. |
| match self.outstanding_frames.remove_frame(&dlci).map(|frame| frame.data) { |
| Some(FrameData::SetAsynchronousBalancedMode) if dlci.is_mux_control() => { |
| // Peer rejected our request to start the Session multiplexer - reset and |
| // let the peer retry. |
| self.multiplexer().reset(); |
| } |
| Some(FrameData::SetAsynchronousBalancedMode) => { |
| // Peer rejected our request to open a user DLC - close the DLC, notify the client |
| // of failure, and let the peer retry. |
| let _ = self.multiplexer().close_session_channel(&dlci); |
| let _ = self.relay_outbound_channel_result_to_client( |
| dlci.try_into().unwrap(), |
| Err(ErrorCode::Canceled), |
| ); |
| } |
| Some(FrameData::Disconnect) if dlci.is_mux_control() => { |
| // A negative response to a Disconnect command typically means the peer is in |
| // a different logical state than us. This is fatal, and we should shut down |
| // the RFCOMM session. |
| info!("Received DM response to Disconnect of RFCOMM Session. Shutting down..."); |
| return true; |
| } |
| Some(frame_data) => warn!("Unexpected DM for {:?}", frame_data), |
| None => { |
| let pn_identifier = |
| MuxCommandIdentifier(Some(dlci), MuxCommandMarker::ParameterNegotiation); |
| // Special case: Peer rejected our request to negotiate parameters for the `dlci`. |
| // See RFCOMM 5.5.3. Cancel the PN. |
| if self.outstanding_frames.remove_mux_command(&pn_identifier).is_some() { |
| self.cancel_parameter_negotiation(dlci); |
| } |
| // Otherwise, it's possible that we have received a DM response to an individual |
| // SessionChannel's Disconnect request. There is no action needed as the channel |
| // is closed. |
| } |
| } |
| false |
| } |
| |
| /// Handles an incoming Frame received from the peer. Returns a flag indicating whether |
| /// the session should terminate, or an error if the frame was unable to be handled. |
| async fn handle_frame(&mut self, frame: Frame) -> Result<bool, Error> { |
| let (dlci, credits) = (frame.dlci, frame.credits); |
| match frame.data { |
| FrameData::SetAsynchronousBalancedMode => { |
| self.handle_sabm_command(dlci).await; |
| } |
| FrameData::UnnumberedAcknowledgement => { |
| return Ok(self.handle_ua_response(dlci).await); |
| } |
| FrameData::DisconnectedMode => { |
| return Ok(self.handle_dm_response(dlci).await); |
| } |
| FrameData::Disconnect => return Ok(self.handle_disconnect_command(dlci).await), |
| FrameData::UnnumberedInfoHeaderCheck(UIHData::Mux(data)) => { |
| self.handle_mux_command(&data).await?; |
| } |
| FrameData::UnnumberedInfoHeaderCheck(UIHData::User(data)) => { |
| self.handle_user_data(dlci, data, credits).await; |
| } |
| } |
| Ok(false) |
| } |
| |
| /// Handles the error case when parsing a frame and sends an optional response frame if |
| /// needed. |
| async fn handle_frame_parse_error(&mut self, e: FrameParseError) { |
| warn!("Error parsing frame: {e:?}"); |
| // Currently, the only frame parsing error that requires a response is the MuxCommand |
| // parsing error. |
| let FrameParseError::UnsupportedMuxCommandType(val) = e else { |
| return; |
| }; |
| let non_supported_response = Frame::make_mux_command( |
| self.role(), |
| MuxCommand { |
| params: MuxCommandParams::NonSupported(NonSupportedCommandParams { |
| cr_bit: true, |
| non_supported_command: val, |
| }), |
| command_response: CommandResponse::Response, |
| }, |
| ); |
| self.send_frame(non_supported_response).await; |
| } |
| |
| /// Sends a RLS command for the provided `dlci`. |
| async fn send_remote_line_status_command(&mut self, dlci: DLCI, status: Option<RlsError>) { |
| let mux_command = MuxCommand { |
| params: MuxCommandParams::RemoteLineStatus(RemoteLineStatusParams::new(dlci, status)), |
| command_response: CommandResponse::Command, |
| }; |
| self.send_frame(Frame::make_mux_command(self.role(), mux_command)).await; |
| } |
| |
| /// Sends a Modem Status command for the provided `dlci`. |
| async fn send_modem_status_command(&mut self, dlci: DLCI) { |
| let mux_command = MuxCommand { |
| params: MuxCommandParams::ModemStatus(ModemStatusParams::default(dlci)), |
| command_response: CommandResponse::Command, |
| }; |
| self.send_frame(Frame::make_mux_command(self.role(), mux_command)).await; |
| } |
| |
| /// Sends an SABM command over the provided `dlci`. |
| async fn send_sabm_command(&mut self, dlci: DLCI) { |
| self.send_frame(Frame::make_sabm_command(self.role(), dlci)).await |
| } |
| |
| /// Sends a UA response over the provided `dlci`. |
| async fn send_ua_response(&mut self, dlci: DLCI) { |
| self.send_frame(Frame::make_ua_response(self.role(), dlci)).await |
| } |
| |
| /// Sends a DM response over the provided `dlci`. |
| async fn send_dm_response(&mut self, dlci: DLCI) { |
| self.send_frame(Frame::make_dm_response(self.role(), dlci)).await |
| } |
| |
| /// Sends a Disc command over the provided `dlci`. |
| async fn send_disc_command(&mut self, dlci: DLCI) { |
| self.send_frame(Frame::make_disc_command(self.role(), dlci)).await |
| } |
| |
| /// Sends the `frame` to the remote peer using the `outgoing_frame_sender`. |
| async fn send_frame(&mut self, frame: Frame) { |
| // Potentially save the frame-to-be-sent in the OutstandingFrames manager. This |
| // bookkeeping step will allow us to easily match received responses to our sent |
| // commands. |
| if let Err(e) = self.outstanding_frames.register_frame(&frame) { |
| warn!("Couldn't send frame: {e:?}"); |
| return; |
| } |
| |
| // Result of this send doesn't matter since failure indicates peer disconnection. |
| let _ = self.outgoing_frame_sender.send(frame).await; |
| } |
| |
| /// Starts the data processing task for this RFCOMM Session. |
| /// `data_receiver` is a stream of incoming data packets received from the remote peer. |
| /// |
| /// The lifetime of this task is tied to the `data_receiver`. |
| async fn process_incoming_frames( |
| inner: Arc<Mutex<Self>>, |
| mut data_receiver: mpsc::Receiver<Vec<u8>>, |
| ) -> Result<(), anyhow::Error> { |
| while let Some(bytes) = data_receiver.next().await { |
| let mut w_inner = inner.lock().await; |
| match Frame::parse(w_inner.role().opposite_role(), w_inner.credit_based_flow(), &bytes) |
| { |
| Ok(f) => { |
| trace!("Parsed frame from peer: {f:?}"); |
| match w_inner.handle_frame(f).await { |
| Ok(true) => break, |
| Ok(false) => {} |
| Err(e) => warn!("Error handling RFCOMM frame: {e:?}"), |
| } |
| } |
| Err(e) => { |
| w_inner.handle_frame_parse_error(e).await; |
| } |
| }; |
| } |
| // The `data_receiver` has closed, indicating peer disconnection. |
| let mut w_inner = inner.lock().await; |
| w_inner.cancel_pending_channels(); |
| w_inner.inspect.disconnect(); |
| w_inner.outgoing_frame_sender.close_channel(); |
| Ok(()) |
| } |
| } |
| |
| /// The maximum number of concurrent frames that can be accepted by this Session. |
| /// This value is chosen arbitrarily, and is not defined in the GSM or RFCOMM specifications. |
| /// This value is chosen as a number significantly more than the expected number of RFCOMM frames in |
| /// a single transaction (typically, we expect to receive 1-2 RFCOMM frames from a remote device |
| /// before responding). |
| const MAX_CONCURRENT_FRAMES: usize = 100; |
| |
| /// An RFCOMM Session that multiplexes multiple channels over a single L2CAP channel. |
| /// |
| /// A `Session` is represented by a processing task which processes incoming bytes |
| /// from the remote peer. Any multiplexed RFCOMM channels will be delivered to the |
| /// `clients` of the Session. |
| #[derive(Inspect)] |
| pub struct Session { |
| _task: fasync::Task<()>, |
| #[inspect(forward)] |
| inner: Arc<Mutex<SessionInner>>, |
| /// Shared termination future. |
| terminated: Shared<BoxFuture<'static, ()>>, |
| } |
| |
| impl Session { |
| /// Creates a new RFCOMM Session with peer `id` over the `l2cap_channel`. Any multiplexed |
| /// RFCOMM channels will be relayed using the `channel_opened_callback`. |
| pub fn create( |
| id: PeerId, |
| l2cap_channel: Channel, |
| channel_opened_callback: ChannelOpenedFn, |
| ) -> Self { |
| // The `session_inner` relays outgoing packets (to be sent to the remote peer) to the |
| // `Session` using this mpsc::channel. |
| let (frames_to_peer_sender, frame_receiver) = mpsc::channel(MAX_CONCURRENT_FRAMES); |
| let session_inner = Arc::new(Mutex::new(SessionInner::create( |
| id, |
| frames_to_peer_sender, |
| channel_opened_callback, |
| ))); |
| let (termination_sender, receiver) = oneshot::channel(); |
| let terminated = receiver.map(|_| ()).boxed().shared(); |
| let _task = fasync::Task::spawn(Session::session_task( |
| id, |
| l2cap_channel, |
| session_inner.clone(), |
| frame_receiver, |
| termination_sender, |
| )); |
| Self { _task, inner: session_inner, terminated } |
| } |
| |
| /// Processing task that drives the work for an RFCOMM Session with a peer. |
| /// |
| /// 1) Drives the RFCOMM SessionInner task - this task is responsible for |
| /// RFCOMM related functionality: parsing & handling frames, modifying internal state, and |
| /// multiplexing RFCOMM channels. Any outgoing frames intended for the peer will be sent to |
| /// the `frame_receiver`. |
| /// 2) Drives the peer processing task which handles incoming packets from the `l2cap` channel. |
| /// This task will relay these received packets to the `session_inner`. The task also |
| /// receives packets from the `frame_receiver` and sends them to the remote peer. |
| /// |
| /// The lifetime of this task is tied to the provided `l2cap` channel. When the remote peer |
| /// disconnects, the `l2cap` channel will close, and therefore the task will terminate. |
| async fn session_task( |
| peer_id: PeerId, |
| l2cap: Channel, |
| session_inner: Arc<Mutex<SessionInner>>, |
| frame_receiver: mpsc::Receiver<Frame>, |
| termination_sender: oneshot::Sender<()>, |
| ) { |
| // `Session::peer_processing_task()` uses this mpsc::channel to relay data received from the |
| // peer to the `session_inner`. |
| let (data_sender, data_from_peer_receiver) = mpsc::channel(MAX_CONCURRENT_FRAMES); |
| |
| // Business logic of the RFCOMM session - parsing and handling frames, modifying the state |
| // of the session, and multiplexing RFCOMM channels. |
| let session_inner_task = |
| SessionInner::process_incoming_frames(session_inner, data_from_peer_receiver) |
| .boxed() |
| .fuse(); |
| // Processes packets of data to/from the remote peer. |
| let peer_processing_task = |
| Session::peer_processing_task(l2cap, frame_receiver, data_sender).boxed().fuse(); |
| |
| // If the `peer_processing_task` terminates first, then the peer disconnected unexpectedly. |
| // In this case, we expect the `session_inner_task` will clean up and terminate. |
| // If the `session_inner_task` terminates first then a Disconnect frame was received (in |
| // either direction). In this case, the finishing of the `session_inner_task` will result |
| // in the `peer_processing_task` to close. |
| let _ = futures::future::join(session_inner_task, peer_processing_task).await; |
| |
| // Session has finished; notify any subscribed clients. |
| info!(%peer_id, "Session with peer ended"); |
| let _ = termination_sender.send(()); |
| } |
| |
| /// Processes incoming data from the `l2cap_channel` connected to the remote peer and |
| /// relays it using the `data_sender`. |
| /// Processes frames-to-be-sent from the `pending_writes` queue and sends them to the |
| /// remote peer. |
| async fn peer_processing_task( |
| mut l2cap_channel: Channel, |
| mut pending_writes: mpsc::Receiver<Frame>, |
| mut data_sender: mpsc::Sender<Vec<u8>>, |
| ) { |
| loop { |
| select! { |
| incoming_bytes = l2cap_channel.next() => { |
| match incoming_bytes { |
| Some(Ok(bytes)) => { |
| trace!("Received packet from peer: {:?}", bytes); |
| if let Err(_) = data_sender.send(bytes).await { |
| warn!("Couldn't send bytes to RFCOMM Session task"); |
| } |
| }, |
| Some(Err(e)) => { |
| error!("Error reading bytes from l2cap channel: {e:?}"); |
| }, |
| None => { |
| info!("Peer closed L2CAP connection, exiting"); |
| return; |
| } |
| } |
| } |
| frame_to_be_written = pending_writes.next() => { |
| let Some(frame) = frame_to_be_written else { |
| // The SessionInner task finished and closed its end of the channel. |
| // This means a Disconnect frame was received (in either direction), and |
| // we can terminate. |
| trace!("SessionInner task finished -- closing toplevel task."); |
| return; |
| }; |
| trace!("Sending frame to peer: {frame:?}"); |
| let mut buf = vec![0; frame.encoded_len()]; |
| if let Err(e) = frame.encode(&mut buf[..]) { |
| warn!("Couldn't encode frame: {e:?}"); |
| continue; |
| } |
| trace!("Sending packet to peer: {buf:?}"); |
| // The result of this send is irrelevant as failure would indicate that the peer |
| // has disconnected. |
| let _ = l2cap_channel.as_ref().write(&buf); |
| } |
| complete => { return; } |
| } |
| } |
| } |
| |
| /// A test-only hook to initiate multiplexer startup without needing to request |
| /// to open a specific RFCOMM channel. |
| #[cfg(test)] |
| async fn initiate_multiplexer_startup(&self) { |
| let mut w_inner = self.inner.lock().await; |
| if let Err(e) = w_inner.start_multiplexer().await { |
| warn!("Couldn't start session multiplexer: {:?}", e); |
| } |
| } |
| |
| /// Requests to open a new RFCOMM channel for the provided `server_channel`. |
| pub async fn open_rfcomm_channel( |
| &self, |
| server_channel: ServerChannel, |
| channel_opened_cb: ChannelRequestFn, |
| ) { |
| let mut w_inner = self.inner.lock().await; |
| if let Err(e) = w_inner.open_remote_channel(server_channel, channel_opened_cb).await { |
| warn!(%server_channel, "Couldn't open RFCOMM channel: {e:?}"); |
| } |
| } |
| |
| /// Request to close the RFCOMM session. |
| pub async fn close(&self) { |
| let mut w_inner = self.inner.lock().await; |
| if let Err(e) = w_inner.close().await { |
| warn!("Couldn't close RFCOMM session: {e:?}"); |
| } |
| } |
| |
| /// Request to send an RLS frame to the remote peer. |
| pub async fn send_remote_line_status( |
| &self, |
| server_channel: ServerChannel, |
| status: Option<RlsError>, |
| ) { |
| let mut w_inner = self.inner.lock().await; |
| if let Err(e) = w_inner.send_remote_line_status(server_channel, status).await { |
| warn!("Couldn't close RFCOMM session: {e:?}"); |
| } |
| } |
| } |
| |
| impl SignaledTask for Session { |
| fn finished(&self) -> BoxFuture<'static, ()> { |
| self.terminated.clone().boxed() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use assert_matches::assert_matches; |
| use async_utils::PollExt; |
| use diagnostics_assertions::{assert_data_tree, AnyProperty}; |
| use fuchsia_async as fasync; |
| use futures::{task::Poll, Future}; |
| use std::pin::pin; |
| |
| use crate::{rfcomm::session::multiplexer::ParameterNegotiationState, rfcomm::test_util::*}; |
| |
| /// Makes a DLC PN frame with arbitrary command parameters. |
| /// `command_response` indicates whether the frame should be a command or response. |
| /// `dlci` is the DLCI being negotiated. |
| /// `credit_flow` indicates whether credit-based flow control should be set or not. |
| /// `max_frame_size` indicates the max frame size to use for the PN. |
| fn make_dlc_pn_frame( |
| command_response: CommandResponse, |
| dlci: DLCI, |
| credit_flow: bool, |
| max_frame_size: u16, |
| ) -> Frame { |
| let credit_based_flow_handshake = match (credit_flow, command_response) { |
| (false, _) => CreditBasedFlowHandshake::Unsupported, |
| (true, CommandResponse::Command) => CreditBasedFlowHandshake::SupportedRequest, |
| (true, CommandResponse::Response) => CreditBasedFlowHandshake::SupportedResponse, |
| }; |
| let pn_command = MuxCommand { |
| params: MuxCommandParams::ParameterNegotiation(ParameterNegotiationParams { |
| dlci, |
| credit_based_flow_handshake, |
| priority: 12, |
| max_frame_size, |
| initial_credits: DEFAULT_INITIAL_CREDITS, |
| }), |
| command_response, |
| }; |
| Frame { |
| role: Role::Initiator, |
| dlci: DLCI::MUX_CONTROL_DLCI, |
| data: FrameData::UnnumberedInfoHeaderCheck(UIHData::Mux(pn_command)), |
| poll_final: false, |
| command_response, |
| credits: None, |
| } |
| } |
| |
| /// Creates and returns the SessionInner processing task. Uses a channel_opened_fn that |
| /// indiscriminately accepts all opened RFCOMM channels. |
| fn setup_session_task() -> (impl Future<Output = ()>, Channel) { |
| let id = PeerId(987); |
| let (local, remote) = Channel::create(); |
| let channel_opened_fn = Box::new(|_server_channel, _channel| async { Ok(()) }.boxed()); |
| let (frame_sender, frame_receiver) = mpsc::channel(0); |
| let session_inner = |
| Arc::new(Mutex::new(SessionInner::create(id, frame_sender, channel_opened_fn))); |
| let (sender, _receiver) = oneshot::channel(); |
| let session_fut = |
| Session::session_task(PeerId(1), local, session_inner, frame_receiver, sender); |
| |
| (session_fut, remote) |
| } |
| |
| /// Creates a ChannelOpenedFn that relays inbound RFCOMM channels using the `channel_sender`. |
| /// Tests should use the returned Receiver to assert on the delivery of opened RFCOMM channels. |
| fn create_inbound_relay() -> (ChannelOpenedFn, mpsc::Receiver<Result<Channel, ErrorCode>>) { |
| let (channel_sender, channel_receiver) = mpsc::channel(0); |
| let f = Box::new(move |_server_channel, channel| { |
| let mut sender = channel_sender.clone(); |
| async move { |
| assert!(sender.send(Ok(channel)).await.is_ok()); |
| Ok(()) |
| } |
| .boxed() |
| }); |
| (f, channel_receiver) |
| } |
| |
| /// Creates a ChannelRequestFn that relays outbound RFCOMM channels using the `channel_sender`. |
| /// Tests should use the returned Receiver to assert on delivery of outbound channels. |
| fn create_outbound_relay() -> (ChannelRequestFn, mpsc::Receiver<Result<Channel, ErrorCode>>) { |
| let (channel_sender, channel_receiver) = mpsc::channel(0); |
| let f = Box::new(move |channel: Result<Channel, ErrorCode>| { |
| let mut sender = channel_sender.clone(); |
| assert!(sender.try_send(channel).is_ok()); |
| Ok(()) |
| }); |
| (f, channel_receiver) |
| } |
| |
| /// Creates and returns 1) A SessionInner 2) A stream of outgoing frames to be sent to the |
| /// remote peer. Use this to validate SessionInner behavior and 3) A stream of opened RFCOMM |
| /// channels. Use this to validate channel establishment. |
| fn setup_session( |
| ) -> (SessionInner, mpsc::Receiver<Frame>, mpsc::Receiver<Result<Channel, ErrorCode>>) { |
| let id = PeerId(5); |
| let (channel_opened_fn, channel_receiver) = create_inbound_relay(); |
| let (outgoing_frame_sender, outgoing_frames) = mpsc::channel(0); |
| let session = SessionInner { |
| multiplexer: SessionMultiplexer::create(), |
| outstanding_frames: OutstandingFrames::new(), |
| pending_channels: HashMap::new(), |
| outgoing_frame_sender, |
| channel_opened_fn, |
| inspect: SessionInspect::new(id), |
| }; |
| (session, outgoing_frames, channel_receiver) |
| } |
| |
| /// Handles the provided `frame` and expects the `expected` frame data to be sent to |
| /// the provided `outgoing_frames` receiver. |
| #[track_caller] |
| fn handle_and_expect_frame( |
| exec: &mut fasync::TestExecutor, |
| session: &mut SessionInner, |
| outgoing_frames: &mut mpsc::Receiver<Frame>, |
| frame: Frame, |
| expected: FrameData, |
| ) { |
| let mut handle_fut = Box::pin(session.handle_frame(frame)); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("waiting for outgoing frame"); |
| expect_frame(exec, outgoing_frames, expected, None); |
| assert!(exec.run_until_stalled(&mut handle_fut).is_ready()); |
| } |
| |
| /// Expects and returns the `channel` from the provided `receiver`. |
| #[track_caller] |
| fn expect_channel( |
| exec: &mut fasync::TestExecutor, |
| receiver: &mut mpsc::Receiver<Result<Channel, ErrorCode>>, |
| ) -> Channel { |
| let mut channel_fut = Box::pin(receiver.next()); |
| match exec.run_until_stalled(&mut channel_fut) { |
| Poll::Ready(Some(Ok(channel))) => channel, |
| x => panic!("Expected a channel but got {:?}", x), |
| } |
| } |
| |
| /// Expects a cancellation Error over the provided `receiver`. |
| #[track_caller] |
| fn expect_channel_error( |
| exec: &mut fasync::TestExecutor, |
| receiver: &mut mpsc::Receiver<Result<Channel, ErrorCode>>, |
| expected_error: ErrorCode, |
| ) { |
| let mut channel_fut = Box::pin(receiver.next()); |
| match exec.run_until_stalled(&mut channel_fut) { |
| Poll::Ready(Some(Err(e))) => assert_eq!(e, expected_error), |
| x => panic!("Expected ready error but got {:?}", x), |
| } |
| } |
| |
| #[test] |
| fn test_outstanding_frame_manager() { |
| let mut outstanding_frames = OutstandingFrames::new(); |
| |
| // Always sets poll_final = true, and therefore requires a response. |
| let sabm_command = Frame::make_sabm_command(Role::Unassigned, DLCI::MUX_CONTROL_DLCI); |
| assert_matches!(outstanding_frames.register_frame(&sabm_command), Ok(true)); |
| // Inserting the same frame on the same DLCI should be rejected since |
| // there is already one outstanding. |
| assert_matches!(outstanding_frames.register_frame(&sabm_command), Err(_)); |
| |
| // Inserting same type of frame, but different DLCI is OK. |
| let user_sabm = Frame::make_sabm_command(Role::Initiator, DLCI::try_from(3).unwrap()); |
| assert_matches!(outstanding_frames.register_frame(&user_sabm), Ok(true)); |
| |
| // Response frames shouldn't be registered. |
| let ua_response = Frame::make_ua_response(Role::Responder, DLCI::MUX_CONTROL_DLCI); |
| assert_matches!(outstanding_frames.register_frame(&ua_response), Ok(false)); |
| |
| // Random DLCI - no frame should exist. |
| let random_dlci = DLCI::try_from(8).unwrap(); |
| assert_eq!(outstanding_frames.remove_frame(&random_dlci), None); |
| // SABM command should be retrievable. |
| assert_eq!(outstanding_frames.remove_frame(&DLCI::MUX_CONTROL_DLCI), Some(sabm_command)); |
| |
| // User data frames shouldn't ever be registered as they require no response. In particular, |
| // the `poll_final` bit is redefined for UserData frames. |
| let user_data = Frame::make_user_data_frame( |
| Role::Initiator, |
| random_dlci, |
| UserData { information: vec![] }, |
| Some(10), // Random amount of credits |
| ); |
| assert_matches!(outstanding_frames.register_frame(&user_data), Ok(false)); |
| |
| // Two different MuxCommands on the same DLCI is OK. |
| let data = MuxCommand { |
| params: MuxCommandParams::FlowControlOff(FlowControlParams {}), |
| command_response: CommandResponse::Command, |
| }; |
| let mux_command = Frame::make_mux_command(Role::Initiator, data); |
| assert_matches!(outstanding_frames.register_frame(&mux_command), Ok(true)); |
| let data2 = MuxCommand { |
| params: MuxCommandParams::FlowControlOn(FlowControlParams {}), |
| command_response: CommandResponse::Command, |
| }; |
| let mux_command2 = Frame::make_mux_command(Role::Initiator, data2.clone()); |
| assert_matches!(outstanding_frames.register_frame(&mux_command2), Ok(true)); |
| // Removing MuxCommand is OK. |
| assert_eq!(outstanding_frames.remove_mux_command(&data2.identifier()), Some(data2)); |
| |
| // Same MuxCommand but on different DLCIs is OK. |
| let user_dlci1 = DLCI::try_from(10).unwrap(); |
| let data1 = MuxCommand { |
| params: MuxCommandParams::ParameterNegotiation( |
| ParameterNegotiationParams::default_command(user_dlci1), |
| ), |
| command_response: CommandResponse::Command, |
| }; |
| let mux_command1 = Frame::make_mux_command(Role::Initiator, data1); |
| assert_matches!(outstanding_frames.register_frame(&mux_command1), Ok(true)); |
| let user_dlci2 = DLCI::try_from(15).unwrap(); |
| let data2 = MuxCommand { |
| params: MuxCommandParams::ParameterNegotiation( |
| ParameterNegotiationParams::default_command(user_dlci2), |
| ), |
| command_response: CommandResponse::Command, |
| }; |
| let mux_command2 = Frame::make_mux_command(Role::Initiator, data2); |
| assert_matches!(outstanding_frames.register_frame(&mux_command2), Ok(true)); |
| } |
| |
| #[test] |
| fn test_session_inner_inspect() { |
| let mut exec = fasync::TestExecutor::new(); |
| let inspect = inspect::Inspector::default(); |
| |
| // Setup SessionInner with inspect. |
| let (data_sender, data_receiver) = mpsc::channel(0); |
| let (mut inner, _outgoing_frames, _inbound_channels) = setup_session(); |
| inner.iattach(inspect.root(), "session_test").expect("should attach to inspect tree"); |
| let session = Arc::new(Mutex::new(inner)); |
| // Default inspect tree. |
| assert_data_tree!(inspect, root: { |
| session_test: contains { |
| peer_id: AnyProperty, |
| connected: "Connected", |
| }, |
| }); |
| |
| // Run the Session task. |
| let mut session_task = |
| Box::pin(SessionInner::process_incoming_frames(session.clone(), data_receiver)); |
| exec.run_until_stalled(&mut session_task) |
| .expect_pending("shouldn't be done while data_sender is live"); |
| |
| // Simulate peer disconnection. |
| drop(data_sender); |
| assert_matches!(exec.run_until_stalled(&mut session_task), Poll::Ready(Ok(_))); |
| // Inspect when Session is not active. |
| assert_data_tree!(inspect, root: { |
| session_test: contains { |
| peer_id: AnyProperty, |
| connected: "Disconnected", |
| } |
| }); |
| } |
| |
| #[test] |
| fn test_register_l2cap_channel() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (processing_fut, remote) = setup_session_task(); |
| let mut processing_fut = pin!(processing_fut); |
| exec.run_until_stalled(&mut processing_fut) |
| .expect_pending("shouldn't be done while remote is live"); |
| |
| drop(remote); |
| exec.run_until_stalled(&mut processing_fut).expect("should be done"); |
| } |
| |
| #[test] |
| fn test_receiving_data_is_ok() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (processing_fut, remote) = setup_session_task(); |
| let mut processing_fut = pin!(processing_fut); |
| exec.run_until_stalled(&mut processing_fut) |
| .expect_pending("shouldn't be done while remote is live"); |
| |
| // Remote sends us some data. Even though this is an invalid Frame, |
| // the `processing_fut` should still be OK. |
| let frame_bytes = [0x03, 0x3F, 0x01, 0x1C]; |
| assert_eq!(remote.as_ref().write(&frame_bytes[..]), Ok(4)); |
| |
| exec.run_until_stalled(&mut processing_fut) |
| .expect_pending("shouldn't be done while remote is live"); |
| } |
| |
| #[test] |
| fn test_peer_disconnection_notifies_termination_future() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let id = PeerId(992); |
| let (local, remote) = Channel::create(); |
| let channel_opened_fn = Box::new(|_server_channel, _channel| async { Ok(()) }.boxed()); |
| let session = Session::create(id, local, channel_opened_fn); |
| |
| // Session should still be active. |
| let mut closed_fut = session.finished(); |
| exec.run_until_stalled(&mut closed_fut) |
| .expect_pending("shouldn't be done while remote is live"); |
| |
| // Peer disconnects - the termination future should resolve. |
| drop(remote); |
| assert!(exec.run_until_stalled(&mut closed_fut).is_ready()); |
| |
| // Trying to check again if the Session has terminated is OK - should resolve immediately. |
| let mut closed_fut2 = session.finished(); |
| assert!(exec.run_until_stalled(&mut closed_fut2).is_ready()); |
| |
| // Although unlikely, checking after the Session has gone out of scope resolves immediately. |
| let mut closed_fut3 = session.finished(); |
| drop(session); |
| assert!(exec.run_until_stalled(&mut closed_fut3).is_ready()); |
| } |
| |
| #[test] |
| fn test_receiving_user_sabm_before_mux_startup_is_rejected() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| assert_eq!(session.role(), Role::Unassigned); |
| |
| // Expect a DM response due to user DLCI SABM before Mux DLCI SABM. |
| let sabm = Frame::make_sabm_command(Role::Initiator, DLCI::try_from(3).unwrap()); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| sabm, |
| FrameData::DisconnectedMode, |
| ); |
| } |
| |
| #[test] |
| fn test_receiving_mux_sabm_starts_multiplexer_with_ua_response() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| |
| // Remote sends us an SABM command - expect a positive UA response. |
| let sabm = Frame::make_sabm_command(Role::Unassigned, DLCI::MUX_CONTROL_DLCI); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| sabm, |
| FrameData::UnnumberedAcknowledgement, |
| ); |
| |
| // The multiplexer for this session should be started and assume the Responder role. |
| assert!(session.multiplexer().started()); |
| assert_eq!(session.role(), Role::Responder); |
| } |
| |
| #[test] |
| fn test_receiving_mux_sabm_after_mux_startup_is_rejected() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Remote sends us a SABM command on the Mux Control DLCI after the multiplexer has |
| // already started. We expect to reject this with a DM response. |
| let sabm = Frame::make_sabm_command(Role::Initiator, DLCI::MUX_CONTROL_DLCI); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| sabm, |
| FrameData::DisconnectedMode, |
| ); |
| } |
| |
| #[test] |
| fn test_receiving_multiple_pn_commands_results_in_set_parameters() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| assert!(!session.session_parameters_negotiated()); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Remote initiates DLC PN over a random user DLCI - expect to reply with a DLCPN response. |
| let random_dlci = DLCI::try_from(3).unwrap(); |
| let dlcpn = make_dlc_pn_frame(CommandResponse::Command, random_dlci, false, 64); |
| let expected_response = |
| make_dlc_pn_frame(CommandResponse::Response, random_dlci, false, 64); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| dlcpn, |
| expected_response.data, |
| ); |
| |
| // The global session parameters should be set. |
| let expected_parameters = |
| SessionParameters { credit_based_flow: false, max_frame_size: 64 }; |
| assert_eq!(session.session_parameters(), expected_parameters); |
| |
| // Multiple DLC PN requests before a DLC is established is OK - new parameters. |
| let dlcpn = make_dlc_pn_frame(CommandResponse::Command, random_dlci, true, 11); |
| let expected_response = make_dlc_pn_frame(CommandResponse::Response, random_dlci, true, 11); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| dlcpn, |
| expected_response.data, |
| ); |
| |
| // The global session parameters should be updated. |
| let expected_parameters = SessionParameters { credit_based_flow: true, max_frame_size: 11 }; |
| assert_eq!(session.session_parameters(), expected_parameters); |
| } |
| |
| #[test] |
| fn test_dlcpn_renegotiation_does_not_update_parameters() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create and start a SessionInner that relays any opened RFCOMM channels. |
| let (mut session, mut outgoing_frames, mut channel_receiver) = setup_session(); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Remote peer initiates DLC PN over a random DLCI - expect to reply with a PN. |
| let random_dlci = DLCI::try_from(3).unwrap(); |
| let dlcpn = make_dlc_pn_frame(CommandResponse::Command, random_dlci, true, 100); |
| let expected_response = |
| make_dlc_pn_frame(CommandResponse::Response, random_dlci, true, 100); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| dlcpn, |
| expected_response.data, |
| ); |
| |
| // The global session parameters should be set. |
| let expected_parameters = |
| SessionParameters { credit_based_flow: true, max_frame_size: 100 }; |
| assert_eq!(session.session_parameters(), expected_parameters); |
| |
| // Remote peer sends SABM over a user DLCI - this will establish the DLCI. |
| let generic_dlci = 6; |
| let user_dlci = DLCI::try_from(generic_dlci).unwrap(); |
| let user_sabm = Frame::make_sabm_command(Role::Initiator, user_dlci); |
| let _channel = { |
| let mut handle_fut = Box::pin(session.handle_frame(user_sabm)); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("waiting for channel delivery"); |
| // We expect a channel to be delivered from the`channel_opened_fn`. |
| let c = expect_channel(&mut exec, &mut channel_receiver); |
| // Continue to run the `handle_frame` to process the result of the channel delivery. |
| exec.run_until_stalled(&mut handle_fut).expect_pending("waiting for outgoing frame"); |
| // We expect to respond to the peer with a positive UA. |
| expect_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| FrameData::UnnumberedAcknowledgement, |
| Some(user_dlci), |
| ); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("waiting for modem status"); |
| // We then expect to send our current Modem Signals to the peer. |
| expect_mux_command(&mut exec, &mut outgoing_frames, MuxCommandMarker::ModemStatus); |
| let _ = exec.run_until_stalled(&mut handle_fut).expect("should be done handling frame"); |
| c |
| }; |
| |
| // There should be an established DLC. |
| assert!(session.multiplexer().dlc_established()); |
| |
| // Remote tries to re-negotiate the session parameters, we expect to reply with |
| // a UIH PN response with the current session parameters (max_frame_size = 100). |
| let dlcpn = make_dlc_pn_frame(CommandResponse::Command, user_dlci, true, 60); |
| let expected_response = make_dlc_pn_frame(CommandResponse::Response, user_dlci, true, 100); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| dlcpn, |
| expected_response.data, |
| ); |
| |
| // The global session parameters should not be updated since the first DLC has |
| // already been established. |
| assert_eq!(session.session_parameters(), expected_parameters); |
| } |
| |
| #[test] |
| fn test_establish_dlci_request_relays_channel_to_channel_open_fn() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create and start a SessionInner that relays any opened RFCOMM channels. |
| let (mut session, mut outgoing_frames, mut channel_receiver) = setup_session(); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Remote peer sends SABM over a user DLCI - we expect to reply with a UA response. |
| let random_dlci = 8; |
| let user_dlci = DLCI::try_from(random_dlci).unwrap(); |
| let user_sabm = Frame::make_sabm_command(Role::Initiator, user_dlci); |
| { |
| let mut handle_fut = Box::pin(session.handle_frame(user_sabm)); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("should wait for channel"); |
| // We expect a channel to be delivered from the`channel_opened_fn`. |
| let _c = expect_channel(&mut exec, &mut channel_receiver); |
| // Continue to run the `handle_frame` to process the result of the channel delivery. |
| exec.run_until_stalled(&mut handle_fut) |
| .expect_pending("should wait for outgoing frame"); |
| // We expect to respond with a positive UA response. |
| expect_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| FrameData::UnnumberedAcknowledgement, |
| Some(user_dlci), |
| ); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("should wait for modem status"); |
| // After positively responding, we expect to send our current Modem Signals to indicate |
| // readiness. |
| expect_mux_command(&mut exec, &mut outgoing_frames, MuxCommandMarker::ModemStatus); |
| let _ = exec.run_until_stalled(&mut handle_fut).expect("finished handling frame"); |
| } |
| } |
| |
| #[test] |
| fn test_no_registered_clients_rejects_establish_dlci_request() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create the session - set the channel_send_fn to unanimously reject |
| // channels, to simulate failure. |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| session.channel_opened_fn = |
| Box::new(|_, _channel| async { Err(format_err!("Always rejecting")) }.boxed()); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Remote peer sends SABM over a user DLCI - this should be rejected with a |
| // DM response frame because channel delivery failed. |
| let user_dlci = DLCI::try_from(6).unwrap(); |
| let user_sabm = Frame::make_sabm_command(Role::Initiator, user_dlci); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| user_sabm, |
| FrameData::DisconnectedMode, |
| ); |
| } |
| |
| #[test] |
| fn test_received_user_data_is_relayed_to_and_from_profile_client() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create and start a SessionInner that relays any opened RFCOMM channels. |
| let (mut session, mut outgoing_frames, mut channel_receiver) = setup_session(); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Establish a user DLCI with an adequate amount of credits - the RFCOMM channel should |
| // be delivered to the channel receiver. |
| let user_dlci = DLCI::try_from(8).unwrap(); |
| let _ = session.multiplexer().find_or_create_session_channel(user_dlci); |
| assert!(session |
| .multiplexer() |
| .set_flow_control(user_dlci, FlowControlMode::CreditBased(Credits::new(100, 100))) |
| .is_ok()); |
| let mut profile_client_channel = { |
| let mut establish_fut = Box::pin(session.establish_session_channel(user_dlci)); |
| exec.run_until_stalled(&mut establish_fut).expect_pending("should wait for channel"); |
| let channel = expect_channel(&mut exec, &mut channel_receiver); |
| assert_eq!(channel.max_tx_size(), 666); // 672 (default max) - 6 |
| assert_matches!(exec.run_until_stalled(&mut establish_fut), Poll::Ready(true)); |
| channel |
| }; |
| |
| // Remote peer sends us user data. |
| let pattern = vec![0x00, 0x01, 0x02]; |
| { |
| let user_data_frame = Frame::make_user_data_frame( |
| Role::Initiator, |
| user_dlci, |
| UserData { information: pattern.clone() }, |
| Some(10), // Random amount of credits. |
| ); |
| let mut handle_fut = Box::pin(session.handle_frame(user_data_frame)); |
| assert!(exec.run_until_stalled(&mut handle_fut).is_ready()); |
| |
| // User data should be forwarded to the profile client channel. |
| match exec.run_until_stalled(&mut profile_client_channel.next()) { |
| Poll::Ready(Some(Ok(buf))) => { |
| assert_eq!(buf, pattern); |
| } |
| x => panic!("Expected user data but got {:?}", x), |
| } |
| } |
| |
| // Profile client responds with it's own data. |
| let response = vec![0x09, 0x08, 0x07, 0x06]; |
| assert_eq!(profile_client_channel.as_ref().write(&response), Ok(4)); |
| // The data should be processed by the SessionChannel, packed as a user data |
| // frame, and sent as an outgoing frame. |
| expect_user_data_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| UserData { information: response }, |
| Some(156), // CREDIT_HIGH_WATER_MARK - (100 (initial credits) - 1 (received frames)) |
| ); |
| } |
| |
| #[test] |
| fn test_receiving_invalid_mux_command_results_in_non_supported_command() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| let unsupported_command = 0xff; |
| let mut handle_fut = Box::pin(session.handle_frame_parse_error( |
| FrameParseError::UnsupportedMuxCommandType(unsupported_command), |
| )); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("should wait for outgoing frame"); |
| |
| // We expect an NSC Frame response. |
| let expected = FrameData::UnnumberedInfoHeaderCheck(UIHData::Mux(MuxCommand { |
| params: MuxCommandParams::NonSupported(NonSupportedCommandParams { |
| cr_bit: true, |
| non_supported_command: unsupported_command, |
| }), |
| command_response: CommandResponse::Response, |
| })); |
| expect_frame(&mut exec, &mut outgoing_frames, expected, None); |
| assert!(exec.run_until_stalled(&mut handle_fut).is_ready()); |
| } |
| |
| #[test] |
| fn test_disconnect_over_user_dlci_closes_session_channel() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create and start a SessionInner that relays any opened RFCOMM channel. |
| let (mut session, mut outgoing_frames, mut channel_receiver) = setup_session(); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Establish a random user DLCI. |
| let user_dlci = DLCI::try_from(6).unwrap(); |
| let _channel = { |
| let mut establish_fut = Box::pin(session.establish_session_channel(user_dlci)); |
| exec.run_until_stalled(&mut establish_fut).expect_pending("should wait for channel"); |
| let c = expect_channel(&mut exec, &mut channel_receiver); |
| assert_matches!(exec.run_until_stalled(&mut establish_fut), Poll::Ready(true)); |
| c |
| }; |
| assert!(session.multiplexer().dlc_established()); |
| |
| // Receive a disconnect command - should close the channel for the provided DLCI and |
| // respond with UA. |
| let disc = Frame::make_disc_command(Role::Initiator, user_dlci); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| disc.clone(), |
| FrameData::UnnumberedAcknowledgement, |
| ); |
| assert!(!session.multiplexer().dlci_established(&user_dlci)); |
| |
| // Receiving a disconnect again on the already-closed DLCI should result in a DM response. |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| disc, |
| FrameData::DisconnectedMode, |
| ); |
| } |
| |
| #[fuchsia::test] |
| fn test_disconnect_over_mux_control_closes_session() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (session_fut, mut remote) = setup_session_task(); |
| let mut session_fut = pin!(session_fut); |
| exec.run_until_stalled(&mut session_fut) |
| .expect_pending("shouldn't be done while remote is live"); |
| |
| { |
| let remote_closed_fut = remote.closed(); |
| let mut remote_closed_fut = pin!(remote_closed_fut); |
| exec.run_until_stalled(&mut remote_closed_fut) |
| .expect_pending("shouldn't be done while remote is live"); |
| } |
| |
| // Remote sends SABM to start up session multiplexer. |
| let sabm = Frame::make_sabm_command(Role::Unassigned, DLCI::MUX_CONTROL_DLCI); |
| send_peer_frame(remote.as_ref(), sabm); |
| exec.run_until_stalled(&mut session_fut) |
| .expect_pending("shouldn't be done while remote is live"); |
| // Expect the outgoing acknowledgement for the SABM. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| |
| // Remote sends us a disconnect frame over the Mux Control DLCI. |
| let disconnect = Frame::make_disc_command(Role::Initiator, DLCI::MUX_CONTROL_DLCI); |
| send_peer_frame(remote.as_ref(), disconnect); |
| // Once we process the disconnect, the session should terminate. |
| let _ = exec.run_until_stalled(&mut session_fut).expect("Session is done now"); |
| // Expect the outgoing acknowledgement for the disconnect. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| |
| // Remote should be closed, since the session has terminated. |
| { |
| let remote_closed_fut = remote.closed(); |
| let mut remote_closed_fut = pin!(remote_closed_fut); |
| let _ = exec |
| .run_until_stalled(&mut remote_closed_fut) |
| .expect("L2CAP channel should be closed now"); |
| } |
| } |
| |
| #[test] |
| fn test_start_multiplexer() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| assert!(!session.multiplexer().started()); |
| |
| // Initiate multiplexer startup - we expect to send a SABM frame. |
| { |
| let mut start_mux_fut = Box::pin(session.start_multiplexer()); |
| exec.run_until_stalled(&mut start_mux_fut) |
| .expect_pending("should wait for outgoing frame"); |
| // The outgoing frame should be an SABM. |
| expect_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| FrameData::SetAsynchronousBalancedMode, |
| Some(DLCI::MUX_CONTROL_DLCI), |
| ); |
| assert_matches!(exec.run_until_stalled(&mut start_mux_fut), Poll::Ready(Ok(_))); |
| } |
| |
| // Attempting to start the multiplexer while it's already starting should fail. |
| { |
| let mut start_mux_fut = Box::pin(session.start_multiplexer()); |
| assert_matches!(exec.run_until_stalled(&mut start_mux_fut), Poll::Ready(Err(_))); |
| } |
| |
| // Simulate peer responding positively with a UA - this should complete startup. |
| { |
| let mut handle_fut = |
| Box::pin(session.handle_frame(Frame::make_ua_response( |
| Role::Unassigned, |
| DLCI::MUX_CONTROL_DLCI, |
| ))); |
| assert!(exec.run_until_stalled(&mut handle_fut).is_ready()); |
| } |
| // Multiplexer startup should finish with the initiator role. |
| assert!(session.multiplexer().started()); |
| assert_eq!(session.role(), Role::Initiator); |
| } |
| |
| #[test] |
| fn test_peer_rejects_multiplexer_startup() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| assert!(!session.multiplexer().started()); |
| // Initiate multiplexer startup - we expect to send a SABM frame. |
| { |
| let mut start_mux_fut = Box::pin(session.start_multiplexer()); |
| exec.run_until_stalled(&mut start_mux_fut) |
| .expect_pending("should wait for outgoing frame"); |
| expect_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| FrameData::SetAsynchronousBalancedMode, |
| Some(DLCI::MUX_CONTROL_DLCI), |
| ); |
| assert_matches!(exec.run_until_stalled(&mut start_mux_fut), Poll::Ready(Ok(_))); |
| } |
| // Simulate peer responding negatively with a DM - this should cancel startup. |
| { |
| let mut handle_fut = |
| Box::pin(session.handle_frame(Frame::make_dm_response( |
| Role::Unassigned, |
| DLCI::MUX_CONTROL_DLCI, |
| ))); |
| assert!(exec.run_until_stalled(&mut handle_fut).is_ready()); |
| } |
| // The multiplexer should not be started and should still be Unassigned. |
| assert!(!session.multiplexer().started()); |
| assert_eq!(session.role(), Role::Unassigned); |
| } |
| |
| #[test] |
| fn test_initiating_parameter_negotiation_expects_response() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| |
| // Attempting to negotiate parameters before mux startup should fail. |
| assert!(!session.multiplexer().started()); |
| let user_dlci = DLCI::try_from(3).unwrap(); |
| { |
| let mut pn_fut = Box::pin(session.start_parameter_negotiation(user_dlci)); |
| assert_matches!(exec.run_until_stalled(&mut pn_fut), Poll::Ready(Err(_))); |
| } |
| |
| assert!(session.multiplexer().start(Role::Initiator).is_ok()); |
| // Initiating PN should be OK now. Upon receiving response, the session parameters |
| // should get set. |
| { |
| let mut pn_fut = Box::pin(session.start_parameter_negotiation(user_dlci)); |
| exec.run_until_stalled(&mut pn_fut).expect_pending("should wait for outgoing frame"); |
| expect_mux_command( |
| &mut exec, |
| &mut outgoing_frames, |
| MuxCommandMarker::ParameterNegotiation, |
| ); |
| assert_matches!(exec.run_until_stalled(&mut pn_fut), Poll::Ready(Ok(_))); |
| } |
| // Simulate peer responding positively - the parameters should be negotiated. |
| { |
| let mut handle_fut = Box::pin(session.handle_frame(make_dlc_pn_frame( |
| CommandResponse::Response, |
| user_dlci, |
| true, // Peer supports credit-based flow control. |
| 100, // Peer supports max-frame-size of 100. |
| ))); |
| assert_matches!(exec.run_until_stalled(&mut handle_fut), Poll::Ready(Ok(_))); |
| } |
| assert!(session.multiplexer().parameters_negotiated()); |
| let expected_parameters = |
| SessionParameters { credit_based_flow: true, max_frame_size: 100 }; |
| assert_eq!(session.multiplexer().parameters(), expected_parameters); |
| } |
| |
| #[test] |
| fn test_peer_rejects_parameter_negotiation_with_dm() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _rfcomm_channels) = setup_session(); |
| let (outbound_fn, mut outbound_channels) = create_outbound_relay(); |
| assert!(session.multiplexer().start(Role::Initiator).is_ok()); |
| |
| // Request to open a channel - should initiate parameter negotiation. |
| let server_channel = ServerChannel::try_from(13).unwrap(); |
| let user_dlci = server_channel.to_dlci(Role::Responder).unwrap(); |
| { |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel, outbound_fn)); |
| exec.run_until_stalled(&mut open_fut).expect_pending("should wait for outgoing frame"); |
| expect_mux_command( |
| &mut exec, |
| &mut outgoing_frames, |
| MuxCommandMarker::ParameterNegotiation, |
| ); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Ok(_))); |
| } |
| // Simulate peer responding negatively with a DM response. |
| { |
| let mut handle_fut = |
| Box::pin(session.handle_frame(Frame::make_dm_response(Role::Responder, user_dlci))); |
| assert_matches!(exec.run_until_stalled(&mut handle_fut), Poll::Ready(Ok(_))); |
| } |
| // The session-wide parameters should not be negotiated. |
| assert_eq!( |
| session.multiplexer().parameter_negotiation_state(), |
| ParameterNegotiationState::NotNegotiated |
| ); |
| // Client should be notified of cancellation. |
| expect_channel_error(&mut exec, &mut outbound_channels, ErrorCode::Canceled); |
| } |
| |
| #[test] |
| fn test_peer_rejects_parameter_negotiation_with_disc() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, mut outgoing_frames, _inbound_channels) = setup_session(); |
| let (outbound_fn, mut outbound_channels) = create_outbound_relay(); |
| assert!(session.multiplexer().start(Role::Initiator).is_ok()); |
| |
| // Request to open a channel - should initiate parameter negotiation. |
| let server_channel = ServerChannel::try_from(13).unwrap(); |
| let user_dlci = server_channel.to_dlci(Role::Responder).unwrap(); |
| { |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel, outbound_fn)); |
| exec.run_until_stalled(&mut open_fut).expect_pending("should wait for outgoing frame"); |
| expect_mux_command( |
| &mut exec, |
| &mut outgoing_frames, |
| MuxCommandMarker::ParameterNegotiation, |
| ); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Ok(_))); |
| } |
| // Simulate peer responding negatively with a Disconnect command - we expect to positively |
| // reply with a UA. |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| Frame::make_disc_command(Role::Responder, user_dlci), |
| FrameData::UnnumberedAcknowledgement, |
| ); |
| // The session-wide parameters should not be negotiated. |
| assert_eq!( |
| session.multiplexer().parameter_negotiation_state(), |
| ParameterNegotiationState::NotNegotiated |
| ); |
| // Client should be notified of cancellation. |
| expect_channel_error(&mut exec, &mut outbound_channels, ErrorCode::Canceled); |
| } |
| |
| #[test] |
| fn test_open_channel_request_establishes_channel_after_mux_startup_and_pn() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create and start a SessionInner that relays any opened RFCOMM channels. |
| let (mut session, mut outgoing_frames, _inbound_channels) = setup_session(); |
| let (outbound_fn, mut outbound_channels) = create_outbound_relay(); |
| |
| // Initiate an open RFCOMM channel request with a random valid ServerChannel. |
| let server_channel = ServerChannel::try_from(5).unwrap(); |
| { |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel, outbound_fn)); |
| exec.run_until_stalled(&mut open_fut).expect_pending("should wait for outgoing frame"); |
| // Since the multiplexer has not started, we first expect to send an SABM over the |
| // MUX Control DLCI to the remote peer. |
| expect_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| FrameData::SetAsynchronousBalancedMode, |
| Some(DLCI::MUX_CONTROL_DLCI), |
| ); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Ok(_))); |
| } |
| { |
| // Simulate peer responding positively with a UA. |
| let mut handle_fut = Box::pin( |
| session |
| .handle_frame(Frame::make_ua_response(Role::Responder, DLCI::MUX_CONTROL_DLCI)), |
| ); |
| exec.run_until_stalled(&mut handle_fut) |
| .expect_pending("should wait for outgoing frame"); |
| // We then expect the session to initiate a Parameter Negotiation request, since |
| // the session has not negotiated parameters. |
| expect_mux_command( |
| &mut exec, |
| &mut outgoing_frames, |
| MuxCommandMarker::ParameterNegotiation, |
| ); |
| assert_matches!(exec.run_until_stalled(&mut handle_fut), Poll::Ready(Ok(_))); |
| } |
| |
| let expected_dlci = server_channel.to_dlci(Role::Responder).unwrap(); |
| { |
| // Simulate the peer's positive response to the DLC PN request. We then expect an |
| // outgoing SABM command to establish the user channel. |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| make_dlc_pn_frame( |
| CommandResponse::Response, |
| expected_dlci, |
| true, // Supports credit-based flow control. |
| 100, // Supports max frame size of 100. |
| ), |
| FrameData::SetAsynchronousBalancedMode, // Outgoing SABM. |
| ); |
| } |
| |
| { |
| // Remote peer replies positively to the open channel request. |
| let mut handle_fut = Box::pin( |
| session.handle_frame(Frame::make_ua_response(Role::Responder, expected_dlci)), |
| ); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("should wait for channel"); |
| // We then expect to open a local RFCOMM channel to be relayed to a profile client. |
| let _channel = expect_channel(&mut exec, &mut outbound_channels); |
| exec.run_until_stalled(&mut handle_fut) |
| .expect_pending("should wait for outgoing frame"); |
| // Upon successful channel delivery, we expect an outgoing ModemStatus frame to |
| // be sent. |
| expect_mux_command(&mut exec, &mut outgoing_frames, MuxCommandMarker::ModemStatus); |
| assert_matches!(exec.run_until_stalled(&mut handle_fut), Poll::Ready(Ok(_))); |
| } |
| |
| // The DLCI should be established. |
| assert!(session.multiplexer().dlci_established(&expected_dlci)); |
| } |
| |
| #[test] |
| fn test_open_channel_request_rejected_by_peer() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Start SessionInner - don't expect any relayed channels. |
| let (mut session, mut outgoing_frames, _inbound_channels) = setup_session(); |
| let (outbound_fn, mut outbound_channels) = create_outbound_relay(); |
| session.channel_opened_fn = |
| Box::new(|_, _channel| async { panic!("Don't expect channels!") }.boxed()); |
| assert!(session.multiplexer().start(Role::Initiator).is_ok()); |
| |
| let server_channel = ServerChannel::try_from(5).unwrap(); |
| let expected_dlci = server_channel.to_dlci(Role::Responder).unwrap(); |
| // Simulate PN finishing ahead of time so that we can directly test the rejection case. |
| session.finish_parameter_negotiation(&ParameterNegotiationParams::default_command( |
| expected_dlci, |
| )); |
| // Initiate an open RFCOMM channel request with a random valid ServerChannel. Expect |
| // an outgoing SABM. |
| { |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel, outbound_fn)); |
| exec.run_until_stalled(&mut open_fut).expect_pending("should wait for outgoing frame"); |
| expect_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| FrameData::SetAsynchronousBalancedMode, |
| Some(expected_dlci), |
| ); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Ok(_))); |
| } |
| { |
| // Simulate peer responding negatively with a DM. |
| let mut handle_fut = Box::pin( |
| session.handle_frame(Frame::make_dm_response(Role::Responder, expected_dlci)), |
| ); |
| assert_matches!(exec.run_until_stalled(&mut handle_fut), Poll::Ready(Ok(_))); |
| } |
| // The DLCI should not be established due to peer rejection - client should be notified. |
| assert!(!session.multiplexer().dlci_established(&expected_dlci)); |
| expect_channel_error(&mut exec, &mut outbound_channels, ErrorCode::Canceled); |
| } |
| |
| #[test] |
| fn test_cancellation_during_open_channel_notifies_client() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (local, mut remote) = Channel::create(); |
| let (channel_open_fn, _inbound_channels) = create_inbound_relay(); |
| let session = Session::create(PeerId(42), local, channel_open_fn); |
| let (outbound_fn, mut outbound_channels) = create_outbound_relay(); |
| |
| // Local profile client requests to open an RFCOMM channel. |
| let server_channel = ServerChannel::try_from(2).unwrap(); |
| { |
| let mut open_fut = Box::pin(session.open_rfcomm_channel(server_channel, outbound_fn)); |
| assert!(exec.run_until_stalled(&mut open_fut).is_ready()); |
| } |
| |
| // Remote should receive an RFCOMM frame to start up the multiplexer. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| // Remote responds positively. |
| let ua = Frame::make_ua_response(Role::Unassigned, DLCI::MUX_CONTROL_DLCI); |
| send_peer_frame(remote.as_ref(), ua); |
| |
| // Remote should receive an RFCOMM frame to negotiate parameters. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| // Remote disconnects - run any background tasks to completion. |
| drop(remote); |
| let _ = exec.run_until_stalled(&mut futures::future::pending::<()>()); |
| // Client should be notified of cancellation. |
| expect_channel_error(&mut exec, &mut outbound_channels, ErrorCode::Canceled); |
| } |
| |
| #[fuchsia::test] |
| fn session_close_request_before_mux_startup_is_no_op() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (local, remote) = Channel::create(); |
| let (channel_open_fn, _inbound_channels) = create_inbound_relay(); |
| let session = Session::create(PeerId(52), local, channel_open_fn); |
| |
| // Some local client (via the RfcommTest API) requests to close the RFCOMM session. |
| { |
| let mut close_fut = Box::pin(session.close()); |
| assert!(exec.run_until_stalled(&mut close_fut).is_ready()); |
| } |
| |
| // Because the RFCOMM session hasn't been established yet, the close request is a |
| // no-op. The underlying L2CAP channel should still be open. |
| let mut channel_closed_fut = Box::pin(remote.closed()); |
| exec.run_until_stalled(&mut channel_closed_fut) |
| .expect_pending("shouldn't be done while session active"); |
| } |
| |
| #[fuchsia::test] |
| fn session_close_request_results_in_disconnect_frame_to_peer() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (local, mut remote) = Channel::create(); |
| let (channel_open_fn, _inbound_channels) = create_inbound_relay(); |
| let session = Session::create(PeerId(52), local, channel_open_fn); |
| |
| // Manually start the multiplexer. |
| { |
| let mut start_fut = Box::pin(session.initiate_multiplexer_startup()); |
| assert!(exec.run_until_stalled(&mut start_fut).is_ready()); |
| } |
| |
| // Expect outgoing SABM and simulate peer positive response. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| let ua = Frame::make_ua_response(Role::Unassigned, DLCI::MUX_CONTROL_DLCI); |
| send_peer_frame(remote.as_ref(), ua); |
| let _ = exec.run_until_stalled(&mut futures::future::pending::<()>()); |
| |
| // Some client (presumably via the RfcommTest API) requests to close the RFCOMM session. |
| { |
| let mut close_fut = Box::pin(session.close()); |
| assert!(exec.run_until_stalled(&mut close_fut).is_ready()); |
| } |
| |
| // Remote should receive an RFCOMM frame to Disconnect. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| { |
| // The session (and therefore L2CAP channel) should only close after the |
| // peer acknowledges. |
| let mut channel_closed_fut = Box::pin(remote.closed()); |
| exec.run_until_stalled(&mut channel_closed_fut) |
| .expect_pending("shouldn't finish while session active"); |
| } |
| // Remote responds positively. |
| let ua = Frame::make_ua_response(Role::Unassigned, DLCI::MUX_CONTROL_DLCI); |
| send_peer_frame(remote.as_ref(), ua); |
| |
| // At this point, the L2CAP channel should be closed. |
| let mut channel_closed_fut = Box::pin(remote.closed()); |
| assert!(exec.run_until_stalled(&mut channel_closed_fut).is_ready()); |
| } |
| |
| #[test] |
| fn test_open_multiple_channels_establishes_channels_after_acknowledgement() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create and start a SessionInner that relays any opened RFCOMM channels. |
| let (mut session, mut outgoing_frames, _inbound_channels) = setup_session(); |
| let (outbound_fn, _outbound_channels1) = create_outbound_relay(); |
| let (outbound_fn2, _outbound_channels2) = create_outbound_relay(); |
| |
| // The session multiplexer has started. |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| |
| // Initiate an open RFCOMM channel request with a random valid ServerChannel. |
| let server_channel = ServerChannel::try_from(5).unwrap(); |
| let expected_dlci = server_channel.to_dlci(Role::Initiator).unwrap(); |
| { |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel, outbound_fn)); |
| exec.run_until_stalled(&mut open_fut).expect_pending("should wait for outgoing frame"); |
| // We expect the session to initiate a Parameter Negotiation request (UIH Frame), |
| // for the DLCI. We do this for every DLC. |
| expect_mux_command( |
| &mut exec, |
| &mut outgoing_frames, |
| MuxCommandMarker::ParameterNegotiation, |
| ); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Ok(_))); |
| } |
| |
| // Before the peer responds, we get another request to open a different RFCOMM channel. |
| let server_channel2 = ServerChannel::try_from(9).unwrap(); |
| let expected_dlci2 = server_channel2.to_dlci(Role::Initiator).unwrap(); |
| { |
| // We expect the session to initiate a Parameter Negotiation request (UIH Frame), |
| // for the DLCI. We do this for every DLC. |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel2, outbound_fn2)); |
| exec.run_until_stalled(&mut open_fut).expect_pending("should wait for outgoing frame"); |
| expect_mux_command( |
| &mut exec, |
| &mut outgoing_frames, |
| MuxCommandMarker::ParameterNegotiation, |
| ); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Ok(_))); |
| } |
| |
| // Simulate the peer's positive response to the first DLC PN request. We expect an |
| // outgoing SABM. |
| let pn_frame = make_dlc_pn_frame( |
| CommandResponse::Response, |
| expected_dlci, |
| true, // Supports credit-based flow control. |
| 100, // Supports max frame size of 100. |
| ); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| pn_frame, |
| FrameData::SetAsynchronousBalancedMode, |
| ); |
| // Simulate the peer's positive response to the second DLC PN request. We expect an |
| // outgoing SABM. |
| let pn_frame = make_dlc_pn_frame( |
| CommandResponse::Response, |
| expected_dlci2, |
| true, // Supports credit-based flow control. |
| 105, // Supports max frame size of 105. |
| ); |
| handle_and_expect_frame( |
| &mut exec, |
| &mut session, |
| &mut outgoing_frames, |
| pn_frame, |
| FrameData::SetAsynchronousBalancedMode, |
| ); |
| } |
| |
| #[test] |
| fn test_open_rfcomm_channel_relays_channel_to_callback() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (local, mut remote) = Channel::create(); |
| let (channel_open_fn, _inbound_channels) = create_inbound_relay(); |
| let session = Session::create(PeerId(321), local, channel_open_fn); |
| let (outbound_fn, mut outbound_channels) = create_outbound_relay(); |
| |
| // 1. Simulate local profile client requesting to open an RFCOMM channel. |
| let server_channel = ServerChannel::try_from(2).unwrap(); |
| let expected_dlci = server_channel.to_dlci(Role::Responder).unwrap(); |
| { |
| let mut open_fut = Box::pin(session.open_rfcomm_channel(server_channel, outbound_fn)); |
| assert!(exec.run_until_stalled(&mut open_fut).is_ready()); |
| } |
| |
| // 2. Remote should receive an RFCOMM frame to start up the multiplexer. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| // 3. Remote responds positively. |
| let ua = Frame::make_ua_response(Role::Unassigned, DLCI::MUX_CONTROL_DLCI); |
| send_peer_frame(remote.as_ref(), ua); |
| |
| // 4. Remote should receive an RFCOMM frame to negotiate parameters. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| // 5. Remote responds positively. |
| let pn_response = make_dlc_pn_frame(CommandResponse::Response, expected_dlci, true, 100); |
| send_peer_frame(remote.as_ref(), pn_response); |
| |
| // 6. Remote should receive an RFCOMM frame to establish the `expected_dlci`. |
| expect_frame_received_by_peer(&mut exec, &mut remote); |
| // 7. Remote responds positively. |
| let ua = Frame::make_ua_response(Role::Responder, expected_dlci); |
| send_peer_frame(remote.as_ref(), ua); |
| |
| // Mux startup, Parameter negotiation, and channel establishment are complete. The RFCOMM |
| // channel should be ready and relayed to the client. |
| let _channel = expect_channel(&mut exec, &mut outbound_channels); |
| |
| // Client trying to connect again on the same channel should fail immediately. |
| let (outbound_fn2, mut outbound_channels2) = create_outbound_relay(); |
| let mut open_fut = Box::pin(session.open_rfcomm_channel(server_channel, outbound_fn2)); |
| assert!(exec.run_until_stalled(&mut open_fut).is_ready()); |
| expect_channel_error(&mut exec, &mut outbound_channels2, ErrorCode::Canceled); |
| } |
| |
| #[test] |
| fn test_open_same_rfcomm_channel_fails() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| // Create and start a SessionInner that relays any opened RFCOMM channels. |
| let (mut session, mut outgoing_frames, _inbound_channels) = setup_session(); |
| assert!(session.multiplexer().start(Role::Responder).is_ok()); |
| let (outbound_fn, mut outbound_channels1) = create_outbound_relay(); |
| let (outbound_fn2, mut outbound_channels2) = create_outbound_relay(); |
| |
| // Initiate an open RFCOMM channel request. |
| let server_channel = ServerChannel::try_from(5).unwrap(); |
| { |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel, outbound_fn)); |
| exec.run_until_stalled(&mut open_fut).expect_pending("should wait for outgoing frame"); |
| // Expect to initiate PN. |
| expect_mux_command( |
| &mut exec, |
| &mut outgoing_frames, |
| MuxCommandMarker::ParameterNegotiation, |
| ); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Ok(_))); |
| } |
| // Before peer responds, client tries to request to open the same channel - failure. |
| let mut open_fut = Box::pin(session.open_remote_channel(server_channel, outbound_fn2)); |
| assert_matches!(exec.run_until_stalled(&mut open_fut), Poll::Ready(Err(_))); |
| // Client should be notified that the second request failed. |
| expect_channel_error(&mut exec, &mut outbound_channels2, ErrorCode::Failed); |
| // First request should still be alive - nothing relayed. |
| poll_stream(&mut exec, &mut outbound_channels1).expect_pending("nothing relayed") |
| } |
| |
| #[fuchsia::test] |
| fn send_rls_before_mux_startup_returns_error() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let (mut session, _outgoing_frames, _inbound_channels) = setup_session(); |
| |
| // Initiate an RLS update command. |
| let server_channel = ServerChannel::try_from(3).unwrap(); |
| let mut rls_fut = Box::pin(session.send_remote_line_status(server_channel, None)); |
| match exec.run_until_stalled(&mut rls_fut) { |
| Poll::Ready(Err(Error::MultiplexerNotStarted)) => {} |
| x => panic!("Expected ready with error but got: {:?}", x), |
| } |
| } |
| |
| #[fuchsia::test] |
| fn send_rls_before_channel_establishment_returns_error() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let our_role = Role::Responder; |
| let (mut session, _outgoing_frames, _inbound_channels) = setup_session(); |
| assert!(session.multiplexer().start(our_role).is_ok()); |
| |
| // Initiate an RLS update for some channel that has not been established. |
| let server_channel = ServerChannel::try_from(3).expect("should be valid server channel"); |
| let expected_dlci = server_channel.to_dlci(our_role).expect("should be valid dlci"); |
| |
| let mut rls_fut = Box::pin(session.send_remote_line_status(server_channel, None)); |
| match exec.run_until_stalled(&mut rls_fut) { |
| Poll::Ready(Err(Error::ChannelNotEstablished(dlci))) => { |
| assert_eq!(dlci, expected_dlci); |
| } |
| x => panic!("Expected ready with error but got: {:?}", x), |
| } |
| } |
| |
| #[fuchsia::test] |
| fn rls_command_received_by_peer() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let our_role = Role::Responder; |
| let remote_peer_role = our_role.opposite_role(); |
| let (mut session, mut outgoing_frames, mut channel_receiver) = setup_session(); |
| assert!(session.multiplexer().start(our_role).is_ok()); |
| |
| // Remote peer wants to open an RFCOMM channel for some service provided by this device. |
| let server_channel_number = |
| ServerChannel::try_from(2).expect("valid server channel number"); |
| let expected_dlci = server_channel_number.to_dlci(our_role).expect("valid DLCI"); |
| |
| let user_sabm = Frame::make_sabm_command(remote_peer_role, expected_dlci); |
| let _rfcomm_channel = { |
| let mut handle_fut = Box::pin(session.handle_frame(user_sabm)); |
| exec.run_until_stalled(&mut handle_fut).expect_pending("should wait for channel"); |
| // We expect a channel to be delivered to the local client (e.g to the `channel_receiver`). |
| let _c = expect_channel(&mut exec, &mut channel_receiver); |
| // After successfully delivering the channel to a local client, we expect to notify the peer with |
| // a positive UA response. |
| exec.run_until_stalled(&mut handle_fut) |
| .expect_pending("should wait for outgoing frame"); |
| expect_frame( |
| &mut exec, |
| &mut outgoing_frames, |
| FrameData::UnnumberedAcknowledgement, |
| Some(expected_dlci), |
| ); |
| |
| // After positively responding, we expect to send our current Modem Signals to indicate |
| // readiness. |
| exec.run_until_stalled(&mut handle_fut) |
| .expect_pending("should wait for outgoing frame"); |
| expect_mux_command(&mut exec, &mut outgoing_frames, MuxCommandMarker::ModemStatus); |
| assert_matches!(exec.run_until_stalled(&mut handle_fut), Poll::Ready(Ok(false))); |
| _c |
| }; |
| |
| // An RFCOMM channel at `expected_dlci` has been established. Sending an RLS update to the peer |
| // should succeed. |
| let error_status = Some(RlsError::Overrun); |
| { |
| let mut rls_fut = |
| Box::pin(session.send_remote_line_status(server_channel_number, error_status)); |
| exec.run_until_stalled(&mut rls_fut).expect_pending("should wait for outgoing frame"); |
| expect_mux_command(&mut exec, &mut outgoing_frames, MuxCommandMarker::RemoteLineStatus); |
| assert_matches!(exec.run_until_stalled(&mut rls_fut), Poll::Ready(Ok(_))); |
| } |
| |
| // Peer would typically respond by echoing the RLS - nothing to be done thereafter. |
| let mux_command = MuxCommand { |
| params: MuxCommandParams::RemoteLineStatus(RemoteLineStatusParams::new( |
| expected_dlci, |
| error_status, |
| )), |
| command_response: CommandResponse::Response, |
| }; |
| let peer_rls = Frame::make_mux_command(remote_peer_role, mux_command); |
| { |
| let mut handle_fut = Box::pin(session.handle_frame(peer_rls)); |
| assert_matches!(exec.run_until_stalled(&mut handle_fut), Poll::Ready(Ok(false))); |
| } |
| } |
| } |