blob: 9bcfdfb4b37b33e829e56800ddd6749066e12f67 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::rewrite_manager::RewriteManager,
failure::{Error, ResultExt},
fidl::endpoints::ServerEnd,
fidl_fuchsia_amber::{self, ControlMarker as AmberMarker, ControlProxy as AmberProxy},
fidl_fuchsia_io::{self, DirectoryMarker},
fidl_fuchsia_pkg::{
PackageCacheProxy, PackageResolverRequest, PackageResolverRequestStream, UpdatePolicy,
},
fidl_fuchsia_pkg_ext::BlobId,
fuchsia_async as fasync,
fuchsia_component::client::connect_to_service,
fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn},
fuchsia_uri::pkg_uri::PkgUri,
fuchsia_zircon::{Channel, MessageBuf, Signals, Status},
futures::prelude::*,
lazy_static::lazy_static,
log::{info, warn},
parking_lot::RwLock,
regex::Regex,
std::sync::Arc,
};
lazy_static! {
// The error amber returns if it could not find the merkle for this package.
static ref PACKAGE_NOT_FOUND_RE: Regex =
Regex::new("^merkle not found for package ").unwrap();
// The error amber returns if it could resolve a merkle for this package, but it couldn't
// download the package.
static ref UNAVAILABLE_RE: Regex =
Regex::new("^not found in \\d+ active sources").unwrap();
}
pub async fn run_resolver_service(
rewrites: Arc<RwLock<RewriteManager>>,
mut amber: AmberProxy,
cache: PackageCacheProxy,
mut stream: PackageResolverRequestStream,
) -> Result<(), Error> {
let mut should_reconnect = false;
while let Some(event) = await!(stream.try_next())? {
let PackageResolverRequest::Resolve {
package_uri,
selectors,
update_policy,
dir,
responder,
} = event;
if should_reconnect {
info!("Reconnecting to amber");
amber = connect_to_service::<AmberMarker>().context("error connecting to amber")?;
should_reconnect = false;
}
let status =
await!(resolve(&rewrites, &amber, &cache, package_uri, selectors, update_policy, dir));
// TODO this is an overbroad error type for this, make it more accurate
if let Err(Status::INTERNAL) = &status {
warn!("Resolution had an internal error, will reconnect to amber on next request.");
should_reconnect = true;
}
responder.send(Status::from(status).into_raw())?;
}
Ok(())
}
/// Resolve the package.
///
/// FIXME: at the moment, we are proxying to Amber to resolve a package name and variant to a
/// merkleroot. Because of this, we cant' implement the update policy, so we just ignore it.
async fn resolve<'a>(
rewrites: &'a Arc<RwLock<RewriteManager>>,
amber: &'a AmberProxy,
cache: &'a PackageCacheProxy,
pkg_uri: String,
selectors: Vec<String>,
_update_policy: UpdatePolicy,
dir_request: ServerEnd<DirectoryMarker>,
) -> Result<(), Status> {
let uri = PkgUri::parse(&pkg_uri).map_err(|err| {
fx_log_err!("failed to parse package uri {:?}: {}", pkg_uri, err);
Err(Status::INVALID_ARGS)
})?;
let was_fuchsia_host = uri.host() == "fuchsia.com";
let uri = rewrites.read().rewrite(uri);
// FIXME: at the moment only the fuchsia.com host is supported.
if !was_fuchsia_host && uri.host() != "fuchsia.com" {
fx_log_err!("package uri's host is currently unsupported: {}", uri);
return Err(Status::INVALID_ARGS);
}
// While the fuchsia-pkg:// spec doesn't require a package name, we do.
let name = uri.name().ok_or_else(|| {
fx_log_err!("package uri is missing a package name: {}", uri);
Err(Status::INVALID_ARGS)
})?;
// While the fuchsia-pkg:// spec allows resource paths, the package resolver should not be
// given one.
if uri.resource().is_some() {
fx_log_err!("package uri should not contain a resource name: {}", uri);
return Err(Status::INVALID_ARGS);
}
// FIXME: need to implement selectors.
if !selectors.is_empty() {
fx_log_warn!("resolve does not support selectors yet");
}
// FIXME: use the package cache to fetch the package instead of amber.
// Ask amber to cache the package.
let chan = await!(amber.get_update_complete(&name, uri.variant(), uri.package_hash()))
.map_err(|err| {
fx_log_err!("error communicating with amber: {:?}", err);
Status::INTERNAL
})?;
let merkle = await!(wait_for_update_to_complete(chan, &uri)).map_err(|err| {
fx_log_err!("error when waiting for amber to complete: {:?}", err);
err
})?;
fx_log_info!(
"resolved {} as {} with the selectors {:?} to {}",
pkg_uri,
uri,
selectors,
merkle
);
await!(cache.open(&mut merkle.into(), &mut selectors.iter().map(|s| s.as_str()), dir_request))
.map_err(|err| {
fx_log_err!("error opening {}: {:?}", merkle, err);
Status::INTERNAL
})?;
Ok(())
}
async fn wait_for_update_to_complete(chan: Channel, uri: &PkgUri) -> Result<BlobId, Status> {
let mut buf = MessageBuf::new();
let sigs = await!(fasync::OnSignals::new(
&chan,
Signals::CHANNEL_PEER_CLOSED | Signals::CHANNEL_READABLE
))?;
if sigs.contains(Signals::CHANNEL_READABLE) {
chan.read(&mut buf)?;
let buf = buf.split().0;
if sigs.contains(Signals::USER_0) {
let msg = String::from_utf8_lossy(&buf);
if PACKAGE_NOT_FOUND_RE.is_match(&msg) {
fx_log_info!("package {} was not found: {}", uri, msg);
return Err(Status::NOT_FOUND);
}
if UNAVAILABLE_RE.is_match(&msg) {
fx_log_info!("package {} is currently unavailable: {}", uri, msg);
return Err(Status::UNAVAILABLE);
}
fx_log_err!("error installing package {}: {}", uri, msg);
return Err(Status::INTERNAL);
}
let merkle = match String::from_utf8(buf) {
Ok(merkle) => merkle,
Err(err) => {
let merkle = String::from_utf8_lossy(err.as_bytes());
fx_log_err!("{:?} is not a valid UTF-8 encoded merkleroot: {:?}", merkle, err);
return Err(Status::INTERNAL);
}
};
let merkle = match merkle.parse() {
Ok(merkle) => merkle,
Err(err) => {
fx_log_err!("{:?} is not a valid merkleroot: {:?}", merkle, err);
return Err(Status::INTERNAL);
}
};
Ok(merkle)
} else {
fx_log_err!("response channel closed unexpectedly");
Err(Status::INTERNAL)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rewrite_manager::{tests::make_rule_config, RewriteManagerBuilder};
use failure::Error;
use fidl::endpoints::{self, ServerEnd};
use fidl_fuchsia_amber::ControlRequest;
use fidl_fuchsia_io::DirectoryProxy;
use fidl_fuchsia_pkg::{self, PackageCacheProxy, PackageCacheRequest, UpdatePolicy};
use files_async;
use fuchsia_async as fasync;
use fuchsia_zircon::{Channel, Peered, Signals, Status};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fs::{self, File};
use std::io;
use std::path::Path;
use std::rc::Rc;
use std::str;
use tempfile::TempDir;
struct Package {
name: String,
variant: String,
merkle: String,
kind: PackageKind,
}
enum PackageKind {
Ok,
Error(String),
}
impl Package {
fn new(name: &str, variant: &str, merkle: &str, kind: PackageKind) -> Self {
Self {
name: name.to_string(),
variant: variant.to_string(),
merkle: merkle.to_string(),
kind: kind,
}
}
}
struct MockAmber {
packages: HashMap<(String, String), Package>,
pkgfs: Rc<TempDir>,
channels: Vec<Channel>,
}
impl MockAmber {
fn new(packages: Vec<Package>, pkgfs: Rc<TempDir>) -> MockAmber {
let mut package_map = HashMap::new();
for package in packages {
package_map.insert((package.name.clone(), package.variant.clone()), package);
}
MockAmber { packages: package_map, pkgfs, channels: vec![] }
}
fn get_update_complete(&mut self, req: ControlRequest) -> Result<(), Error> {
match req {
ControlRequest::GetUpdateComplete { name, version, responder, .. } => {
let (s, c) = Channel::create().unwrap();
let mut handles = vec![];
let variant = version.unwrap_or_else(|| "0".to_string());
if let Some(package) = self.packages.get(&(name, variant)) {
match package.kind {
PackageKind::Ok => {
// Create blob dir with a single file.
let blob_path = self.pkgfs.path().join(&package.merkle);
if let Err(e) = fs::create_dir(&blob_path) {
if e.kind() != io::ErrorKind::AlreadyExists {
return Err(e.into());
}
}
let blob_file = blob_path.join(format!("{}_file", package.merkle));
fs::write(&blob_file, "hello")?;
s.write(package.merkle.as_bytes(), &mut handles)?;
}
PackageKind::Error(ref msg) => {
// Package not found, signal error.
s.signal_peer(Signals::NONE, Signals::USER_0)?;
s.write(msg.as_bytes(), &mut handles)?;
}
}
} else {
// Package not found, signal error.
s.signal_peer(Signals::NONE, Signals::USER_0)?;
s.write("merkle not found for package ".as_bytes(), &mut handles)?;
}
self.channels.push(s);
responder.send(c).expect("failed to send response");
}
_ => {}
}
Ok(())
}
}
struct MockPackageCache {
pkgfs: DirectoryProxy,
}
impl MockPackageCache {
fn new(pkgfs: Rc<TempDir>) -> Result<MockPackageCache, Error> {
let f = File::open(pkgfs.path())?;
let pkgfs =
DirectoryProxy::new(fasync::Channel::from_channel(fdio::clone_channel(&f)?)?);
Ok(MockPackageCache { pkgfs })
}
fn open(&self, req: PackageCacheRequest) {
// Forward request to pkgfs directory.
// FIXME: this is a bit of a hack but there isn't a formal way to convert a Directory
// request into a Node request.
match req {
PackageCacheRequest::Open { meta_far_blob_id, dir, responder, .. } => {
let node_request = ServerEnd::new(dir.into_channel());
let flags =
fidl_fuchsia_io::OPEN_RIGHT_READABLE | fidl_fuchsia_io::OPEN_FLAG_DIRECTORY;
let merkle = BlobId::from(meta_far_blob_id.merkle_root).to_string();
let status = match self.pkgfs.open(flags, 0, &merkle, node_request) {
Ok(()) => Status::OK,
Err(e) => {
eprintln!("Cache lookup failed: {}", e);
Status::INTERNAL
}
};
responder.send(status.into_raw()).expect("failed to send response");
}
_ => {}
}
}
}
struct ResolveTest {
rewrite_manager: Arc<RwLock<RewriteManager>>,
amber_proxy: AmberProxy,
cache_proxy: PackageCacheProxy,
pkgfs: Rc<TempDir>,
}
impl ResolveTest {
fn new(
rewrite_manager: Arc<RwLock<RewriteManager>>,
packages: Vec<Package>,
) -> ResolveTest {
let pkgfs = Rc::new(TempDir::new().expect("failed to create tmp dir"));
let amber = Rc::new(RefCell::new(MockAmber::new(packages, pkgfs.clone())));
let amber_proxy: AmberProxy = endpoints::spawn_local_stream_handler(move |req| {
let amber = amber.clone();
async move {
amber.borrow_mut().get_update_complete(req).expect("amber failed");
}
})
.expect("failed to spawn handler");
let cache =
Rc::new(MockPackageCache::new(pkgfs.clone()).expect("failed to create cache"));
let cache_proxy: PackageCacheProxy =
endpoints::spawn_local_stream_handler(move |req| {
let cache = cache.clone();
async move {
cache.open(req);
}
})
.expect("failed to spawn handler");
ResolveTest { rewrite_manager, amber_proxy, cache_proxy, pkgfs }
}
fn check_dir(&self, dir_path: &Path, want_files: &Vec<String>) {
let mut files: Vec<String> = fs::read_dir(&dir_path)
.expect("could not read dir")
.into_iter()
.map(|entry| {
entry
.expect("get directory entry")
.file_name()
.to_str()
.expect("valid utf8")
.into()
})
.collect();
files.sort_unstable();
assert_eq!(&files, want_files);
}
async fn check_dir_async<'a>(
&'a self,
dir: &'a DirectoryProxy,
want_files: &'a Vec<String>,
) {
let entries = await!(files_async::readdir(dir)).expect("could not read dir");
let mut files: Vec<_> = entries.into_iter().map(|f| f.name).collect();
files.sort_unstable();
assert_eq!(&files, want_files);
}
async fn check_amber_update<'a>(
&'a self,
name: &'a str,
variant: Option<&'a str>,
merkle: Option<&'a str>,
expected_res: Result<String, Status>,
) {
let chan = await!(self.amber_proxy.get_update_complete(name, variant, merkle))
.expect("error communicating with amber");
let expected_res = expected_res.map(|r| r.parse().expect("could not parse blob"));
let path = match variant {
None => format!("/{}", name),
Some(variant) => format!("/{}/{}", name, variant),
};
let uri =
PkgUri::new_package("fuchsia.com".to_string(), path, merkle.map(|s| s.to_string()))
.unwrap();
let res = await!(wait_for_update_to_complete(chan, &uri));
assert_eq!(res, expected_res);
}
async fn run_resolve<'a>(
&'a self,
uri: &'a str,
expected_res: Result<Vec<String>, Status>,
) {
let selectors = vec![];
let update_policy = UpdatePolicy { fetch_if_absent: true, allow_old_versions: false };
let (package_dir_c, package_dir_s) = Channel::create().unwrap();
let res = await!(resolve(
&self.rewrite_manager,
&self.amber_proxy,
&self.cache_proxy,
uri.to_string(),
selectors,
update_policy,
ServerEnd::new(package_dir_s),
));
if res.is_ok() {
let expected_files = expected_res.as_ref().unwrap();
let dir_proxy =
DirectoryProxy::new(fasync::Channel::from_channel(package_dir_c).unwrap());
await!(self.check_dir_async(&dir_proxy, expected_files));
}
assert_eq!(res, expected_res.map(|_s| ()), "unexpected result for {}", uri);
}
}
fn gen_merkle(c: char) -> String {
(0..64).map(|_| c).collect()
}
fn gen_merkle_file(c: char) -> String {
format!("{}_file", gen_merkle(c))
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_mock_amber() {
let dynamic_rule_config = make_rule_config(vec![]);
let rewrite_manager = Arc::new(RwLock::new(
RewriteManagerBuilder::new(&dynamic_rule_config).unwrap().build(),
));
let packages = vec![
Package::new("foo", "0", &gen_merkle('a'), PackageKind::Ok),
Package::new("bar", "stable", &gen_merkle('b'), PackageKind::Ok),
Package::new("baz", "stable", &gen_merkle('c'), PackageKind::Ok),
Package::new("buz", "0", &gen_merkle('c'), PackageKind::Ok),
];
let test = ResolveTest::new(rewrite_manager, packages);
// Name
await!(test.check_amber_update("foo", None, None, Ok(gen_merkle('a'))));
// Name and variant
await!(test.check_amber_update("bar", Some("stable"), None, Ok(gen_merkle('b'))));
// Name, variant, and merkle
let merkle = gen_merkle('c');
await!(test.check_amber_update("baz", Some("stable"), Some(&merkle), Ok(gen_merkle('c'))));
// Nonexistent package
await!(test.check_amber_update("nonexistent", None, None, Err(Status::NOT_FOUND)));
// no merkle('d') since we didn't ask to update "buz".
test.check_dir(test.pkgfs.path(), &vec![gen_merkle('a'), gen_merkle('b'), gen_merkle('c')]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_resolve_package() {
let dynamic_rule_config = make_rule_config(vec![]);
let rewrite_manager = Arc::new(RwLock::new(
RewriteManagerBuilder::new(&dynamic_rule_config).unwrap().build(),
));
let packages = vec![
Package::new("foo", "0", &gen_merkle('a'), PackageKind::Ok),
Package::new("bar", "stable", &gen_merkle('b'), PackageKind::Ok),
];
let test = ResolveTest::new(rewrite_manager, packages);
// Package name
await!(test.run_resolve("fuchsia-pkg://fuchsia.com/foo", Ok(vec![gen_merkle_file('a')]),));
// Package name and variant
await!(test
.run_resolve("fuchsia-pkg://fuchsia.com/bar/stable", Ok(vec![gen_merkle_file('b')]),));
// Package name, variant, and merkle
let url = format!("fuchsia-pkg://fuchsia.com/bar/stable?hash={}", gen_merkle('b'));
await!(test.run_resolve(&url, Ok(vec![gen_merkle_file('b')],)));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_resolve_package_error() {
let dynamic_rule_config = make_rule_config(vec![]);
let rewrite_manager = Arc::new(RwLock::new(
RewriteManagerBuilder::new(&dynamic_rule_config).unwrap().build(),
));
let packages = vec![
Package::new("foo", "stable", &gen_merkle('a'), PackageKind::Ok),
Package::new(
"unavailable",
"0",
&gen_merkle('a'),
PackageKind::Error("not found in 1 active sources. last error: ".to_string()),
),
];
let test = ResolveTest::new(rewrite_manager, packages);
// Missing package
await!(test.run_resolve("fuchsia-pkg://fuchsia.com/foo/beta", Err(Status::NOT_FOUND)));
// Unavailable package
await!(
test.run_resolve("fuchsia-pkg://fuchsia.com/unavailable/0", Err(Status::UNAVAILABLE))
);
// Bad package URI
await!(test.run_resolve("fuchsia-pkg://fuchsia.com/foo!", Err(Status::INVALID_ARGS)));
// No package name
await!(test.run_resolve("fuchsia-pkg://fuchsia.com", Err(Status::INVALID_ARGS)));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_resolve_package_unknown_host() {
let rules = vec![fuchsia_uri_rewrite::Rule::new(
"example.com".to_owned(),
"fuchsia.com".to_owned(),
"/foo/".to_owned(),
"/foo/".to_owned(),
)
.unwrap()];
let dynamic_rule_config = make_rule_config(vec![]);
let rewrite_manager = Arc::new(RwLock::new(
RewriteManagerBuilder::new(&dynamic_rule_config).unwrap().static_rules(rules).build(),
));
let packages = vec![
Package::new("foo", "0", &gen_merkle('a'), PackageKind::Ok),
Package::new("bar", "stable", &gen_merkle('b'), PackageKind::Ok),
];
let test = ResolveTest::new(rewrite_manager, packages);
await!(test.run_resolve("fuchsia-pkg://example.com/foo/0", Ok(vec![gen_merkle_file('a')]),));
await!(test.run_resolve("fuchsia-pkg://fuchsia.com/foo/0", Ok(vec![gen_merkle_file('a')]),));
await!(test.run_resolve("fuchsia-pkg://example.com/bar/stable", Err(Status::INVALID_ARGS)));
await!(test
.run_resolve("fuchsia-pkg://fuchsia.com/bar/stable", Ok(vec![gen_merkle_file('b')]),));
}
}