blob: 6f9b22d0f7ded2cc366f16b10d0cf8043ee3ed06 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Context as _, Error};
use fidl::endpoints::create_endpoints;
use fidl_fuchsia_bluetooth_avdtp_test::*;
use fuchsia_async as fasync;
use fuchsia_sync::RwLock;
use futures::channel::mpsc::{channel, SendError};
use futures::{try_join, FutureExt, Sink, SinkExt, Stream, StreamExt, TryStreamExt};
use rustyline::error::ReadlineError;
use rustyline::{CompletionType, Config, EditMode, Editor};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::pin::pin;
use std::sync::Arc;
use std::thread;
use tracing::info;
use crate::commands::{Cmd, CmdHelper, ReplControl};
use crate::types::{PeerFactoryMap, CLEAR_LINE, PROMPT};
mod commands;
mod types;
// TODO( Spawn listener for PeerEventStream to delete peer from map
// when a peer disconnects from service.
async fn peer_manager_listener(
avdtp_svc: &PeerManagerProxy,
mut stream: PeerManagerEventStream,
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<(), Error> {
while let Some(evt) = stream.try_next().await? {
print!("{}", CLEAR_LINE);
match evt {
PeerManagerEvent::OnPeerConnected { peer_id } => {
let (client, server) = create_endpoints::<PeerControllerMarker>();
let peer = client.into_proxy().expect("Error: Couldn't obtain peer client proxy");
match peer_map.write().entry(peer_id.value.to_string()) {
Entry::Occupied(mut entry) => {
let _ = entry.insert(peer);
println!("Known peer connected with id: {}", peer_id.value.to_string())
Entry::Vacant(entry) => {
let _ = entry.insert(peer);
"Inserted device into PeerFactoryMap with id: {}",
// Establish channel with the given peer_id and server endpoint.
let _ = avdtp_svc.get_peer(&peer_id, server);
info!("Getting peer with peer_id: {}", peer_id.value.to_string());
async fn set_configuration<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::SetConfig, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.set_configuration().await;
Ok(String::from("Set configuration command"))
Err(e) => Err(e),
async fn get_configuration<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::GetConfig, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.get_configuration().await;
Ok(String::from("Get configuration command"))
Err(e) => Err(e),
async fn get_capabilities<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::GetCapabilities, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.get_capabilities().await;
Ok(String::from("Get capabilities command"))
Err(e) => Err(e),
async fn get_all_capabilities<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::GetAllCapabilities, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.get_all_capabilities().await;
Ok(String::from("Get all capabilities command"))
Err(e) => Err(e),
async fn reconfigure<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::Reconfigure, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.reconfigure_stream().await;
Ok(String::from("Reconfigure command"))
Err(e) => Err(e),
async fn suspend<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::Suspend, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.suspend_stream().await;
Ok(String::from("Suspend command"))
Err(e) => Err(e),
async fn suspend_reconfigure<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::SuspendReconfigure, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.suspend_and_reconfigure().await;
Ok(String::from("Suspend and reconfigure command"))
Err(e) => Err(e),
async fn release_stream<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::ReleaseStream, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.release_stream().await;
Ok(String::from("Release stream command"))
Err(e) => Err(e),
async fn establish_stream<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::EstablishStream, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.establish_stream().await;
Ok(String::from("Establish stream command"))
Err(e) => Err(e),
async fn abort_stream<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::AbortStream, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.abort_stream().await;
Ok(String::from("Abort stream command"))
Err(e) => Err(e),
async fn start_stream<'a>(
args: &'a [&'a str],
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<String, Error> {
match get_controller_from_args(args, Cmd::StartStream, peer_map) {
Ok(peer_controller) => {
let _ = peer_controller.start_stream().await;
Ok(String::from("Start stream command"))
Err(e) => Err(e),
fn get_controller_from_args<'a>(
args: &'a [&'a str],
cmd: Cmd,
peer_map: Arc<RwLock<PeerFactoryMap>>,
) -> Result<PeerControllerProxy, Error> {
if args.len() < 1 {
return Err(format_err!("usage: {}", cmd.cmd_help()));
// Handle the case where user wants to understand command usage.
if args[0] == "help" {
return Err(format_err!("{}", cmd.cmd_help()));
let id = args[0];
match peer_map.write().entry(id.to_string()) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(_) => {
Err(format_err!("No peer with id: {:?}.\n Usage: {}", id, cmd.cmd_help()))
fn handle_help<'a>(args: &'a [&'a str]) -> Result<String, Error> {
if args.len() == 0 {
} else {
match args[0].parse::<Cmd>() {
Err(_) => Err(format_err!("Unsupported command: {}", args[0])),
Ok(cmd) => Ok(cmd.cmd_help().to_string()),
/// Handle a single raw input command from a user and indicate whether the command should
/// result in continuation or breaking of the read evaluate print loop.
async fn handle_cmd<'a>(
peer_map: Arc<RwLock<PeerFactoryMap>>,
line: String,
) -> Result<ReplControl, Error> {
let components: Vec<_> = line.trim().split_whitespace().collect();
if components.is_empty() {
return Ok(ReplControl::Continue);
let (raw_cmd, args) = components.split_first().expect("not empty");
let cmd = raw_cmd.parse();
let res = match cmd {
Ok(Cmd::AbortStream) => abort_stream(args, peer_map).await,
Ok(Cmd::EstablishStream) => establish_stream(args, peer_map).await,
Ok(Cmd::ReleaseStream) => release_stream(args, peer_map).await,
Ok(Cmd::StartStream) => start_stream(args, peer_map).await,
Ok(Cmd::SetConfig) => set_configuration(args, peer_map).await,
Ok(Cmd::GetConfig) => get_configuration(args, peer_map).await,
Ok(Cmd::GetCapabilities) => get_capabilities(args, peer_map).await,
Ok(Cmd::GetAllCapabilities) => get_all_capabilities(args, peer_map).await,
Ok(Cmd::Reconfigure) => reconfigure(args, peer_map).await,
Ok(Cmd::Suspend) => suspend(args, peer_map).await,
Ok(Cmd::SuspendReconfigure) => suspend_reconfigure(args, peer_map).await,
Ok(Cmd::Help) => handle_help(args),
Ok(Cmd::Exit) | Ok(Cmd::Quit) => return Ok(ReplControl::Break),
Err(_) => {
info!("{} is not a valid command.", raw_cmd);
Ok(format!("\"{}\" is not a valid command", raw_cmd))
if res != "" {
println!("{}", res);
/// Generates a rustyline `Editor` in a separate thread to manage user input. This input is returned
/// as a `Stream` of lines entered by the user.
/// The thread exits and the `Stream` is exhausted when an error occurs on stdin or the user
/// sends a ctrl-c or ctrl-d sequence.
/// Because rustyline shares control over output to the screen with other parts of the system, a
/// `Sink` is passed to the caller to send acknowledgements that a command has been processed and
/// that rustyline should handle the next line of input.
fn cmd_stream() -> (impl Stream<Item = String>, impl Sink<(), Error = SendError>) {
// Editor thread and command processing thread must be synchronized so that output
// is printed in the correct order.
let (mut cmd_sender, cmd_receiver) = channel(512);
let (ack_sender, mut ack_receiver) = channel(512);
let _ = thread::spawn(move || -> Result<(), Error> {
let mut exec = fasync::LocalExecutor::new();
let fut = async {
let config = Config::builder()
let mut rl: Editor<CmdHelper> = Editor::with_config(config);
loop {
let readline = rl.readline(PROMPT);
match readline {
Ok(line) => {
Err(ReadlineError::Eof) | Err(ReadlineError::Interrupted) => {
return Ok(());
Err(e) => {
println!("Error: {:?}", e);
return Err(e.into());
// wait until processing thread is finished evaluating the last command
// before running the next loop in the repl
if {
return Ok(());
(cmd_receiver, ack_sender)
/// REPL execution
async fn run_repl<'a>(peer_map: Arc<RwLock<PeerFactoryMap>>) -> Result<(), Error> {
// `cmd_stream` blocks on input in a separate thread and passes commands and acks back to
// the main thread via async channels.
let (mut commands, mut acks) = cmd_stream();
while let Some(cmd) = {
match handle_cmd(peer_map.clone(), cmd).await {
Ok(ReplControl::Continue) => {}
Ok(ReplControl::Break) => {
Err(e) => {
println!("Error handling command: {}", e);
async fn main() -> Result<(), Error> {
let avdtp_svc = fuchsia_component::client::connect_to_protocol::<PeerManagerMarker>()
.context("Failed to connect to AVDTP Peer Manager")?;
// Get the list of currently connected peers.
// Since we are spinning up bt-a2dp locally, there should be no connected peers.
let _peers = avdtp_svc.connected_peers().await?;
let evt_stream = avdtp_svc.take_event_stream();
let peer_map: Arc<RwLock<PeerFactoryMap>> = Arc::new(RwLock::new(HashMap::new()));
let event_fut = peer_manager_listener(&avdtp_svc, evt_stream, peer_map.clone()).fuse();
let repl_fut = run_repl(peer_map.clone()).fuse();
let event_fut = pin!(event_fut);
let repl_fut = pin!(repl_fut);
try_join!(event_fut, repl_fut).map(|((), ())| ())
mod tests {
use super::*;
use fidl::endpoints::create_proxy;
/// Tests parsing input arguments works successfully.
/// Success case: The id provided exists, and the PeerControllerProxy is correctly returned.
/// Error case: Invalid id, Error string + help message returned.
async fn test_get_controller_from_args() {
let mut peer_map = HashMap::new();
let (client_proxy, _server) =
create_proxy::<PeerControllerMarker>().expect("Failed to create peer endpoint");
let supported_id = "123";
let _ = peer_map.insert(supported_id.to_string(), client_proxy.clone());
let peer_map_obj = Arc::new(RwLock::new(peer_map));
let invalid_id_args = ["2"];
let res =
get_controller_from_args(&invalid_id_args, Cmd::StartStream, peer_map_obj.clone());
let valid_id_args = [supported_id];
let res = get_controller_from_args(&valid_id_args, Cmd::StartStream, peer_map_obj);
/// Tests parsing the help command works as intended.
fn test_handle_help() {
let invalid_cmd = ["foobar"];
let res = handle_help(&invalid_cmd);
res.map_err(|e| format!("{:?}", e)),
Err("Unsupported command: foobar".to_string())
let generic_help = [];
let res = handle_help(&generic_help);
let cmd_help = ["set-configuration"];
let res = handle_help(&cmd_help);