blob: 64683afcd498caa0f6ffafb84ebe386666c9052b [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{clock, error, inspect_util, metrics_util::tuf_error_as_update_tuf_client_event_code},
anyhow::anyhow,
cobalt_sw_delivery_registry as metrics,
fidl_fuchsia_pkg_ext::MirrorConfig,
fuchsia_async::{self as fasync, TimeoutExt as _},
fuchsia_cobalt::CobaltSender,
fuchsia_inspect::{self as inspect, Property},
fuchsia_inspect_contrib::inspectable::InspectableDebugString,
fuchsia_syslog::{fx_log_err, fx_log_info},
fuchsia_zircon as zx,
futures::{
future::{AbortHandle, Abortable, FutureExt as _, TryFutureExt as _},
lock::Mutex as AsyncMutex,
stream::StreamExt,
},
http_uri_ext::HttpUriExt as _,
std::{
sync::{Arc, Weak},
time::Duration,
},
tuf::{
error::Error as TufError,
interchange::Json,
metadata::{TargetDescription, TargetPath},
repository::{RepositoryProvider, RepositoryStorageProvider},
},
};
pub struct UpdatingTufClient {
client: tuf::client::Client<
Json,
Box<dyn RepositoryStorageProvider<Json> + Send>,
Box<dyn RepositoryProvider<Json> + Send>,
tuf::client::DefaultTranslator,
>,
/// Time that this repository was last successfully checked for an update, or None if the
/// repository has never successfully fetched target metadata.
last_update_successfully_checked_time: InspectableDebugString<Option<zx::Time>>,
/// `Some` if there is an AutoClient task, dropping it stops the task.
auto_client_aborter: Option<AbortHandleOnDrop>,
tuf_metadata_timeout: Duration,
inspect: UpdatingTufClientInspectState,
cobalt_sender: CobaltSender,
}
struct AbortHandleOnDrop {
abort_handle: AbortHandle,
}
impl Drop for AbortHandleOnDrop {
fn drop(&mut self) {
self.abort_handle.abort();
}
}
impl From<AbortHandle> for AbortHandleOnDrop {
fn from(abort_handle: AbortHandle) -> Self {
Self { abort_handle }
}
}
struct UpdatingTufClientInspectState {
/// Count of the number of times this repository failed to check for an update.
update_check_failure_count: inspect_util::Counter,
/// Count of the number of times this repository was successfully checked for an update.
update_check_success_count: inspect_util::Counter,
/// Count of the number of times this repository was successfully updated.
updated_count: inspect_util::Counter,
/// Version of the active root file.
root_version: inspect::UintProperty,
/// Version of the active timestamp file, or -1 if unknown.
timestamp_version: inspect::IntProperty,
/// Version of the active snapshot file, or -1 if unknown.
snapshot_version: inspect::IntProperty,
/// Version of the active targets file, or -1 if unknown.
targets_version: inspect::IntProperty,
_node: inspect::Node,
}
/// Result of updating metadata if stale.
pub enum UpdateResult {
/// Metadata update was skipped because it was not stale.
Deferred,
/// Local metadata is up to date with the remote.
UpToDate,
/// Some metadata was updated.
Updated,
}
#[derive(Debug)]
pub struct RepoVersions {
root: u32,
timestamp: Option<u32>,
snapshot: Option<u32>,
targets: Option<u32>,
}
impl UpdatingTufClient {
pub fn from_tuf_client_and_mirror_config(
client: tuf::client::Client<
Json,
Box<dyn RepositoryStorageProvider<Json> + Send>,
Box<dyn RepositoryProvider<Json> + Send>,
tuf::client::DefaultTranslator,
>,
config: Option<&MirrorConfig>,
tuf_metadata_timeout: Duration,
node: inspect::Node,
cobalt_sender: CobaltSender,
) -> Arc<AsyncMutex<Self>> {
let (auto_client_aborter, auto_client_node_and_registration) =
if config.map_or(false, |c| c.subscribe()) {
let (aborter, registration) = AbortHandle::new_pair();
let auto_client_node = node.create_child("auto_client");
(Some(aborter.into()), Some((auto_client_node, registration, config.unwrap())))
} else {
(None, None)
};
let root_version = client.root_version();
let ret = Arc::new(AsyncMutex::new(Self {
client,
last_update_successfully_checked_time: InspectableDebugString::new(
None,
&node,
"last_update_successfully_checked_time",
),
auto_client_aborter,
tuf_metadata_timeout,
inspect: UpdatingTufClientInspectState {
update_check_failure_count: inspect_util::Counter::new(
&node,
"update_check_failure_count",
),
update_check_success_count: inspect_util::Counter::new(
&node,
"update_check_success_count",
),
updated_count: inspect_util::Counter::new(&node, "updated_count"),
root_version: node.create_uint("root_version", root_version.into()),
timestamp_version: node.create_int("timestamp_version", -1),
snapshot_version: node.create_int("snapshot_version", -1),
targets_version: node.create_int("targets_version", -1),
_node: node,
},
cobalt_sender,
}));
if let Some((node, registration, mirror_config)) = auto_client_node_and_registration {
fasync::Task::local(
Abortable::new(
AutoClient::from_updating_client_and_auto_url(
Arc::downgrade(&ret),
mirror_config
.mirror_url()
.to_owned()
.extend_dir_with_path("auto")
// Safe because mirror_url has a scheme and "auto" is a valid path segment.
.unwrap()
.to_string(),
node,
)
.run(),
registration,
)
.map(|_| ()),
)
.detach();
}
ret
}
pub async fn fetch_target_description(
&mut self,
target: &TargetPath,
) -> Result<TargetDescription, TufError> {
self.client.fetch_target_description(target).await
}
/// Updates the tuf client metadata if it is considered to be stale, returning whether or not
/// updates were performed.
pub async fn update_if_stale(&mut self) -> Result<UpdateResult, error::TufOrTimeout> {
if self.is_stale() {
if self.update().await? {
Ok(UpdateResult::Updated)
} else {
Ok(UpdateResult::UpToDate)
}
} else {
Ok(UpdateResult::Deferred)
}
}
/// Provides the current known metadata versions.
pub fn metadata_versions(&self) -> RepoVersions {
RepoVersions {
root: self.client.root_version(),
timestamp: self.client.timestamp_version(),
snapshot: self.client.snapshot_version(),
targets: self.client.targets_version(),
}
}
fn is_stale(&self) -> bool {
if self.auto_client_aborter.is_none() {
return true;
}
if let Some(last_update_time) = *self.last_update_successfully_checked_time {
last_update_time + SUBSCRIBE_CACHE_STALE_TIMEOUT <= clock::now()
} else {
true
}
}
async fn update(&mut self) -> Result<bool, error::TufOrTimeout> {
let res = self
.client
.update()
.map_err(error::TufOrTimeout::Tuf)
.on_timeout(self.tuf_metadata_timeout, || Err(crate::error::TufOrTimeout::Timeout))
.await;
self.inspect.root_version.set(self.client.root_version().into());
self.inspect
.timestamp_version
.set(self.client.timestamp_version().map(|uint| uint.into()).unwrap_or(-1i64));
self.inspect
.snapshot_version
.set(self.client.snapshot_version().map(|uint| uint.into()).unwrap_or(-1i64));
self.inspect
.targets_version
.set(self.client.targets_version().map(|uint| uint.into()).unwrap_or(-1i64));
if let Ok(update_occurred) = &res {
self.last_update_successfully_checked_time.get_mut().replace(clock::now());
self.inspect.update_check_success_count.increment();
if *update_occurred {
self.inspect.updated_count.increment();
}
} else {
self.inspect.update_check_failure_count.increment();
}
self.cobalt_sender.log_event_count(
metrics::UPDATE_TUF_CLIENT_METRIC_ID,
match &res {
Ok(_) => metrics::UpdateTufClientMetricDimensionResult::Success,
Err(e) => tuf_error_as_update_tuf_client_event_code(&e),
},
0,
1,
);
res
}
}
pub const SUBSCRIBE_CACHE_STALE_TIMEOUT: zx::Duration = zx::Duration::from_minutes(5);
struct AutoClient {
updating_client: Weak<AsyncMutex<UpdatingTufClient>>,
auto_url: String,
inspect: AutoClientInspectState,
}
struct AutoClientInspectState {
connect_failure_count: inspect_util::Counter,
connect_success_count: inspect_util::Counter,
update_attempt_count: inspect_util::Counter,
_node: inspect::Node,
}
#[cfg(not(test))]
const AUTO_CLIENT_SSE_RECONNECT_DELAY: zx::Duration = zx::Duration::from_minutes(1);
#[cfg(test)]
const AUTO_CLIENT_SSE_RECONNECT_DELAY: zx::Duration = zx::Duration::from_minutes(0);
impl AutoClient {
fn from_updating_client_and_auto_url(
updating_client: Weak<AsyncMutex<UpdatingTufClient>>,
auto_url: String,
node: inspect::Node,
) -> Self {
Self {
updating_client,
auto_url,
inspect: AutoClientInspectState {
connect_failure_count: inspect_util::Counter::new(&node, "connect_failure_count"),
connect_success_count: inspect_util::Counter::new(&node, "connect_success_count"),
update_attempt_count: inspect_util::Counter::new(&node, "update_attempt_count"),
_node: node,
},
}
}
async fn run(self) {
loop {
fx_log_info!("AutoClient for {:?} connecting.", self.auto_url);
match self.connect().await {
Ok(sse_client) => match self.handle_sse(sse_client).await {
HandleSseEndState::Abort => {
return;
}
HandleSseEndState::Reconnect => (),
},
Err(e) => {
fx_log_err!(
"AutoClient for {:?} error connecting: {:#}",
self.auto_url,
anyhow!(e)
);
}
}
Self::wait_before_reconnecting().await;
}
}
async fn connect(&self) -> Result<http_sse::Client, http_sse::ClientConnectError> {
// The /auto protocol has no heartbeat, so, without TCP keepalive, a client cannot
// differentiate a repository that is not updating from a repository that has dropped
// the connection.
let mut tcp_options = fuchsia_hyper::TcpOptions::default();
tcp_options.keepalive_idle = Some(std::time::Duration::from_secs(15));
tcp_options.keepalive_interval = Some(std::time::Duration::from_secs(5));
tcp_options.keepalive_count = Some(2);
match http_sse::Client::connect(
fuchsia_hyper::new_https_client_from_tcp_options(tcp_options),
&self.auto_url,
)
.await
{
Ok(sse_client) => {
self.inspect.connect_success_count.increment();
Ok(sse_client)
}
Err(e) => {
self.inspect.connect_failure_count.increment();
Err(e)
}
}
}
async fn handle_sse(&self, mut sse_event_stream: http_sse::Client) -> HandleSseEndState {
while let Some(item) = sse_event_stream.next().await {
match item {
Ok(event) => {
if event.event_type() != "timestamp.json" {
fx_log_err!(
"AutoClient for {:?} ignoring unrecognized event: {:?}",
self.auto_url,
event
);
continue;
}
fx_log_info!(
"AutoClient for {:?} observed valid event: {:?}",
self.auto_url,
event
);
if let Some(updating_client) = self.updating_client.upgrade() {
self.inspect.update_attempt_count.increment();
if let Err(e) = updating_client.lock().await.update().await {
fx_log_err!(
"AutoClient for {:?} error updating TUF client: {:#}",
self.auto_url,
anyhow!(e)
);
}
} else {
return HandleSseEndState::Abort;
}
}
Err(e) => {
fx_log_err!(
"AutoClient for {:?} event stream read error: {:#}",
self.auto_url,
anyhow!(e)
);
return HandleSseEndState::Reconnect;
}
}
}
fx_log_err!("AutoClient for {:?} event stream closed.", self.auto_url);
HandleSseEndState::Reconnect
}
async fn wait_before_reconnecting() {
fasync::Timer::new(fasync::Time::after(AUTO_CLIENT_SSE_RECONNECT_DELAY)).await
}
}
impl Drop for AutoClient {
fn drop(&mut self) {
fx_log_info!("AutoClient for {:?} stopping.", self.auto_url);
}
}
enum HandleSseEndState {
Reconnect,
Abort,
}