blob: 72d398992d6ac0be41c67342936b5b456ee7e64d [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
capability_source::{
AggregateInstance, AggregateMember, AnonymizedAggregateCapabilityProvider,
CapabilitySource, ComponentCapability, FilteredAggregateCapabilityProvider,
FilteredAggregateCapabilityRouteData,
},
component_instance::{
ComponentInstanceInterface, ExtendedInstanceInterface, ResolvedInstanceInterface,
WeakComponentInstanceInterface,
},
error::RoutingError,
legacy_router::{
self, CapabilityVisitor, ErrorNotFoundFromParent, ErrorNotFoundInChild, ExposeVisitor,
OfferVisitor, RouteBundle, Sources,
},
mapper::NoopRouteMapper,
},
async_trait::async_trait,
cm_rust::{ExposeDecl, NameMapping, OfferDecl, OfferServiceDecl},
cm_types::Name,
derivative::Derivative,
futures::future::BoxFuture,
moniker::ChildName,
std::collections::HashSet,
};
/// Provides capabilities exposed by an anonymized aggregates.
///
/// Given a set of collections and static children and the name of a capability, this provider
/// returns a list of children within them that expose the capability, and routes to a particular
/// child's exposed capability with that name.
///
/// This is used during collection routing from anonymized aggregate service instances.
#[derive(Derivative)]
#[derivative(Clone(bound = "V: Clone"))]
pub(super) struct AnonymizedAggregateServiceProvider<C: ComponentInstanceInterface, V> {
/// Component that defines the aggregate.
pub containing_component: WeakComponentInstanceInterface<C>,
/// The members relative to `containing_component` that make up the aggregate.
pub members: Vec<AggregateMember>,
/// Name of the capability as exposed by children in the collection.
pub capability_name: Name,
pub capability_type: cm_rust::CapabilityTypeName,
pub sources: Sources,
pub visitor: V,
}
#[async_trait]
impl<C, V> AnonymizedAggregateCapabilityProvider<C> for AnonymizedAggregateServiceProvider<C, V>
where
C: ComponentInstanceInterface + 'static,
V: OfferVisitor,
V: ExposeVisitor,
V: CapabilityVisitor,
V: Clone + Send + Sync + 'static,
{
/// Returns a list of instances contributing capabilities to this provider.
///
/// In the case of service capabilities, they are *not* instances inside that service, but
/// rather service capabilities with the same name that are exposed by different components.
async fn list_instances(&self) -> Result<Vec<AggregateInstance>, RoutingError> {
let mut instances = Vec::new();
let component = self.containing_component.upgrade()?;
let mut child_components = vec![];
let mut parent = None;
let mut include_self = false;
{
let resolved_state = component.lock_resolved_state().await?;
for s in &self.members {
match s {
AggregateMember::Child(child_name) => {
if let Some(child) = resolved_state.get_child(&child_name) {
child_components.push((child_name.clone(), child.clone()));
}
}
AggregateMember::Collection(collection) => {
child_components.extend(resolved_state.children_in_collection(&collection));
}
AggregateMember::Parent => {
if let Ok(p) = component.try_get_parent() {
match p {
ExtendedInstanceInterface::AboveRoot(_) => {
// Services from above parent are not supported.
}
ExtendedInstanceInterface::Component(p) => {
parent = Some(p);
}
}
}
}
AggregateMember::Self_ => {
include_self = true;
}
}
}
}
for (child_name, child_component) in child_components {
let child_exposes = child_component.lock_resolved_state().await.map(|c| c.exposes());
match child_exposes {
Ok(child_exposes) => {
if let Some(_) = legacy_router::find_matching_exposes(
self.capability_type,
&self.capability_name,
&child_exposes,
) {
instances.push(AggregateInstance::Child(child_name.clone()));
}
}
// Ignore errors. One misbehaving component should not affect the entire collection.
Err(_) => {}
}
}
if let Some(parent) = parent {
let parent_offers = parent.lock_resolved_state().await.map(|c| c.offers());
match parent_offers {
Ok(parent_offers) => {
let child_moniker = component.child_moniker().expect("ChildName should exist");
if let Some(_) = legacy_router::find_matching_offers(
self.capability_type,
&self.capability_name,
&child_moniker,
&parent_offers,
) {
instances.push(AggregateInstance::Parent);
}
}
// Ignore errors. One misbehaving component should not affect the entire collection.
Err(_) => {}
}
}
if include_self {
instances.push(AggregateInstance::Self_);
}
Ok(instances)
}
/// Returns a `CapabilitySource` to a capability exposed by a child.
///
/// `instance` is the name of the child that exposes the capability, as returned by
/// `list_instances`.
async fn route_instance(
&self,
instance: &AggregateInstance,
) -> Result<CapabilitySource<C>, RoutingError> {
match instance {
AggregateInstance::Child(name) => self.route_child_instance(&name).await,
AggregateInstance::Parent => self.route_parent_instance().await,
AggregateInstance::Self_ => self.route_self_instance().await,
}
}
fn clone_boxed(&self) -> Box<dyn AnonymizedAggregateCapabilityProvider<C>> {
Box::new(self.clone())
}
}
impl<C, V> AnonymizedAggregateServiceProvider<C, V>
where
C: ComponentInstanceInterface + 'static,
V: OfferVisitor,
V: ExposeVisitor,
V: CapabilityVisitor,
V: Clone + Send + Sync + 'static,
{
async fn route_child_instance(
&self,
instance: &ChildName,
) -> Result<CapabilitySource<C>, RoutingError> {
let containing_component = self.containing_component.upgrade()?;
let child_component =
containing_component.lock_resolved_state().await?.get_child(instance).ok_or_else(
|| RoutingError::OfferFromChildInstanceNotFound {
child_moniker: instance.clone(),
moniker: containing_component.moniker().clone(),
capability_id: self.capability_name.clone().into(),
},
)?;
let child_exposes = child_component.lock_resolved_state().await?.exposes();
let child_exposes = legacy_router::find_matching_exposes(
self.capability_type,
&self.capability_name,
&child_exposes,
)
.ok_or_else(|| {
ExposeDecl::error_not_found_in_child(
containing_component.moniker().clone(),
instance.clone(),
self.capability_name.clone(),
)
})?;
legacy_router::route_from_expose(
child_exposes,
child_component,
self.sources.clone(),
&mut self.visitor.clone(),
&mut NoopRouteMapper,
)
.await
}
async fn route_parent_instance(&self) -> Result<CapabilitySource<C>, RoutingError> {
let containing_component = self.containing_component.upgrade()?;
let parent = match containing_component.try_get_parent().map_err(|_| {
RoutingError::OfferFromParentNotFound {
moniker: containing_component.moniker().clone(),
capability_id: self.capability_name.clone().into(),
}
})? {
ExtendedInstanceInterface::AboveRoot(_) => {
return Err(RoutingError::unsupported_route_source("service above parent"));
}
ExtendedInstanceInterface::Component(p) => p,
};
let child_moniker = containing_component.child_moniker().expect("ChildName should exist");
let parent_offers = parent.lock_resolved_state().await?.offers();
let parent_offers = legacy_router::find_matching_offers(
self.capability_type,
&self.capability_name,
&child_moniker,
&parent_offers,
)
.ok_or_else(|| {
OfferDecl::error_not_found_from_parent(
containing_component.moniker().clone(),
self.capability_name.clone(),
)
})?;
legacy_router::route_from_offer(
parent_offers,
parent,
self.sources.clone(),
&mut self.visitor.clone(),
&mut NoopRouteMapper,
)
.await
}
async fn route_self_instance(&self) -> Result<CapabilitySource<C>, RoutingError> {
let containing_component = self.containing_component.upgrade()?;
legacy_router::route_from_self_by_name(
&self.capability_name,
containing_component,
self.sources.clone(),
&mut self.visitor.clone(),
&mut NoopRouteMapper,
)
.await
}
}
#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
pub(super) struct OfferFilteredServiceProvider<C: ComponentInstanceInterface> {
/// Component that offered the filtered service
component: WeakComponentInstanceInterface<C>,
/// The service capability
capability: ComponentCapability,
/// The service offer that has a filter.
offer_decl: OfferServiceDecl,
}
impl<C> OfferFilteredServiceProvider<C>
where
C: ComponentInstanceInterface + 'static,
{
pub(super) fn new(
offer_decl: OfferServiceDecl,
component: WeakComponentInstanceInterface<C>,
capability: ComponentCapability,
) -> Self {
Self { offer_decl, component, capability }
}
}
impl<C> FilteredAggregateCapabilityProvider<C> for OfferFilteredServiceProvider<C>
where
C: ComponentInstanceInterface + 'static,
{
fn route_instances(
&self,
) -> Vec<BoxFuture<'_, Result<FilteredAggregateCapabilityRouteData<C>, RoutingError>>> {
let capability_source = CapabilitySource::Component {
capability: self.capability.clone(),
component: self.component.clone(),
};
let instance_filter = get_instance_filter(&self.offer_decl);
let fut = async move {
Ok(FilteredAggregateCapabilityRouteData { capability_source, instance_filter })
};
// Without the explicit type, this does not compile
let mut out: Vec<
BoxFuture<'_, Result<FilteredAggregateCapabilityRouteData<C>, RoutingError>>,
> = vec![];
out.push(Box::pin(fut));
out
}
fn clone_boxed(&self) -> Box<dyn FilteredAggregateCapabilityProvider<C>> {
Box::new(self.clone())
}
}
fn get_instance_filter(offer_decl: &OfferServiceDecl) -> Vec<NameMapping> {
let renamed_instances = offer_decl.renamed_instances.as_ref().unwrap_or_else(|| {
static EMPTY_VEC: Vec<NameMapping> = vec![];
&EMPTY_VEC
});
if !renamed_instances.is_empty() {
let source_instance_filter: HashSet<_> = offer_decl
.source_instance_filter
.as_ref()
.unwrap_or_else(|| {
static EMPTY_VEC: Vec<String> = vec![];
&EMPTY_VEC
})
.iter()
.map(|s| s.as_str())
.collect();
renamed_instances
.clone()
.into_iter()
.filter_map(|m| {
if source_instance_filter.is_empty()
|| source_instance_filter.contains(&m.target_name.as_str())
{
Some(NameMapping { source_name: m.source_name, target_name: m.target_name })
} else {
None
}
})
.collect()
} else {
offer_decl
.source_instance_filter
.clone()
.unwrap_or_default()
.into_iter()
.map(|n| NameMapping { source_name: n.clone(), target_name: n })
.collect()
}
}
#[derive(Derivative)]
#[derivative(Clone(bound = "V: Clone"))]
pub(super) struct OfferAggregateServiceProvider<C: ComponentInstanceInterface, V> {
/// Component that offered the aggregate service
component: WeakComponentInstanceInterface<C>,
/// List of offer decl to follow for routing each service provider used in the overall aggregation
offer_decls: Vec<OfferServiceDecl>,
sources: Sources,
visitor: V,
}
impl<C, V> OfferAggregateServiceProvider<C, V>
where
C: ComponentInstanceInterface + 'static,
V: OfferVisitor + ExposeVisitor + CapabilityVisitor,
V: Send + Sync + Clone + 'static,
{
pub(super) fn new(
offer_decls: Vec<OfferServiceDecl>,
component: WeakComponentInstanceInterface<C>,
sources: Sources,
visitor: V,
) -> Self {
Self { offer_decls, sources, visitor, component }
}
}
impl<C, V> FilteredAggregateCapabilityProvider<C> for OfferAggregateServiceProvider<C, V>
where
C: ComponentInstanceInterface + 'static,
V: OfferVisitor + ExposeVisitor + CapabilityVisitor,
V: Send + Sync + Clone + 'static,
{
fn route_instances(
&self,
) -> Vec<BoxFuture<'_, Result<FilteredAggregateCapabilityRouteData<C>, RoutingError>>> {
// Without the explicit type, this does not compile
let mut out: Vec<
BoxFuture<'_, Result<FilteredAggregateCapabilityRouteData<C>, RoutingError>>,
> = vec![];
for offer_decl in &self.offer_decls {
let instance_filter = get_instance_filter(offer_decl);
if instance_filter.is_empty() {
continue;
}
let mut offer_decl = offer_decl.clone();
offer_decl.source_instance_filter = None;
offer_decl.renamed_instances = None;
let offer_decl = OfferDecl::Service(offer_decl);
let fut = async {
let component = self.component.upgrade().map_err(|e| {
RoutingError::unsupported_route_source(format!(
"error upgrading aggregation point component {}",
e
))
})?;
let capability_source = legacy_router::route_from_offer(
RouteBundle::from_offer(offer_decl),
component,
self.sources.clone(),
&mut self.visitor.clone(),
&mut NoopRouteMapper,
)
.await?;
Ok(FilteredAggregateCapabilityRouteData { capability_source, instance_filter })
};
out.push(Box::pin(fut));
}
out
}
fn clone_boxed(&self) -> Box<dyn FilteredAggregateCapabilityProvider<C>> {
Box::new(self.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use cm_rust::{Availability, OfferSource};
use cm_rust_testing::*;
#[test]
fn test_get_instance_filter() {
fn get_instance_filter(
source_instance_filter: Option<Vec<String>>,
renamed_instances: Option<Vec<NameMapping>>,
) -> Vec<NameMapping> {
super::get_instance_filter(&OfferServiceDecl {
source: OfferSource::Parent,
source_name: "foo".parse().unwrap(),
source_dictionary: Default::default(),
target: offer_target_static_child("a"),
target_name: "bar".parse().unwrap(),
source_instance_filter,
renamed_instances,
availability: Availability::Required,
})
}
assert_eq!(get_instance_filter(None, None), vec![]);
assert_eq!(get_instance_filter(Some(vec![]), Some(vec![])), vec![]);
let same_name_map = vec![
NameMapping { source_name: "a".into(), target_name: "a".into() },
NameMapping { source_name: "b".into(), target_name: "b".into() },
];
assert_eq!(get_instance_filter(Some(vec!["a".into(), "b".into()]), None), same_name_map);
assert_eq!(
get_instance_filter(Some(vec!["a".into(), "b".into()]), Some(vec![])),
same_name_map
);
let renamed_map = vec![
NameMapping { source_name: "one".into(), target_name: "a".into() },
NameMapping { source_name: "two".into(), target_name: "b".into() },
];
assert_eq!(get_instance_filter(None, Some(renamed_map.clone())), renamed_map);
assert_eq!(get_instance_filter(Some(vec![]), Some(renamed_map.clone())), renamed_map);
assert_eq!(
get_instance_filter(Some(vec!["b".into()]), Some(renamed_map.clone())),
vec![NameMapping { source_name: "two".into(), target_name: "b".into() }]
);
}
}