blob: 327d9591b552690b3b66892190fab3781e811e41 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{anyhow, Context as _, Error},
cobalt_client::traits::AsEventCode as _,
cobalt_sw_delivery_registry as metrics,
fidl_fuchsia_pkg::{LocalMirrorMarker, LocalMirrorProxy, PackageCacheMarker},
fuchsia_async as fasync,
fuchsia_cobalt::{CobaltConnector, CobaltSender, ConnectionType},
fuchsia_component::{client::connect_to_service, server::ServiceFs},
fuchsia_inspect as inspect,
fuchsia_syslog::{self, fx_log_err, fx_log_info},
fuchsia_trace as trace,
futures::{prelude::*, stream::FuturesUnordered},
itertools::Itertools as _,
parking_lot::RwLock,
std::{
io,
sync::Arc,
time::{Duration, Instant},
},
system_image::CachePackages,
};
mod args;
mod cache;
mod clock;
mod config;
mod error;
mod experiment;
mod font_package_manager;
mod inspect_util;
mod metrics_util;
mod ota_channel;
mod queue;
mod repository;
mod repository_manager;
mod repository_service;
mod resolver_service;
mod rewrite_manager;
mod rewrite_service;
#[cfg(test)]
mod test_util;
use crate::{
args::Args,
cache::PackageCache,
config::Config,
experiment::Experiments,
font_package_manager::{FontPackageManager, FontPackageManagerBuilder},
ota_channel::ChannelInspectState,
repository_manager::{RepositoryManager, RepositoryManagerBuilder},
repository_service::RepositoryService,
resolver_service::ResolverServiceInspectState,
rewrite_manager::{RewriteManager, RewriteManagerBuilder},
rewrite_service::RewriteService,
};
// FIXME: allow for multiple threads and sendable futures once repo updates support it.
// FIXME(43342): trace durations assume they start and end on the same thread, but since the
// package resolver's executor is multi-threaded, a trace duration that includes an 'await' may not
// end on the same thread it starts on, resulting in invalid trace events.
// const SERVER_THREADS: usize = 2;
// TODO(61367): increase the concurrency limit back to 5 once blobfs memory usage during OTAs is
// under control
const MAX_CONCURRENT_BLOB_FETCHES: usize = 1;
const MAX_CONCURRENT_PACKAGE_FETCHES: usize = 5;
// Each fetch_blob call emits an event, and a system update fetches about 1,000 blobs in about a
// minute.
const COBALT_CONNECTOR_BUFFER_SIZE: usize = 1000;
const STATIC_REPO_DIR: &str = "/config/data/repositories";
const DYNAMIC_REPO_PATH: &str = "/data/repositories.json";
const STATIC_RULES_PATH: &str = "/config/data/rewrites.json";
const DYNAMIC_RULES_PATH: &str = "/data/rewrites.json";
const STATIC_FONT_REGISTRY_PATH: &str = "/config/data/font_packages.json";
// Repository size is currently 100 KB. Allowing for 10x growth and assuming a
// 4,096 B/s minimum bandwidth (the default minimum bandwidth used by rust-tuf
// HttpRepository) results in a duration of (10 * 100,000 B) / (4,096 B/s) = 244 seconds.
// Round to the minute boundary to make it more clear when reconstructing logs
// that there is a designed timeout involved.
// TODO(fxbug.dev/62300) replace with granular deadlines in rust-tuf.
const DEFAULT_TUF_METADATA_DEADLINE: Duration = Duration::from_secs(240);
const DEFAULT_BLOB_NETWORK_DEADLINE: Duration = Duration::from_secs(30);
pub fn main() -> Result<(), Error> {
let startup_time = Instant::now();
fuchsia_syslog::init_with_tags(&["pkg-resolver"]).expect("can't init logger");
fuchsia_trace_provider::trace_provider_create_with_fdio();
fx_log_info!("starting package resolver");
let mut executor = fasync::Executor::new().context("error creating executor")?;
executor.run_singlethreaded(main_inner_async(startup_time, argh::from_env()))
}
async fn main_inner_async(startup_time: Instant, args: Args) -> Result<(), Error> {
let config = Config::load_from_config_data_or_default();
let pkg_cache =
connect_to_service::<PackageCacheMarker>().context("error connecting to package cache")?;
let local_mirror = if args.allow_local_mirror {
Some(
connect_to_service::<LocalMirrorMarker>()
.context("error connecting to local mirror")?,
)
} else {
None
};
let pkgfs_install = pkgfs::install::Client::open_from_namespace()
.context("error connecting to pkgfs/install")?;
let pkgfs_needs =
pkgfs::needs::Client::open_from_namespace().context("error connecting to pkgfs/needs")?;
let cache = PackageCache::new(pkg_cache, pkgfs_install, pkgfs_needs);
let base_package_index =
Arc::new(cache.base_package_index().await.context("failed to load base package index")?);
// The list of cache packages from the system image, not to be confused with the PackageCache.
let system_cache_list = Arc::new(load_system_cache_list().await);
let inspector = fuchsia_inspect::Inspector::new();
let channel_inspect_state =
ChannelInspectState::new(inspector.root().create_child("omaha_channel"));
let experiment_state = experiment::State::new(inspector.root().create_child("experiments"));
let experiment_state = Arc::new(RwLock::new(experiment_state));
let experiments = Arc::clone(&experiment_state).into();
let futures = FuturesUnordered::new();
let (mut cobalt_sender, cobalt_fut) =
CobaltConnector { buffer_size: COBALT_CONNECTOR_BUFFER_SIZE }
.serve(ConnectionType::project_id(metrics::PROJECT_ID));
futures.push(cobalt_fut.boxed_local());
let font_package_manager = Arc::new(load_font_package_manager(cobalt_sender.clone()));
let repo_manager = Arc::new(RwLock::new(load_repo_manager(
inspector.root().create_child("repository_manager"),
experiments,
&config,
cobalt_sender.clone(),
local_mirror.clone(),
args.tuf_metadata_deadline_seconds,
)));
let rewrite_manager = Arc::new(RwLock::new(
load_rewrite_manager(
inspector.root().create_child("rewrite_manager"),
Arc::clone(&repo_manager),
&config,
&channel_inspect_state,
cobalt_sender.clone(),
)
.await,
));
let (blob_fetch_queue, blob_fetcher) = crate::cache::make_blob_fetch_queue(
inspector.root().create_child("blob_fetcher"),
cache.clone(),
MAX_CONCURRENT_BLOB_FETCHES,
repo_manager.read().stats(),
cobalt_sender.clone(),
local_mirror,
args.blob_network_deadline_seconds,
);
futures.push(blob_fetch_queue.boxed_local());
let resolver_service_inspect_state = Arc::new(ResolverServiceInspectState::from_node(
inspector.root().create_child("resolver_service"),
));
let (package_fetch_queue, package_fetcher) = resolver_service::make_package_fetch_queue(
cache.clone(),
Arc::clone(&base_package_index),
Arc::clone(&system_cache_list),
Arc::clone(&repo_manager),
Arc::clone(&rewrite_manager),
blob_fetcher,
MAX_CONCURRENT_PACKAGE_FETCHES,
Arc::clone(&resolver_service_inspect_state),
);
futures.push(package_fetch_queue.boxed_local());
let package_fetcher = Arc::new(package_fetcher);
let resolver_cb = {
let cache = cache.clone();
let repo_manager = Arc::clone(&repo_manager);
let rewrite_manager = Arc::clone(&rewrite_manager);
let package_fetcher = Arc::clone(&package_fetcher);
let base_package_index = Arc::clone(&base_package_index);
let system_cache_list = Arc::clone(&system_cache_list);
let cobalt_sender = cobalt_sender.clone();
let resolver_service_inspect = Arc::clone(&resolver_service_inspect_state);
move |stream| {
fasync::Task::local(
resolver_service::run_resolver_service(
cache.clone(),
Arc::clone(&repo_manager),
Arc::clone(&rewrite_manager),
Arc::clone(&package_fetcher),
Arc::clone(&base_package_index),
Arc::clone(&system_cache_list),
stream,
cobalt_sender.clone(),
Arc::clone(&resolver_service_inspect),
)
.unwrap_or_else(|e| fx_log_err!("failed to spawn_local {:#}", anyhow!(e))),
)
.detach()
}
};
let font_resolver_fb = {
let cache = cache.clone();
let package_fetcher = Arc::clone(&package_fetcher);
let cobalt_sender = cobalt_sender.clone();
move |stream| {
fasync::Task::local(
resolver_service::run_font_resolver_service(
Arc::clone(&font_package_manager),
cache.clone(),
Arc::clone(&package_fetcher),
stream,
cobalt_sender.clone(),
)
.unwrap_or_else(|e| {
fx_log_err!("Failed to spawn_local font_resolver_service {:#}", anyhow!(e))
}),
)
.detach()
}
};
let repo_cb = move |stream| {
let repo_manager = Arc::clone(&repo_manager);
fasync::Task::local(
async move {
let mut repo_service = RepositoryService::new(repo_manager);
repo_service.run(stream).await
}
.unwrap_or_else(|e| fx_log_err!("error encountered: {:#}", anyhow!(e))),
)
.detach()
};
let rewrite_cb = move |stream| {
let mut rewrite_service = RewriteService::new(Arc::clone(&rewrite_manager));
fasync::Task::local(
async move { rewrite_service.handle_client(stream).await }
.unwrap_or_else(|e| fx_log_err!("while handling rewrite client {:#}", anyhow!(e))),
)
.detach()
};
let mut fs = ServiceFs::new();
fs.dir("svc")
.add_fidl_service(resolver_cb)
.add_fidl_service(font_resolver_fb)
.add_fidl_service(repo_cb)
.add_fidl_service(rewrite_cb);
inspector.serve(&mut fs)?;
fs.take_and_serve_directory_handle()?;
futures.push(fs.collect().boxed_local());
cobalt_sender.log_elapsed_time(
metrics::PKG_RESOLVER_STARTUP_DURATION_METRIC_ID,
0,
Instant::now().duration_since(startup_time).as_micros() as i64,
);
trace::instant!("app", "startup", trace::Scope::Process);
futures.collect::<()>().await;
Ok(())
}
fn load_repo_manager(
node: inspect::Node,
experiments: Experiments,
config: &Config,
mut cobalt_sender: CobaltSender,
local_mirror: Option<LocalMirrorProxy>,
tuf_metadata_deadline: Duration,
) -> RepositoryManager {
// report any errors we saw, but don't error out because otherwise we won't be able
// to update the system.
let dynamic_repo_path =
if config.enable_dynamic_configuration() { Some(DYNAMIC_REPO_PATH) } else { None };
match RepositoryManagerBuilder::new(dynamic_repo_path, experiments)
.unwrap_or_else(|(builder, err)| {
fx_log_err!("error loading dynamic repo config: {:#}", anyhow!(err));
builder
})
.tuf_metadata_deadline(tuf_metadata_deadline)
.with_local_mirror(local_mirror)
.inspect_node(node)
.load_static_configs_dir(STATIC_REPO_DIR)
{
Ok(builder) => {
cobalt_sender.log_event_count(
metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_METRIC_ID,
metrics::RepositoryManagerLoadStaticConfigsMetricDimensionResult::Success
.as_event_code(),
0,
1
);
builder
}
Err((builder, errs)) => {
for err in errs {
let dimension_result: metrics::RepositoryManagerLoadStaticConfigsMetricDimensionResult
= (&err).into();
cobalt_sender.log_event_count(
metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_METRIC_ID,
dimension_result.as_event_code(),
0,
1
);
match &err {
crate::repository_manager::LoadError::Io { path: _, error }
if error.kind() == io::ErrorKind::NotFound =>
{
fx_log_info!("no statically configured repositories present");
}
_ => fx_log_err!("error loading static repo config: {:#}", anyhow!(err)),
};
}
builder
}
}.cobalt_sender(cobalt_sender)
.build()
}
async fn load_rewrite_manager(
node: inspect::Node,
repo_manager: Arc<RwLock<RepositoryManager>>,
config: &Config,
channel_inspect_state: &ChannelInspectState,
cobalt_sender: CobaltSender,
) -> RewriteManager {
let dynamic_rules_path =
if config.enable_dynamic_configuration() { Some(DYNAMIC_RULES_PATH) } else { None };
let builder = RewriteManagerBuilder::new(dynamic_rules_path)
.unwrap_or_else(|(builder, err)| {
if err.kind() != io::ErrorKind::NotFound {
fx_log_err!(
"unable to load dynamic rewrite rules from disk, using defaults: {:#}",
anyhow!(err)
);
}
builder
})
.inspect_node(node)
.static_rules_path(STATIC_RULES_PATH)
.unwrap_or_else(|(builder, err)| {
if err.kind() != io::ErrorKind::NotFound {
fx_log_err!("unable to load static rewrite rules from disk: {:#}", anyhow!(err));
}
builder
});
// If we have a channel in vbmeta or sysconfig, we don't want to load the dynamic
// configs. Instead, we'll construct a unique rule for that channel.
match crate::ota_channel::create_rewrite_rule_for_ota_channel(
&channel_inspect_state,
&repo_manager.read(),
cobalt_sender.clone(),
)
.await
{
Ok(Some(rule)) => {
fx_log_info!("Created rewrite rule for ota channel: {:?}", rule);
builder.replace_dynamic_rules(vec![rule]).build()
}
Ok(None) => {
fx_log_info!("No ota channel present, so not creating rewrite rule.");
builder.build()
}
Err(err) => {
fx_log_err!("Failed to create rewrite rule for ota channel with error, falling back to defaults. {:#}", anyhow!(err));
builder.build()
}
}
}
fn load_font_package_manager(mut cobalt_sender: CobaltSender) -> FontPackageManager {
match FontPackageManagerBuilder::new().add_registry_file(STATIC_FONT_REGISTRY_PATH) {
Ok(builder) => {
cobalt_sender.log_event_count(
metrics::FONT_MANAGER_LOAD_STATIC_REGISTRY_METRIC_ID,
metrics::FontManagerLoadStaticRegistryMetricDimensionResult::Success
.as_event_code(),
0,
1
);
builder
}
Err((builder, errs)) => {
let err_str = format!("{:#}", errs
.into_iter()
.filter_map(|err| {
let dimension_result: metrics::FontManagerLoadStaticRegistryMetricDimensionResult = (&err).into();
cobalt_sender.log_event_count(
metrics::FONT_MANAGER_LOAD_STATIC_REGISTRY_METRIC_ID,
dimension_result.as_event_code(),
0,
1
);
if err.is_not_found() {
fx_log_info!("no font package registry present: {:#}", anyhow!(err));
None
} else {
Some(anyhow!(err))
}
})
.format("; "));
if !err_str.is_empty() {
fx_log_err!("error(s) loading font package registry: {}", err_str);
}
builder
}
}
.build()
}
// Read in the list of cache_packages from /system/data/cache_packages.
// If we can't load it for some reason, return an empty cache.
async fn load_system_cache_list() -> system_image::CachePackages {
let system_image = pkgfs::system::Client::open_from_namespace();
// Default to empty cache.
let empty = CachePackages::from_entries(vec![]);
let system_image = match system_image {
Ok(s) => s,
Err(e) => {
fx_log_err!("failed to open system image: {:#}", anyhow!(e));
return empty;
}
};
let cache_file = system_image.open_file("data/cache_packages").await;
let cache_file = match cache_file {
Ok(f) => f,
Err(e) => {
fx_log_err!("failed to open data/cache_packages: {:#}", anyhow!(e));
return empty;
}
};
let cache_list = CachePackages::deserialize(cache_file);
match cache_list {
Ok(cl) => cl,
Err(e) => {
fx_log_err!("error opening package cache: {:#}", anyhow!(e));
empty
}
}
}