blob: 57dfe2eccbf927aae9df78367b105d0bbaaf5dd1 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fidl_fuchsia_net_name as name;
use futures::{Stream, TryFutureExt};
/// Creates a stream of [`name::DnsServer_`] from watching the server
/// configuration provided by `proxy`.
pub fn new_dns_server_stream(
proxy: name::DnsServerWatcherProxy,
) -> impl Stream<Item = Result<Vec<name::DnsServer_>, fidl::Error>> {
futures::stream::try_unfold(proxy, |proxy| {
proxy.watch_servers().map_ok(move |item| Some((item, proxy)))
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::*;
use fuchsia_async as fasync;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt, TryStreamExt};
use std::collections::VecDeque;
use std::sync::Arc;
struct MockDnsServerWatcher {
configs: VecDeque<Vec<name::DnsServer_>>,
pending_request: Option<name::DnsServerWatcherWatchServersResponder>,
}
impl MockDnsServerWatcher {
fn new() -> Self {
Self { configs: VecDeque::new(), pending_request: None }
}
fn push_config(&mut self, config: Vec<name::DnsServer_>) {
match self.pending_request.take() {
Some(req) => {
let () =
req.send(&mut config.into_iter()).expect("Failed to fulfill FIDL request");
}
None => self.configs.push_back(config),
}
}
async fn serve(
watcher: Arc<Mutex<Self>>,
rs: name::DnsServerWatcherRequestStream,
) -> Result<(), fidl::Error> {
rs.try_for_each(move |r| {
let watcher = watcher.clone();
async move {
match r {
name::DnsServerWatcherRequest::WatchServers { responder } => {
let mut w = watcher.lock().await;
if w.pending_request.is_some() {
panic!("No more than 1 pending requests allowed");
}
if let Some(config) = w.configs.pop_front() {
responder
.send(&mut config.into_iter())
.expect("Failed to fulfill FIDL request");
} else {
w.pending_request = Some(responder)
}
}
}
Ok(())
}
})
.await
}
}
#[fasync::run_singlethreaded(test)]
async fn test_dns_server_stream() {
let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
let (proxy, rs) =
fidl::endpoints::create_proxy_and_stream::<name::DnsServerWatcherMarker>().unwrap();
let (serve_fut, abort_handle) =
futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
let mut stream = new_dns_server_stream(proxy);
assert!(stream.next().now_or_never().is_none());
assert!(stream.next().now_or_never().is_none());
{
let mut w = watcher.lock().await;
w.push_config(vec![to_discovered_server(DYNAMIC_SERVER_A)]);
w.push_config(vec![to_discovered_server(DYNAMIC_SERVER_B)]);
}
let nxt = stream
.next()
.await
.expect("Stream ended unexpectedly")
.expect("FIDL error occurred");
assert_eq!(nxt, vec![to_discovered_server(DYNAMIC_SERVER_A)]);
let nxt = stream
.next()
.await
.expect("Stream ended unexpectedly")
.expect("FIDL error occurred");
assert_eq!(nxt, vec![to_discovered_server(DYNAMIC_SERVER_B)]);
// Abort the serving future so join will end.
abort_handle.abort();
stream
})
.await;
let _aborted = serve_result.expect_err("Future must've been aborted");
let _fidl_error: fidl::Error = stream
.next()
.await
.expect("Stream must yield a final value")
.expect_err("Stream must yield an error");
assert!(stream.next().await.is_none(), "Stream must end after error");
}
}