blob: 0302e7028970047c9f1f7c1607bd55b44fd385ca [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fuchsia_async as fasync;
use fuchsia_syslog::fx_log_err;
use failure::{err_msg, Error};
use fidl::encoding::OutOfLine;
use fidl::endpoints::{RequestStream, ServerEnd};
use futures::{TryFutureExt, TryStreamExt};
use parking_lot::Mutex;
use std::sync::Arc;
use fidl_fuchsia_stash::{StoreAccessorMarker, StoreAccessorRequest, StoreAccessorRequestStream};
use crate::accessor;
use crate::store;
macro_rules! shutdown_on_error {
($x:expr, $y:expr) => {
match $x {
Ok(_) => (),
Err(e) => {
fx_log_err!("error encountered: {:?}", e);
$y.shutdown();
}
}
};
}
/// Instance represents a single instance of the stash service, handling requests from a single
/// client.
#[derive(Clone)]
pub struct Instance {
pub client_name: Option<String>,
pub enable_bytes: bool,
pub store_manager: Arc<Mutex<store::StoreManager>>,
}
impl Instance {
/// identify must be called once at initial connection, to establish which namespace in the
/// store to use. Right now this is honor-system based for preventing clients from accessing
/// each other's data, but once component monikers are implemented they will take the place of
/// this function call.
pub fn identify(&mut self, name: String) -> Result<(), Error> {
if let Some(name) = self.client_name.as_ref() {
return Err(err_msg(format!("client attempted to identify twice: {}", name)));
}
self.client_name = Some(name);
Ok(())
}
/// creates a new accessor for interacting with the store.
pub fn create_accessor(
&mut self,
read_only: bool,
server_end: ServerEnd<StoreAccessorMarker>,
) -> Result<(), Error> {
let mut acc = accessor::Accessor::new(
self.store_manager.clone(),
self.enable_bytes,
read_only,
self.client_name
.clone()
.ok_or(err_msg("identify has not been called"))?,
);
let server_chan = fasync::Channel::from_channel(server_end.into_channel())?;
fasync::spawn(async move {
let mut stream = StoreAccessorRequestStream::from_channel(server_chan);
while let Some(req) = await!(stream.try_next())? {
match req {
StoreAccessorRequest::GetValue { key, responder } => {
match acc.get_value(&key) {
Ok(mut res) => responder.send(res.as_mut().map(OutOfLine))?,
Err(e) => {
fx_log_err!("error encountered: {:?}", e);
responder.control_handle().shutdown();
}
}
}
StoreAccessorRequest::SetValue {
key,
val,
control_handle,
} => shutdown_on_error!(acc.set_value(key, val), control_handle),
StoreAccessorRequest::DeleteValue {
key,
control_handle,
} => shutdown_on_error!(acc.delete_value(key), control_handle),
StoreAccessorRequest::ListPrefix {
prefix,
it,
control_handle: _,
} => acc.list_prefix(prefix, it),
StoreAccessorRequest::GetPrefix {
prefix,
it,
control_handle,
} => shutdown_on_error!(acc.get_prefix(prefix, it), control_handle),
StoreAccessorRequest::DeletePrefix {
prefix,
control_handle,
} => shutdown_on_error!(acc.delete_prefix(prefix), control_handle),
StoreAccessorRequest::Commit {
control_handle
} => shutdown_on_error!(acc.commit(), control_handle),
}
}
Ok(())
}.unwrap_or_else(|e: failure::Error| fx_log_err!("error running accessor interface: {:?}", e)));
Ok(())
}
}