blob: fd6007e38ff43e7ff2c66ef279019ce47a65a172 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![recursion_limit = "256"]
use {
anyhow::{anyhow, Context, Error},
fidl::encoding::Decodable,
fidl::endpoints::create_request_stream,
fidl_fuchsia_bluetooth_bredr::*,
fuchsia_async::{self as fasync, futures::select},
fuchsia_bluetooth::types::{Channel, PeerId, Uuid},
fuchsia_component::client::connect_to_service,
futures::{
channel::{
mpsc::{channel, SendError},
oneshot,
},
FutureExt, Sink, SinkExt, Stream, StreamExt, TryStreamExt,
},
parking_lot::RwLock,
rustyline::{error::ReadlineError, CompletionType, Config, Editor},
std::{convert::TryFrom, sync::Arc, thread},
};
use crate::{
commands::{Cmd, CmdHelper, ReplControl},
rfcomm::{handle_profile_events, rfcomm_channel_task, spp_service_definition},
types::*,
};
mod commands;
mod rfcomm;
mod types;
/// Prompt to be shown for tool's REPL
pub static PROMPT: &str = "\x1b[34mprofile>\x1b[0m ";
/// Escape code to clear the pty line on which the cursor is located.
/// Used when evented output is intermingled with the REPL prompt.
pub static RESET_LINE: &str = "\x1b[2K\r";
fn channels(state: Arc<RwLock<ProfileState>>) {
for (chan_id, chan) in state.read().l2cap_channels.map() {
print!(
"Channel:\n Id: {}\n Mode: {:?}\n Max Tx Sdu Size: {}\n",
chan_id, chan.mode, chan.max_tx_sdu_size
);
}
}
fn channel_mode_from_str(s: &str) -> Result<ChannelMode, Error> {
match s {
"basic" | "b" => Ok(ChannelMode::Basic),
"ertm" | "e" => Ok(ChannelMode::EnhancedRetransmission),
s => Err(anyhow!("Invalid channel mode {}", s)),
}
}
fn security_requirements_from_str(s: &str) -> Result<Option<SecurityRequirements>, Error> {
match s {
"none" => Ok(None),
"auth" => Ok(Some(SecurityRequirements {
authentication_required: Some(true),
secure_connections_required: None,
..SecurityRequirements::EMPTY
})),
"sc" => Ok(Some(SecurityRequirements {
authentication_required: None,
secure_connections_required: Some(true),
..SecurityRequirements::EMPTY
})),
"auth-sc" => Ok(Some(SecurityRequirements {
authentication_required: Some(true),
secure_connections_required: Some(true),
..SecurityRequirements::EMPTY
})),
s => Err(anyhow!("Invalid security requirements {}", s)),
}
}
/// Listen on the control event channel for new events.
async fn connection_receiver(
mut connection_requests: ConnectionReceiverRequestStream,
end_ad_receiver: oneshot::Receiver<()>,
state: Arc<RwLock<ProfileState>>,
service_id: u32,
) -> Result<(), Error> {
let tag = "ConnectionReceiver";
let mut end_ad_receiver = end_ad_receiver.fuse();
loop {
select! {
request = connection_requests.try_next() => {
let event = match request {
Err(e) => return Err(anyhow!("{} error: {:?}", tag, e)),
Ok(None) => return Err(anyhow!("{} channel closed", tag)),
Ok(Some(event)) => event,
};
let ConnectionReceiverRequest::Connected { channel, .. } = event;
let socket = channel.socket.ok_or(anyhow!("{}: missing socket", tag))?;
let mode = channel.channel_mode.ok_or(anyhow!("{}: missing channel mode", tag))?;
let max_tx_sdu_size = channel.max_tx_sdu_size.ok_or(anyhow!("{}: missing max tx sdu", tag))?;
let chan_id = state.write().l2cap_channels.insert(L2capChannel { socket, mode, max_tx_sdu_size });
print!("{} Channel Connected to service {}:\n Channel:\n Id: {}\n Mode: {:?}\n Max Tx Sdu Size: {}\n", RESET_LINE, service_id, chan_id, mode, max_tx_sdu_size);
},
_ = end_ad_receiver => break,
complete => break,
}
}
print!("{} ConnectionReceiver closed for service {}", RESET_LINE, service_id);
let args = vec![format!("{}", service_id)];
remove_service(state, &args)
}
fn setup_rfcomm(profile_svc: &ProfileProxy, state: Arc<RwLock<ProfileState>>) -> Result<(), Error> {
let service_defs = vec![spp_service_definition()];
let (connect_client, connect_requests) =
create_request_stream().context("ConnectionReceiver creation")?;
let _ = profile_svc
.advertise(&mut service_defs.into_iter(), ChannelParameters::new_empty(), connect_client)
.check()?;
let (results_client, results_requests) =
create_request_stream().context("SearchResults creation")?;
profile_svc.search(ServiceClassProfileIdentifier::SerialPort, &[], results_client)?;
// Spawn the RFCOMM task to handle profile events.
let state_clone = state.clone();
let rfcomm_task = fasync::Task::spawn(async move {
if let Err(e) = handle_profile_events(state_clone, connect_requests, results_requests).await
{
println!("Rfcomm task ended: {:?}", e);
}
});
state.write().rfcomm.service = Some(rfcomm_task);
println!("Successfully added RFCOMM service advertisement and search");
Ok(())
}
async fn advertise(
profile_svc: &ProfileProxy,
state: Arc<RwLock<ProfileState>>,
args: &Vec<String>,
) -> Result<(), Error> {
if args.len() != 3 {
println!("Error: invalid number of arguments");
println!("{}", Cmd::help_msg());
return Ok(());
}
let psm = args[0].parse::<u16>().map_err(|_| anyhow!("psm must be a 16-bit integer"))?;
let channel_mode =
channel_mode_from_str(args[1].as_ref()).map_err(|_| anyhow!("invalid channel mode"))?;
let max_rx_sdu_size = args[2]
.parse::<u16>()
.map_err(|_| anyhow!("max-rx-sdu-size must be an integer i the range 0 - 65535"))?;
let params = ChannelParameters {
channel_mode: Some(channel_mode),
max_rx_sdu_size: Some(max_rx_sdu_size),
security_requirements: None,
..ChannelParameters::EMPTY
};
let audio_sink_uuid = Uuid::new16(0x110B); // Audio Sink
let svc_defs = vec![ServiceDefinition {
service_class_uuids: Some(vec![audio_sink_uuid.into()]),
protocol_descriptor_list: Some(vec![ProtocolDescriptor {
protocol: ProtocolIdentifier::L2Cap,
params: vec![DataElement::Uint16(psm)],
}]),
..ServiceDefinition::new_empty()
}];
let (connect_client, connect_requests) =
create_request_stream().context("ConnectionReceiver creation")?;
let _ = profile_svc.advertise(&mut svc_defs.into_iter(), params, connect_client).check()?;
let (end_ad_sender, end_ad_receiver) = oneshot::channel::<()>();
let service_id = state.write().services.insert(SdpService {
advertisement_stopper: end_ad_sender,
params: ChannelParameters {
channel_mode: Some(channel_mode),
max_rx_sdu_size: Some(max_rx_sdu_size),
security_requirements: None,
..ChannelParameters::EMPTY
},
});
print!("Advertised Service Id: {}\n", service_id);
let request_handler_fut =
connection_receiver(connect_requests, end_ad_receiver, state.clone(), service_id);
fasync::Task::spawn(async move {
if let Err(e) = request_handler_fut.await {
print!("{} ConnectionReceiver ended with error: {:?}", RESET_LINE, e);
}
})
.detach();
Ok(())
}
fn remove_service(state: Arc<RwLock<ProfileState>>, args: &Vec<String>) -> Result<(), Error> {
if args.len() != 1 {
return Err(anyhow!("Invalid number of arguments"));
}
let service_id =
args[0].parse::<u32>().map_err(|_| anyhow!("service-id must be a positive number"))?;
state.write().services.remove(&service_id).ok_or(anyhow!("Unknown service"))?;
Ok(())
}
fn services(state: Arc<RwLock<ProfileState>>) {
for (id, service) in &state.read().services {
print!(
"Service:\n Id: {}\n Mode: {:?}, Max Rx Sdu Size: {}",
id,
service.params.channel_mode.unwrap(),
service.params.max_rx_sdu_size.unwrap()
);
}
}
async fn connect_l2cap(
profile_svc: &ProfileProxy,
state: Arc<RwLock<ProfileState>>,
args: &Vec<String>,
) -> Result<(), Error> {
if args.len() != 5 {
return Err(anyhow!("Invalid number of arguments"));
}
let peer_id: PeerId = args[0].parse()?;
let psm = args[1].parse::<u16>().map_err(|_| anyhow!("Psm must be [0, 65535]"))?;
let channel_mode = channel_mode_from_str(args[2].as_ref())?;
let max_rx_sdu_size =
args[3].parse::<u16>().map_err(|_| anyhow!("max-sdu-size must be [0, 65535]"))?;
let security_requirements = security_requirements_from_str(args[4].as_ref())?;
let params = ChannelParameters {
channel_mode: Some(channel_mode),
max_rx_sdu_size: Some(max_rx_sdu_size),
security_requirements,
..ChannelParameters::EMPTY
};
let channel = match profile_svc
.connect(
&mut peer_id.into(),
&mut ConnectParameters::L2cap(L2capParameters {
psm: Some(psm),
parameters: Some(params),
..L2capParameters::new_empty()
}),
)
.await?
{
Err(e) => return Err(anyhow!("Could not connect to {}: {:?}", peer_id, e)),
Ok(channel) => channel,
};
let mode = match channel.channel_mode {
Some(m) => m,
None => return Err(anyhow!("Missing channel mode in response")),
};
let max_tx_sdu_size = match channel.max_tx_sdu_size {
Some(s) => s,
None => return Err(anyhow!("Missing max tx sdu size in response")),
};
let chan_id = match channel.socket {
Some(socket) => {
state.write().l2cap_channels.insert(L2capChannel { socket, mode, max_tx_sdu_size })
}
None => {
println!("Error: failed to receive a socket");
return Ok(());
}
};
print!(
"Channel:\n Id: {}\n Mode: {:?}\n Max Tx Sdu Size: {}\n",
chan_id, mode, max_tx_sdu_size
);
Ok(())
}
/// Attempts to make an outbound RFCOMM connection to the peer specified in `args`.
async fn connect_rfcomm(
profile_svc: &ProfileProxy,
state: Arc<RwLock<ProfileState>>,
args: &Vec<String>,
) -> Result<(), Error> {
if args.len() != 2 {
return Err(anyhow!("Invalid number of arguments"));
}
let peer_id: PeerId = args[0].parse()?;
let server_channel =
args[1].parse::<u8>().map_err(|_| anyhow!("Server channel must be a u8"))?;
let server_channel = ServerChannelNumber::try_from(server_channel)?;
let res = profile_svc
.connect(
&mut peer_id.into(),
&mut ConnectParameters::Rfcomm(RfcommParameters {
channel: Some(server_channel.0),
..RfcommParameters::EMPTY
}),
)
.await?;
let channel = res.map_err(|e| anyhow!("Error establishing channel: {:?}", e))?;
let channel = Channel::try_from(channel).unwrap();
let receiver = state.write().rfcomm.create_channel(server_channel);
fasync::Task::spawn(rfcomm_channel_task(server_channel, state.clone(), channel, receiver))
.detach();
println!("Established outbound Rfcomm channel: {}", server_channel.0);
Ok(())
}
fn disconnect_l2cap(state: Arc<RwLock<ProfileState>>, args: &Vec<String>) -> Result<(), Error> {
if args.len() != 1 {
return Err(anyhow!("Invalid number of arguments"));
}
let chan_id = args[0].parse::<u32>().map_err(|_| anyhow!("channel-id must be an integer"))?;
match state.write().l2cap_channels.remove(&chan_id) {
Some(_) => println!("Channel {} disconnected", chan_id),
None => println!("No channel with id {} exists", chan_id),
}
Ok(())
}
fn disconnect_rfcomm(state: Arc<RwLock<ProfileState>>, args: &Vec<String>) -> Result<(), Error> {
if args.len() != 1 {
return Err(anyhow!("Invalid number of arguments"));
}
let server_channel =
args[0].parse::<u8>().map_err(|_| anyhow!("Server channel must be a u8"))?;
let server_channel = ServerChannelNumber::try_from(server_channel)?;
if state.write().rfcomm.remove_channel(server_channel) {
println!("RFCOMM Channel {:?} disconnected", server_channel);
} else {
println!("No RFCOMM channel with id {:?} exists", server_channel);
}
Ok(())
}
fn write_l2cap(state: Arc<RwLock<ProfileState>>, args: &Vec<String>) -> Result<(), Error> {
if args.len() != 2 {
return Err(anyhow!("Invalid number of arguments"));
}
let chan_id = args[0].parse::<u32>().map_err(|_| anyhow!("channel-id must be an integer"))?;
let bytes = args[1].as_bytes();
let num_bytes = match state.read().l2cap_channels.map().get(&chan_id) {
Some(chan) => chan.socket.write(bytes).map_err(|_| anyhow!("error writing data"))?,
None => return Err(anyhow!("No channel with id {} exists", chan_id)),
};
println!("{} bytes written", num_bytes);
Ok(())
}
/// Sends a user data payload to the ServerChannelNumber specified in `args`.
fn write_rfcomm(state: Arc<RwLock<ProfileState>>, args: &Vec<String>) -> Result<(), Error> {
if args.len() != 2 {
return Err(anyhow!("Invalid number of arguments"));
}
let server_channel =
args[0].parse::<u8>().map_err(|_| anyhow!("Server channel must be a u8"))?;
let server_channel = ServerChannelNumber::try_from(server_channel)?;
let bytes = args[1].as_bytes().to_vec();
state.write().rfcomm.send_user_data(server_channel, bytes)
}
fn cleanup(state: Arc<RwLock<ProfileState>>) {
state.write().reset();
}
enum ParsedCmd {
Valid(Cmd, Vec<String>),
Empty,
}
/// Parse a single raw input command from a user into (command type, argument list)
fn parse_cmd(line: String) -> Result<ParsedCmd, Error> {
let components: Vec<_> = line.trim().split_whitespace().collect();
match components.split_first() {
Some((raw_cmd, args)) => match raw_cmd.parse() {
Ok(cmd) => {
let args = args.into_iter().map(|s| s.to_string()).collect();
Ok(ParsedCmd::Valid(cmd, args))
}
Err(_) => Err(anyhow!("\"{}\" is not a valid command", raw_cmd)),
},
None => Ok(ParsedCmd::Empty),
}
}
async fn handle_cmd(
profile_svc: &ProfileProxy,
state: Arc<RwLock<ProfileState>>,
cmd: Cmd,
args: Vec<String>,
) -> Result<ReplControl, Error> {
match cmd {
Cmd::Advertise => advertise(profile_svc, state.clone(), &args).await?,
Cmd::RemoveService => remove_service(state.clone(), &args)?,
Cmd::Services => services(state.clone()),
Cmd::Channels => channels(state.clone()),
Cmd::ConnectL2cap => connect_l2cap(profile_svc, state.clone(), &args).await?,
Cmd::ConnectRfcomm => connect_rfcomm(profile_svc, state.clone(), &args).await?,
Cmd::DisconnectL2cap => disconnect_l2cap(state.clone(), &args)?,
Cmd::DisconnectRfcomm => disconnect_rfcomm(state.clone(), &args)?,
Cmd::SetupRfcomm => setup_rfcomm(profile_svc, state.clone())?,
Cmd::WriteL2cap => write_l2cap(state.clone(), &args)?,
Cmd::WriteRfcomm => write_rfcomm(state.clone(), &args)?,
Cmd::Help => println!("{}", Cmd::help_msg()),
Cmd::Exit | Cmd::Quit => {
cleanup(state.clone());
return Ok(ReplControl::Break);
}
};
Ok(ReplControl::Continue)
}
/// Generates a rustyline `Editor` in a separate thread to manage user input. This input is returned
/// as a `Stream` of lines entered by the user.
///
/// The thread exits and the `Stream` is exhausted when an error occurs on stdin or the user
/// sends a ctrl-c or ctrl-d sequence.
///
/// Because rustyline shares control over output to the screen with other parts of the system, a
/// `Sink` is passed to the caller to send acknowledgements that a command has been processed and
/// that rustyline should handle the next line of input.
fn cmd_stream() -> (impl Stream<Item = String>, impl Sink<(), Error = SendError>) {
let (mut cmd_sender, cmd_receiver) = channel(512);
let (ack_sender, mut ack_receiver) = channel(512);
thread::spawn(move || -> Result<(), Error> {
let mut exec = fasync::Executor::new().context("error creating readline event loop")?;
let fut = async {
let config = Config::builder()
.auto_add_history(true)
.history_ignore_space(true)
.completion_type(CompletionType::List)
.build();
let mut rl = Editor::<CmdHelper>::with_config(config);
rl.set_helper(Some(CmdHelper::new()));
loop {
let readline = rl.readline(PROMPT);
match readline {
Ok(line) => {
cmd_sender.try_send(line)?;
}
Err(ReadlineError::Eof) | Err(ReadlineError::Interrupted) => {
return Ok(());
}
Err(e) => {
println!("Error: {:?}", e);
return Err(e.into());
}
}
// Wait for processing thread to finish evaluating last command.
if ack_receiver.next().await == None {
return Ok(());
}
}
};
exec.run_singlethreaded(fut)
});
(cmd_receiver, ack_sender)
}
/// Wait for raw commands from rustyline thread, and then parse and handle them.
async fn run_repl(
profile_svc: ProfileProxy,
state: Arc<RwLock<ProfileState>>,
) -> Result<(), Error> {
// `cmd_stream` blocks on input in a separate thread and passes commands and acks back to
// the main thread via async channels.
let (mut commands, mut acks) = cmd_stream();
while let Some(raw_cmd) = commands.next().await {
match parse_cmd(raw_cmd) {
Ok(ParsedCmd::Valid(cmd, args)) => {
match handle_cmd(&profile_svc, state.clone(), cmd, args).await {
Ok(ReplControl::Continue) => {}
Ok(ReplControl::Break) => break,
Err(e) => println!("Error handling command: {}", e),
}
}
Ok(ParsedCmd::Empty) => {}
Err(err) => println!("Error parsing command: {}", err),
}
// Notify readline loop that command has been evaluated.
acks.send(()).await?
}
Ok(())
}
#[fasync::run_singlethreaded]
async fn main() -> Result<(), Error> {
let profile_svc = connect_to_service::<ProfileMarker>()
.context("failed to connect to bluetooth profile service")?;
let state = Arc::new(RwLock::new(ProfileState::new()));
run_repl(profile_svc, state.clone()).await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn disconnect_l2cap_channel_succeeds() {
let state = Arc::new(RwLock::new(ProfileState::new()));
let (s, _) = fidl::Socket::create(fidl::SocketOpts::STREAM).unwrap();
assert_eq!(
0,
state.write().l2cap_channels.insert(L2capChannel {
socket: s,
mode: ChannelMode::Basic,
max_tx_sdu_size: 672
})
);
assert_eq!(1, state.read().l2cap_channels.map().len());
let args = vec!["0".to_string()];
assert!(disconnect_l2cap(state.clone(), &args).is_ok());
assert!(state.read().l2cap_channels.map().is_empty());
// Disconnecting an already disconnected channel should not fail.
// (It should only print a message)
assert!(disconnect_l2cap(state.clone(), &args).is_ok());
}
#[test]
fn disconnect_rfcomm_channel_succeeds() {
let state = Arc::new(RwLock::new(ProfileState::new()));
let server_channel = ServerChannelNumber(10);
let _receiver = state.write().rfcomm.create_channel(server_channel);
let args = vec!["10".to_string()];
assert!(disconnect_rfcomm(state.clone(), &args).is_ok());
// Disconnecting an already disconnected channel should not fail.
// (It should only print a message)
assert!(disconnect_rfcomm(state.clone(), &args).is_ok());
}
}