blob: 04ddfbee6edf895be561b6ee04e63f41409defc7 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::keys::{KeyEntry, ParseKeyError},
anyhow::Context as _,
async_trait::async_trait,
fidl::prelude::*,
fidl_fuchsia_ssh::{
AuthorizedKeysRequest, AuthorizedKeysRequestStream, KeyEvent, KeyEventType,
KeyWatcherRequest, KeyWatcherRequestStream, SshAuthorizedKeyEntry,
},
fuchsia_async as fasync, fuchsia_zircon as zx,
futures::{
channel::mpsc,
lock::{Mutex, MutexGuard},
prelude::*,
},
std::{
fs::{File, OpenOptions},
io::{BufRead, BufReader, Read, Seek, SeekFrom, Write},
ops::DerefMut,
path::Path,
sync::Arc,
},
thiserror::Error,
tracing::*,
};
#[derive(Error, Debug)]
pub enum SshKeyManagerError {
#[error("i/o error")]
IoError(#[source] std::io::Error),
#[error("Invalid Key")]
InvalidKey(
#[source]
#[from]
ParseKeyError,
),
#[error("File changed while transaction was completed")]
TransactionInvalidated,
#[error("Error starting watcher")]
WatcherInit,
}
impl SshKeyManagerError {
pub fn to_zx_status(&self) -> zx::Status {
match self {
SshKeyManagerError::IoError(_) => zx::Status::IO,
SshKeyManagerError::InvalidKey(_) => zx::Status::INVALID_ARGS,
SshKeyManagerError::TransactionInvalidated => zx::Status::INTERRUPTED_RETRY,
SshKeyManagerError::WatcherInit => zx::Status::INTERNAL,
}
}
}
/// A transaction represents an atomic set of modifications to the authorized_keys file.
pub struct Transaction<'a, T: Read + Seek + Write + Send> {
file: MutexGuard<'a, T>,
start_state: Vec<KeyEntry>,
end_state: Vec<KeyEntry>,
actions: Vec<KeyEvent>,
}
#[async_trait]
pub trait EventSender: Sync + Send {
/// Send the given events to all watchers.
async fn send_events(&self, events: Vec<KeyEvent>);
}
impl<'a, T: Read + Write + Seek + Send> Transaction<'a, T> {
async fn new(file: MutexGuard<'a, T>) -> Result<Transaction<'a, T>, SshKeyManagerError> {
let mut txn = Transaction { file, start_state: vec![], end_state: vec![], actions: vec![] };
let keys = txn.load_keys().await?;
txn.start_state = keys.clone();
txn.end_state = keys;
Ok(txn)
}
/// Load the current set of keys from the disk, silently skipping any invalid lines.
async fn load_keys(&mut self) -> Result<Vec<KeyEntry>, SshKeyManagerError> {
let file = self.file.deref_mut();
file.seek(SeekFrom::Start(0)).map_err(SshKeyManagerError::IoError)?;
let reader = BufReader::new(file);
let mut result = vec![];
for line in reader.lines() {
let line = line.map_err(SshKeyManagerError::IoError)?;
if line.starts_with('#') || line.is_empty() {
continue;
}
let entry = match line.parse::<KeyEntry>() {
Ok(entry) => entry,
Err(e) => {
warn!(%e, %line, "Poorly formatted line in authorized_keys");
continue;
}
};
result.push(entry);
}
Ok(result)
}
/// Write the final set of keys from this transaction to the disk.
async fn write_keys(&mut self) -> Result<(), std::io::Error> {
let file = self.file.deref_mut();
file.seek(SeekFrom::Start(0))?;
file.write_all(
"# This file is auto-generated by ssh-key-manager, edits may be overwritten.\n"
.as_bytes(),
)?;
for entry in self.end_state.iter() {
file.write_all(entry.to_string().as_bytes())?;
file.write_all(b"\n")?;
}
Ok(())
}
fn get_keys(&self) -> Vec<KeyEntry> {
self.end_state.clone()
}
/// Add a key to the transaction. The key is not actually persisted until
/// `commit()` is called.
pub fn add_key(&mut self, key: String) -> Result<(), SshKeyManagerError> {
let entry = key.parse::<KeyEntry>()?;
self.end_state.push(entry);
self.actions.push(KeyEvent {
event: KeyEventType::Added,
key: Some(Box::new(SshAuthorizedKeyEntry { key })),
});
Ok(())
}
/// Commit all the changes in this `Transaction` to the disk, or fail if the
/// underlying authorized_keys file was modified since the transaction
/// started.
pub async fn commit(mut self, manager: &dyn EventSender) -> Result<(), SshKeyManagerError> {
// We have to deal with the possibility that something else touched
// authorized_keys, even if it wasn't us. This isn't going to be perfect
// -- there's no way of guaranteeing that nothing will happen between
// `load_keys()` and `write_keys()` below, but this is better than
// nothing.
let current_state = self.load_keys().await?;
if current_state != self.start_state {
// Refuse to write out.
return Err(SshKeyManagerError::TransactionInvalidated);
}
self.write_keys().await.map_err(SshKeyManagerError::IoError)?;
manager.send_events(self.actions).await;
Ok(())
}
}
struct KeyEventSender {
sender: mpsc::UnboundedSender<KeyEvent>,
_task: fasync::Task<Result<mpsc::UnboundedReceiver<KeyEvent>, anyhow::Error>>,
finished: Arc<Mutex<bool>>,
}
impl KeyEventSender {
pub fn new(
mut stream: KeyWatcherRequestStream,
existing: Vec<KeyEntry>,
) -> Result<Self, anyhow::Error> {
let (sender, mut receiver) = mpsc::unbounded::<KeyEvent>();
let finished = Arc::new(Mutex::new(false));
let clone = Arc::clone(&finished);
let task = fasync::Task::spawn(async move {
while let Ok(Some(req)) = stream.try_next().await {
let KeyWatcherRequest::Next { responder } = req;
let mut event = receiver.next().await.unwrap();
responder.send(&mut event)?;
}
*clone.lock().await = true;
Ok(receiver)
});
for i in existing.into_iter().map(|item| KeyEvent {
event: KeyEventType::Existing,
key: Some(Box::new(SshAuthorizedKeyEntry { key: item.to_string() })),
}) {
sender.unbounded_send(i)?;
}
sender.unbounded_send(KeyEvent { event: KeyEventType::FinishedExisting, key: None })?;
Ok(KeyEventSender { sender, _task: task, finished })
}
pub fn send_event(&self, event: KeyEvent) -> Result<(), anyhow::Error> {
self.sender.unbounded_send(event)?;
Ok(())
}
pub async fn is_done(&self) -> bool {
*self.finished.lock().await
}
}
pub struct SshKeyManager<T: Read + Write + Seek + Send> {
authorized_keys: Mutex<T>,
/// List of registered watchers for key events. This mutex must always be acquired before
/// authorized_keys.
watchers: Mutex<Vec<KeyEventSender>>,
}
impl SshKeyManager<File> {
pub fn new(keys_path: impl AsRef<Path>) -> Result<SshKeyManager<File>, SshKeyManagerError> {
let file = OpenOptions::new()
.read(true)
.write(true)
.open(keys_path)
.map_err(SshKeyManagerError::IoError)?;
Ok(SshKeyManager::with_file(file))
}
}
impl<T: Read + Write + Seek + Send> SshKeyManager<T> {
pub fn with_file(file: T) -> SshKeyManager<T> {
SshKeyManager { authorized_keys: Mutex::new(file), watchers: Mutex::new(vec![]) }
}
/// Begin a `Transaction`.
async fn begin<'a>(&'a self) -> Result<Transaction<'a, T>, SshKeyManagerError> {
Transaction::new(self.authorized_keys.lock().await).await
}
/// Add a key to the authorized keys list.
async fn add_key(&self, key: SshAuthorizedKeyEntry) -> Result<(), SshKeyManagerError> {
let mut txn = self.begin().await?;
txn.add_key(key.key)?;
txn.commit(self).await?;
Ok(())
}
/// Start sending new `WatchEvent`s to the given stream.
async fn start_watch(&self, stream: KeyWatcherRequestStream) -> Result<(), SshKeyManagerError> {
let mut watcher_list = self.watchers.lock().await;
let existing = self.get_cur_keys().await?;
let watcher =
KeyEventSender::new(stream, existing).map_err(|_| SshKeyManagerError::WatcherInit)?;
watcher_list.push(watcher);
Ok(())
}
#[cfg(test)]
/// Used for testing to make sure that all watchers have received all the
/// `KeyEvent`s.
pub async fn assert_all_watchers_flushed(self) {
let watcher_list = self.watchers.into_inner();
let mut errors = std::collections::HashMap::new();
for (i, mut watcher) in watcher_list.into_iter().enumerate() {
watcher.sender.flush().await.expect("flush ok");
let mut receiver = watcher._task.await.expect("watcher task finishes successfully");
let mut leftovers = vec![];
while let Ok(Some(event)) = receiver.try_next() {
leftovers.push(event);
}
if !leftovers.is_empty() {
errors.insert(i, leftovers);
}
}
if !errors.is_empty() {
let mut message =
"The following watchers did not handle all of their events:\n".to_owned();
for (k, v) in errors.into_iter() {
message = format!("{} {}: leftover events: {:?}\n", message, k, v);
}
panic!("{}", message);
}
}
async fn get_cur_keys(&self) -> Result<Vec<KeyEntry>, SshKeyManagerError> {
let txn = self.begin().await?;
Ok(txn.get_keys())
}
pub async fn handle_requests(
&self,
mut stream: AuthorizedKeysRequestStream,
) -> Result<(), anyhow::Error> {
while let Some(req) = stream.try_next().await.context("Getting request")? {
match req {
AuthorizedKeysRequest::AddKey { key, responder } => {
responder.send(&mut self.add_key(key).await.map_err(|e| {
warn!(?e, "Error while adding key");
e.to_zx_status().into_raw()
}))?;
}
AuthorizedKeysRequest::WatchKeys { watcher, .. } => {
let (stream, handle) = watcher.into_stream_and_control_handle()?;
let _ = self.start_watch(stream).await.map_err(|e| {
warn!(?e, "Error while starting watch");
handle.shutdown_with_epitaph(e.to_zx_status());
});
}
AuthorizedKeysRequest::RemoveKey { responder, .. } => {
responder.send(&mut Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
}
};
}
Ok(())
}
}
#[async_trait]
impl<T: Read + Write + Seek + Send> EventSender for SshKeyManager<T> {
async fn send_events(&self, events: Vec<KeyEvent>) {
let mut watchers = self.watchers.lock().await;
let mut cur_watchers = vec![];
std::mem::swap(watchers.deref_mut(), &mut cur_watchers);
// Clean up the watchers list as we go.
for watcher in cur_watchers.into_iter() {
if watcher.is_done().await {
continue;
}
for event in events.iter() {
if let Err(e) = watcher.send_event(event.clone()) {
warn!(?e, ?event, "Failed to send event");
}
}
watchers.push(watcher);
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
fidl_fuchsia_ssh::{
AuthorizedKeysMarker, AuthorizedKeysProxy, KeyWatcherMarker, KeyWatcherProxy,
},
std::{io::Cursor, sync::Arc},
};
struct MockEventSender {
expected_events: Mutex<Vec<KeyEvent>>,
}
impl MockEventSender {
pub fn new() -> MockEventSender {
MockEventSender { expected_events: Mutex::new(vec![]) }
}
pub async fn add_key(&self, key: String, txn: &mut Transaction<'_, Cursor<Vec<u8>>>) {
self.expected_events.lock().await.push(KeyEvent {
event: KeyEventType::Added,
key: Some(Box::new(SshAuthorizedKeyEntry { key: key.clone() })),
});
txn.add_key(key).expect("add key ok");
}
pub async fn finish(&self) {
let expected_events = self.expected_events.lock().await;
if expected_events.len() != 0 {
panic!("Missing events that we expected to receive: {:?}", *expected_events);
}
}
}
#[async_trait]
impl EventSender for MockEventSender {
async fn send_events(&self, events: Vec<KeyEvent>) {
let mut expected_events = self.expected_events.lock().await;
for e in events.into_iter() {
let expected = expected_events.pop();
assert_eq!(expected, Some(e), "expected != actual");
}
}
}
fn make_empty_key_file() -> Mutex<Cursor<Vec<u8>>> {
let cursor = Cursor::new(vec![]);
Mutex::new(cursor)
}
fn make_key_file(content: &str) -> Mutex<Cursor<Vec<u8>>> {
let cursor = Cursor::new(content.as_bytes().to_vec());
Mutex::new(cursor)
}
fn expect_has_keys(file: Mutex<Cursor<Vec<u8>>>, expected: Vec<&str>) {
let actual = String::from_utf8(file.into_inner().into_inner())
.expect("valid unicode")
.split('\n')
.filter(|line| !line.starts_with('#') && !line.is_empty())
.map(|line| line.to_owned())
.collect::<Vec<String>>();
let expected = expected.into_iter().map(|e| e.to_owned()).collect::<Vec<String>>();
assert_eq!(actual, expected);
}
const TEST_KEY: &str = "ssh-rsa abcdefg";
#[fuchsia::test]
async fn test_add_key() {
let file = make_empty_key_file();
let mut txn = Transaction::new(file.lock().await).await.expect("start txn okay");
let sender = MockEventSender::new();
sender.add_key(TEST_KEY.to_owned(), &mut txn).await;
txn.commit(&sender).await.expect("commit okay");
expect_has_keys(file, vec![TEST_KEY]);
sender.finish().await;
}
#[fuchsia::test]
async fn test_modify_inflight() {
let mut file = make_empty_key_file();
let sender = MockEventSender::new();
// This is OK because the place that actually holds `file` is the transaction.
// Doing this lets us keep a mutable reference to the file, so that we can modify it after
// the transaction reads the initial state.
let sneaky_ref: &'static mut Cursor<Vec<u8>> =
unsafe { std::mem::transmute(file.get_mut()) };
let mut txn = Transaction::new(file.lock().await).await.expect("start txn okay");
sneaky_ref.write_all("ssh-rsa this-is-a-new-key".as_bytes()).expect("write ok");
// Don't use sender here as we expect this to fail.
txn.add_key(TEST_KEY.to_owned()).expect("add key ok");
txn.commit(&sender).await.expect_err("commit fails");
expect_has_keys(file, vec!["ssh-rsa this-is-a-new-key"]);
sender.finish().await;
}
#[fuchsia::test]
async fn test_invalid_keyfile() {
let file = make_key_file("INVALID KEY");
let sender = MockEventSender::new();
let mut txn = Transaction::new(file.lock().await).await.expect("start txn okay");
sender.add_key(TEST_KEY.to_owned(), &mut txn).await;
txn.commit(&sender).await.expect("commit ok");
expect_has_keys(file, vec![TEST_KEY]);
sender.finish().await;
}
/// Helper used to make sure the SshKeyManager sends the correct watch events.
struct SshKeyManagerTestEnv {
expected_events: Vec<KeyEvent>,
manager: Arc<SshKeyManager<Cursor<Vec<u8>>>>,
manager_proxy: AuthorizedKeysProxy,
watcher_proxy: KeyWatcherProxy,
_task: fasync::Task<Result<(), anyhow::Error>>,
}
impl SshKeyManagerTestEnv {
fn new(keys: &[&str]) -> SshKeyManagerTestEnv {
// Set up the SshKeyManager using a fake file.
let file = Cursor::new(keys.join("\n").as_bytes().to_vec());
let manager = Arc::new(SshKeyManager::with_file(file));
// Populate the expected events list with the initial set of lines in the file.
let mut expected_events = vec![];
for key in keys {
if let Ok(_) = key.parse::<KeyEntry>() {
let event = KeyEvent {
event: KeyEventType::Existing,
key: Some(Box::new(SshAuthorizedKeyEntry { key: key.to_string() })),
};
expected_events.push(event);
}
}
expected_events.push(KeyEvent { event: KeyEventType::FinishedExisting, key: None });
// Set up the proxies and start handling requests.
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<AuthorizedKeysMarker>().unwrap();
let manager_clone = manager.clone();
let task =
fasync::Task::spawn(async move { manager_clone.handle_requests(stream).await });
let (watcher, remote) = fidl::endpoints::create_proxy::<KeyWatcherMarker>().unwrap();
proxy.watch_keys(remote).expect("send watch OK");
SshKeyManagerTestEnv {
expected_events,
manager,
manager_proxy: proxy,
watcher_proxy: watcher,
_task: task,
}
}
async fn add_key(&mut self, key: &str) -> Result<(), zx::Status> {
self.manager_proxy
.add_key(&mut SshAuthorizedKeyEntry { key: key.to_owned() })
.await
.unwrap()
.map_err(zx::Status::from_raw)?;
self.expected_events.push(KeyEvent {
event: KeyEventType::Added,
key: Some(Box::new(SshAuthorizedKeyEntry { key: key.to_owned() })),
});
Ok(())
}
/// Validate that the events received by the `KeyWatcher` are as
/// expected. This assumes that any watchers _NOT_ owned by the
/// `SshKeyManagerTestEnv` have been flushed.
async fn validate(self) {
let mut received = vec![];
for _expected in self.expected_events.iter() {
let actual = self.watcher_proxy.next().await.expect("watcher next() succeeds");
received.push(actual);
}
// Make sure we got what we expected.
assert_eq!(received, self.expected_events);
// Now we want to make sure that there are no extra events waiting for us.
// We can't do this easily without risking the test hanging if there are no extra
// events.
// Order is important here. We have to close manager_proxy, so that self._task will
// finish, so that self.manager is the only reference left to the
// SshKeyManagerInstance.
// Then, we need to close the watcher_proxy so that assert_all_watchers_flushed() can
// take the receiver from each watcher task, which it then uses to make sure there are
// no events left in the buffer.
std::mem::drop(self.manager_proxy);
let _ = self._task.await;
let manager = Arc::try_unwrap(self.manager).unwrap_or_else(|_| panic!("arc is evil"));
std::mem::drop(self.watcher_proxy);
manager.assert_all_watchers_flushed().await;
}
}
#[fuchsia::test]
async fn test_manager_existing_keys() {
let env = SshKeyManagerTestEnv::new(&[
"ssh-rsa key comment",
"ssh-ed25519 key2 comment2",
"# a line that's a comment",
"invalid line",
]);
env.validate().await;
}
#[fuchsia::test]
async fn test_manager_add_keys() {
let mut env = SshKeyManagerTestEnv::new(&[
"ssh-rsa key comment",
"ssh-ed25519 key2 comment2",
"# a line that's a comment",
"invalid line",
]);
env.add_key("invalid key").await.expect_err("Adding key fails");
env.add_key("ssh-dss old-bad-key deprecated and removed")
.await
.expect_err("Adding DSA key fails");
env.add_key("ssh-rsa valid_key valid key line").await.expect("Adding key succeeds");
env.validate().await;
}
#[fuchsia::test]
async fn test_manager_shows_new_watcher_right_state() {
let mut env = SshKeyManagerTestEnv::new(&[]);
env.add_key("ssh-rsa valid_key").await.expect("Adding key succeeds");
let (proxy, remote) = fidl::endpoints::create_proxy::<KeyWatcherMarker>().unwrap();
env.manager_proxy.watch_keys(remote).unwrap();
// The added key should be "existing" now.
assert_eq!(proxy.next().await.unwrap().event, KeyEventType::Existing);
assert_eq!(proxy.next().await.unwrap().event, KeyEventType::FinishedExisting);
std::mem::drop(proxy);
env.validate().await;
}
#[fuchsia::test]
async fn test_manager_stops_sending_events_when_watcher_closed() {
let mut env = SshKeyManagerTestEnv::new(&[]);
env.add_key("ssh-rsa valid_key").await.expect("Adding key succeeds");
let (proxy, remote) = fidl::endpoints::create_proxy::<KeyWatcherMarker>().unwrap();
env.manager_proxy.watch_keys(remote).unwrap();
assert_eq!(proxy.next().await.unwrap().event, KeyEventType::Existing);
assert_eq!(proxy.next().await.unwrap().event, KeyEventType::FinishedExisting);
std::mem::drop(proxy);
env.add_key("ssh-rsa valid_key").await.expect("Adding another key succeeds");
// Validate should succeed, since the watcher should've been dropped from the
// SshKeyManager's watcher list.
env.validate().await;
}
}