| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::keys::{KeyEntry, ParseKeyError}, |
| anyhow::Context as _, |
| async_trait::async_trait, |
| fidl::prelude::*, |
| fidl_fuchsia_ssh::{ |
| AuthorizedKeysRequest, AuthorizedKeysRequestStream, KeyEvent, KeyEventType, |
| KeyWatcherRequest, KeyWatcherRequestStream, SshAuthorizedKeyEntry, |
| }, |
| fuchsia_async as fasync, fuchsia_zircon as zx, |
| futures::{ |
| channel::mpsc, |
| lock::{Mutex, MutexGuard}, |
| prelude::*, |
| }, |
| std::{ |
| fs::{File, OpenOptions}, |
| io::{BufRead, BufReader, Read, Seek, SeekFrom, Write}, |
| ops::DerefMut, |
| path::Path, |
| sync::Arc, |
| }, |
| thiserror::Error, |
| tracing::*, |
| }; |
| |
| #[derive(Error, Debug)] |
| pub enum SshKeyManagerError { |
| #[error("i/o error")] |
| IoError(#[source] std::io::Error), |
| #[error("Invalid Key")] |
| InvalidKey( |
| #[source] |
| #[from] |
| ParseKeyError, |
| ), |
| #[error("File changed while transaction was completed")] |
| TransactionInvalidated, |
| #[error("Error starting watcher")] |
| WatcherInit, |
| } |
| |
| impl SshKeyManagerError { |
| pub fn to_zx_status(&self) -> zx::Status { |
| match self { |
| SshKeyManagerError::IoError(_) => zx::Status::IO, |
| SshKeyManagerError::InvalidKey(_) => zx::Status::INVALID_ARGS, |
| SshKeyManagerError::TransactionInvalidated => zx::Status::INTERRUPTED_RETRY, |
| SshKeyManagerError::WatcherInit => zx::Status::INTERNAL, |
| } |
| } |
| } |
| |
| /// A transaction represents an atomic set of modifications to the authorized_keys file. |
| pub struct Transaction<'a, T: Read + Seek + Write + Send> { |
| file: MutexGuard<'a, T>, |
| start_state: Vec<KeyEntry>, |
| end_state: Vec<KeyEntry>, |
| actions: Vec<KeyEvent>, |
| } |
| |
| #[async_trait] |
| pub trait EventSender: Sync + Send { |
| /// Send the given events to all watchers. |
| async fn send_events(&self, events: Vec<KeyEvent>); |
| } |
| |
| impl<'a, T: Read + Write + Seek + Send> Transaction<'a, T> { |
| async fn new(file: MutexGuard<'a, T>) -> Result<Transaction<'a, T>, SshKeyManagerError> { |
| let mut txn = Transaction { file, start_state: vec![], end_state: vec![], actions: vec![] }; |
| let keys = txn.load_keys().await?; |
| txn.start_state = keys.clone(); |
| txn.end_state = keys; |
| Ok(txn) |
| } |
| |
| /// Load the current set of keys from the disk, silently skipping any invalid lines. |
| async fn load_keys(&mut self) -> Result<Vec<KeyEntry>, SshKeyManagerError> { |
| let file = self.file.deref_mut(); |
| file.seek(SeekFrom::Start(0)).map_err(SshKeyManagerError::IoError)?; |
| let reader = BufReader::new(file); |
| let mut result = vec![]; |
| for line in reader.lines() { |
| let line = line.map_err(SshKeyManagerError::IoError)?; |
| if line.starts_with('#') || line.is_empty() { |
| continue; |
| } |
| |
| let entry = match line.parse::<KeyEntry>() { |
| Ok(entry) => entry, |
| Err(e) => { |
| warn!(%e, %line, "Poorly formatted line in authorized_keys"); |
| continue; |
| } |
| }; |
| result.push(entry); |
| } |
| Ok(result) |
| } |
| |
| /// Write the final set of keys from this transaction to the disk. |
| async fn write_keys(&mut self) -> Result<(), std::io::Error> { |
| let file = self.file.deref_mut(); |
| file.seek(SeekFrom::Start(0))?; |
| |
| file.write_all( |
| "# This file is auto-generated by ssh-key-manager, edits may be overwritten.\n" |
| .as_bytes(), |
| )?; |
| for entry in self.end_state.iter() { |
| file.write_all(entry.to_string().as_bytes())?; |
| file.write_all(b"\n")?; |
| } |
| |
| Ok(()) |
| } |
| |
| fn get_keys(&self) -> Vec<KeyEntry> { |
| self.end_state.clone() |
| } |
| |
| /// Add a key to the transaction. The key is not actually persisted until |
| /// `commit()` is called. |
| pub fn add_key(&mut self, key: String) -> Result<(), SshKeyManagerError> { |
| let entry = key.parse::<KeyEntry>()?; |
| self.end_state.push(entry); |
| self.actions.push(KeyEvent { |
| event: KeyEventType::Added, |
| key: Some(Box::new(SshAuthorizedKeyEntry { key })), |
| }); |
| |
| Ok(()) |
| } |
| |
| /// Commit all the changes in this `Transaction` to the disk, or fail if the |
| /// underlying authorized_keys file was modified since the transaction |
| /// started. |
| pub async fn commit(mut self, manager: &dyn EventSender) -> Result<(), SshKeyManagerError> { |
| // We have to deal with the possibility that something else touched |
| // authorized_keys, even if it wasn't us. This isn't going to be perfect |
| // -- there's no way of guaranteeing that nothing will happen between |
| // `load_keys()` and `write_keys()` below, but this is better than |
| // nothing. |
| let current_state = self.load_keys().await?; |
| if current_state != self.start_state { |
| // Refuse to write out. |
| return Err(SshKeyManagerError::TransactionInvalidated); |
| } |
| |
| self.write_keys().await.map_err(SshKeyManagerError::IoError)?; |
| manager.send_events(self.actions).await; |
| Ok(()) |
| } |
| } |
| |
| struct KeyEventSender { |
| sender: mpsc::UnboundedSender<KeyEvent>, |
| _task: fasync::Task<Result<mpsc::UnboundedReceiver<KeyEvent>, anyhow::Error>>, |
| finished: Arc<Mutex<bool>>, |
| } |
| |
| impl KeyEventSender { |
| pub fn new( |
| mut stream: KeyWatcherRequestStream, |
| existing: Vec<KeyEntry>, |
| ) -> Result<Self, anyhow::Error> { |
| let (sender, mut receiver) = mpsc::unbounded::<KeyEvent>(); |
| let finished = Arc::new(Mutex::new(false)); |
| let clone = Arc::clone(&finished); |
| let task = fasync::Task::spawn(async move { |
| while let Ok(Some(req)) = stream.try_next().await { |
| let KeyWatcherRequest::Next { responder } = req; |
| let mut event = receiver.next().await.unwrap(); |
| responder.send(&mut event)?; |
| } |
| *clone.lock().await = true; |
| Ok(receiver) |
| }); |
| |
| for i in existing.into_iter().map(|item| KeyEvent { |
| event: KeyEventType::Existing, |
| key: Some(Box::new(SshAuthorizedKeyEntry { key: item.to_string() })), |
| }) { |
| sender.unbounded_send(i)?; |
| } |
| |
| sender.unbounded_send(KeyEvent { event: KeyEventType::FinishedExisting, key: None })?; |
| |
| Ok(KeyEventSender { sender, _task: task, finished }) |
| } |
| |
| pub fn send_event(&self, event: KeyEvent) -> Result<(), anyhow::Error> { |
| self.sender.unbounded_send(event)?; |
| Ok(()) |
| } |
| |
| pub async fn is_done(&self) -> bool { |
| *self.finished.lock().await |
| } |
| } |
| |
| pub struct SshKeyManager<T: Read + Write + Seek + Send> { |
| authorized_keys: Mutex<T>, |
| /// List of registered watchers for key events. This mutex must always be acquired before |
| /// authorized_keys. |
| watchers: Mutex<Vec<KeyEventSender>>, |
| } |
| |
| impl SshKeyManager<File> { |
| pub fn new(keys_path: impl AsRef<Path>) -> Result<SshKeyManager<File>, SshKeyManagerError> { |
| let file = OpenOptions::new() |
| .read(true) |
| .write(true) |
| .open(keys_path) |
| .map_err(SshKeyManagerError::IoError)?; |
| |
| Ok(SshKeyManager::with_file(file)) |
| } |
| } |
| |
| impl<T: Read + Write + Seek + Send> SshKeyManager<T> { |
| pub fn with_file(file: T) -> SshKeyManager<T> { |
| SshKeyManager { authorized_keys: Mutex::new(file), watchers: Mutex::new(vec![]) } |
| } |
| |
| /// Begin a `Transaction`. |
| async fn begin<'a>(&'a self) -> Result<Transaction<'a, T>, SshKeyManagerError> { |
| Transaction::new(self.authorized_keys.lock().await).await |
| } |
| |
| /// Add a key to the authorized keys list. |
| async fn add_key(&self, key: SshAuthorizedKeyEntry) -> Result<(), SshKeyManagerError> { |
| let mut txn = self.begin().await?; |
| txn.add_key(key.key)?; |
| txn.commit(self).await?; |
| Ok(()) |
| } |
| |
| /// Start sending new `WatchEvent`s to the given stream. |
| async fn start_watch(&self, stream: KeyWatcherRequestStream) -> Result<(), SshKeyManagerError> { |
| let mut watcher_list = self.watchers.lock().await; |
| let existing = self.get_cur_keys().await?; |
| let watcher = |
| KeyEventSender::new(stream, existing).map_err(|_| SshKeyManagerError::WatcherInit)?; |
| watcher_list.push(watcher); |
| Ok(()) |
| } |
| |
| #[cfg(test)] |
| /// Used for testing to make sure that all watchers have received all the |
| /// `KeyEvent`s. |
| pub async fn assert_all_watchers_flushed(self) { |
| let watcher_list = self.watchers.into_inner(); |
| let mut errors = std::collections::HashMap::new(); |
| for (i, mut watcher) in watcher_list.into_iter().enumerate() { |
| watcher.sender.flush().await.expect("flush ok"); |
| let mut receiver = watcher._task.await.expect("watcher task finishes successfully"); |
| let mut leftovers = vec![]; |
| while let Ok(Some(event)) = receiver.try_next() { |
| leftovers.push(event); |
| } |
| if !leftovers.is_empty() { |
| errors.insert(i, leftovers); |
| } |
| } |
| |
| if !errors.is_empty() { |
| let mut message = |
| "The following watchers did not handle all of their events:\n".to_owned(); |
| for (k, v) in errors.into_iter() { |
| message = format!("{} {}: leftover events: {:?}\n", message, k, v); |
| } |
| panic!("{}", message); |
| } |
| } |
| |
| async fn get_cur_keys(&self) -> Result<Vec<KeyEntry>, SshKeyManagerError> { |
| let txn = self.begin().await?; |
| Ok(txn.get_keys()) |
| } |
| |
| pub async fn handle_requests( |
| &self, |
| mut stream: AuthorizedKeysRequestStream, |
| ) -> Result<(), anyhow::Error> { |
| while let Some(req) = stream.try_next().await.context("Getting request")? { |
| match req { |
| AuthorizedKeysRequest::AddKey { key, responder } => { |
| responder.send(&mut self.add_key(key).await.map_err(|e| { |
| warn!(?e, "Error while adding key"); |
| e.to_zx_status().into_raw() |
| }))?; |
| } |
| AuthorizedKeysRequest::WatchKeys { watcher, .. } => { |
| let (stream, handle) = watcher.into_stream_and_control_handle()?; |
| let _ = self.start_watch(stream).await.map_err(|e| { |
| warn!(?e, "Error while starting watch"); |
| handle.shutdown_with_epitaph(e.to_zx_status()); |
| }); |
| } |
| AuthorizedKeysRequest::RemoveKey { responder, .. } => { |
| responder.send(&mut Err(zx::Status::NOT_SUPPORTED.into_raw()))?; |
| } |
| }; |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl<T: Read + Write + Seek + Send> EventSender for SshKeyManager<T> { |
| async fn send_events(&self, events: Vec<KeyEvent>) { |
| let mut watchers = self.watchers.lock().await; |
| let mut cur_watchers = vec![]; |
| std::mem::swap(watchers.deref_mut(), &mut cur_watchers); |
| // Clean up the watchers list as we go. |
| for watcher in cur_watchers.into_iter() { |
| if watcher.is_done().await { |
| continue; |
| } |
| for event in events.iter() { |
| if let Err(e) = watcher.send_event(event.clone()) { |
| warn!(?e, ?event, "Failed to send event"); |
| } |
| } |
| watchers.push(watcher); |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| fidl_fuchsia_ssh::{ |
| AuthorizedKeysMarker, AuthorizedKeysProxy, KeyWatcherMarker, KeyWatcherProxy, |
| }, |
| std::{io::Cursor, sync::Arc}, |
| }; |
| |
| struct MockEventSender { |
| expected_events: Mutex<Vec<KeyEvent>>, |
| } |
| impl MockEventSender { |
| pub fn new() -> MockEventSender { |
| MockEventSender { expected_events: Mutex::new(vec![]) } |
| } |
| |
| pub async fn add_key(&self, key: String, txn: &mut Transaction<'_, Cursor<Vec<u8>>>) { |
| self.expected_events.lock().await.push(KeyEvent { |
| event: KeyEventType::Added, |
| key: Some(Box::new(SshAuthorizedKeyEntry { key: key.clone() })), |
| }); |
| txn.add_key(key).expect("add key ok"); |
| } |
| |
| pub async fn finish(&self) { |
| let expected_events = self.expected_events.lock().await; |
| if expected_events.len() != 0 { |
| panic!("Missing events that we expected to receive: {:?}", *expected_events); |
| } |
| } |
| } |
| #[async_trait] |
| impl EventSender for MockEventSender { |
| async fn send_events(&self, events: Vec<KeyEvent>) { |
| let mut expected_events = self.expected_events.lock().await; |
| for e in events.into_iter() { |
| let expected = expected_events.pop(); |
| assert_eq!(expected, Some(e), "expected != actual"); |
| } |
| } |
| } |
| |
| fn make_empty_key_file() -> Mutex<Cursor<Vec<u8>>> { |
| let cursor = Cursor::new(vec![]); |
| Mutex::new(cursor) |
| } |
| |
| fn make_key_file(content: &str) -> Mutex<Cursor<Vec<u8>>> { |
| let cursor = Cursor::new(content.as_bytes().to_vec()); |
| Mutex::new(cursor) |
| } |
| |
| fn expect_has_keys(file: Mutex<Cursor<Vec<u8>>>, expected: Vec<&str>) { |
| let actual = String::from_utf8(file.into_inner().into_inner()) |
| .expect("valid unicode") |
| .split('\n') |
| .filter(|line| !line.starts_with('#') && !line.is_empty()) |
| .map(|line| line.to_owned()) |
| .collect::<Vec<String>>(); |
| |
| let expected = expected.into_iter().map(|e| e.to_owned()).collect::<Vec<String>>(); |
| |
| assert_eq!(actual, expected); |
| } |
| |
| const TEST_KEY: &str = "ssh-rsa abcdefg"; |
| |
| #[fuchsia::test] |
| async fn test_add_key() { |
| let file = make_empty_key_file(); |
| let mut txn = Transaction::new(file.lock().await).await.expect("start txn okay"); |
| let sender = MockEventSender::new(); |
| sender.add_key(TEST_KEY.to_owned(), &mut txn).await; |
| txn.commit(&sender).await.expect("commit okay"); |
| |
| expect_has_keys(file, vec![TEST_KEY]); |
| sender.finish().await; |
| } |
| |
| #[fuchsia::test] |
| async fn test_modify_inflight() { |
| let mut file = make_empty_key_file(); |
| let sender = MockEventSender::new(); |
| // This is OK because the place that actually holds `file` is the transaction. |
| // Doing this lets us keep a mutable reference to the file, so that we can modify it after |
| // the transaction reads the initial state. |
| let sneaky_ref: &'static mut Cursor<Vec<u8>> = |
| unsafe { std::mem::transmute(file.get_mut()) }; |
| let mut txn = Transaction::new(file.lock().await).await.expect("start txn okay"); |
| sneaky_ref.write_all("ssh-rsa this-is-a-new-key".as_bytes()).expect("write ok"); |
| // Don't use sender here as we expect this to fail. |
| txn.add_key(TEST_KEY.to_owned()).expect("add key ok"); |
| txn.commit(&sender).await.expect_err("commit fails"); |
| expect_has_keys(file, vec!["ssh-rsa this-is-a-new-key"]); |
| sender.finish().await; |
| } |
| |
| #[fuchsia::test] |
| async fn test_invalid_keyfile() { |
| let file = make_key_file("INVALID KEY"); |
| let sender = MockEventSender::new(); |
| let mut txn = Transaction::new(file.lock().await).await.expect("start txn okay"); |
| sender.add_key(TEST_KEY.to_owned(), &mut txn).await; |
| txn.commit(&sender).await.expect("commit ok"); |
| expect_has_keys(file, vec![TEST_KEY]); |
| sender.finish().await; |
| } |
| |
| /// Helper used to make sure the SshKeyManager sends the correct watch events. |
| struct SshKeyManagerTestEnv { |
| expected_events: Vec<KeyEvent>, |
| manager: Arc<SshKeyManager<Cursor<Vec<u8>>>>, |
| manager_proxy: AuthorizedKeysProxy, |
| watcher_proxy: KeyWatcherProxy, |
| _task: fasync::Task<Result<(), anyhow::Error>>, |
| } |
| |
| impl SshKeyManagerTestEnv { |
| fn new(keys: &[&str]) -> SshKeyManagerTestEnv { |
| // Set up the SshKeyManager using a fake file. |
| let file = Cursor::new(keys.join("\n").as_bytes().to_vec()); |
| let manager = Arc::new(SshKeyManager::with_file(file)); |
| |
| // Populate the expected events list with the initial set of lines in the file. |
| let mut expected_events = vec![]; |
| for key in keys { |
| if let Ok(_) = key.parse::<KeyEntry>() { |
| let event = KeyEvent { |
| event: KeyEventType::Existing, |
| key: Some(Box::new(SshAuthorizedKeyEntry { key: key.to_string() })), |
| }; |
| expected_events.push(event); |
| } |
| } |
| expected_events.push(KeyEvent { event: KeyEventType::FinishedExisting, key: None }); |
| |
| // Set up the proxies and start handling requests. |
| let (proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<AuthorizedKeysMarker>().unwrap(); |
| let manager_clone = manager.clone(); |
| let task = |
| fasync::Task::spawn(async move { manager_clone.handle_requests(stream).await }); |
| let (watcher, remote) = fidl::endpoints::create_proxy::<KeyWatcherMarker>().unwrap(); |
| proxy.watch_keys(remote).expect("send watch OK"); |
| |
| SshKeyManagerTestEnv { |
| expected_events, |
| manager, |
| manager_proxy: proxy, |
| watcher_proxy: watcher, |
| _task: task, |
| } |
| } |
| |
| async fn add_key(&mut self, key: &str) -> Result<(), zx::Status> { |
| self.manager_proxy |
| .add_key(&mut SshAuthorizedKeyEntry { key: key.to_owned() }) |
| .await |
| .unwrap() |
| .map_err(zx::Status::from_raw)?; |
| self.expected_events.push(KeyEvent { |
| event: KeyEventType::Added, |
| key: Some(Box::new(SshAuthorizedKeyEntry { key: key.to_owned() })), |
| }); |
| Ok(()) |
| } |
| |
| /// Validate that the events received by the `KeyWatcher` are as |
| /// expected. This assumes that any watchers _NOT_ owned by the |
| /// `SshKeyManagerTestEnv` have been flushed. |
| async fn validate(self) { |
| let mut received = vec![]; |
| for _expected in self.expected_events.iter() { |
| let actual = self.watcher_proxy.next().await.expect("watcher next() succeeds"); |
| received.push(actual); |
| } |
| // Make sure we got what we expected. |
| assert_eq!(received, self.expected_events); |
| |
| // Now we want to make sure that there are no extra events waiting for us. |
| // We can't do this easily without risking the test hanging if there are no extra |
| // events. |
| // Order is important here. We have to close manager_proxy, so that self._task will |
| // finish, so that self.manager is the only reference left to the |
| // SshKeyManagerInstance. |
| // Then, we need to close the watcher_proxy so that assert_all_watchers_flushed() can |
| // take the receiver from each watcher task, which it then uses to make sure there are |
| // no events left in the buffer. |
| std::mem::drop(self.manager_proxy); |
| let _ = self._task.await; |
| let manager = Arc::try_unwrap(self.manager).unwrap_or_else(|_| panic!("arc is evil")); |
| std::mem::drop(self.watcher_proxy); |
| manager.assert_all_watchers_flushed().await; |
| } |
| } |
| |
| #[fuchsia::test] |
| async fn test_manager_existing_keys() { |
| let env = SshKeyManagerTestEnv::new(&[ |
| "ssh-rsa key comment", |
| "ssh-ed25519 key2 comment2", |
| "# a line that's a comment", |
| "invalid line", |
| ]); |
| |
| env.validate().await; |
| } |
| |
| #[fuchsia::test] |
| async fn test_manager_add_keys() { |
| let mut env = SshKeyManagerTestEnv::new(&[ |
| "ssh-rsa key comment", |
| "ssh-ed25519 key2 comment2", |
| "# a line that's a comment", |
| "invalid line", |
| ]); |
| |
| env.add_key("invalid key").await.expect_err("Adding key fails"); |
| env.add_key("ssh-dss old-bad-key deprecated and removed") |
| .await |
| .expect_err("Adding DSA key fails"); |
| env.add_key("ssh-rsa valid_key valid key line").await.expect("Adding key succeeds"); |
| |
| env.validate().await; |
| } |
| |
| #[fuchsia::test] |
| async fn test_manager_shows_new_watcher_right_state() { |
| let mut env = SshKeyManagerTestEnv::new(&[]); |
| env.add_key("ssh-rsa valid_key").await.expect("Adding key succeeds"); |
| let (proxy, remote) = fidl::endpoints::create_proxy::<KeyWatcherMarker>().unwrap(); |
| env.manager_proxy.watch_keys(remote).unwrap(); |
| |
| // The added key should be "existing" now. |
| assert_eq!(proxy.next().await.unwrap().event, KeyEventType::Existing); |
| assert_eq!(proxy.next().await.unwrap().event, KeyEventType::FinishedExisting); |
| std::mem::drop(proxy); |
| |
| env.validate().await; |
| } |
| |
| #[fuchsia::test] |
| async fn test_manager_stops_sending_events_when_watcher_closed() { |
| let mut env = SshKeyManagerTestEnv::new(&[]); |
| env.add_key("ssh-rsa valid_key").await.expect("Adding key succeeds"); |
| let (proxy, remote) = fidl::endpoints::create_proxy::<KeyWatcherMarker>().unwrap(); |
| env.manager_proxy.watch_keys(remote).unwrap(); |
| |
| assert_eq!(proxy.next().await.unwrap().event, KeyEventType::Existing); |
| assert_eq!(proxy.next().await.unwrap().event, KeyEventType::FinishedExisting); |
| std::mem::drop(proxy); |
| env.add_key("ssh-rsa valid_key").await.expect("Adding another key succeeds"); |
| |
| // Validate should succeed, since the watcher should've been dropped from the |
| // SshKeyManager's watcher list. |
| env.validate().await; |
| } |
| } |