blob: 9aa3295e3d19554755ce654b9f65eaef5e5e0ff6 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
at_commands as at,
task::{Context, Poll},
fuchsia_zircon as zx,
channel::mpsc::{self, Receiver, Sender},
stream::{FusedStream, Stream, StreamExt},
AsyncWrite, AsyncWriteExt,
log::{info, warn},
std::{collections::HashMap, collections::VecDeque, io::Cursor},
use crate::{
AgUpdate, InformationRequest, Procedure, ProcedureError, ProcedureMarker, ProcedureRequest,
features::{AgFeatures, HfFeatures},
indicators::{AgIndicators, AgIndicatorsReporting, HfIndicators},
/// The maximum number of concurrent procedures currently supported by this SLC.
/// This value is chosen as a number significantly more than the total number of procedures
/// supported by this implementation.
/// An update that can be received from either the Audio Gateway (AG) or the Hands Free (HF).
#[derive(Debug, Clone)]
pub enum Command {
/// A command received from the AG.
/// A command received from the HF.
impl From<AgUpdate> for Command {
fn from(src: AgUpdate) -> Self {
impl From<at::Command> for Command {
fn from(src: at::Command) -> Self {
/// The state associated with this service level connection.
#[derive(Clone, Debug, Default)]
pub struct SlcState {
/// Whether the channel has been initialized with the SLCI Procedure.
pub initialized: bool,
/// The features of the AG.
pub ag_features: AgFeatures,
/// The features of the HF.
pub hf_features: HfFeatures,
/// The codecs supported by the HF.
pub hf_supported_codecs: Option<Vec<u32>>,
/// The indicators supported by the HF and its current status.
pub hf_indicators: HfIndicators,
/// The current AG indicator events reporting state.
pub ag_indicator_events_reporting: AgIndicatorsReporting,
/// The current indicator status of the AG.
pub ag_indicator_status: AgIndicators,
/// The format used when representing the network operator name on the AG.
pub ag_network_operator_name_format: Option<at::NetworkOperatorNameFormat>,
/// Use AG Extended Error Codes.
pub extended_errors: bool,
/// Enable call waiting notifications.
pub call_waiting_notifications: bool,
/// Enable call line identification notifications during incoming calls.
pub call_line_ident_notifications: bool,
impl SlcState {
/// Returns true if both peers support the Codec Negotiation state.
pub fn codec_negotiation(&self) -> bool {
&& self.hf_features.contains(HfFeatures::CODEC_NEGOTIATION)
/// Returns true if both peers support Three-way calling.
pub fn three_way_calling(&self) -> bool {
&& self.ag_features.contains(AgFeatures::THREE_WAY_CALLING)
/// Returns true if both peers support HF Indicators.
pub fn hf_indicators(&self) -> bool {
&& self.ag_features.contains(AgFeatures::HF_INDICATORS)
/// Provides an API for managing data packets to be sent over the provided `channel`.
/// - Provides a way to queue data to be sent.
/// - Provides a way to drain and asynchronously send the queued data to the remote.
/// - Provides a stream implementation to read bytes received from the remote and send
/// any queued bytes.
struct DataController<T: Stream + AsyncWrite> {
/// The underlying channel representing the connection with the remote.
channel: T,
/// Bytes that are buffered to be sent to the remote.
buffer: Vec<u8>,
/// Cursor on the first buffer waiting indicating the next byte to be written.
buffer_cursor: usize,
/// Flag indicating whether the `channel` needs to be flushed or not.
needs_flush: bool,
impl<T: Stream + AsyncWrite + Unpin> DataController<T> {
fn new(channel: T) -> Self {
Self { channel, buffer: Vec::new(), buffer_cursor: 0, needs_flush: false }
/// Adds the provided `bytes` to the send queue.
fn queue_data(&mut self, mut bytes: Vec<u8>) {
self.buffer.append(&mut bytes);
/// Write all queued data to the `channel` - returns Error if writing fails.
async fn send_queued(&mut self) -> Result<(), zx::Status> {
let bytes = std::mem::take(&mut self.buffer);
let result =;
if let Ok(_) = result {
info!("Sent {:?}", String::from_utf8_lossy(&bytes));
self.buffer_cursor = 0;
self.needs_flush = false;
/// Attempts to send any queued data to the `channel`.
/// Returns Error if writing to or flushing the channel fails, OK otherwise.
fn try_send_queued(&mut self, cx: &mut Context<'_>) -> Result<(), zx::Status> {
while !self.buffer.is_empty() {
let cursor = self.buffer_cursor;
match Pin::new(&mut, &self.buffer[cursor..]) {
Poll::Pending => {
// Unable to write, try again later.
return Ok(());
Poll::Ready(Err(e)) => {
warn!("Error writing bytes to channel: {:?}", e);
return Err(e.into());
Poll::Ready(Ok(written)) => {
self.buffer_cursor = cursor + written;
// If we've finished writing the entire buffer, we are ready to flush.
if self.buffer_cursor >= self.buffer.len() {
// Reset the pointer to the front of the buffer as we've finished writing.
self.buffer = Vec::new();
self.buffer_cursor = 0;
self.needs_flush = true;
// Attempt to flush
if self.needs_flush {
match Pin::new(&mut {
Poll::Ready(Ok(())) => self.needs_flush = false,
Poll::Ready(Err(e)) => return Err(e.into()),
Poll::Pending => (),
impl<T> Stream for DataController<T>
T: AsyncWrite + Stream<Item = Result<Vec<u8>, zx::Status>> + FusedStream + Unpin,
type Item = T::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.is_terminated() {
panic!("Cannot poll a terminated stream");
// Before reading on the channel, try to send any buffered messages.
if let Err(e) = self.try_send_queued(cx) {
return Poll::Ready(Some(Err(e)));
// Check for any data received from the peer.
impl<T> FusedStream for DataController<T>
T: AsyncWrite + Stream<Item = Result<Vec<u8>, zx::Status>> + FusedStream + Unpin,
fn is_terminated(&self) -> bool {
/// A connection between two peers that shares synchronized state and acts as the control plane for
/// HFP. See HFP v1.8, 4.2 for more information.
pub struct ServiceLevelConnection {
/// The underlying RFCOMM connection connecting the peers.
connection: Option<DataController<Channel>>,
/// The current state associated with this connection.
state: SlcState,
/// The current active procedures serviced by this SLC.
procedures: HashMap<ProcedureMarker, Box<dyn Procedure>>,
/// Queued AG requests waiting for the SLC to be initialized.
requests_pending_initialization: VecDeque<(ProcedureMarker, AgUpdate)>,
/// The sender used to relay updates to the stream implementation.
sender: Sender<InformationRequest>,
/// The receiver polled by the stream implementation producing requests for more information
/// from the HFP component.
receiver: Receiver<InformationRequest>,
impl ServiceLevelConnection {
/// Create a new, unconnected `ServiceLevelConnection`.
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(MAX_CONCURRENT_PROCEDURES);
Self {
connection: None,
state: SlcState::default(),
procedures: HashMap::new(),
requests_pending_initialization: VecDeque::new(),
/// Returns `true` if an active connection exists between the peers.
pub fn connected(&self) -> bool {
self.connection.as_ref().map(|ch| !ch.is_terminated()).unwrap_or(false)
/// Returns `true` if the channel has been initialized - namely the SLCI procedure has
/// been completed for the connected channel.
pub fn initialized(&self) -> bool {
self.connected() && self.state.initialized
/// Returns `true` if the provided `procedure` is currently active.
fn is_active(&self, procedure: &ProcedureMarker) -> bool {
/// Connect using the provided `channel`.
pub fn connect(&mut self, channel: Channel) {
// Reset the internal state before connecting the new `channel` to avoid processing
// stale procedure requests.
self.connection = Some(DataController::new(channel));
/// Connects and initializes the provided `channel` with `state`.
/// This method should be used in integration-style tests in order to bypass the
/// back-and-forth needed to complete the SLC Initialization procedure.
pub fn initialize_at_state(&mut self, channel: Channel, state: SlcState) {
self.state = state;
/// Sets the channel status to initialized.
/// Note: This should only be called when the SLCI Procedure has successfully finished
/// or in testing scenarios.
fn set_initialized(&mut self) {
self.state.initialized = true;
pub fn network_operator_name_format(&self) -> &Option<at::NetworkOperatorNameFormat> {
/// Close the service level connection and reset the state.
fn reset(&mut self) {
*self = Self::new();
/// Adds the AT `message` to the queue of outgoing data packets.
/// Returns Error if serialization fails, OK otherwise.
fn queue_message_to_peer(&mut self, message: at::Response) -> Result<(), at::SerializeError> {
let mut bytes = Vec::new();
message.serialize(&mut bytes)?;
if let Some(connection) = &mut self.connection {
/// Attempts to send any queued messages to the peer via the RFCOMM `connection`.
/// Returns Error if there was an error sending the queued messages.
async fn send_queued(&mut self) -> Result<(), zx::Status> {
if let Some(connection) = &mut self.connection {
/// Garbage collects the provided `procedure` and returns true if it has terminated.
fn check_and_cleanup_procedure(&mut self, procedure: &ProcedureMarker) -> bool {
let is_terminated = self.procedures.get(procedure).map_or(false, |p| p.is_terminated());
if is_terminated {
// Special case of the SLCI Procedure - once this is complete, the channel is
// considered initialized.
if *procedure == ProcedureMarker::SlcInitialization {
/// Handles an error received as a result of operating a procedure.
fn procedure_error(&mut self, error: ProcedureError) {
// TODO( Right now, we only send an Error AT response to the peer.
// Different error cases may warrant different SLC behavior. For example, errors
// before the SLC is initialized may warrant complete channel shutdown. Fix this method
// to make the error handling policy decisions.
info!("Error in procedure update: {:?}", error);
if let Err(err) = self.queue_message_to_peer(at::Response::Error) {
warn!("Unable to serialize AT error response with {:}", err);
/// Handle a procedure update.
/// Returns a potential request for more information if the update cannot be directly
/// handled by the SLC.
fn procedure_request(&mut self, request: ProcedureRequest) -> Option<InformationRequest> {
match request {
ProcedureRequest::None => {}
ProcedureRequest::Error(e) => {
ProcedureRequest::SendMessages(messages) => {
// Messages to be sent to the peer via the Service Level RFCOMM Connection.
for message in messages {
if let Err(err) = self.queue_message_to_peer(message) {
warn!("Unable to serialize AT response with {:}", err);
ProcedureRequest::Info(req) => return Some(req),
/// Consume and handle a command received from the local device.
/// This method:
/// 1) Attempts to drive the procedure with the `command`. SLCI updates are handled immediately.
/// Any non-SLCI updates received before the SLC has been initialized will be queued and
/// processed FIFO after initialization.
/// 2) Handles any subsequent request from (1) - sending any bytes to the peer
/// if needed or queueing up information requests to be consumed by the internal receiver.
pub async fn receive_ag_request(&mut self, id: ProcedureMarker, command: AgUpdate) {
// Non-SLCI requests received before initialization will be queued for later.
// If there are still outstanding requests pending initialization, queue the request to
// be processed after to maintain ordering of events.
if id != ProcedureMarker::SlcInitialization {
if !self.initialized() || !self.requests_pending_initialization.is_empty() {
self.requests_pending_initialization.push_back((id, command));
let request = self.handle_command(id, command.into());
// If the command requires more information, relay the request to the stream implementation.
// Otherwise, the command was handled and we should attempt to send any queued packets.
let info_request = match self.procedure_request(request) {
Some(r) => r,
None => {
// TODO( Propagate this error to PeerTask to be handled.
if let Err(e) = self.send_queued().await {
warn!("Error sending queued messages: {:?}", e);
if let Err(e) = self.sender.try_send(info_request) {
warn!("Couldn't relay procedure info request to internal receiver: {:?}", e);
/// Consume `bytes` from the peer (HF) and handle the command.
/// Returns the an optional request for more information if the SLC requires input
/// from the HFP component.
fn receive_data(&mut self, bytes: &mut Vec<u8>) -> Option<InformationRequest> {
let request = self.receive_data_internal(bytes);
/// Consume bytes from the peer (HF), producing a parsed at::Command from the bytes and
/// handling it. Internal helper method for `Self::receive_data`.
/// Returns the request from handling the command.
fn receive_data_internal(&mut self, bytes: &mut Vec<u8>) -> ProcedureRequest {
// Parse the byte buffer into a HF message.
let parse_result = at::Command::deserialize(&mut Cursor::new(bytes));
if let Err(err) = parse_result {
warn!("Received unparseable AT command: {:?}", err);
return ProcedureError::UnparsableHf(err).into();
let command = parse_result.unwrap();
info!("Received {:?}", command);
// Attempt to match the received command to a procedure.
let procedure_id = match self.match_command_to_procedure(&command) {
Ok(id) => id,
Err(e) => return e.into(),
// Handle the received HF commend.
self.handle_command(procedure_id, command.into())
/// Handles the provided `command`:
/// - Progresses the matched procedure with the `command`.
/// - Garbage collects the procedure if completed.
/// Returns the request from progressing the procedure.
fn handle_command(
&mut self,
procedure_id: ProcedureMarker,
command: Command,
) -> ProcedureRequest {
// Progress the procedure with the message.
let request = match command {
Command::Hf(cmd) => self.hf_message(procedure_id, cmd),
Command::Ag(cmd) => self.ag_message(procedure_id, cmd),
// Potentially clean up the procedure if this was the last stage. Procedures that
// have been cleaned up cannot require additional responses, as this would violate
// the `Procedure::is_terminated()` guarantee.
if self.check_and_cleanup_procedure(&procedure_id) && request.requires_response() {
return ProcedureError::UnexpectedRequest.into();
/// Matches the incoming message to a procedure. Returns the procedure identifier
/// for the given `command` or Error if the command couldn't be matched.
fn match_command_to_procedure(
command: &at::Command,
) -> Result<ProcedureMarker, ProcedureError> {
// If we haven't initialized the SLC yet, the only valid procedure to match is
// the SLCI Procedure.
if !self.state.initialized {
return Ok(ProcedureMarker::SlcInitialization);
// Otherwise, try to match it to a procedure - it must be a non SLCI command since
// the channel has already been initialized.
match ProcedureMarker::match_command(command) {
Ok(ProcedureMarker::SlcInitialization) => {
warn!("Received unexpected SLCI command after SLC initialization: {:?}", command);
res => res,
/// Updates the the procedure specified by the `marker` with the received AG `message`.
/// Initializes the procedure if it is not already in progress.
/// Returns the request associated with the `message`.
pub fn ag_message(&mut self, marker: ProcedureMarker, message: AgUpdate) -> ProcedureRequest {
.ag_update(message, &mut self.state)
/// Updates the the procedure specified by the `marker` with the received HF `message`.
/// Initializes the procedure if it is not already in progress.
/// Returns the request associated with the `message`.
pub fn hf_message(
&mut self,
marker: ProcedureMarker,
message: at::Command,
) -> ProcedureRequest {
.hf_update(message, &mut self.state)
/// Helper function to process any requests that are pending SLC initialization.
fn process_requests_pending_initialization(
&mut self,
cx: &mut Context<'_>,
) -> Option<Result<InformationRequest, ProcedureError>> {
if self.initialized() {
while let Some((marker, request)) = self.requests_pending_initialization.pop_front() {
let request = self.handle_command(marker, request.into());
let info_req = self.procedure_request(request);
match info_req {
Some(info_req) => return Some(Ok(info_req)),
None => {
if let Some(conn) = &mut self.connection {
if let Err(e) = conn.try_send_queued(cx) {
return Some(Err(e.into()));
/// Helper function to poll the internal channel for information requests from any procedures.
fn poll_next_procedure_update(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
match self.receiver.poll_next_unpin(cx) {
Poll::Ready(Some(request)) => Poll::Ready(Some(Ok(request))),
Poll::Ready(None) => {
info!("Internal procedure update channel closed unexpectedly");
Poll::Pending => Poll::Pending,
/// Helper function to poll the RFCOMM channel for messages from the remote peer.
fn poll_next_data_update(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
loop {
if let Some(conn) = &mut self.connection {
match conn.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(mut data))) => {
let request = self.receive_data(&mut data);
// If the SLC requires more information, bubble it up.
// Otherwise, try to send any queued data as a result of self.receive_data()
// and continue the loop to register a waker for the next read.
match request {
Some(info_req) => return Poll::Ready(Some(Ok(info_req))),
None => {
if let Some(conn) = &mut self.connection {
if let Err(e) = conn.try_send_queued(cx) {
return Poll::Ready(Some(Err(e.into())));
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(None) => {
return Poll::Ready(None);
Poll::Pending => break,
} else {
impl Stream for ServiceLevelConnection {
type Item = Result<InformationRequest, ProcedureError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.is_terminated() {
panic!("Cannot poll a terminated stream");
// Try to send any queued data in the RFCOMM channel.
if let Some(conn) = &mut self.connection {
if let Err(e) = conn.try_send_queued(cx) {
return Poll::Ready(Some(Err(e.into())));
// Try to process any local procedure requests pending channel initialization.
match self.process_requests_pending_initialization(cx) {
None => {}
request => return Poll::Ready(request),
// Check for any local procedural updates.
match self.poll_next_procedure_update(cx) {
Poll::Pending => {}
request => return request,
// Check for any data received from the connection.
impl FusedStream for ServiceLevelConnection {
fn is_terminated(&self) -> bool {
pub(crate) mod tests {
use {
features::{AgFeatures, HfFeatures},
fuchsia_async as fasync,
/// Builds and returns a connected service level connection. Returns the SLC and
/// the remote end of the channel.
fn create_and_connect_slc() -> (ServiceLevelConnection, Channel) {
let mut slc = ServiceLevelConnection::new();
let (local, remote) = Channel::create();
(slc, remote)
/// Builds and returns a service level connection that is connected and initialized with
/// the provided `state`.
/// Returns the SLC and the remote end of the channel.
pub fn create_and_initialize_slc(state: SlcState) -> (ServiceLevelConnection, Channel) {
let mut connection = ServiceLevelConnection::new();
let (local, remote) = Channel::create();
connection.initialize_at_state(local, state);
(connection, remote)
/// Expects the provided `expected` AT data to be received by the `remote` channel.
pub async fn expect_data_received_by_peer(remote: &mut Channel, expected: Vec<at::Response>) {
for expected_at in expected {
let mut bytes = Vec::new();
assert_matches!(remote.read_datagram(&mut bytes).await, Ok(_));
let actual =
at::Response::deserialize(&mut Cursor::new(bytes)).expect("valid response");
assert_eq!(actual, expected_at);
/// Expects a message to be received by the peer. If provided, validates the contents
/// of the received message.
fn expect_peer_ready(
exec: &mut fasync::Executor,
remote: &mut Channel,
expected: Option<Vec<u8>>,
) {
let mut vec = Vec::new();
let actual_bytes = {
let mut remote_fut = Box::pin(remote.read_datagram(&mut vec));
match exec.run_until_stalled(&mut remote_fut) {
Poll::Ready(Ok(bytes)) => bytes,
x => panic!("Expected ready but got: {:?}", x),
if let Some(expected) = expected {
let expected_bytes = expected.len();
assert_eq!(actual_bytes, expected_bytes);
assert_eq!(vec, expected);
/// Expects nothing to be received by the `remote` peer.
fn expect_peer_pending(exec: &mut fasync::Executor, remote: &mut Channel) {
let mut vec = Vec::new();
let mut remote_fut = Box::pin(remote.read_datagram(&mut vec));
assert_matches!(exec.run_until_stalled(&mut remote_fut), Poll::Pending);
/// Serializes the AT Response into a byte buffer.
fn serialize_at_response(response: at::Response) -> Vec<u8> {
let mut buf = Vec::new();
response.serialize(&mut buf).expect("serialization is ok");
/// Simulates the HFP component responding to the `slc` with the provided `update`.
fn do_ag_update(
exec: &mut fasync::Executor,
slc: &mut ServiceLevelConnection,
marker: ProcedureMarker,
update: AgUpdate,
) {
let mut fut = Box::pin(slc.receive_ag_request(marker, update));
assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(()));
async fn connected_state_before_and_after_connect() {
let mut slc = ServiceLevelConnection::new();
let (_left, right) = Channel::create();
async fn slc_stream_produces_items() {
let (mut slc, mut remote) = create_and_connect_slc();
let actual_request = match {
Some(Ok(r)) => r,
x => panic!("Unexpected stream item: {:?}", x),
// The BRSF should start the SLCI procedure.
assert_matches!(actual_request, InformationRequest::GetAgFeatures { .. });
async fn slc_stream_terminated() {
let (mut slc, remote) = create_and_connect_slc();
assert_matches!(, None);
// TODO( Re-enable this test after error handling policies are implemented.
fn unexpected_command_before_initialization_closes_channel() {
let mut exec = fasync::Executor::new().unwrap();
let (mut slc, remote) = create_and_connect_slc();
// Peer sends an unexpected AT command.
let unexpected = format!("AT+CIND=?\r").into_bytes();
let _ = remote.as_ref().write(&unexpected);
// No requests should be received on the stream.
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
// Channel should be disconnected now.
/// Tests that the SLC is resilient to a new connection being established while there
/// is an existing one with outstanding procedures. The SLC should be completely reset
/// and any outstanding procedures should be terminated.
async fn new_connection_when_outstanding_procedure_terminates_procedure() {
let (mut slc, remote) = create_and_connect_slc();
// Peer sends us HF features - we expect a request for the AG features on the
// SLC stream.
let slci_marker = ProcedureMarker::SlcInitialization;
let features = HfFeatures::THREE_WAY_CALLING;
let command = format!("AT+BRSF={}\r", features.bits()).into_bytes();
let _ = remote.as_ref().write(&command);
match {
Some(Ok(InformationRequest::GetAgFeatures { .. })) => {}
x => panic!("Expected a GetAgFeatures request but got: {:?}", x),
// At this point, the SLC Initialization procedure should be in progress.
// A new connection comes through.
let (local2, _remote2) = Channel::create();
// The old `remote` end should be closed, the SLCI procedure should no longer be in
// progress.
assert_matches!(remote.closed().await, Ok(()));
fn completing_slc_init_procedure_initializes_channel() {
let mut exec = fasync::Executor::new().unwrap();
let (mut slc, mut remote) = create_and_connect_slc();
let slci_marker = ProcedureMarker::SlcInitialization;
// Peer sends us HF features - we expect a request for the AG features on the
// SLC stream.
let features = HfFeatures::THREE_WAY_CALLING;
let command1 = format!("AT+BRSF={}\r", features.bits()).into_bytes();
let _ = remote.as_ref().write(&command1);
let response_fn1 = {
match exec.run_until_stalled(&mut {
Poll::Ready(Some(Ok(InformationRequest::GetAgFeatures { response }))) => response,
x => panic!("Expected GetAgFeatures but got: {:?}", x),
// At this point, the SLC Initialization procedure should be in progress.
// Simulate local response with AG Features - expect these to be sent to the peer.
let features = AgFeatures::empty();
do_ag_update(&mut exec, &mut slc, slci_marker, response_fn1(features));
expect_peer_ready(&mut exec, &mut remote, None);
// No further requests - waiting on peer response.
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
// Peer sends us an HF supported indicators request - since the SLC can handle the request,
// we expect no item in the SLC stream. The response should directly be sent to the peer.
let command2 = format!("AT+CIND=?\r").into_bytes();
let _ = remote.as_ref().write(&command2);
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_ready(&mut exec, &mut remote, None);
// Peer requests the indicator status. Since this status is not managed by the SLC, we
// expect a stream item to get the information.
let command3 = format!("AT+CIND?\r").into_bytes();
let _ = remote.as_ref().write(&command3);
let response_fn2 = {
match exec.run_until_stalled(&mut {
Poll::Ready(Some(Ok(InformationRequest::GetAgIndicatorStatus { response }))) => {
x => panic!("Expected GetAgFeatures but got: {:?}", x),
// Simulate local response with the AG indicators status - expect this to go to the peer.
do_ag_update(&mut exec, &mut slc, slci_marker, response_fn2(AgIndicators::default()));
expect_peer_ready(&mut exec, &mut remote, None);
// No further requests - waiting on peer response.
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
// Peer requests to enable the Indicator Status update in the AG - since the SLC can
// handle the request, we expect no item in the SLC stream, and the response should directly
// be sent to the peer.
let command4 = format!("AT+CMER=3,0,0,1\r").into_bytes();
let _ = remote.as_ref().write(&command4);
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_ready(&mut exec, &mut remote, None);
// The SLC should be considered initialized and the SLCI Procedure is done.
fn slci_command_after_initialization_returns_error() {
let _exec = fasync::Executor::new().unwrap();
let (mut slc, _remote) = create_and_connect_slc();
// Bypass the SLCI procedure by setting the channel to initialized.
// Receiving an AT command associated with the SLCI procedure thereafter should
// be an error.
let cmd1 = at::Command::Brsf { features: HfFeatures::empty().bits() as i64 };
let cmd2 = at::Command::CindTest {};
async fn locally_initiated_phone_status_procedure_returns_message() {
// Bypass the SLCI procedure by setting the channel to initialized and enable indicator
// reporting.
let state = SlcState {
ag_indicator_events_reporting: AgIndicatorsReporting::new_enabled(),
let (mut slc, mut remote) = create_and_initialize_slc(state);
// Local device wants to initiate a phone status update.
let expected_marker = ProcedureMarker::PhoneStatus;
let status = AgIndicator::BatteryLevel(2);
slc.receive_ag_request(expected_marker, status.into()).await;
// We expect the PhoneStatus Procedure to be initiated and an outgoing message to the peer
// with the status update.
let expected_messages = vec![at::Response::Success(at::Success::Ciev {
value: 2,
expect_data_received_by_peer(&mut remote, expected_messages).await;
// Since status updates require no response, the procedure should be terminated.
fn ag_updates_are_queued_until_slc_initialization() {
let mut exec = fasync::Executor::new().unwrap();
let (mut slc, mut remote) = create_and_connect_slc();
// Receiving a Ag request to send the phone status update should result in no action
// because the SLC is not initialized yet.
let status1 = AgIndicator::Call(0);
let expected1 = at::Response::Success(at::Success::Ciev {
value: 0i64,
do_ag_update(&mut exec, &mut slc, ProcedureMarker::PhoneStatus, status1.into());
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_pending(&mut exec, &mut remote);
// Peer sends us HF features - we expect a request for the AG features on the
// SLC stream and the SLCI procedure should begin.
let features = HfFeatures::THREE_WAY_CALLING;
let command1 = format!("AT+BRSF={}\r", features.bits()).into_bytes();
let _ = remote.as_ref().write(&command1);
// Simulate local response with AG Features - expect these to be sent to the peer.
let ag_features_update = {
match exec.run_until_stalled(&mut {
Poll::Ready(Some(Ok(InformationRequest::GetAgFeatures { response }))) => {
x => panic!("Expected GetAgFeatures but got: {:?}", x),
do_ag_update(&mut exec, &mut slc, ProcedureMarker::SlcInitialization, ag_features_update);
expect_peer_ready(&mut exec, &mut remote, None);
// Receiving another phone status amidst the SLCI procedure should be saved for later.
let status2 = AgIndicator::CallHeld(1);
let expected2 = at::Response::Success(at::Success::Ciev {
value: 1i64,
do_ag_update(&mut exec, &mut slc, ProcedureMarker::PhoneStatus, status2.into());
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_pending(&mut exec, &mut remote);
// Peer continues the SLCI procedure.
let command2 = format!("AT+CIND=?\r").into_bytes();
let _ = remote.as_ref().write(&command2);
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_ready(&mut exec, &mut remote, None);
let command3 = format!("AT+CIND?\r").into_bytes();
let _ = remote.as_ref().write(&command3);
let ag_indicators = {
match exec.run_until_stalled(&mut {
Poll::Ready(Some(Ok(InformationRequest::GetAgIndicatorStatus { response }))) => {
x => panic!("Expected GetAgFeatures but got: {:?}", x),
// Simulate local response with AG indicators status - expect this to go to the peer.
do_ag_update(&mut exec, &mut slc, ProcedureMarker::SlcInitialization, ag_indicators);
expect_peer_ready(&mut exec, &mut remote, None);
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
// Peer requests to enable the Indicator Status update in the AG.
let command4 = format!("AT+CMER=3,0,0,1\r").into_bytes();
let _ = remote.as_ref().write(&command4);
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_ready(&mut exec, &mut remote, None);
// At this point, the mandatory portion of the SLCI procedure is complete. There are no optional
// steps since we responded with an empty set of AgFeatures.
// A third request to send a PhoneStatus update _after_ SLCI completes is OK. This should
// only be processed after any queued requests so that the peer gets the updates in order.
let status3 = AgIndicator::CallHeld(0);
let expected3 = at::Response::Success(at::Success::Ciev {
value: 0i64,
do_ag_update(&mut exec, &mut slc, ProcedureMarker::PhoneStatus, status3.into());
// The next time the SLC stream is polled, we expect the updates to be processed.
// Since these are phone status updates, we expect the one-shot procedures to send data
// to the peer and therefore no SLC stream items.
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_ready(&mut exec, &mut remote, Some(serialize_at_response(expected1)));
expect_peer_ready(&mut exec, &mut remote, Some(serialize_at_response(expected2)));
expect_peer_ready(&mut exec, &mut remote, Some(serialize_at_response(expected3)));
async fn rfcomm_connection_stream_produces_items() {
let (local, remote) = Channel::create();
let mut connection = DataController::new(local);
let data1 = vec![0x01, 0x02, 0x03, 0x04];
let _ = remote.as_ref().write(&data1);
assert_matches!(, Some(Ok(buf)) if buf == data1);
let data2 = vec![0x01];
let _ = remote.as_ref().write(&data2);
assert_matches!(, Some(Ok(buf)) if buf == data2);
assert_matches!(, None);
fn queued_packets_get_sent_to_connection() {
let mut exec = fasync::Executor::new().unwrap();
let (local, mut remote) = Channel::create();
let mut connection = DataController::new(local);
let mut data1 = vec![0x00, 0x02, 0x04, 0x06, 0x08];
let mut data2 = vec![0x10, 0x11, 0x12];
// Queueing the message shouldn't have any impact on it being sent to the peer.
expect_peer_pending(&mut exec, &mut remote);
// Polling the connection stream should result in messages sent to the peer. Since the
// peer hasn't sent any data to us, we don't expect any stream items.
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
data1.append(&mut data2);
expect_peer_ready(&mut exec, &mut remote, Some(data1));
let data3 = vec![0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9];
// Explicitly sending the queued data is OK.
let mut send_fut = Box::pin(connection.send_queued());
assert_matches!(exec.run_until_stalled(&mut send_fut), Poll::Ready(Ok(())));
expect_peer_ready(&mut exec, &mut remote, Some(data3));
// Polling the stream thereafter should have no effect. No duplicate messages.
assert_matches!(exec.run_until_stalled(&mut, Poll::Pending);
expect_peer_pending(&mut exec, &mut remote);
async fn read_error_result_is_propagated_to_stream() {
// Close the local end of the channel so that local reads and remote writes
// fail.
let (local, remote) = Channel::create();
let mut connection = DataController::new(local);
// Remote writing to us should fail.
let bytes = vec![0x00, 0x03];
assert_matches!(remote.as_ref().write(&bytes), Err(zx::Status::BAD_STATE));
// A local read should also fail - the error should be propagated to the stream.
assert_matches!(, Some(Err(zx::Status::BAD_STATE)));
async fn write_error_result_is_propagated_to_stream() {
// Close the remote end of the channel so that remote reads and local writes
// fail.
let (local, _remote) = Channel::create();
let mut connection = DataController::new(local);
// Queue some data to be sent to the remote.
let bytes = vec![0x00, 0x03];
// When the stream is polled, the attempted write of `bytes` should fail, and the
// error should be propagated via the stream.
assert_matches!(, Some(Err(zx::Status::IO)));
// Trying to explicitly write the bytes should also fail.
assert_matches!(connection.send_queued().await, Err(zx::Status::IO));