[archivist] A pipeline for LoWPAN

This CL doesn't introduce the routing. We can route this protocol from
the archivist to whichever component will consume the data in a
follow-up.

Bug: 102985
Tested: archivist-integration-tests

Change-Id: I31e0b6e0ab49075f1fffb87be8ebec123d0b3db8
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/692768
Fuchsia-Auto-Submit: Miguel Flores <miguelfrde@google.com>
Reviewed-by: Christopher Johnson <crjohns@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/diagnostics/archivist/meta/archivist.cml b/src/diagnostics/archivist/meta/archivist.cml
index 051a34c..71e306b 100644
--- a/src/diagnostics/archivist/meta/archivist.cml
+++ b/src/diagnostics/archivist/meta/archivist.cml
@@ -13,6 +13,7 @@
             protocol: [
                 "fuchsia.diagnostics.FeedbackArchiveAccessor",
                 "fuchsia.diagnostics.LegacyMetricsArchiveAccessor",
+                "fuchsia.diagnostics.LoWPANArchiveAccessor",
             ],
         },
     ],
@@ -41,6 +42,7 @@
             protocol: [
                 "fuchsia.diagnostics.FeedbackArchiveAccessor",
                 "fuchsia.diagnostics.LegacyMetricsArchiveAccessor",
+                "fuchsia.diagnostics.LoWPANArchiveAccessor",
             ],
             from: "self",
         },
diff --git a/src/diagnostics/archivist/src/archivist.rs b/src/diagnostics/archivist/src/archivist.rs
index e9f171c..21b8d78 100644
--- a/src/diagnostics/archivist/src/archivist.rs
+++ b/src/diagnostics/archivist/src/archivist.rs
@@ -103,9 +103,10 @@
         let pipelines_node = component::inspector().root().create_child("pipelines");
         let pipelines_path = Path::new(&archivist_configuration.pipelines_path);
         let pipelines = vec![
-            Pipeline::feedback(diagnostics_repo.clone(), &pipelines_path, &pipelines_node),
-            Pipeline::legacy_metrics(diagnostics_repo.clone(), &pipelines_path, &pipelines_node),
-            Pipeline::all_access(diagnostics_repo.clone(), &pipelines_path, &pipelines_node),
+            Pipeline::feedback(diagnostics_repo.clone(), pipelines_path, &pipelines_node),
+            Pipeline::legacy_metrics(diagnostics_repo.clone(), pipelines_path, &pipelines_node),
+            Pipeline::lowpan(diagnostics_repo.clone(), pipelines_path, &pipelines_node),
+            Pipeline::all_access(diagnostics_repo.clone(), pipelines_path, &pipelines_node),
         ];
         component::inspector().root().record(pipelines_node);
 
diff --git a/src/diagnostics/archivist/src/constants.rs b/src/diagnostics/archivist/src/constants.rs
index ffe8c61..d93ba8b 100644
--- a/src/diagnostics/archivist/src/constants.rs
+++ b/src/diagnostics/archivist/src/constants.rs
@@ -28,6 +28,11 @@
 pub const LEGACY_METRICS_ARCHIVE_ACCESSOR_NAME: &str =
     "fuchsia.diagnostics.LegacyMetricsArchiveAccessor";
 
+/// Name used by clients to connect to the lowpan diagnostics protocol.
+/// This protocol applies static selectors configured under config/data/lowpan to
+/// inspect exfiltration.
+pub const LOWPAN_ARCHIVE_ACCESSOR_NAME: &str = "fuchsia.diagnostics.LoWPANArchiveAccessor";
+
 /// The maximum number of Inspect files that can be simultaneously snapshotted and formatted per
 /// reader.
 pub const MAXIMUM_SIMULTANEOUS_SNAPSHOTS_PER_READER: usize = 4;
diff --git a/src/diagnostics/archivist/src/pipeline.rs b/src/diagnostics/archivist/src/pipeline.rs
index 6964cda..acb909a 100644
--- a/src/diagnostics/archivist/src/pipeline.rs
+++ b/src/diagnostics/archivist/src/pipeline.rs
@@ -115,6 +115,19 @@
         Self::new(parameters, data_repo, pipelines_path, parent_node)
     }
 
+    /// Creates a pipeline for LoWPAN metrics. This applies static selectors configured
+    /// under config/data/lowpan to inspect exfiltration.
+    pub fn lowpan(data_repo: DataRepo, pipelines_path: &Path, parent_node: &inspect::Node) -> Self {
+        let parameters = PipelineParameters {
+            has_config: true,
+            name: "lowpan",
+            empty_behavior: configs::EmptyBehavior::Disable,
+            protocol_name: constants::LOWPAN_ARCHIVE_ACCESSOR_NAME,
+            moniker_rewriter: Some(MonikerRewriter::new()),
+        };
+        Self::new(parameters, data_repo, pipelines_path, parent_node)
+    }
+
     #[cfg(test)]
     pub fn for_test(static_selectors: Option<Vec<Selector>>, data_repo: DataRepo) -> Self {
         Pipeline {
diff --git a/src/diagnostics/archivist/tests/v2/BUILD.gn b/src/diagnostics/archivist/tests/v2/BUILD.gn
index dfbd0da..e4bc4db 100644
--- a/src/diagnostics/archivist/tests/v2/BUILD.gn
+++ b/src/diagnostics/archivist/tests/v2/BUILD.gn
@@ -71,13 +71,13 @@
   inputs = [
     "test_data/empty_result_golden.json",
     "test_data/unified_reader_all_golden.json",
-    "test_data/feedback_reader_all_golden.json",
+    "test_data/pipeline_reader_all_golden.json",
     "test_data/memory_monitor_v2_moniker_golden.json",
     "test_data/memory_monitor_legacy_moniker_golden.json",
     "test_data/unified_reader_full_filter_golden.json",
-    "test_data/feedback_reader_nonoverlapping_selectors_golden.json",
+    "test_data/pipeline_reader_nonoverlapping_selectors_golden.json",
     "test_data/unified_reader_single_value_golden.json",
-    "test_data/feedback_reader_single_value_golden.json",
+    "test_data/pipeline_reader_single_value_golden.json",
   ]
 }
 
@@ -147,6 +147,17 @@
   deps = [ "//src/diagnostics/archivist:bin" ]
 }
 
+fuchsia_component_manifest("with-lowpan-filtering-manifest") {
+  manifest = "meta/archivist_for_integration.cml"
+  component_name = "archivist-with-lowpan-filtering"
+}
+
+fuchsia_component("archivist-with-lowpan-filtering") {
+  cm_label = ":with-lowpan-filtering-manifest"
+  testonly = true
+  deps = [ "//src/diagnostics/archivist:bin" ]
+}
+
 fuchsia_component_manifest("with-feedback-filtering-disabled-manifest") {
   manifest = "meta/archivist_for_integration.cml"
   component_name = "archivist-with-feedback-filtering-disabled"
@@ -195,6 +206,11 @@
   values = with_legacy_metrics_filtering
 }
 
+fuchsia_structured_config_values("archivist-with-lowpan-filtering-config") {
+  cm_label = ":with-lowpan-filtering-manifest"
+  values = with_lowpan_filtering
+}
+
 fuchsia_structured_config_values(
     "archivist-with-feedback-filtering-disabled-config") {
   cm_label = ":with-feedback-filtering-disabled-manifest"
@@ -208,6 +224,12 @@
   ]
 }
 
+resource("filter_lowpan") {
+  sources = [ "configs/pipelines/static_selectors.cfg" ]
+  outputs =
+      [ "data/config/pipelines/lowpan_filtered/lowpan/static_selectors.cfg" ]
+}
+
 resource("do_not_filter_feedback") {
   sources = [ "configs/pipelines/DISABLE_FILTERING.txt" ]
   outputs = [ "data/config/pipelines/feedback_filtering_disabled/feedback/DISABLE_FILTERING.txt" ]
@@ -233,11 +255,14 @@
     ":archivist-with-legacy-metrics-filtering",
     ":archivist-with-legacy-metrics-filtering-config",
     ":archivist-with-log-connector-config",
+    ":archivist-with-lowpan-filtering",
+    ":archivist-with-lowpan-filtering-config",
     ":archivist-with-small-caches",
     ":archivist-with-small-caches-config",
     ":configure_legacy_metrics_pipeline",
     ":do_not_filter_feedback",
     ":filter_feedback",
+    ":filter_lowpan",
     "components",
     "//src/diagnostics/iquery/test/test_component",
   ]
diff --git a/src/diagnostics/archivist/tests/v2/configs.gni b/src/diagnostics/archivist/tests/v2/configs.gni
index 94a90eb..f599d87 100644
--- a/src/diagnostics/archivist/tests/v2/configs.gni
+++ b/src/diagnostics/archivist/tests/v2/configs.gni
@@ -30,6 +30,11 @@
   pipelines_path = "/pkg/data/config/pipelines/feedback_filtered"
 }
 
+with_lowpan_filtering = {
+  forward_variables_from(archivist_for_integration_config, "*")
+  pipelines_path = "/pkg/data/config/pipelines/lowpan_filtered"
+}
+
 with_legacy_metrics_filtering = {
   forward_variables_from(archivist_for_integration_config, "*")
   pipelines_path = "/pkg/data/config/pipelines/legacy_metrics_filtered"
diff --git a/src/diagnostics/archivist/tests/v2/meta/test.shard.cml b/src/diagnostics/archivist/tests/v2/meta/test.shard.cml
index e042268..ff301ef 100644
--- a/src/diagnostics/archivist/tests/v2/meta/test.shard.cml
+++ b/src/diagnostics/archivist/tests/v2/meta/test.shard.cml
@@ -7,6 +7,7 @@
             protocol: [
                 "fuchsia.diagnostics.FeedbackArchiveAccessor",
                 "fuchsia.diagnostics.LegacyMetricsArchiveAccessor",
+                "fuchsia.diagnostics.LoWPANArchiveAccessor",
             ],
         },
     ],
@@ -22,6 +23,7 @@
             protocol: [
                 "fuchsia.diagnostics.FeedbackArchiveAccessor",
                 "fuchsia.diagnostics.LegacyMetricsArchiveAccessor",
+                "fuchsia.diagnostics.LoWPANArchiveAccessor",
             ],
             from: "self",
         },
diff --git a/src/diagnostics/archivist/tests/v2/src/constants.rs b/src/diagnostics/archivist/tests/v2/src/constants.rs
index a647f37..beadfe3 100644
--- a/src/diagnostics/archivist/tests/v2/src/constants.rs
+++ b/src/diagnostics/archivist/tests/v2/src/constants.rs
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 pub const ARCHIVIST_WITH_FEEDBACK_FILTERING: &str = "#meta/archivist-with-feedback-filtering.cm";
+pub const ARCHIVIST_WITH_LOWPAN_FILTERING: &str = "#meta/archivist-with-lowpan-filtering.cm";
 pub const ARCHIVIST_WITH_FEEDBACK_FILTERING_DISABLED: &str =
     "#meta/archivist-with-feedback-filtering-disabled.cm";
 pub const ARCHIVIST_WITH_SMALL_CACHES: &str = "#meta/archivist-with-small-caches.cm";
diff --git a/src/diagnostics/archivist/tests/v2/src/inspect/reader.rs b/src/diagnostics/archivist/tests/v2/src/inspect/reader.rs
index 6cbca03..63892f4 100644
--- a/src/diagnostics/archivist/tests/v2/src/inspect/reader.rs
+++ b/src/diagnostics/archivist/tests/v2/src/inspect/reader.rs
@@ -4,6 +4,7 @@
 
 use crate::{constants::*, test_topology};
 use anyhow::Error;
+use archivist_lib::constants;
 use diagnostics_reader::{
     assert_data_tree, assert_json_diff, AnyProperty, ArchiveReader, DiagnosticsHierarchy, Inspect,
 };
@@ -28,12 +29,12 @@
         include_str!("../../test_data/unified_reader_all_golden.json");
     static ref UNIFIED_FULL_FILTER_GOLDEN: &'static str =
         include_str!("../../test_data/unified_reader_full_filter_golden.json");
-    static ref FEEDBACK_SINGLE_VALUE_GOLDEN: &'static str =
-        include_str!("../../test_data/feedback_reader_single_value_golden.json");
-    static ref FEEDBACK_ALL_GOLDEN: &'static str =
-        include_str!("../../test_data/feedback_reader_all_golden.json");
-    static ref FEEDBACK_NONOVERLAPPING_SELECTORS_GOLDEN: &'static str =
-        include_str!("../../test_data/feedback_reader_nonoverlapping_selectors_golden.json");
+    static ref PIPELINE_SINGLE_VALUE_GOLDEN: &'static str =
+        include_str!("../../test_data/pipeline_reader_single_value_golden.json");
+    static ref PIPELINE_ALL_GOLDEN: &'static str =
+        include_str!("../../test_data/pipeline_reader_all_golden.json");
+    static ref PIPELINE_NONOVERLAPPING_SELECTORS_GOLDEN: &'static str =
+        include_str!("../../test_data/pipeline_reader_nonoverlapping_selectors_golden.json");
     static ref MEMORY_MONITOR_V2_MONIKER_GOLDEN: &'static str =
         include_str!("../../test_data/memory_monitor_v2_moniker_golden.json");
     static ref MEMORY_MONITOR_LEGACY_MONIKER_GOLDEN: &'static str =
@@ -271,21 +272,21 @@
     // First, retrieve all of the information in our realm to make sure that everything
     // we expect is present.
     let accessor = connect_to_feedback_accessor(&instance);
-    retrieve_and_validate_results(accessor, Vec::new(), &FEEDBACK_ALL_GOLDEN, 3).await;
+    retrieve_and_validate_results(accessor, Vec::new(), &PIPELINE_ALL_GOLDEN, 3).await;
 
     // Then verify that from the expected data, we can retrieve one specific value.
     let accessor = connect_to_feedback_accessor(&instance);
     retrieve_and_validate_results(
         accessor,
         vec!["test_component:*:lazy-*"],
-        &FEEDBACK_SINGLE_VALUE_GOLDEN,
+        &PIPELINE_SINGLE_VALUE_GOLDEN,
         3,
     )
     .await;
 
     // Then verify that subtree selection retrieves all trees under and including root.
     let accessor = connect_to_feedback_accessor(&instance);
-    retrieve_and_validate_results(accessor, vec!["test_component:root"], &FEEDBACK_ALL_GOLDEN, 3)
+    retrieve_and_validate_results(accessor, vec!["test_component:root"], &PIPELINE_ALL_GOLDEN, 3)
         .await;
 
     // Then verify that client selectors dont override the static selectors provided
@@ -294,12 +295,12 @@
     retrieve_and_validate_results(
         accessor,
         vec![r#"test_component:root:array\:0x15"#],
-        &FEEDBACK_NONOVERLAPPING_SELECTORS_GOLDEN,
+        &PIPELINE_NONOVERLAPPING_SELECTORS_GOLDEN,
         3,
     )
     .await;
 
-    assert!(feedback_pipeline_is_filtered(instance, 3).await);
+    assert!(pipeline_is_filtered(instance, 3, constants::FEEDBACK_ARCHIVE_ACCESSOR_NAME).await);
 
     Ok(())
 }
@@ -316,7 +317,7 @@
         .expect("add child a");
 
     let instance = builder.build().await.expect("create instance");
-    assert!(!feedback_pipeline_is_filtered(instance, 3).await);
+    assert!(!pipeline_is_filtered(instance, 3, constants::FEEDBACK_ARCHIVE_ACCESSOR_NAME).await);
 
     Ok(())
 }
@@ -332,7 +333,56 @@
 
     let instance = builder.build().await.expect("create instance");
 
-    assert!(!feedback_pipeline_is_filtered(instance, 3).await);
+    assert!(!pipeline_is_filtered(instance, 3, constants::FEEDBACK_ARCHIVE_ACCESSOR_NAME).await);
+
+    Ok(())
+}
+
+#[fuchsia::test]
+async fn lowpan_canonical_reader_test() -> Result<(), Error> {
+    let (builder, test_realm) = test_topology::create(test_topology::Options {
+        archivist_url: ARCHIVIST_WITH_LOWPAN_FILTERING,
+    })
+    .await
+    .expect("create base topology");
+    test_topology::add_eager_child(&test_realm, "test_component", IQUERY_TEST_COMPONENT_URL)
+        .await
+        .expect("add child a");
+
+    let instance = builder.build().await.expect("create instance");
+
+    // First, retrieve all of the information in our realm to make sure that everything
+    // we expect is present.
+    let accessor = connect_to_lowpan_accessor(&instance);
+    retrieve_and_validate_results(accessor, Vec::new(), &PIPELINE_ALL_GOLDEN, 3).await;
+
+    // Then verify that from the expected data, we can retrieve one specific value.
+    let accessor = connect_to_lowpan_accessor(&instance);
+    retrieve_and_validate_results(
+        accessor,
+        vec!["test_component:*:lazy-*"],
+        &PIPELINE_SINGLE_VALUE_GOLDEN,
+        3,
+    )
+    .await;
+
+    // Then verify that subtree selection retrieves all trees under and including root.
+    let accessor = connect_to_lowpan_accessor(&instance);
+    retrieve_and_validate_results(accessor, vec!["test_component:root"], &PIPELINE_ALL_GOLDEN, 3)
+        .await;
+
+    // Then verify that client selectors dont override the static selectors provided
+    // to the archivist.
+    let accessor = connect_to_lowpan_accessor(&instance);
+    retrieve_and_validate_results(
+        accessor,
+        vec![r#"test_component:root:array\:0x15"#],
+        &PIPELINE_NONOVERLAPPING_SELECTORS_GOLDEN,
+        3,
+    )
+    .await;
+
+    assert!(pipeline_is_filtered(instance, 3, constants::LOWPAN_ARCHIVE_ACCESSOR_NAME).await);
 
     Ok(())
 }
@@ -355,6 +405,15 @@
         .unwrap()
 }
 
+fn connect_to_lowpan_accessor(instance: &RealmInstance) -> ArchiveAccessorProxy {
+    instance
+        .root
+        .connect_to_named_protocol_at_exposed_dir::<ArchiveAccessorMarker>(
+            "fuchsia.diagnostics.LoWPANArchiveAccessor",
+        )
+        .unwrap()
+}
+
 // Loop indefinitely snapshotting the archive until we get the expected number of
 // hierarchies, and then validate that the ordered json represetionation of these hierarchies
 // matches the golden file.
@@ -427,19 +486,18 @@
     serde_json::from_str(&sorted_results_json_string).unwrap()
 }
 
-async fn feedback_pipeline_is_filtered(
+async fn pipeline_is_filtered(
     instance: RealmInstance,
     expected_results_count: usize,
+    accessor_name: &str,
 ) -> bool {
-    let feedback_archive_accessor = instance
+    let archive_accessor = instance
         .root
-        .connect_to_named_protocol_at_exposed_dir::<ArchiveAccessorMarker>(
-            "fuchsia.diagnostics.FeedbackArchiveAccessor",
-        )
+        .connect_to_named_protocol_at_exposed_dir::<ArchiveAccessorMarker>(accessor_name)
         .unwrap();
 
-    let feedback_results = ArchiveReader::new()
-        .with_archive(feedback_archive_accessor)
+    let pipeline_results = ArchiveReader::new()
+        .with_archive(archive_accessor)
         .with_minimum_schema_count(expected_results_count)
         .snapshot_raw::<Inspect>()
         .await
@@ -455,5 +513,5 @@
         .await
         .expect("got result");
 
-    process_results_for_comparison(feedback_results) != process_results_for_comparison(all_results)
+    process_results_for_comparison(pipeline_results) != process_results_for_comparison(all_results)
 }
diff --git a/src/diagnostics/archivist/tests/v2/src/test_topology.rs b/src/diagnostics/archivist/tests/v2/src/test_topology.rs
index 8bc03f1..364d77d 100644
--- a/src/diagnostics/archivist/tests/v2/src/test_topology.rs
+++ b/src/diagnostics/archivist/tests/v2/src/test_topology.rs
@@ -46,6 +46,7 @@
         .capability(Capability::protocol_by_name(
             "fuchsia.diagnostics.LegacyMetricsArchiveAccessor",
         ))
+        .capability(Capability::protocol_by_name("fuchsia.diagnostics.LoWPANArchiveAccessor"))
         .capability(Capability::protocol_by_name("fuchsia.diagnostics.LogSettings"))
         .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
         .capability(Capability::protocol_by_name("fuchsia.logger.Log"));
diff --git a/src/diagnostics/archivist/tests/v2/test_data/feedback_reader_all_golden.json b/src/diagnostics/archivist/tests/v2/test_data/pipeline_reader_all_golden.json
similarity index 100%
rename from src/diagnostics/archivist/tests/v2/test_data/feedback_reader_all_golden.json
rename to src/diagnostics/archivist/tests/v2/test_data/pipeline_reader_all_golden.json
diff --git a/src/diagnostics/archivist/tests/v2/test_data/feedback_reader_nonoverlapping_selectors_golden.json b/src/diagnostics/archivist/tests/v2/test_data/pipeline_reader_nonoverlapping_selectors_golden.json
similarity index 100%
rename from src/diagnostics/archivist/tests/v2/test_data/feedback_reader_nonoverlapping_selectors_golden.json
rename to src/diagnostics/archivist/tests/v2/test_data/pipeline_reader_nonoverlapping_selectors_golden.json
diff --git a/src/diagnostics/archivist/tests/v2/test_data/feedback_reader_single_value_golden.json b/src/diagnostics/archivist/tests/v2/test_data/pipeline_reader_single_value_golden.json
similarity index 100%
rename from src/diagnostics/archivist/tests/v2/test_data/feedback_reader_single_value_golden.json
rename to src/diagnostics/archivist/tests/v2/test_data/pipeline_reader_single_value_golden.json