| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::logs::error::LogsError; |
| use crate::logs::repository::LogsRepository; |
| use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker}; |
| use futures::channel::mpsc; |
| use futures::StreamExt; |
| use std::sync::Arc; |
| use tracing::warn; |
| use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync}; |
| |
| pub struct LogSettingsServer { |
| /// The repository holding the logs. |
| logs_repo: Arc<LogsRepository>, |
| |
| /// Sender which is used to close the stream of Log connections after ingestion of logsink |
| /// completes. |
| task_sender: mpsc::UnboundedSender<fasync::Task<()>>, |
| |
| /// Task draining the receiver for the `task_sender`s. |
| _drain_listeners_task: fasync::Task<()>, |
| } |
| |
| impl LogSettingsServer { |
| pub fn new(logs_repo: Arc<LogsRepository>) -> Self { |
| let (task_sender, rcv) = mpsc::unbounded(); |
| Self { |
| logs_repo, |
| task_sender, |
| _drain_listeners_task: fasync::Task::spawn(async move { |
| rcv.for_each_concurrent(None, |rx| rx).await; |
| }), |
| } |
| } |
| |
| /// Spawn a task to handle requests from components reading the shared log. |
| pub fn spawn(&self, stream: fdiagnostics::LogSettingsRequestStream) { |
| let logs_repo = Arc::clone(&self.logs_repo); |
| if let Err(e) = self.task_sender.unbounded_send(fasync::Task::spawn(async move { |
| if let Err(e) = Self::handle_requests(logs_repo, stream).await { |
| warn!("error handling Log requests: {}", e); |
| } |
| })) { |
| warn!("Couldn't queue listener task: {:?}", e); |
| } |
| } |
| |
| pub async fn handle_requests( |
| logs_repo: Arc<LogsRepository>, |
| mut stream: fdiagnostics::LogSettingsRequestStream, |
| ) -> Result<(), LogsError> { |
| let connection_id = logs_repo.new_interest_connection(); |
| while let Some(request) = stream.next().await { |
| let request = request.map_err(|source| LogsError::HandlingRequests { |
| protocol: fdiagnostics::LogSettingsMarker::PROTOCOL_NAME, |
| source, |
| })?; |
| match request { |
| fdiagnostics::LogSettingsRequest::RegisterInterest { control_handle, .. } => { |
| warn!("fuchsia.diagnostics/LogSettings.RegisterInterest is not supported; closing the channel"); |
| control_handle.shutdown(); |
| } |
| fdiagnostics::LogSettingsRequest::SetInterest { selectors, responder } => { |
| logs_repo.update_logs_interest(connection_id, selectors); |
| responder.send().ok(); |
| } |
| } |
| } |
| logs_repo.finish_interest_connection(connection_id); |
| |
| Ok(()) |
| } |
| } |