[bt][hfp] Implement Call Waiting Notifications.

If call waiting notifications are enabled, send a
CCWA unsolicited response to the peer when a call
enters the incoming waiting state.

Test: Added unit tests
Bug: 64558

Change-Id: Ife37c1bdf7f070a2f0a5cd1fd67b8de45d05b1c0
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/513640
Fuchsia-Auto-Submit: Jeff Belgum <belgum@google.com>
Commit-Queue: Jeff Belgum <belgum@google.com>
Reviewed-by: Matthew Kehrt <kehrt@google.com>
diff --git a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/calls.rs b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/calls.rs
index 2216427..b783229 100644
--- a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/calls.rs
+++ b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/calls.rs
@@ -351,6 +351,12 @@
             .map(|(idx, call)| Call::new(idx, call.number.clone(), call.state, call.direction))
     }
 
+    /// Get the Call that has been waiting the longest. Returns None if there are no waiting calls.
+    pub fn waiting(&self) -> Option<Call> {
+        self.oldest_by_state(CallState::IncomingWaiting)
+            .map(|(idx, call)| Call::new(idx, call.number.clone(), call.state, call.direction))
+    }
+
     /// Answer the call that has been in the IncomingRinging state the longest.
     /// Returns an Error if there are no calls in the IncomingRinging state.
     pub fn answer(&mut self) -> Result<(), CallError> {
@@ -711,6 +717,7 @@
         let expected = CallIndicators {
             call: indicators::Call::None,
             callsetup: indicators::CallSetup::Incoming,
+            callwaiting: false,
             callheld: indicators::CallHeld::None,
         };
         assert_eq!(calls.indicators(), expected);
diff --git a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/task.rs b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/task.rs
index 28cae83..436275a8 100644
--- a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/task.rs
+++ b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/peer/task.rs
@@ -298,6 +298,11 @@
                 // A new call state has been received from the call service
                 update = self.calls.select_next_some() => {
                     self.ringer.ring(self.calls.should_ring());
+                    if update.callwaiting {
+                        if let Some(call) = self.calls.waiting() {
+                            self.call_waiting_update(call).await;
+                        }
+                    }
                     for status in update.to_vec() {
                         self.phone_status_update(status).await;
                     }
@@ -361,6 +366,16 @@
         self.connection.receive_ag_request(ProcedureMarker::Ring, AgUpdate::Ring(call)).await;
     }
 
+    /// Request to send a Call Waiting Notification.
+    async fn call_waiting_update(&mut self, call: Call) {
+        self.connection
+            .receive_ag_request(
+                ProcedureMarker::CallWaitingNotifications,
+                AgUpdate::CallWaiting(call),
+            )
+            .await;
+    }
+
     /// Request to send the phone `status` by initiating the Phone Status Indicator
     /// procedure.
     async fn phone_status_update(&mut self, status: AgIndicator) {
@@ -426,21 +441,30 @@
         at_commands::{self as at, SerDe},
         fidl_fuchsia_bluetooth_bredr::{ProfileMarker, ProfileRequestStream},
         fidl_fuchsia_bluetooth_hfp::{
-            CallState, PeerHandlerMarker, PeerHandlerRequest, SignalStrength,
+            CallState, PeerHandlerMarker, PeerHandlerRequest, PeerHandlerRequestStream,
+            PeerHandlerWaitForCallResponder, SignalStrength,
         },
         fuchsia_async as fasync,
         fuchsia_bluetooth::types::Channel,
-        futures::{future::ready, pin_mut, stream::FusedStream, SinkExt},
+        futures::{
+            future::ready,
+            pin_mut,
+            stream::{FusedStream, Stream},
+            SinkExt,
+        },
         proptest::prelude::*,
     };
 
     use crate::{
-        peer::service_level_connection::{
-            tests::{
-                create_and_initialize_slc, expect_data_received_by_peer, expect_peer_ready,
-                serialize_at_response,
+        peer::{
+            calls::Number,
+            service_level_connection::{
+                tests::{
+                    create_and_initialize_slc, expect_data_received_by_peer, expect_peer_ready,
+                    serialize_at_response,
+                },
+                SlcState,
             },
-            SlcState,
         },
         protocol::{
             features::HfFeatures,
@@ -739,6 +763,65 @@
         assert!(result.is_ready());
     }
 
+    /// Transform `stream` into a Stream of WaitForCall responders.
+    #[track_caller]
+    async fn wait_for_call_stream(
+        stream: PeerHandlerRequestStream,
+    ) -> impl Stream<Item = PeerHandlerWaitForCallResponder> {
+        filtered_stream(stream, |item| match item {
+            PeerHandlerRequest::WaitForCall { responder } => Ok(responder),
+            x => Err(x),
+        })
+        .await
+    }
+
+    /// Transform `stream` into a Stream of `T`.
+    ///
+    /// `f` is a function that takes a PeerHandlerRequest and either returns an Ok if the request
+    /// is relevant to the test or Err if the request irrelevant.
+    ///
+    /// This test helper function can be used in the common case where the test interacts
+    /// with a particular kind of PeerHandlerRequest.
+    ///
+    /// Initial setup of the handler is done, then a filtered stream is produced which
+    /// outputs items based on the result of `f`. Ok return values from `f` are returned from the
+    /// `filtered_stream`. Err return values from `f` are not returned from the `filtered_stream`.
+    /// Instead they are stored within `filtered_stream` until `filtered_stream` is dropped
+    /// so that they do not cause the underlying fidl channel to be closed.
+    #[track_caller]
+    async fn filtered_stream<T>(
+        mut stream: PeerHandlerRequestStream,
+        f: impl Fn(PeerHandlerRequest) -> Result<T, PeerHandlerRequest>,
+    ) -> impl Stream<Item = T> {
+        // Send the network information immediately so the peer can make progress.
+        match stream.next().await {
+            Some(Ok(PeerHandlerRequest::WatchNetworkInformation { responder })) => {
+                responder
+                    .send(NetworkInformation::EMPTY)
+                    .expect("Successfully send network information");
+            }
+            x => panic!("Expected watch network information request: {:?}", x),
+        };
+
+        // A vec to hold all the stream items we don't care about for this test.
+        let mut junk_drawer = vec![];
+
+        // Filter out all items, placing them in the junk_drawer.
+        stream.filter_map(move |item| {
+            let item = match item {
+                Ok(item) => match f(item) {
+                    Ok(t) => Some(t),
+                    Err(x) => {
+                        junk_drawer.push(x);
+                        None
+                    }
+                },
+                _ => None,
+            };
+            ready(item)
+        })
+    }
+
     #[test]
     fn call_updates_update_ringer_state() {
         // Set up the executor, peer, and background call manager task
@@ -752,35 +835,11 @@
         let (connection, mut remote) = create_and_initialize_slc(state);
         let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
 
-        let (proxy, mut stream) =
+        let (proxy, stream) =
             fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
 
         fasync::Task::local(async move {
-            // Send the network information immediately so the peer can make progress.
-            match stream.next().await {
-                Some(Ok(PeerHandlerRequest::WatchNetworkInformation { responder })) => {
-                    responder
-                        .send(NetworkInformation::EMPTY)
-                        .expect("Successfully send network information");
-                }
-                x => panic!("Expected watch network information request: {:?}", x),
-            };
-
-            // A vec to hold all the stream items we don't care about for this test.
-            let mut junk_drawer = vec![];
-
-            // Filter out all items that are irrelevant to this particular test, placing them in
-            // the junk_drawer.
-            let mut stream = stream.filter_map(move |item| {
-                let item = match item {
-                    Ok(PeerHandlerRequest::WaitForCall { responder }) => Some(responder),
-                    x => {
-                        junk_drawer.push(x);
-                        None
-                    }
-                };
-                ready(item)
-            });
+            let mut stream = wait_for_call_stream(stream).await;
 
             // Send the incoming call
             let responder = stream.next().await.unwrap();
@@ -789,10 +848,6 @@
                 .send(client_end, "1234567", CallState::IncomingRinging)
                 .expect("Successfully send call information");
 
-            // Expect to send the Call Setup indicator to the peer.
-            let expected_data1 = vec![AgIndicator::CallSetup(1).into()];
-            expect_data_received_by_peer(&mut remote, expected_data1).await;
-
             // Call manager should collect all further requests, without responding.
             stream.collect::<Vec<_>>().await;
         })
@@ -808,6 +863,10 @@
         let result = exec.run_until_stalled(&mut run_fut);
         assert!(result.is_pending());
 
+        // Expect to send the Call Setup indicator to the peer.
+        let expected_data = vec![AgIndicator::CallSetup(1).into()];
+        exec.run_singlethreaded(expect_data_received_by_peer(&mut remote, expected_data));
+
         // Drop the peer task sender to force the PeerTask's run future to complete
         drop(sender);
         let task = exec.run_until_stalled(&mut run_fut).expect("run_fut to complete");
@@ -888,4 +947,60 @@
         // Since we (the AG) received a valid HF indicator, we expect to send an OK back to the peer.
         expect_peer_ready(&mut exec, &mut remote, Some(serialize_at_response(at::Response::Ok)));
     }
+
+    #[test]
+    fn call_updates_produce_call_waiting() {
+        // Set up the executor, peer, and background call manager task
+        let mut exec = fasync::Executor::new().unwrap();
+
+        let raw_number = "1234567";
+        let number = Number::from(raw_number);
+        let expected_ccwa =
+            vec![at::success(at::Success::Ccwa { ty: number.type_(), number: number.into() })];
+        let expected_ciev = vec![AgIndicator::CallSetup(1).into()];
+
+        // Setup the peer task with the specified SlcState to enable indicator events.
+        let state = SlcState {
+            call_waiting_notifications: true,
+            ag_indicator_events_reporting: AgIndicatorsReporting::new_enabled(),
+            ..SlcState::default()
+        };
+        let (connection, mut remote) = create_and_initialize_slc(state);
+        let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
+
+        let (proxy, stream) =
+            fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
+
+        fasync::Task::local(async move {
+            let mut stream = wait_for_call_stream(stream).await;
+
+            // Send the incoming waiting call
+            let responder = stream.next().await.unwrap();
+            let (client_end, _call_stream) = fidl::endpoints::create_request_stream().unwrap();
+            responder
+                .send(client_end, raw_number, CallState::IncomingWaiting)
+                .expect("Successfully send call information");
+
+            // Call manager should collect all further requests, without responding.
+            stream.collect::<Vec<_>>().await;
+        })
+        .detach();
+
+        // Pass in the client end connected to the call manager
+        let result = exec.run_singlethreaded(sender.send(PeerRequest::Handle(proxy)));
+        assert!(result.is_ok());
+
+        // Run the PeerTask until it has no more work to do.
+        let run_fut = peer.run(receiver);
+        pin_mut!(run_fut);
+        let result = exec.run_until_stalled(&mut run_fut);
+        assert!(result.is_pending());
+
+        exec.run_singlethreaded(expect_data_received_by_peer(&mut remote, expected_ccwa));
+        exec.run_singlethreaded(expect_data_received_by_peer(&mut remote, expected_ciev));
+
+        // Drop the peer task sender to force the PeerTask's run future to complete
+        drop(sender);
+        exec.run_until_stalled(&mut run_fut).expect("run_fut to complete");
+    }
 }
diff --git a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure.rs b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure.rs
index d29f358..12df6cb 100644
--- a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure.rs
+++ b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure.rs
@@ -442,6 +442,8 @@
     CurrentCalls(Vec<Call>),
     /// The information of an IncomingRinging call.
     Ring(Call),
+    /// The information of an IncomingRinging call.
+    CallWaiting(Call),
 }
 
 impl From<AgUpdate> for ProcedureRequest {
@@ -512,6 +514,12 @@
                     }),
                 ]
             }
+            AgUpdate::CallWaiting(call) => {
+                vec![at::success(at::Success::Ccwa {
+                    ty: call.number.type_(),
+                    number: call.number.into(),
+                })]
+            }
         }
         .into()
     }
diff --git a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure/call_waiting_notifications.rs b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure/call_waiting_notifications.rs
index 9cce169..51e5d69 100644
--- a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure/call_waiting_notifications.rs
+++ b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/procedure/call_waiting_notifications.rs
@@ -11,6 +11,8 @@
 /// The HF may disable or enable the Call Waiting Notifications via this
 /// procedure. See HFP v1.8, Section 4.21.
 ///
+/// The AG may send Call Waiting Notifications via this procedure. See HFP v1.8, Section 4.22
+///
 /// This procedure is implemented from the perspective of the AG. Namely, outgoing `requests`
 /// typically request information about the current state of the AG, to be sent to the remote
 /// peer acting as the HF.
@@ -43,6 +45,21 @@
         }
     }
 
+    fn ag_update(&mut self, update: AgUpdate, state: &mut SlcState) -> ProcedureRequest {
+        match (self.terminated, update) {
+            (false, AgUpdate::CallWaiting(call)) => {
+                // Only send the update if call waiting notifications are enabled for the SLC.
+                self.terminated = true;
+                if state.call_waiting_notifications {
+                    AgUpdate::CallWaiting(call).into()
+                } else {
+                    ProcedureRequest::None
+                }
+            }
+            (_, update) => ProcedureRequest::Error(ProcedureError::UnexpectedAg(update)),
+        }
+    }
+
     fn is_terminated(&self) -> bool {
         self.terminated
     }
diff --git a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/protocol/indicators.rs b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/protocol/indicators.rs
index f5aa041..c5be040 100644
--- a/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/protocol/indicators.rs
+++ b/src/connectivity/bluetooth/profiles/bt-hfp-audio-gateway/src/protocol/indicators.rs
@@ -441,15 +441,19 @@
     pub call: Call,
     pub callsetup: CallSetup,
     pub callheld: CallHeld,
+    /// There is at least one call in the IncomingWaiting state. `callwaiting` is distinct from
+    /// other fields in that it doesn't map to a specific CIEV phone status indicator.
+    pub callwaiting: bool,
 }
 
 impl CallIndicators {
     /// Find CallIndicators based on all items in `iter`.
-    pub fn find(iter: impl Iterator<Item = CallState> + Clone) -> Self {
+    pub fn find(mut iter: impl Iterator<Item = CallState> + Clone) -> Self {
         let call = Call::find(iter.clone());
         let callsetup = CallSetup::find(iter.clone());
-        let callheld = CallHeld::find(iter);
-        CallIndicators { call, callsetup, callheld }
+        let callheld = CallHeld::find(iter.clone());
+        let callwaiting = iter.any(|c| c == CallState::IncomingWaiting);
+        CallIndicators { call, callsetup, callheld, callwaiting }
     }
 
     /// A list of all the statuses that have changed between `other` and self.
@@ -465,6 +469,9 @@
         if other.callheld != self.callheld {
             changes.callheld = Some(self.callheld);
         }
+        if self.callwaiting && !other.callwaiting {
+            changes.callwaiting = true;
+        }
         changes
     }
 }
@@ -474,12 +481,17 @@
     pub call: Option<Call>,
     pub callsetup: Option<CallSetup>,
     pub callheld: Option<CallHeld>,
+    /// Indicates whether there is a call that has changed to the CallWaiting state in this update.
+    pub callwaiting: bool,
 }
 
 impl CallIndicatorsUpdates {
-    /// Returns true if all fields are `None`
+    /// Returns true if all fields are `None` or false.
     pub fn is_empty(&self) -> bool {
-        self.call.is_none() && self.callsetup.is_none() && self.callheld.is_none()
+        self.call.is_none()
+            && self.callsetup.is_none()
+            && self.callheld.is_none()
+            && !self.callwaiting
     }
 
     /// Returns a Vec of all updated indicators. This vec is ordered by Indicator index.
@@ -799,6 +811,7 @@
         let expected = CallIndicators {
             call: Call::Some,
             callsetup: CallSetup::Incoming,
+            callwaiting: false,
             callheld: CallHeld::Held,
         };
         assert_eq!(ind, expected);
@@ -832,6 +845,23 @@
             ..CallIndicatorsUpdates::default()
         };
         assert_eq!(b.difference(a), expected);
+
+        let a = CallIndicators::default();
+        let b = CallIndicators { callsetup: CallSetup::Incoming, callwaiting: true, ..a };
+        let expected = CallIndicatorsUpdates {
+            callsetup: Some(CallSetup::Incoming),
+            callwaiting: true,
+            ..CallIndicatorsUpdates::default()
+        };
+        assert_eq!(b.difference(a), expected);
+
+        // reverse: going from b to a.
+        let expected = CallIndicatorsUpdates {
+            callsetup: Some(CallSetup::None),
+            callwaiting: false,
+            ..CallIndicatorsUpdates::default()
+        };
+        assert_eq!(a.difference(b), expected);
     }
 
     #[test]
@@ -847,6 +877,10 @@
         assert!(!updates.is_empty());
 
         let mut updates = CallIndicatorsUpdates::default();
+        updates.callwaiting = true;
+        assert!(!updates.is_empty());
+
+        let mut updates = CallIndicatorsUpdates::default();
         updates.callheld = Some(CallHeld::Held);
         assert!(!updates.is_empty());
     }
diff --git a/src/connectivity/lib/at-commands/definitions/hfp.at b/src/connectivity/lib/at-commands/definitions/hfp.at
index 21e75b6..508fad6 100644
--- a/src/connectivity/lib/at-commands/definitions/hfp.at
+++ b/src/connectivity/lib/at-commands/definitions/hfp.at
@@ -175,3 +175,7 @@
 # Hang Up
 # HFP 1.8 4.34.2
 command { AT+CHUP }
+
+# Call Waiting Notification
+# HFP 1.8 4.34.2
+response { CCWA: number: String, ty: Integer }