blob: 72e8dc771fe05b3f3a882b5946eb30723421d912 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fuchsia_async as fasync;
use failure::{err_msg, Error};
use fidl::encoding::OutOfLine;
use fidl::endpoints::{RequestStream, ServerEnd};
use fidl_fuchsia_stash::{StoreAccessorMarker, StoreAccessorRequest, StoreAccessorRequestStream};
use fuchsia_syslog::fx_log_err;
use futures::lock::Mutex;
use futures::{TryFutureExt, TryStreamExt};
use std::sync::Arc;
use crate::accessor;
use crate::store;
/// Instance represents a single instance of the stash service, handling requests from a single
/// client.
#[derive(Clone)]
pub struct Instance {
pub client_name: Option<String>,
pub enable_bytes: bool,
pub store_manager: Arc<Mutex<store::StoreManager>>,
}
impl Instance {
/// identify must be called once at initial connection, to establish which namespace in the
/// store to use. Right now this is honor-system based for preventing clients from accessing
/// each other's data, but once component monikers are implemented they will take the place of
/// this function call.
pub fn identify(&mut self, name: String) -> Result<(), Error> {
if let Some(name) = self.client_name.as_ref() {
return Err(err_msg(format!("client attempted to identify twice: {}", name)));
}
self.client_name = Some(name);
Ok(())
}
/// creates a new accessor for interacting with the store.
pub fn create_accessor(
&mut self,
read_only: bool,
server_end: ServerEnd<StoreAccessorMarker>,
) -> Result<(), Error> {
let mut acc = accessor::Accessor::new(
self.store_manager.clone(),
self.enable_bytes,
read_only,
self.client_name.clone().ok_or(err_msg("identify has not been called"))?,
);
let server_chan = fasync::Channel::from_channel(server_end.into_channel())?;
fasync::spawn(
async move {
let mut stream = StoreAccessorRequestStream::from_channel(server_chan);
while let Some(req) = await!(stream.try_next())? {
match req {
StoreAccessorRequest::GetValue { key, responder } => {
let mut res = await!(acc.get_value(&key))?;
responder.send(res.as_mut().map(OutOfLine))?;
}
StoreAccessorRequest::SetValue { key, val, .. } => {
await!(acc.set_value(key, val))?
}
StoreAccessorRequest::DeleteValue { key, .. } => {
await!(acc.delete_value(key))?
}
StoreAccessorRequest::ListPrefix { prefix, it, .. } => {
await!(acc.list_prefix(prefix, it))
}
StoreAccessorRequest::GetPrefix { prefix, it, .. } => {
await!(acc.get_prefix(prefix, it))?
}
StoreAccessorRequest::DeletePrefix { prefix, .. } => {
await!(acc.delete_prefix(prefix))?
}
StoreAccessorRequest::Commit { .. } => await!(acc.commit())?,
}
}
Ok(())
}
.unwrap_or_else(|e: failure::Error| {
fx_log_err!("error running accessor interface: {:?}", e)
}),
);
Ok(())
}
}