blob: 81ff7291d60da003c9c7464054fdca2435f823c0 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
capability_source::{AggregateCapabilityProvider, CapabilitySourceInterface},
component_instance::{
ComponentInstanceInterface, ResolvedInstanceInterface, WeakComponentInstanceInterface,
},
error::RoutingError,
router::{
find_matching_expose, CapabilityVisitor, ErrorNotFoundInChild, Expose, ExposeVisitor,
Offer, OfferVisitor, RoutingStrategy, Sources,
},
DebugRouteMapper,
},
async_trait::async_trait,
cm_rust::{
CapabilityName, ExposeDecl, ExposeDeclCommon, ExposeServiceDecl, OfferServiceDecl,
ServiceDecl,
},
derivative::Derivative,
from_enum::FromEnum,
moniker::{ChildMoniker, ChildMonikerBase},
std::sync::Arc,
};
/// Provides capabilities exposed by children in a collection.
///
/// Given a collection and the name of a capability, this provider returns a list of children
/// within the collection that expose the capability, and routes to a particular child's exposed
/// capability with that name.
///
/// This is used during collection routing to aggregate service instances across
/// all children within the collection.
#[derive(Derivative)]
#[derivative(Clone(bound = "E: Clone, S: Clone, V: Clone, M: Clone"))]
pub(super) struct CollectionCapabilityProvider<C: ComponentInstanceInterface, U, O, E, S, V, M> {
pub router: RoutingStrategy<U, O, Expose<E>>,
/// Component that contains the collection.
pub collection_component: WeakComponentInstanceInterface<C>,
/// Name of the collection within `collection_component`.
pub collection_name: String,
/// Name of the capability as exposed by children in the collection.
pub capability_name: CapabilityName,
pub sources: S,
pub visitor: V,
pub mapper: M,
}
#[async_trait]
impl<C, U, O, E, S, V, M> AggregateCapabilityProvider<C>
for CollectionCapabilityProvider<C, U, O, E, S, V, M>
where
C: ComponentInstanceInterface + 'static,
U: Send + Sync + 'static,
O: Send + Sync + 'static,
E: ExposeDeclCommon
+ ErrorNotFoundInChild
+ FromEnum<ExposeDecl>
+ Into<ExposeDecl>
+ Clone
+ 'static,
S: Sources + 'static,
V: ExposeVisitor<ExposeDecl = E>,
V: CapabilityVisitor<CapabilityDecl = S::CapabilityDecl>,
V: Clone + Send + Sync + 'static,
M: DebugRouteMapper + Send + Sync + Clone + 'static,
{
/// Returns a list of instances of capabilities in this provider.
///
/// Instances correspond to the names of children in the collection that expose the capability
/// with the name `capability_name`.
///
/// In the case of service capabilities, they are *not* instances inside that service, but
/// rather service capabilities with the same name that are exposed by different children.
async fn list_instances(&self) -> Result<Vec<String>, RoutingError> {
let mut instances = Vec::new();
let component = self.collection_component.upgrade()?;
let components: Vec<(ChildMoniker, Arc<C>)> =
component.lock_resolved_state().await?.children_in_collection(&self.collection_name);
for (moniker, child_component) in components {
let child_exposes = child_component.lock_resolved_state().await.map(|c| c.exposes());
match child_exposes {
Ok(child_exposes) => {
if find_matching_expose::<E>(&self.capability_name, &child_exposes).is_some() {
instances.push(moniker.name().to_string())
}
}
// Ignore errors. One misbehaving component should not affect the entire collection.
Err(_) => {}
}
}
Ok(instances)
}
/// Returns a `CapabilitySourceInterface` to a capability exposed by a child.
///
/// `instance` is the name of the child that exposes the capability, as returned by
/// `list_instances`.
async fn route_instance(
&self,
instance: &str,
) -> Result<CapabilitySourceInterface<C>, RoutingError> {
let collection_component = self.collection_component.upgrade()?;
let (child_moniker, child_component): (ChildMoniker, Arc<C>) = {
collection_component
.lock_resolved_state()
.await?
.children_in_collection(&self.collection_name)
.into_iter()
.find(|child| child.0.name() == instance)
.ok_or_else(|| RoutingError::OfferFromChildInstanceNotFound {
child_moniker: ChildMoniker::new(
instance.to_string(),
Some(self.collection_name.clone()),
),
moniker: collection_component.abs_moniker().clone(),
capability_id: self.capability_name.clone().into(),
})?
};
let expose_decl: E = {
let child_exposes = child_component.lock_resolved_state().await?.exposes();
find_matching_expose(&self.capability_name, &child_exposes).cloned().ok_or_else(
|| {
E::error_not_found_in_child(
collection_component.abs_moniker().clone(),
child_moniker,
self.capability_name.clone(),
)
},
)?
};
self.router
.route_from_expose(
expose_decl,
child_component,
self.sources.clone(),
&mut self.visitor.clone(),
&mut self.mapper.clone(),
)
.await
}
fn clone_boxed(&self) -> Box<dyn AggregateCapabilityProvider<C>> {
Box::new(self.clone())
}
}
#[derive(Derivative)]
#[derivative(Clone(bound = "S: Clone, M: Clone, V: Clone"))]
pub(super) struct AggregateServiceProvider<C: ComponentInstanceInterface, U, S, M, V> {
router: RoutingStrategy<U, Offer<OfferServiceDecl>, Expose<ExposeServiceDecl>>,
/// Component that offered the aggregate service
component: WeakComponentInstanceInterface<C>,
/// List of offer decl to follow for routing each service provider used in the overall aggregation
offer_decls: Vec<OfferServiceDecl>,
sources: S,
visitor: V,
mapper: M,
}
impl<C, U, S, M, V> AggregateServiceProvider<C, U, S, M, V>
where
C: ComponentInstanceInterface + 'static,
U: Send + Sync + 'static,
S: Sources<CapabilityDecl = ServiceDecl> + 'static,
V: OfferVisitor<OfferDecl = OfferServiceDecl>
+ ExposeVisitor<ExposeDecl = ExposeServiceDecl>
+ CapabilityVisitor<CapabilityDecl = ServiceDecl>,
V: Send + Sync + Clone + 'static,
M: DebugRouteMapper + Send + Sync + Clone + 'static,
{
pub(super) fn new(
offer_service_decls: Vec<OfferServiceDecl>,
component: WeakComponentInstanceInterface<C>,
router: RoutingStrategy<U, Offer<OfferServiceDecl>, Expose<ExposeServiceDecl>>,
sources: S,
visitor: V,
mapper: M,
) -> Self {
let single_instance_decls: Vec<OfferServiceDecl> = offer_service_decls
.iter()
.map(|o| {
o.source_instance_filter.iter().flatten().map(move |instance_name| {
// Create a service decl for each filtered service instance so that when the aggregate
// capability routes an instance each entry can ignore the component name.
let mut single_instance_decl = o.clone();
single_instance_decl.source_instance_filter = Some(vec![instance_name.clone()]);
single_instance_decl
})
})
.flatten()
.collect();
Self { router, offer_decls: single_instance_decls, sources, visitor, component, mapper }
}
}
#[async_trait]
impl<C, U, S, M, V> AggregateCapabilityProvider<C> for AggregateServiceProvider<C, U, S, M, V>
where
C: ComponentInstanceInterface + 'static,
U: Send + Sync + 'static,
S: Sources<CapabilityDecl = ServiceDecl> + 'static,
V: OfferVisitor<OfferDecl = OfferServiceDecl>
+ ExposeVisitor<ExposeDecl = ExposeServiceDecl>
+ CapabilityVisitor<CapabilityDecl = ServiceDecl>,
V: Send + Sync + Clone + 'static,
M: DebugRouteMapper + Send + Sync + Clone + 'static,
{
async fn list_instances(&self) -> Result<Vec<String>, RoutingError> {
// Returns the offer source string as the instance name. e.g child instance name, collection name, "parent", "framework", etc.
Ok(self
.offer_decls
.iter()
.flat_map(|o| o.clone().source_instance_filter.unwrap_or(Vec::new()))
.collect())
}
async fn route_instance(
&self,
instance: &str,
) -> Result<CapabilitySourceInterface<C>, RoutingError> {
for offer_decl in &self.offer_decls {
if offer_decl
.source_instance_filter
.as_ref()
.map(|allowed_instances| allowed_instances.contains(&instance.to_string()))
.unwrap_or(false)
{
return self
.router
.route_from_offer(
offer_decl.clone(),
self.component.upgrade().map_err(|e| {
RoutingError::unsupported_route_source(format!(
"error upgrading aggregation point component {}",
e
))
})?,
self.sources.clone(),
&mut self.visitor.clone(),
&mut self.mapper.clone(),
)
.await;
}
}
Err(RoutingError::unsupported_route_source(format!("instance '{}' not found", instance)))
}
fn clone_boxed(&self) -> Box<dyn AggregateCapabilityProvider<C>> {
Box::new(self.clone())
}
}