blob: 2966e376642f929cb016d3081176777bd3c21839 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fidl::encoding::OutOfLine;
use fidl_fuchsia_ledger_cloud::{
CloudProviderRequest, CloudProviderRequestStream, CommitPack, DeviceSetRequest,
DeviceSetRequestStream, DeviceSetWatcherProxy, PageCloudRequest, PageCloudRequestStream,
PageCloudWatcherProxy, Status,
};
use futures::future;
use futures::future::{FutureExt, LocalFutureObj};
use futures::prelude::*;
use futures::select;
use futures::stream::FuturesUnordered;
use std::cell::{Cell, RefCell};
use std::convert::{Into, TryFrom};
use std::rc::Rc;
use crate::serialization::*;
use crate::state::*;
use crate::utils::{FutureOrEmpty, Signal, SignalWatcher};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ConnectionState {
Connected,
Disconnected,
}
/// Shared data accessible by any connection derived from a CloudSession.
pub struct CloudSessionShared {
pub storage: Rc<RefCell<Cloud>>,
network_state: Cell<ConnectionState>,
network_state_signal: RefCell<Signal>,
}
impl CloudSessionShared {
pub fn new(storage: Rc<RefCell<Cloud>>) -> CloudSessionShared {
CloudSessionShared {
storage,
network_state: Cell::new(ConnectionState::Connected),
network_state_signal: RefCell::new(Signal::new()),
}
}
pub fn get_network_state(&self) -> ConnectionState {
self.network_state.get()
}
#[allow(unused)]
pub fn set_network_state(&self, new_state: ConnectionState) {
if new_state == self.network_state.get() {
return;
};
self.network_state.set(new_state);
self.network_state_signal.borrow_mut().signal_and_rearm()
}
pub fn watch_network_state(&self) -> SignalWatcher {
self.network_state_signal.borrow().watch()
}
}
/// The state of a DeviceSet connection.
struct DeviceSetSession {
/// Shared state.
shared: Rc<CloudSessionShared>,
/// Stream of requests.
requests: stream::Fuse<DeviceSetRequestStream>,
/// If a watcher is set, contains the future from storage that
/// completes when the cloud is erased, and the watcher to signal
/// in that case.
watcher: Option<(future::Fuse<DeviceSetWatcher>, DeviceSetWatcherProxy)>,
/// Signals network state changes.
network_watcher: SignalWatcher,
}
type DeviceSetSessionFuture = LocalFutureObj<'static, ()>;
impl DeviceSetSession {
fn new(shared: Rc<CloudSessionShared>, requests: DeviceSetRequestStream) -> DeviceSetSession {
let network_watcher = shared.watch_network_state();
DeviceSetSession { shared, requests: requests.fuse(), watcher: None, network_watcher }
}
fn handle_request_disconnected(&mut self, req: DeviceSetRequest) -> Result<(), fidl::Error> {
match req {
DeviceSetRequest::CheckFingerprint { responder, .. } => {
responder.send(Status::NetworkError)
}
DeviceSetRequest::SetFingerprint { responder, .. } => {
responder.send(Status::NetworkError)
}
DeviceSetRequest::Erase { responder } => responder.send(Status::NetworkError),
DeviceSetRequest::SetWatcher { responder, watcher, .. } => {
responder.send(Status::NetworkError)?;
let proxy = watcher.into_proxy()?;
proxy.on_error(Status::NetworkError)
}
}
}
fn handle_request(&mut self, req: DeviceSetRequest) -> Result<(), fidl::Error> {
let mut storage = self.shared.storage.borrow_mut();
let device_set = storage.get_device_set();
match req {
DeviceSetRequest::CheckFingerprint { fingerprint, responder } => responder.send(
CloudError::status(device_set.check_fingerprint(&Fingerprint::from(fingerprint))),
),
DeviceSetRequest::SetFingerprint { fingerprint, responder } => {
device_set.set_fingerprint(Fingerprint::from(fingerprint));
responder.send(Status::Ok)
}
DeviceSetRequest::Erase { responder } => {
device_set.erase();
responder.send(Status::Ok)
}
DeviceSetRequest::SetWatcher { fingerprint, watcher: watcher_channel, responder } => {
let proxy = watcher_channel.into_proxy()?;
match device_set.watch(&Fingerprint::from(fingerprint)) {
Err(e) => {
responder.send(Status::from(e))?;
proxy.on_error(Status::NotFound)
}
Ok(fut) => {
self.watcher.replace((fut.fuse(), proxy));
responder.send(Status::Ok)
}
}
}
}
}
async fn handle_requests(mut self) -> Result<(), fidl::Error> {
loop {
select! {
_ = &mut self.network_watcher => {
self.network_watcher = self.shared.watch_network_state();
if self.shared.get_network_state() == ConnectionState::Disconnected {
if let Some((_watcher, proxy)) = self.watcher.take() {
proxy.on_error(Status::NetworkError)?
}
}
},
req = self.requests.try_next() =>
match req? {
None => return Ok(()),
Some(req) => match self.shared.get_network_state() {
ConnectionState::Disconnected => self.handle_request_disconnected(req)?,
_ => self.handle_request(req)?,
}
},
_ = FutureOrEmpty(self.watcher.as_mut().map(|(w, _)| w)) => {
// self.watcher cannot be None here.
let (_, proxy) = self.watcher.take().unwrap();
proxy.on_cloud_erased()?
}
}
}
}
/// Runs the device set.
fn run(self) -> DeviceSetSessionFuture {
LocalFutureObj::new(Box::new(self.handle_requests().map(|_| ())))
}
}
/// A future corresponding to a connection to a PageWatcher.
type PageWatcherFuture = LocalFutureObj<'static, ()>;
/// The state of a PageCloud connection.
struct PageSession {
/// Shared data.
shared: Rc<CloudSessionShared>,
/// Id of the page.
page_id: PageId,
/// The stream of requests on this connection.
requests: stream::Fuse<PageCloudRequestStream>,
/// If a watcher is set, the future that completes when the watcher disconnects.
watcher: Option<(future::Fuse<PageWatcherFuture>, PageCloudWatcherProxy)>,
/// Signals changes in network state.
network_watcher: SignalWatcher,
}
/// The type of the future returned by `PageSession::run`.
type PageSessionFuture = LocalFutureObj<'static, ()>;
impl PageSession {
fn new(
shared: Rc<CloudSessionShared>,
page_id: PageId,
requests: PageCloudRequestStream,
) -> PageSession {
let network_watcher = shared.watch_network_state();
PageSession { shared, page_id, requests: requests.fuse(), watcher: None, network_watcher }
}
/// State machine for the page watcher.
/// A page watcher connection is at any point:
/// - waiting on new commits in storage
/// - waiting for the watcher to acknowledge previously sent commits.
async fn run_page_watcher(
shared: Rc<CloudSessionShared>,
page_id: PageId,
mut position: Token,
proxy: PageCloudWatcherProxy,
) {
loop {
let fut = shared.storage.borrow_mut().get_page(page_id.clone()).watch(position);
if let Some(fut) = fut {
await!(fut).expect("Cloud state destoyed before PageSession");
}
let mut exclusive_storage = shared.storage.borrow_mut();
if let Some((next_position, commits)) =
exclusive_storage.get_page(page_id.clone()).get_commits(position)
{
position = next_position;
let buf = Commit::serialize_vec(commits);
// Release the storage before await-ing.
std::mem::drop(exclusive_storage);
match await!(
proxy.on_new_commits(&mut CommitPack { buffer: buf }, &mut position.into(),)
) {
Ok(()) => {}
Err(_) => return (), // Assume the connection closed.
}
}
}
}
fn handle_request(&mut self, request: PageCloudRequest) -> Result<(), fidl::Error> {
let mut storage = self.shared.storage.borrow_mut();
let page = storage.get_page(self.page_id.clone());
match request {
PageCloudRequest::AddCommits { commits, responder } => {
match Commit::deserialize_vec(commits.buffer) {
Err(e) => responder.send(Status::from(e)),
Ok(commits) => responder.send(CloudError::status(page.add_commits(commits))),
}
}
PageCloudRequest::GetCommits { min_position_token, responder } => {
match Token::try_from(min_position_token) {
Err(_) => responder.send(Status::ParseError, None, None),
Ok(position) => {
let (position, commits) = match page.get_commits(position) {
None => (None, Vec::new()),
Some((position, commits)) => (Some(position), commits),
};
let buf = Commit::serialize_vec(commits);
// This must live until the end of the call
let mut position = position.map(Token::into);
let position = position.as_mut().map(OutOfLine);
responder.send(
Status::Ok,
Some(OutOfLine(&mut CommitPack { buffer: buf })),
position,
)
}
}
}
PageCloudRequest::AddObject { id, buffer, responder } => {
let mut data = Vec::new();
match read_buffer(buffer, &mut data) {
Err(_) => responder.send(Status::ArgumentError),
Ok(()) => responder.send(CloudError::status(
page.add_object(ObjectId::from(id), Object { data }),
)),
}
}
PageCloudRequest::GetObject { id, responder } => {
match page.get_object(&ObjectId(id.clone())) {
Err(e) => responder.send(Status::from(e), None),
Ok(obj) => responder.send(
Status::Ok,
Some(OutOfLine(
&mut write_buffer(obj.data.as_slice()).expect("Failed to write buffer"),
)),
),
}
}
PageCloudRequest::SetWatcher {
min_position_token,
watcher: watcher_channel,
responder,
} => match Token::try_from(min_position_token) {
Err(_) => responder.send(Status::ParseError),
Ok(position) => {
let proxy = watcher_channel.into_proxy()?;
let watcher = Self::run_page_watcher(
Rc::clone(&self.shared),
self.page_id.clone(),
position,
proxy.clone(),
);
let watcher = LocalFutureObj::new(Box::new(watcher)).fuse();
self.watcher.replace((watcher, proxy));
responder.send(Status::Ok)
}
},
}
}
fn handle_request_disconnected(
&mut self,
request: PageCloudRequest,
) -> Result<(), fidl::Error> {
match request {
PageCloudRequest::AddCommits { responder, .. } => responder.send(Status::NetworkError),
PageCloudRequest::GetCommits { responder, .. } => {
responder.send(Status::NetworkError, None, None)
}
PageCloudRequest::AddObject { responder, .. } => responder.send(Status::NetworkError),
PageCloudRequest::GetObject { responder, .. } => {
responder.send(Status::NetworkError, None)
}
PageCloudRequest::SetWatcher { responder, watcher, .. } => {
// Ledger seems to require that we do not send an error here, but on the watcher instead.
responder.send(Status::Ok)?;
watcher.into_proxy()?.on_error(Status::NetworkError)
}
}
}
async fn handle_requests(mut self) -> Result<(), fidl::Error> {
loop {
select! {
_ = &mut self.network_watcher => {
self.network_watcher = self.shared.watch_network_state();
if self.shared.get_network_state() == ConnectionState::Disconnected {
if let Some((_watcher, proxy)) = self.watcher.take() {
// Ignoring errors because they should only close the proxy connection.
let _ = proxy.on_error(Status::NetworkError);
}
}
},
req = self.requests.try_next() => {
match req? {
None => return Ok(()),
Some(req) => match self.shared.get_network_state() {
ConnectionState::Disconnected => self.handle_request_disconnected(req)?,
_ => self.handle_request(req)?
}
}
},
() = FutureOrEmpty(self.watcher.as_mut().map(|(w,_)| w)) => {
// The watcher has been disconnected.
self.watcher.take();
}
}
}
}
fn run(self) -> PageSessionFuture {
LocalFutureObj::new(Box::new(self.handle_requests().map(|_| ())))
}
}
/// Holds the state of a PageCloud connection.
pub struct CloudSession {
/// Shared CloudSession data.
shared: Rc<CloudSessionShared>,
/// The stream of incoming requests.
requests: stream::Fuse<CloudProviderRequestStream>,
/// Futures for each active DeviceSet connection.
device_sets: FuturesUnordered<DeviceSetSessionFuture>,
/// Futures for each active PageCloud connection.
pages: FuturesUnordered<PageSessionFuture>,
}
pub type CloudSessionFuture = LocalFutureObj<'static, ()>;
impl CloudSession {
pub fn new(state: Rc<CloudSessionShared>, stream: CloudProviderRequestStream) -> CloudSession {
CloudSession {
shared: state,
requests: stream.fuse(),
device_sets: FuturesUnordered::new(),
pages: FuturesUnordered::new(),
}
}
fn handle_request(&mut self, req: CloudProviderRequest) -> Result<(), fidl::Error> {
match req {
CloudProviderRequest::GetDeviceSet { device_set: device_set_channel, responder } => {
let stream = device_set_channel.into_stream()?;
self.device_sets.push(DeviceSetSession::new(Rc::clone(&self.shared), stream).run());
responder.send(Status::Ok)
}
CloudProviderRequest::GetPageCloud { app_id, page_id, page_cloud, responder } => {
let stream = page_cloud.into_stream()?;
let page_id = PageId::from(app_id, page_id);
self.pages.push(PageSession::new(Rc::clone(&self.shared), page_id, stream).run());
responder.send(Status::Ok)
}
}
}
async fn handle_requests(mut self) -> Result<(), fidl::Error> {
loop {
select! {
_ = self.device_sets.next() => {},
_ = self.pages.next() => {},
req = self.requests.try_next() =>
match req? {
Some(req) => self.handle_request(req)?,
None => return Ok(())
}
}
}
}
pub fn run(self) -> CloudSessionFuture {
LocalFutureObj::new(Box::new(self.handle_requests().map(|_| ())))
}
}
#[cfg(test)]
mod tests {
use fidl::endpoints::create_endpoints;
use fidl_fuchsia_ledger_cloud::{
DeviceSetMarker, DeviceSetWatcherMarker, DeviceSetWatcherRequest, PageCloudMarker,
PageCloudWatcherMarker, PageCloudWatcherRequest, Status,
};
use fuchsia_async as fasync;
use pin_utils::pin_mut;
use std::rc::Rc;
use super::*;
#[test]
fn page_cloud_disconnection() {
let mut exec = fasync::Executor::new().unwrap();
let (client, server) = create_endpoints::<PageCloudMarker>().unwrap();
let stream = server.into_stream().unwrap();
let server_state = Rc::new(CloudSessionShared::new(Rc::new(RefCell::new(Cloud::new()))));
let server_fut =
PageSession::new(Rc::clone(&server_state), PageId(vec![], vec![]), stream).run();
fasync::spawn_local(server_fut);
let waiting_on_watcher = Rc::new(Cell::new(false));
let waiting_on_watcher_clone = Rc::clone(&waiting_on_watcher);
let proxy = client.into_proxy().unwrap();
let client_fut = async move {
let (status, _, _) = await!(proxy.get_commits(None)).unwrap();
assert_eq!(status, Status::Ok);
let (watcher_client, watcher_server) =
create_endpoints::<PageCloudWatcherMarker>().unwrap();
let status = await!(proxy.set_watcher(None, watcher_client)).unwrap();
assert_eq!(status, Status::Ok);
// The watcher will stay still until the cloud provider gets disconnected.
let mut watcher_stream = watcher_server.into_stream().unwrap();
waiting_on_watcher_clone.set(true);
let message = await!(watcher_stream.try_next()).unwrap();
match message {
Some(PageCloudWatcherRequest::OnError { status: Status::NetworkError, .. }) => {}
_ => assert!(false),
};
waiting_on_watcher_clone.set(false);
let message = await!(watcher_stream.try_next()).unwrap();
assert!(message.is_none());
// Requests return NetworkError.
let (status, _, _) = await!(proxy.get_commits(None)).unwrap();
assert_eq!(status, Status::NetworkError);
let (watcher_client, watcher_server) =
create_endpoints::<PageCloudWatcherMarker>().unwrap();
let status = await!(proxy.set_watcher(None, watcher_client)).unwrap();
assert_eq!(status, Status::Ok);
// Setting a watcher returns Ok, but the watcher is immediately closed with a network error.
let mut watcher_stream = watcher_server.into_stream().unwrap();
let message = await!(watcher_stream.try_next()).unwrap();
match message {
Some(PageCloudWatcherRequest::OnError { status: Status::NetworkError, .. }) => {}
_ => assert!(false),
};
let message = await!(watcher_stream.try_next()).unwrap();
assert!(message.is_none());
};
pin_mut!(client_fut);
assert!(exec.run_until_stalled(&mut client_fut).is_pending());
assert!(waiting_on_watcher.get());
server_state.set_network_state(ConnectionState::Disconnected);
assert!(exec.run_until_stalled(&mut client_fut).is_ready());
}
#[test]
fn device_set_disconnection() {
let mut exec = fasync::Executor::new().unwrap();
let (client, server) = create_endpoints::<DeviceSetMarker>().unwrap();
let stream = server.into_stream().unwrap();
let server_state = Rc::new(CloudSessionShared::new(Rc::new(RefCell::new(Cloud::new()))));
let server_fut = DeviceSetSession::new(Rc::clone(&server_state), stream).run();
fasync::spawn_local(server_fut);
let waiting_on_watcher = Rc::new(Cell::new(false));
let waiting_on_watcher_clone = Rc::clone(&waiting_on_watcher);
let proxy = client.into_proxy().unwrap();
let client_fut = async move {
let fingerprint: Vec<u8> = vec![1, 2, 3];
let status =
await!(proxy.set_fingerprint(&mut fingerprint.clone().into_iter())).unwrap();
assert_eq!(status, Status::Ok);
let (watcher_client, watcher_server) =
create_endpoints::<DeviceSetWatcherMarker>().unwrap();
let status =
await!(proxy.set_watcher(&mut fingerprint.clone().into_iter(), watcher_client))
.unwrap();
assert_eq!(status, Status::Ok);
// The watcher will stay still until the cloud provider gets disconnected.
let mut watcher_stream = watcher_server.into_stream().unwrap();
waiting_on_watcher_clone.set(true);
let message = await!(watcher_stream.try_next()).unwrap();
match message {
Some(DeviceSetWatcherRequest::OnError { status: Status::NetworkError, .. }) => {}
_ => assert!(false),
};
waiting_on_watcher_clone.set(false);
let message = await!(watcher_stream.try_next()).unwrap();
assert!(message.is_none());
// Requests return NetworkError.
let status =
await!(proxy.set_fingerprint(&mut fingerprint.clone().into_iter())).unwrap();
assert_eq!(status, Status::NetworkError);
let (watcher_client, watcher_server) =
create_endpoints::<DeviceSetWatcherMarker>().unwrap();
let status =
await!(proxy.set_watcher(&mut fingerprint.clone().into_iter(), watcher_client))
.unwrap();
assert_eq!(status, Status::NetworkError);
// The watcher also gets NetworkError.
let mut watcher_stream = watcher_server.into_stream().unwrap();
let message = await!(watcher_stream.try_next()).unwrap();
match message {
Some(DeviceSetWatcherRequest::OnError { status: Status::NetworkError, .. }) => {}
_ => assert!(false),
};
let message = await!(watcher_stream.try_next()).unwrap();
assert!(message.is_none());
};
pin_mut!(client_fut);
assert!(exec.run_until_stalled(&mut client_fut).is_pending());
assert!(waiting_on_watcher.get());
server_state.set_network_state(ConnectionState::Disconnected);
assert!(exec.run_until_stalled(&mut client_fut).is_ready());
}
}