blob: 3f1023f63604175db15c523a2634ea6bf1d2ff29 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Controls one link to another node over a zx socket.
use {
anyhow::Error,
byteorder::WriteBytesExt,
futures::{io::AsyncWriteExt, prelude::*},
overnet_core::{ConfigProducer, LinkIntroductionFacts, Router},
std::{sync::Arc, time::Duration},
stream_framer::{new_deframer, new_framer, Deframed, Format, ReadBytes},
};
pub fn run_stream_link<'a>(
node: Arc<Router>,
rx_bytes: &'a mut (dyn AsyncRead + Unpin + Send),
tx_bytes: &'a mut (dyn AsyncWrite + Unpin + Send),
introduction_facts: LinkIntroductionFacts,
config: ConfigProducer,
) -> impl 'a + Send + Future<Output = Result<(), Error>> {
let (mut link_sender, mut link_receiver) = node.new_link(introduction_facts, config);
drop(node);
let (mut framer, mut framer_read) = new_framer(LosslessBinary, 4096);
let (mut deframer_write, mut deframer) = new_deframer(LosslessBinary);
futures::future::try_join4(
async move {
loop {
let msg = framer_read.read().await?;
tx_bytes.write_all(&msg).await?;
tx_bytes.flush().await?;
}
},
async move {
let mut buf = [0u8; 4096];
loop {
let n = rx_bytes.read(&mut buf).await?;
if n == 0 {
return Ok::<_, Error>(());
}
deframer_write.write(&buf[..n]).await?;
}
},
async move {
loop {
if let ReadBytes::Framed(mut frame) = deframer.read().await? {
link_receiver.received_frame(frame.as_mut()).await;
}
}
},
async move {
while let Some(frame) = link_sender.next_send().await {
framer.write(frame.bytes()).await?;
}
Ok::<_, Error>(())
},
)
.map_ok(|((), (), (), ())| ())
}
/// Framing format that assumes a lossless underlying byte stream that can transport all 8 bits of a
/// byte.
struct LosslessBinary;
impl Format for LosslessBinary {
fn frame(&self, bytes: &[u8], outgoing: &mut Vec<u8>) -> Result<(), Error> {
if bytes.len() > (std::u16::MAX as usize) + 1 {
return Err(anyhow::format_err!(
"Packet length ({}) too long for stream framing",
bytes.len()
));
}
outgoing.write_u16::<byteorder::LittleEndian>((bytes.len() - 1) as u16)?;
outgoing.extend_from_slice(bytes);
Ok(())
}
fn deframe(&self, bytes: &[u8]) -> Result<Deframed, Error> {
if bytes.len() <= 3 {
return Ok(Deframed { frame: None, unframed_bytes: 0, new_start_pos: 0 });
}
let len = 1 + (u16::from_le_bytes([bytes[0], bytes[1]]) as usize);
if bytes.len() < 2 + len {
// Not enough bytes to deframe: done for now.
return Ok(Deframed { frame: None, unframed_bytes: 0, new_start_pos: 0 });
}
let frame = &bytes[2..2 + len];
return Ok(Deframed {
frame: Some(frame.to_vec()),
unframed_bytes: 0,
new_start_pos: 2 + len,
});
}
fn deframe_timeout(&self, _have_pending_bytes: bool) -> Option<Duration> {
None
}
}
#[cfg(test)]
mod test {
use super::*;
#[fuchsia_async::run(1, test)]
async fn simple_frame() -> Result<(), Error> {
let (mut framer_writer, mut framer_reader) = new_framer(LosslessBinary, 1024);
framer_writer.write(&[1, 2, 3, 4]).await?;
let (mut deframer_writer, mut deframer_reader) = new_deframer(LosslessBinary);
deframer_writer.write(framer_reader.read().await?.as_slice()).await?;
assert_eq!(deframer_reader.read().await?, ReadBytes::Framed(vec![1, 2, 3, 4]));
framer_writer.write(&[5, 6, 7, 8]).await?;
deframer_writer.write(framer_reader.read().await?.as_slice()).await?;
assert_eq!(deframer_reader.read().await?, ReadBytes::Framed(vec![5, 6, 7, 8]));
Ok(())
}
#[fuchsia_async::run(1, test)]
async fn large_frame() -> Result<(), Error> {
let big_slice = vec![0u8; 100000];
let (mut framer_writer, _framer_reader) = new_framer(LosslessBinary, 1024);
assert!(framer_writer.write(&big_slice).await.is_err());
Ok(())
}
}