blob: b40e1ab53b32a91079aaafc12d362a97925852f3 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Datagram socket bindings.
use std::{
convert::{Infallible as Never, TryInto as _},
fmt::Debug,
hash::Hash,
num::{NonZeroU16, NonZeroU64, NonZeroU8, TryFromIntError},
ops::ControlFlow,
};
use either::Either;
use fidl_fuchsia_net as fnet;
use fidl_fuchsia_posix as fposix;
use fidl_fuchsia_posix_socket as fposix_socket;
use derivative::Derivative;
use explicit::ResultExt as _;
use fidl::endpoints::RequestStream as _;
use fuchsia_async as fasync;
use fuchsia_zircon::{self as zx, prelude::HandleBased as _, Peered as _};
use net_types::{
ip::{GenericOverIp, Ip, IpInvariant, IpVersion, Ipv4, Ipv4Addr, Ipv6},
MulticastAddr, SpecifiedAddr, ZonedAddr,
};
use netstack3_core::{
device::{DeviceId, WeakDeviceId},
error::{LocalAddressError, NotSupportedError, SocketError},
icmp,
ip::{IpSockCreateAndSendError, IpSockSendError},
socket::{
self as core_socket, ConnInfo, ConnectError, ExpectedConnError, ExpectedUnboundError,
ListenerInfo, MulticastInterfaceSelector, MulticastMembershipInterfaceSelector,
NotDualStackCapableError, SetDualStackEnabledError, SetMulticastMembershipError,
ShutdownType, SocketInfo,
},
sync::Mutex as CoreMutex,
udp, IpExt,
};
use packet::{Buf, BufferMut};
use tracing::{error, trace, warn};
use crate::bindings::{
socket::{
queue::{BodyLen, MessageQueue},
worker::{self, SocketWorker},
},
trace_duration,
util::{
DeviceNotFoundError, IntoCore as _, IntoFidl, RemoveResourceResultExt as _,
TryFromFidlWithContext, TryIntoCore, TryIntoCoreWithContext, TryIntoFidl,
TryIntoFidlWithContext,
},
BindingId, BindingsCtx, Ctx,
};
use super::{
IntoErrno, IpSockAddrExt, SockAddr, SocketWorkerProperties, ZXSIO_SIGNAL_INCOMING,
ZXSIO_SIGNAL_OUTGOING,
};
/// The types of supported datagram protocols.
#[derive(Debug)]
pub(crate) enum DatagramProtocol {
Udp,
IcmpEcho,
}
/// A minimal abstraction over transport protocols that allows bindings-side state to be stored.
pub(crate) trait Transport<I: Ip>: Debug + Sized + Send + Sync + 'static {
const PROTOCOL: DatagramProtocol;
/// Whether the Transport Protocol supports dualstack sockets.
const SUPPORTS_DUALSTACK: bool;
type SocketId: Hash + Eq + Debug + Send + Sync + Clone;
/// Match Linux and implicitly map IPv4 addresses to IPv6 addresses for
/// dual-stack capable protocols.
fn maybe_map_sock_addr(addr: fnet::SocketAddress) -> fnet::SocketAddress {
match (I::VERSION, addr, Self::SUPPORTS_DUALSTACK) {
(IpVersion::V6, fnet::SocketAddress::Ipv4(v4_addr), true) => {
let port = v4_addr.port();
let address = v4_addr.addr().to_ipv6_mapped();
fnet::SocketAddress::Ipv6(fnet::Ipv6SocketAddress::new(
Some(ZonedAddr::Unzoned(address).into()),
port,
))
}
(_, _, _) => addr,
}
}
fn external_data(id: &Self::SocketId) -> &DatagramSocketExternalData<I>;
#[cfg(test)]
fn collect_all_sockets(ctx: &mut Ctx) -> Vec<Self::SocketId>;
}
/// Bindings data held by datagram sockets.
#[derive(Debug)]
pub(crate) struct DatagramSocketExternalData<I: Ip> {
message_queue: CoreMutex<MessageQueue<AvailableMessage<I>>>,
}
/// A special case of TryFrom that avoids the associated error type in generic contexts.
pub(crate) trait OptionFromU16: Sized {
fn from_u16(_: u16) -> Option<Self>;
}
pub(crate) struct LocalAddress<I: Ip, D, L> {
address: Option<core_socket::StrictlyZonedAddr<I::Addr, SpecifiedAddr<I::Addr>, D>>,
identifier: Option<L>,
}
pub(crate) struct RemoteAddress<I: Ip, D, R> {
address: core_socket::StrictlyZonedAddr<I::Addr, SpecifiedAddr<I::Addr>, D>,
identifier: R,
}
/// An abstraction over transport protocols that allows generic manipulation of Core state.
pub(crate) trait TransportState<I: Ip>: Transport<I> + Send + Sync + 'static {
type ConnectError: IntoErrno;
type ListenError: IntoErrno;
type DisconnectError: IntoErrno;
type SetSocketDeviceError: IntoErrno;
type SetMulticastMembershipError: IntoErrno;
type MulticastInterfaceError: IntoErrno;
type SetReuseAddrError: IntoErrno;
type SetReusePortError: IntoErrno;
type ShutdownError: IntoErrno;
type SetIpTransparentError: IntoErrno;
type LocalIdentifier: OptionFromU16 + Into<u16> + Send;
type RemoteIdentifier: From<u16> + Into<u16> + Send;
type SocketInfo: IntoFidl<LocalAddress<I, WeakDeviceId<BindingsCtx>, Self::LocalIdentifier>>
+ TryIntoFidl<RemoteAddress<I, WeakDeviceId<BindingsCtx>, u16>, Error = fposix::Errno>;
type SendError: IntoErrno;
type SendToError: IntoErrno;
fn create_unbound(
ctx: &mut Ctx,
external_data: DatagramSocketExternalData<I>,
) -> Self::SocketId;
fn connect(
ctx: &mut Ctx,
id: &Self::SocketId,
remote_ip: Option<ZonedAddr<SpecifiedAddr<I::Addr>, DeviceId<BindingsCtx>>>,
remote_id: Self::RemoteIdentifier,
) -> Result<(), Self::ConnectError>;
fn bind(
ctx: &mut Ctx,
id: &Self::SocketId,
addr: Option<ZonedAddr<SpecifiedAddr<I::Addr>, DeviceId<BindingsCtx>>>,
port: Option<Self::LocalIdentifier>,
) -> Result<(), Self::ListenError>;
fn disconnect(ctx: &mut Ctx, id: &Self::SocketId) -> Result<(), Self::DisconnectError>;
fn shutdown(
ctx: &mut Ctx,
id: &Self::SocketId,
which: ShutdownType,
) -> Result<(), Self::ShutdownError>;
fn get_shutdown(ctx: &mut Ctx, id: &Self::SocketId) -> Option<ShutdownType>;
fn get_socket_info(ctx: &mut Ctx, id: &Self::SocketId) -> Self::SocketInfo;
async fn close(ctx: &mut Ctx, id: Self::SocketId);
fn set_socket_device(
ctx: &mut Ctx,
id: &Self::SocketId,
device: Option<&DeviceId<BindingsCtx>>,
) -> Result<(), Self::SetSocketDeviceError>;
fn get_bound_device(ctx: &mut Ctx, id: &Self::SocketId) -> Option<WeakDeviceId<BindingsCtx>>;
fn set_dual_stack_enabled(
ctx: &mut Ctx,
id: &Self::SocketId,
enabled: bool,
) -> Result<(), SetDualStackEnabledError>;
fn get_dual_stack_enabled(
ctx: &mut Ctx,
id: &Self::SocketId,
) -> Result<bool, NotDualStackCapableError>;
fn set_reuse_addr(
ctx: &mut Ctx,
id: &Self::SocketId,
reuse_addr: bool,
) -> Result<(), Self::SetReuseAddrError>;
fn get_reuse_addr(ctx: &mut Ctx, id: &Self::SocketId) -> bool;
fn set_reuse_port(
ctx: &mut Ctx,
id: &Self::SocketId,
reuse_port: bool,
) -> Result<(), Self::SetReusePortError>;
fn get_reuse_port(ctx: &mut Ctx, id: &Self::SocketId) -> bool;
fn set_multicast_membership(
ctx: &mut Ctx,
id: &Self::SocketId,
multicast_group: MulticastAddr<I::Addr>,
interface: MulticastMembershipInterfaceSelector<I::Addr, DeviceId<BindingsCtx>>,
want_membership: bool,
) -> Result<(), Self::SetMulticastMembershipError>;
fn set_unicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
hop_limit: Option<NonZeroU8>,
ip_version: IpVersion,
) -> Result<(), NotDualStackCapableError>;
fn set_multicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
hop_limit: Option<NonZeroU8>,
ip_version: IpVersion,
) -> Result<(), NotDualStackCapableError>;
fn get_unicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<NonZeroU8, NotDualStackCapableError>;
fn get_multicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<NonZeroU8, NotDualStackCapableError>;
fn set_ip_transparent(
ctx: &mut Ctx,
id: &Self::SocketId,
value: bool,
) -> Result<(), Self::SetIpTransparentError>;
fn get_ip_transparent(ctx: &mut Ctx, id: &Self::SocketId) -> bool;
fn set_multicast_interface(
ctx: &mut Ctx,
id: &Self::SocketId,
interface: Option<&DeviceId<BindingsCtx>>,
ip_version: IpVersion,
) -> Result<(), Self::MulticastInterfaceError>;
fn get_multicast_interface(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<Option<WeakDeviceId<BindingsCtx>>, Self::MulticastInterfaceError>;
fn send<B: BufferMut>(
ctx: &mut Ctx,
id: &Self::SocketId,
body: B,
) -> Result<(), Self::SendError>;
fn send_to<B: BufferMut>(
ctx: &mut Ctx,
id: &Self::SocketId,
remote: (
Option<ZonedAddr<SpecifiedAddr<I::Addr>, DeviceId<BindingsCtx>>>,
Self::RemoteIdentifier,
),
body: B,
) -> Result<(), Self::SendToError>;
}
#[derive(Debug)]
pub(crate) enum Udp {}
type UdpSocketId<I> = udp::UdpSocketId<I, WeakDeviceId<BindingsCtx>, BindingsCtx>;
impl<I: IpExt> Transport<I> for Udp {
const PROTOCOL: DatagramProtocol = DatagramProtocol::Udp;
const SUPPORTS_DUALSTACK: bool = true;
type SocketId = UdpSocketId<I>;
fn external_data(id: &Self::SocketId) -> &DatagramSocketExternalData<I> {
id.external_data()
}
#[cfg(test)]
fn collect_all_sockets(ctx: &mut Ctx) -> Vec<Self::SocketId> {
net_types::map_ip_twice!(I, IpInvariant(ctx), |IpInvariant(ctx)| ctx
.api()
.udp::<I>()
.collect_all_sockets())
}
}
impl OptionFromU16 for NonZeroU16 {
fn from_u16(t: u16) -> Option<Self> {
Self::new(t)
}
}
#[netstack3_core::context_ip_bounds(I, BindingsCtx)]
impl<I> TransportState<I> for Udp
where
I: IpExt,
{
type ConnectError = ConnectError;
type ListenError = Either<ExpectedUnboundError, LocalAddressError>;
type DisconnectError = ExpectedConnError;
type ShutdownError = ExpectedConnError;
type SetSocketDeviceError = SocketError;
type SetMulticastMembershipError = SetMulticastMembershipError;
type MulticastInterfaceError = NotDualStackCapableError;
type SetReuseAddrError = ExpectedUnboundError;
type SetReusePortError = ExpectedUnboundError;
type SetIpTransparentError = Never;
type LocalIdentifier = NonZeroU16;
type RemoteIdentifier = udp::UdpRemotePort;
type SocketInfo = SocketInfo<I::Addr, WeakDeviceId<BindingsCtx>>;
type SendError = Either<udp::SendError, fposix::Errno>;
type SendToError = Either<LocalAddressError, udp::SendToError>;
fn create_unbound(
ctx: &mut Ctx,
external_data: DatagramSocketExternalData<I>,
) -> Self::SocketId {
ctx.api().udp().create_with(external_data)
}
fn connect(
ctx: &mut Ctx,
id: &Self::SocketId,
remote_ip: Option<ZonedAddr<SpecifiedAddr<<I as Ip>::Addr>, DeviceId<BindingsCtx>>>,
remote_id: Self::RemoteIdentifier,
) -> Result<(), Self::ConnectError> {
ctx.api().udp().connect(id, remote_ip, remote_id)
}
fn bind(
ctx: &mut Ctx,
id: &Self::SocketId,
addr: Option<ZonedAddr<SpecifiedAddr<<I as Ip>::Addr>, DeviceId<BindingsCtx>>>,
port: Option<Self::LocalIdentifier>,
) -> Result<(), Self::ListenError> {
ctx.api().udp().listen(id, addr, port)
}
fn disconnect(ctx: &mut Ctx, id: &Self::SocketId) -> Result<(), Self::DisconnectError> {
ctx.api().udp().disconnect(id)
}
fn shutdown(
ctx: &mut Ctx,
id: &Self::SocketId,
which: ShutdownType,
) -> Result<(), Self::ShutdownError> {
ctx.api().udp().shutdown(id, which)
}
fn get_shutdown(ctx: &mut Ctx, id: &Self::SocketId) -> Option<ShutdownType> {
ctx.api().udp().get_shutdown(id)
}
fn get_socket_info(ctx: &mut Ctx, id: &Self::SocketId) -> Self::SocketInfo {
ctx.api().udp().get_info(id)
}
async fn close(ctx: &mut Ctx, id: Self::SocketId) {
let weak = id.downgrade();
let DatagramSocketExternalData { message_queue: _ } = ctx
.api()
.udp()
.close(id)
.map_deferred(|d| d.into_future("udp socket", &weak))
.into_future()
.await;
}
fn set_socket_device(
ctx: &mut Ctx,
id: &Self::SocketId,
device: Option<&DeviceId<BindingsCtx>>,
) -> Result<(), Self::SetSocketDeviceError> {
ctx.api().udp().set_device(id, device)
}
fn get_bound_device(ctx: &mut Ctx, id: &Self::SocketId) -> Option<WeakDeviceId<BindingsCtx>> {
ctx.api().udp().get_bound_device(id)
}
fn set_reuse_addr(
ctx: &mut Ctx,
id: &Self::SocketId,
reuse_addr: bool,
) -> Result<(), Self::SetReusePortError> {
ctx.api().udp().set_posix_reuse_addr(id, reuse_addr).inspect_err(|_| {
warn!("tried to set SO_REUSEADDR on a bound socket; see https://fxbug.dev/42051599")
})
}
fn set_reuse_port(
ctx: &mut Ctx,
id: &Self::SocketId,
reuse_port: bool,
) -> Result<(), Self::SetReusePortError> {
ctx.api().udp().set_posix_reuse_port(id, reuse_port).inspect_err(|_| {
warn!("tried to set SO_REUSEPORT on a bound socket; see https://fxbug.dev/42051599")
})
}
fn set_dual_stack_enabled(
ctx: &mut Ctx,
id: &Self::SocketId,
enabled: bool,
) -> Result<(), SetDualStackEnabledError> {
ctx.api().udp().set_dual_stack_enabled(id, enabled)
}
fn get_dual_stack_enabled(
ctx: &mut Ctx,
id: &Self::SocketId,
) -> Result<bool, NotDualStackCapableError> {
ctx.api().udp().get_dual_stack_enabled(id)
}
fn get_reuse_addr(ctx: &mut Ctx, id: &Self::SocketId) -> bool {
ctx.api().udp().get_posix_reuse_addr(id)
}
fn get_reuse_port(ctx: &mut Ctx, id: &Self::SocketId) -> bool {
ctx.api().udp().get_posix_reuse_port(id)
}
fn set_multicast_membership(
ctx: &mut Ctx,
id: &Self::SocketId,
multicast_group: MulticastAddr<I::Addr>,
interface: MulticastMembershipInterfaceSelector<I::Addr, DeviceId<BindingsCtx>>,
want_membership: bool,
) -> Result<(), Self::SetMulticastMembershipError> {
ctx.api().udp().set_multicast_membership(id, multicast_group, interface, want_membership)
}
fn set_unicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
hop_limit: Option<NonZeroU8>,
ip_version: IpVersion,
) -> Result<(), NotDualStackCapableError> {
ctx.api().udp().set_unicast_hop_limit(id, hop_limit, ip_version)
}
fn set_multicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
hop_limit: Option<NonZeroU8>,
ip_version: IpVersion,
) -> Result<(), NotDualStackCapableError> {
ctx.api().udp().set_multicast_hop_limit(id, hop_limit, ip_version)
}
fn get_unicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<NonZeroU8, NotDualStackCapableError> {
ctx.api().udp().get_unicast_hop_limit(id, ip_version)
}
fn get_multicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<NonZeroU8, NotDualStackCapableError> {
ctx.api().udp().get_multicast_hop_limit(id, ip_version)
}
fn set_ip_transparent(
ctx: &mut Ctx,
id: &Self::SocketId,
value: bool,
) -> Result<(), Self::SetIpTransparentError> {
Ok(ctx.api().udp().set_transparent(id, value))
}
fn get_ip_transparent(ctx: &mut Ctx, id: &Self::SocketId) -> bool {
ctx.api().udp().get_transparent(id)
}
fn set_multicast_interface(
ctx: &mut Ctx,
id: &Self::SocketId,
interface: Option<&DeviceId<BindingsCtx>>,
ip_version: IpVersion,
) -> Result<(), Self::MulticastInterfaceError> {
ctx.api().udp().set_multicast_interface(id, interface, ip_version)
}
fn get_multicast_interface(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<Option<WeakDeviceId<BindingsCtx>>, Self::MulticastInterfaceError> {
ctx.api().udp().get_multicast_interface(id, ip_version)
}
fn send<B: BufferMut>(
ctx: &mut Ctx,
id: &Self::SocketId,
body: B,
) -> Result<(), Self::SendError> {
ctx.api()
.udp()
.send(id, body)
.map_err(|e| e.map_right(|ExpectedConnError| fposix::Errno::Edestaddrreq))
}
fn send_to<B: BufferMut>(
ctx: &mut Ctx,
id: &Self::SocketId,
(remote_ip, remote_port): (
Option<ZonedAddr<SpecifiedAddr<I::Addr>, DeviceId<BindingsCtx>>>,
Self::RemoteIdentifier,
),
body: B,
) -> Result<(), Self::SendToError> {
ctx.api().udp().send_to(id, remote_ip, remote_port, body)
}
}
impl<I: IpExt> DatagramSocketExternalData<I> {
pub(crate) fn receive_udp<B: BufferMut>(
&self,
device_id: &DeviceId<BindingsCtx>,
(dst_ip, dst_port): (<I>::Addr, NonZeroU16),
(src_ip, src_port): (<I>::Addr, Option<NonZeroU16>),
body: &B,
) {
self.message_queue.lock().receive(AvailableMessage {
interface_id: device_id.bindings_id().id,
source_addr: src_ip,
source_port: src_port.map_or(0, NonZeroU16::get),
destination_addr: dst_ip,
destination_port: dst_port.get(),
timestamp: fasync::Time::now(),
data: body.as_ref().to_vec(),
})
}
}
#[derive(Debug)]
pub(crate) enum IcmpEcho {}
type IcmpSocketId<I> = icmp::IcmpSocketId<I, WeakDeviceId<BindingsCtx>, BindingsCtx>;
impl<I: IpExt> Transport<I> for IcmpEcho {
const PROTOCOL: DatagramProtocol = DatagramProtocol::IcmpEcho;
const SUPPORTS_DUALSTACK: bool = false;
type SocketId = IcmpSocketId<I>;
fn external_data(id: &Self::SocketId) -> &DatagramSocketExternalData<I> {
id.external_data()
}
#[cfg(test)]
fn collect_all_sockets(ctx: &mut Ctx) -> Vec<Self::SocketId> {
net_types::map_ip_twice!(I, IpInvariant(ctx), |IpInvariant(ctx)| ctx
.api()
.icmp_echo::<I>()
.collect_all_sockets())
}
}
impl OptionFromU16 for u16 {
fn from_u16(t: u16) -> Option<Self> {
Some(t)
}
}
#[netstack3_core::context_ip_bounds(I, BindingsCtx)]
impl<I> TransportState<I> for IcmpEcho
where
I: IpExt,
{
type ConnectError = ConnectError;
type ListenError = Either<ExpectedUnboundError, LocalAddressError>;
type DisconnectError = ExpectedConnError;
type ShutdownError = ExpectedConnError;
type SetSocketDeviceError = SocketError;
type SetMulticastMembershipError = NotSupportedError;
type MulticastInterfaceError = NotSupportedError;
type SetReuseAddrError = NotSupportedError;
type SetReusePortError = NotSupportedError;
type SetIpTransparentError = NotSupportedError;
type LocalIdentifier = NonZeroU16;
type RemoteIdentifier = u16;
type SocketInfo = SocketInfo<I::Addr, WeakDeviceId<BindingsCtx>>;
type SendError = core_socket::SendError<packet_formats::error::ParseError>;
type SendToError = either::Either<
LocalAddressError,
core_socket::SendToError<packet_formats::error::ParseError>,
>;
fn create_unbound(
ctx: &mut Ctx,
external_data: DatagramSocketExternalData<I>,
) -> Self::SocketId {
ctx.api().icmp_echo().create_with(external_data)
}
fn connect(
ctx: &mut Ctx,
id: &Self::SocketId,
remote_ip: Option<ZonedAddr<SpecifiedAddr<I::Addr>, DeviceId<BindingsCtx>>>,
remote_id: Self::RemoteIdentifier,
) -> Result<(), Self::ConnectError> {
ctx.api().icmp_echo().connect(id, remote_ip, remote_id)
}
fn bind(
ctx: &mut Ctx,
id: &Self::SocketId,
addr: Option<ZonedAddr<SpecifiedAddr<<I as Ip>::Addr>, DeviceId<BindingsCtx>>>,
port: Option<Self::LocalIdentifier>,
) -> Result<(), Self::ListenError> {
ctx.api().icmp_echo().bind(id, addr, port)
}
fn disconnect(ctx: &mut Ctx, id: &Self::SocketId) -> Result<(), Self::DisconnectError> {
ctx.api().icmp_echo().disconnect(id)
}
fn shutdown(
ctx: &mut Ctx,
id: &Self::SocketId,
which: ShutdownType,
) -> Result<(), Self::ShutdownError> {
ctx.api().icmp_echo().shutdown(id, which)
}
fn get_shutdown(ctx: &mut Ctx, id: &Self::SocketId) -> Option<ShutdownType> {
ctx.api().icmp_echo().get_shutdown(id)
}
fn get_socket_info(ctx: &mut Ctx, id: &Self::SocketId) -> Self::SocketInfo {
ctx.api().icmp_echo().get_info(id)
}
async fn close(ctx: &mut Ctx, id: Self::SocketId) {
let weak = id.downgrade();
let DatagramSocketExternalData { message_queue: _ } = ctx
.api()
.icmp_echo()
.close(id)
.map_deferred(|d| d.into_future("icmp socket", &weak))
.into_future()
.await;
}
fn set_socket_device(
ctx: &mut Ctx,
id: &Self::SocketId,
device: Option<&DeviceId<BindingsCtx>>,
) -> Result<(), Self::SetSocketDeviceError> {
ctx.api().icmp_echo().set_device(id, device)
}
fn get_bound_device(ctx: &mut Ctx, id: &Self::SocketId) -> Option<WeakDeviceId<BindingsCtx>> {
ctx.api().icmp_echo().get_bound_device(id)
}
fn set_dual_stack_enabled(
_ctx: &mut Ctx,
_id: &Self::SocketId,
_enabled: bool,
) -> Result<(), SetDualStackEnabledError> {
// NB: Despite ICMP's lack of support for dual stack operations, Linux
// allows the `IPV6_V6ONLY` socket option to be set/unset. Here we
// disallow setting the option, which more accurately reflects that ICMP
// sockets do not support dual stack operations.
return Err(SetDualStackEnabledError::NotCapable);
}
fn get_dual_stack_enabled(
_ctx: &mut Ctx,
_id: &Self::SocketId,
) -> Result<bool, NotDualStackCapableError> {
match I::VERSION {
IpVersion::V4 => Err(NotDualStackCapableError),
// NB: Despite ICMP's lack of support for dual stack operations,
// Linux allows the `IPV6_V6ONLY` socket option to be set/unset.
// Here we always report that the dual stack operations are
// disabled, which more accurately reflects that ICMP sockets do not
// support dual stack operations.
IpVersion::V6 => Ok(false),
}
}
fn set_reuse_addr(
_ctx: &mut Ctx,
_id: &Self::SocketId,
_reuse_addr: bool,
) -> Result<(), Self::SetReuseAddrError> {
Err(NotSupportedError)
}
fn get_reuse_addr(_ctx: &mut Ctx, _id: &Self::SocketId) -> bool {
false
}
fn set_reuse_port(
_ctx: &mut Ctx,
_id: &Self::SocketId,
_reuse_port: bool,
) -> Result<(), Self::SetReusePortError> {
Err(NotSupportedError)
}
fn get_reuse_port(_ctx: &mut Ctx, _id: &Self::SocketId) -> bool {
false
}
fn set_multicast_membership(
_ctx: &mut Ctx,
_id: &Self::SocketId,
_multicast_group: MulticastAddr<I::Addr>,
_interface: MulticastMembershipInterfaceSelector<I::Addr, DeviceId<BindingsCtx>>,
_want_membership: bool,
) -> Result<(), Self::SetMulticastMembershipError> {
Err(NotSupportedError)
}
fn set_unicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
hop_limit: Option<NonZeroU8>,
ip_version: IpVersion,
) -> Result<(), NotDualStackCapableError> {
// Disallow updates when the hop limit's version doesn't match the
// socket's version. This matches Linux's behavior for IPv4 sockets, but
// diverges from Linux's behavior for IPv6 sockets. Rejecting updates to
// the IPv4 TTL for IPv6 sockets more accurately reflects that ICMP
// sockets do not support dual stack operations.
if I::VERSION != ip_version {
return Err(NotDualStackCapableError);
}
Ok(ctx.api().icmp_echo().set_unicast_hop_limit(id, hop_limit))
}
fn set_multicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
hop_limit: Option<NonZeroU8>,
ip_version: IpVersion,
) -> Result<(), NotDualStackCapableError> {
// Disallow updates when the hop limit's version doesn't match the
// socket's version. This matches Linux's behavior for IPv4 sockets, but
// diverges from Linux's behavior for IPv6 sockets. Rejecting updates to
// the IPv4 TTL for IPv6 sockets more accurately reflects that ICMP
// sockets do not support dual stack operations.
if I::VERSION != ip_version {
return Err(NotDualStackCapableError);
}
Ok(ctx.api().icmp_echo().set_multicast_hop_limit(id, hop_limit))
}
fn get_unicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<NonZeroU8, NotDualStackCapableError> {
// Disallow fetching the hop limit when its version doesn't match the
// socket's version. This matches Linux's behavior for IPv4 sockets, but
// diverges from Linux's behavior for IPv6 sockets. Rejecting fetches of
// the IPv4 TTL for IPv6 sockets more accurately reflects that ICMP
// sockets do not support dual stack operations.
if I::VERSION != ip_version {
return Err(NotDualStackCapableError);
}
Ok(ctx.api().icmp_echo().get_unicast_hop_limit(id))
}
fn get_multicast_hop_limit(
ctx: &mut Ctx,
id: &Self::SocketId,
ip_version: IpVersion,
) -> Result<NonZeroU8, NotDualStackCapableError> {
// Disallow fetching the hop limit when its version doesn't match the
// socket's version. This matches Linux's behavior for IPv4 sockets, but
// diverges from Linux's behavior for IPv6 sockets. Rejecting fetches of
// the IPv4 TTL for IPv6 sockets more accurately reflects that ICMP
// sockets do not support dual stack operations.
if I::VERSION != ip_version {
return Err(NotDualStackCapableError);
}
Ok(ctx.api().icmp_echo().get_multicast_hop_limit(id))
}
fn set_multicast_interface(
_ctx: &mut Ctx,
_id: &Self::SocketId,
_interface: Option<&DeviceId<BindingsCtx>>,
_ip_version: IpVersion,
) -> Result<(), NotSupportedError> {
Err(NotSupportedError)
}
fn get_multicast_interface(
_ctx: &mut Ctx,
_id: &Self::SocketId,
_ip_version: IpVersion,
) -> Result<Option<WeakDeviceId<BindingsCtx>>, Self::MulticastInterfaceError> {
Err(NotSupportedError)
}
fn set_ip_transparent(
_ctx: &mut Ctx,
_id: &Self::SocketId,
_value: bool,
) -> Result<(), Self::SetIpTransparentError> {
Err(NotSupportedError)
}
fn get_ip_transparent(_ctx: &mut Ctx, _id: &Self::SocketId) -> bool {
false
}
fn send<B: BufferMut>(
ctx: &mut Ctx,
id: &Self::SocketId,
body: B,
) -> Result<(), Self::SendError> {
ctx.api().icmp_echo().send(id, body)
}
fn send_to<B: BufferMut>(
ctx: &mut Ctx,
id: &Self::SocketId,
(remote_ip, _remote_id): (
Option<ZonedAddr<SpecifiedAddr<I::Addr>, DeviceId<BindingsCtx>>>,
Self::RemoteIdentifier,
),
body: B,
) -> Result<(), Self::SendToError> {
ctx.api().icmp_echo().send_to(id, remote_ip, body)
}
}
impl<E> IntoErrno for core_socket::SendError<E> {
fn into_errno(self) -> fposix::Errno {
match self {
core_socket::SendError::NotConnected => fposix::Errno::Edestaddrreq,
core_socket::SendError::NotWriteable => fposix::Errno::Epipe,
core_socket::SendError::IpSock(err) => err.into_errno(),
core_socket::SendError::SerializeError(_e) => fposix::Errno::Einval,
}
}
}
impl<E> IntoErrno for core_socket::SendToError<E> {
fn into_errno(self) -> fposix::Errno {
match self {
core_socket::SendToError::NotWriteable => fposix::Errno::Epipe,
core_socket::SendToError::Zone(err) => err.into_errno(),
// NB: Mapping MTU to EMSGSIZE is different from the impl on
// `IpSockSendError` which maps to EINVAL instead.
core_socket::SendToError::CreateAndSend(IpSockCreateAndSendError::Send(
IpSockSendError::Mtu,
)) => fposix::Errno::Emsgsize,
core_socket::SendToError::CreateAndSend(IpSockCreateAndSendError::Send(
IpSockSendError::Unroutable(err),
)) => err.into_errno(),
core_socket::SendToError::CreateAndSend(IpSockCreateAndSendError::Create(err)) => {
err.into_errno()
}
core_socket::SendToError::RemoteUnexpectedlyMapped => fposix::Errno::Enetunreach,
core_socket::SendToError::RemoteUnexpectedlyNonMapped => fposix::Errno::Eafnosupport,
core_socket::SendToError::SerializeError(_e) => fposix::Errno::Einval,
}
}
}
impl<I: IpExt> DatagramSocketExternalData<I> {
pub(crate) fn receive_icmp_echo_reply<B: BufferMut>(
&self,
device: &DeviceId<BindingsCtx>,
src_ip: I::Addr,
dst_ip: I::Addr,
id: u16,
data: B,
) {
tracing::debug!("Received ICMP echo reply in binding: {:?}, id: {id}", I::VERSION);
self.message_queue.lock().receive(AvailableMessage {
source_addr: src_ip,
source_port: 0,
interface_id: device.bindings_id().id,
destination_addr: dst_ip,
destination_port: id,
timestamp: fasync::Time::now(),
data: data.as_ref().to_vec(),
})
}
}
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
struct AvailableMessage<I: Ip> {
interface_id: BindingId,
source_addr: I::Addr,
source_port: u16,
destination_addr: I::Addr,
destination_port: u16,
timestamp: fasync::Time,
data: Vec<u8>,
}
impl<I: Ip> BodyLen for AvailableMessage<I> {
fn body_len(&self) -> usize {
self.data.len()
}
}
/// IP extension providing separate types of bindings data for IPv4 and IPv6.
trait BindingsDataIpExt: Ip {
/// The version specific bindings data.
///
/// [`Ipv4BindingsData`] for IPv4, and [`Ipv6BindingsData`] for IPv6.
type VersionSpecificData: Default
+ Send
+ GenericOverIp<Self, Type = Self::VersionSpecificData>
+ GenericOverIp<Ipv4, Type = Ipv4BindingsData>
+ GenericOverIp<Ipv6, Type = Ipv6BindingsData>;
}
impl BindingsDataIpExt for Ipv4 {
type VersionSpecificData = Ipv4BindingsData;
}
impl BindingsDataIpExt for Ipv6 {
type VersionSpecificData = Ipv6BindingsData;
}
/// Datagram bindings data specific to IPv4 sockets.
#[derive(Default)]
struct Ipv4BindingsData {
// NB: At the moment, IPv4 sockets don't need to hold any unique data.
}
impl<I: Ip + BindingsDataIpExt> GenericOverIp<I> for Ipv4BindingsData {
type Type = I::VersionSpecificData;
}
/// Datagram bindings data specific to IPv6 sockets.
#[derive(Default)]
struct Ipv6BindingsData {
// Corresponds to the IPV6_RECVPKTINFO socket option.
recv_pkt_info: bool,
}
impl<I: Ip + BindingsDataIpExt> GenericOverIp<I> for Ipv6BindingsData {
type Type = I::VersionSpecificData;
}
#[derive(Debug)]
struct BindingData<I: BindingsDataIpExt, T: Transport<I>> {
peer_event: zx::EventPair,
info: SocketControlInfo<I, T>,
/// The bindings data specific to `I`.
version_specific_data: I::VersionSpecificData,
/// If true, return the original received destination address in the control data. This is
/// modified using the SetIpReceiveOriginalDestinationAddress method (a.k.a. IP_RECVORIGDSTADDR)
/// and is useful for transparent sockets (IP_TRANSPARENT).
ip_receive_original_destination_address: bool,
/// SO_TIMESTAMP, SO_TIMESTAMPNS state.
timestamp_option: fposix_socket::TimestampOption,
/// `IP_MULTICAST_IF` option. It can be set separately from `IPV6_MULTICAST_IF`.
ipv4_multicast_if_addr: Option<SpecifiedAddr<Ipv4Addr>>,
}
impl<I, T> BindingData<I, T>
where
I: IpExt + IpSockAddrExt + BindingsDataIpExt,
T: Transport<Ipv4>,
T: Transport<Ipv6>,
T: TransportState<I>,
{
/// Creates a new `BindingData`.
fn new(ctx: &mut Ctx, properties: SocketWorkerProperties) -> Self {
let (local_event, peer_event) = zx::EventPair::create();
// signal peer that OUTGOING is available.
// TODO(brunodalbo): We're currently not enforcing any sort of
// flow-control for outgoing datagrams. That'll get fixed once we
// limit the number of in flight datagrams per socket (i.e. application
// buffers).
if let Err(e) = local_event.signal_peer(zx::Signals::NONE, ZXSIO_SIGNAL_OUTGOING) {
error!("socket failed to signal peer: {:?}", e);
}
let external_data = DatagramSocketExternalData {
message_queue: CoreMutex::new(MessageQueue::new(local_event)),
};
let id = T::create_unbound(ctx, external_data);
Self {
peer_event,
info: SocketControlInfo { _properties: properties, id },
version_specific_data: I::VersionSpecificData::default(),
ip_receive_original_destination_address: false,
timestamp_option: fposix_socket::TimestampOption::Disabled,
ipv4_multicast_if_addr: None,
}
}
}
/// Information on socket control plane.
#[derive(Debug)]
pub(crate) struct SocketControlInfo<I: Ip, T: Transport<I>> {
_properties: SocketWorkerProperties,
id: T::SocketId,
}
pub(super) fn spawn_worker(
domain: fposix_socket::Domain,
proto: fposix_socket::DatagramSocketProtocol,
ctx: crate::bindings::Ctx,
events: fposix_socket::SynchronousDatagramSocketRequestStream,
properties: SocketWorkerProperties,
spawner: &worker::ProviderScopedSpawner<crate::bindings::util::TaskWaitGroupSpawner>,
) -> Result<(), fposix::Errno> {
match (domain, proto) {
(fposix_socket::Domain::Ipv4, fposix_socket::DatagramSocketProtocol::Udp) => {
spawner.spawn(SocketWorker::serve_stream_with(
ctx,
BindingData::<Ipv4, Udp>::new,
properties,
events,
(),
spawner.clone(),
));
Ok(())
}
(fposix_socket::Domain::Ipv6, fposix_socket::DatagramSocketProtocol::Udp) => {
spawner.spawn(SocketWorker::serve_stream_with(
ctx,
BindingData::<Ipv6, Udp>::new,
properties,
events,
(),
spawner.clone(),
));
Ok(())
}
(fposix_socket::Domain::Ipv4, fposix_socket::DatagramSocketProtocol::IcmpEcho) => {
spawner.spawn(SocketWorker::serve_stream_with(
ctx,
BindingData::<Ipv4, IcmpEcho>::new,
properties,
events,
(),
spawner.clone(),
));
Ok(())
}
(fposix_socket::Domain::Ipv6, fposix_socket::DatagramSocketProtocol::IcmpEcho) => {
spawner.spawn(SocketWorker::serve_stream_with(
ctx,
BindingData::<Ipv6, IcmpEcho>::new,
properties,
events,
(),
spawner.clone(),
));
Ok(())
}
}
}
impl worker::CloseResponder for fposix_socket::SynchronousDatagramSocketCloseResponder {
fn send(self, arg: Result<(), i32>) -> Result<(), fidl::Error> {
fposix_socket::SynchronousDatagramSocketCloseResponder::send(self, arg)
}
}
impl<I, T> worker::SocketWorkerHandler for BindingData<I, T>
where
I: IpExt + IpSockAddrExt + BindingsDataIpExt,
T: Transport<Ipv4>,
T: Transport<Ipv6>,
T: TransportState<I>,
T: Send + Sync + 'static,
DeviceId<BindingsCtx>: TryFromFidlWithContext<NonZeroU64, Error = DeviceNotFoundError>,
WeakDeviceId<BindingsCtx>: TryIntoFidlWithContext<NonZeroU64, Error = DeviceNotFoundError>,
{
type Request = fposix_socket::SynchronousDatagramSocketRequest;
type RequestStream = fposix_socket::SynchronousDatagramSocketRequestStream;
type CloseResponder = fposix_socket::SynchronousDatagramSocketCloseResponder;
type SetupArgs = ();
type Spawner = ();
fn handle_request(
&mut self,
ctx: &mut Ctx,
request: Self::Request,
_spawners: &worker::TaskSpawnerCollection<()>,
) -> ControlFlow<Self::CloseResponder, Option<Self::RequestStream>> {
RequestHandler { ctx, data: self }.handle_request(request)
}
async fn close(self, ctx: &mut Ctx) {
let id = self.info.id;
T::close(ctx, id).await;
}
}
/// A borrow into a [`SocketWorker`]'s state.
struct RequestHandler<'a, I: BindingsDataIpExt, T: Transport<I>> {
ctx: &'a mut crate::bindings::Ctx,
data: &'a mut BindingData<I, T>,
}
impl<'a, I, T> RequestHandler<'a, I, T>
where
I: IpExt + IpSockAddrExt + BindingsDataIpExt,
T: Transport<Ipv4>,
T: Transport<Ipv6>,
T: TransportState<I>,
T: Send + Sync + 'static,
DeviceId<BindingsCtx>: TryFromFidlWithContext<NonZeroU64, Error = DeviceNotFoundError>,
WeakDeviceId<BindingsCtx>: TryIntoFidlWithContext<NonZeroU64, Error = DeviceNotFoundError>,
{
fn handle_request(
mut self,
request: fposix_socket::SynchronousDatagramSocketRequest,
) -> ControlFlow<
fposix_socket::SynchronousDatagramSocketCloseResponder,
Option<fposix_socket::SynchronousDatagramSocketRequestStream>,
> {
// On Error, logs the `Errno` with additional debugging context.
//
// Implemented as a macro to avoid erasing the callsite information.
macro_rules! maybe_log_error {
($operation:expr, $result:expr) => {
match $result {
Ok(_) => {}
Err(errno) => crate::bindings::socket::log_errno!(
errno,
"{:?} {} failed to handle {}: {:?}",
<T as Transport<I>>::PROTOCOL,
I::NAME,
$operation,
errno
),
}
};
}
type Request = fposix_socket::SynchronousDatagramSocketRequest;
match request {
Request::Describe { responder } => responder
.send(self.describe())
.unwrap_or_else(|e| error!("failed to respond: {e:?}")),
Request::Connect { addr, responder } => {
let result = self.connect(addr);
maybe_log_error!("connect", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::Disconnect { responder } => {
let result = self.disconnect();
maybe_log_error!("disconnect", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::Clone2 { request, control_handle: _ } => {
let channel = fidl::AsyncChannel::from_channel(request.into_channel());
let stream =
fposix_socket::SynchronousDatagramSocketRequestStream::from_channel(channel);
return ControlFlow::Continue(Some(stream));
}
Request::Close { responder } => {
return ControlFlow::Break(responder);
}
Request::Bind { addr, responder } => {
let result = self.bind(addr);
maybe_log_error!("bind", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::Query { responder } => {
responder
.send(fposix_socket::SYNCHRONOUS_DATAGRAM_SOCKET_PROTOCOL_NAME.as_bytes())
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetSockName { responder } => {
let result = self.get_sock_name();
maybe_log_error!("get_sock_name", &result);
responder
.send(result.as_ref().map_err(|e| *e))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetPeerName { responder } => {
let result = self.get_peer_name();
maybe_log_error!("get_peer_name", &result);
responder
.send(result.as_ref().map_err(|e| *e))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::Shutdown { mode, responder } => {
let result = self.shutdown(mode);
maybe_log_error!("shutdown", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::RecvMsg { want_addr, data_len, want_control, flags, responder } => {
let result = self.recv_msg(want_addr, data_len as usize, want_control, flags);
maybe_log_error!("recvmsg", &result);
responder
.send(match result {
Ok((ref addr, ref data, ref control, truncated)) => {
Ok((addr.as_ref(), data.as_slice(), control, truncated))
}
Err(err) => Err(err),
})
.unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SendMsg { addr, data, control: _, flags: _, responder } => {
// TODO(https://fxbug.dev/42094933): handle control.
let result = self.send_msg(addr.map(|addr| *addr), data);
maybe_log_error!("sendmsg", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetInfo { responder } => {
let result = self.get_sock_info();
maybe_log_error!("get_info", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetTimestamp { responder } => {
let result = self.get_timestamp_option();
maybe_log_error!("get_timestamp", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetTimestamp { value, responder } => {
let result = self.set_timestamp_option(value);
maybe_log_error!("set_timestamp", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetOriginalDestination { responder } => {
responder
.send(Err(fposix::Errno::Enoprotoopt))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetError { responder } => {
tracing::debug!("syncudp::GetError is not implemented, returning Ok");
// Pretend that we don't have any errors to report.
// TODO(https://fxbug.dev/322214321): Actually implement SO_ERROR.
responder.send(Ok(())).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetSendBuffer { value_bytes: _, responder } => {
// TODO(https://fxbug.dev/42074004): Actually implement SetSendBuffer.
//
// Currently, UDP sending in Netstack3 is synchronous, so it's not clear what a
// sensible implementation would look like.
responder.send(Ok(())).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetSendBuffer { responder } => {
// TODO(https://fxbug.dev/42074004): Actually implement SetSendBuffer.
// Until then we return the default buffer size used on Linux.
const DEFAULT_SEND_BUFFER: u64 = 212992;
responder
.send(Ok(DEFAULT_SEND_BUFFER))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetReceiveBuffer { value_bytes, responder } => {
responder
.send({
self.set_max_receive_buffer_size(value_bytes);
Ok(())
})
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetReceiveBuffer { responder } => {
responder
.send(Ok(self.get_max_receive_buffer_size()))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetReuseAddress { value, responder } => {
let result = self.set_reuse_addr(value);
maybe_log_error!("set_reuse_addr", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetReuseAddress { responder } => {
responder
.send(Ok(self.get_reuse_addr()))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetReusePort { value, responder } => {
let result = self.set_reuse_port(value);
maybe_log_error!("set_reuse_port", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetReusePort { responder } => {
responder
.send(Ok(self.get_reuse_port()))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetAcceptConn { responder } => {
respond_not_supported!("syncudp::GetAcceptConn", responder)
}
Request::SetBindToDevice { value, responder } => {
let identifier = (!value.is_empty()).then_some(value.as_str());
let result = self.bind_to_device(identifier);
maybe_log_error!("set_bind_to_device", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetBindToDevice { responder } => {
let result = self.get_bound_device();
maybe_log_error!("get_bind_to_device", &result);
responder
.send(match result {
Ok(ref d) => Ok(d.as_deref().unwrap_or("")),
Err(e) => Err(e),
})
.unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetBindToInterfaceIndex { value, responder } => {
let result = self.bind_to_device_index(value);
maybe_log_error!("set_bind_to_if_index", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetBindToInterfaceIndex { responder } => {
let result = self.get_bound_device_index();
maybe_log_error!("get_bind_to_if_index", &result);
responder
.send(match result {
Ok(d) => Ok(d.map(|d| d.get()).unwrap_or(0)),
Err(e) => Err(e),
})
.unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetBroadcast { value, responder } => {
// We allow a no-op since the core does not yet support limiting
// broadcast packets. Until we implement this, we leave this as a
// no-op so that applications needing to send broadcast packets may
// make progress.
//
// TODO(https://fxbug.dev/42077065): Actually implement SO_BROADCAST.
let response = if value { Ok(()) } else { Err(fposix::Errno::Eopnotsupp) };
responder.send(response).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetBroadcast { responder } => {
respond_not_supported!("syncudp::GetBroadcast", responder)
}
Request::SetKeepAlive { value: _, responder } => {
respond_not_supported!("syncudp::SetKeepAlive", responder)
}
Request::GetKeepAlive { responder } => {
respond_not_supported!("syncudp::GetKeepAlive", responder)
}
Request::SetLinger { linger: _, length_secs: _, responder } => {
respond_not_supported!("syncudp::SetLinger", responder)
}
Request::GetLinger { responder } => {
tracing::debug!("syncudp::GetLinger is not supported, returning Ok((false, 0))");
responder
.send(Ok((false, 0)))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetOutOfBandInline { value: _, responder } => {
respond_not_supported!("syncudp::SetOutOfBandInline", responder)
}
Request::GetOutOfBandInline { responder } => {
respond_not_supported!("syncudp::GetOutOfBandInline", responder)
}
Request::SetNoCheck { value: _, responder } => {
respond_not_supported!("syncudp::value", responder)
}
Request::GetNoCheck { responder } => {
respond_not_supported!("syncudp::GetNoCheck", responder)
}
Request::SetIpv6Only { value, responder } => {
let result = self.set_dual_stack_enabled(!value);
maybe_log_error!("set_ipv6_only", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetIpv6Only { responder } => {
let result = self.get_dual_stack_enabled().map(|enabled| !enabled);
maybe_log_error!("get_ipv6_only", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetIpv6TrafficClass { value: _, responder } => {
respond_not_supported!("syncudp::SetIpv6TrafficClass", responder)
}
Request::GetIpv6TrafficClass { responder } => {
respond_not_supported!("syncudp::GetIpv6TrafficClass", responder)
}
Request::SetIpv6MulticastInterface { value, responder } => {
let result = self.set_multicast_interface_ipv6(NonZeroU64::new(value));
maybe_log_error!("set_ipv6_multicast_interface", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetIpv6MulticastInterface { responder } => {
let result = self
.get_multicast_interface_ipv6()
.map(|v| v.map(NonZeroU64::get).unwrap_or(0));
maybe_log_error!("get_ipv6_multicast_interface", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetIpv6UnicastHops { value, responder } => {
let result = self.set_unicast_hop_limit(Ipv6::VERSION, value);
maybe_log_error!("set_ipv6_unicast_hops", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetIpv6UnicastHops { responder } => {
let result = self.get_unicast_hop_limit(Ipv6::VERSION);
maybe_log_error!("get_ipv6_unicast_hops", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetIpv6MulticastHops { value, responder } => {
let result = self.set_multicast_hop_limit(Ipv6::VERSION, value);
maybe_log_error!("set_ipv6_multicast_hops", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetIpv6MulticastHops { responder } => {
let result = self.get_multicast_hop_limit(Ipv6::VERSION);
maybe_log_error!("get_ipv6_multicast_hops", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetIpv6MulticastLoopback { value, responder } => {
// TODO(https://fxbug.dev/42058186): add support for
// looping back sent packets.
responder
.send((!value).then_some(()).ok_or(fposix::Errno::Enoprotoopt))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetIpv6MulticastLoopback { responder } => {
respond_not_supported!("syncudp::GetIpv6MulticastLoopback", responder)
}
Request::SetIpTtl { value, responder } => {
let result = self.set_unicast_hop_limit(Ipv4::VERSION, value);
maybe_log_error!("set_ip_ttl", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetIpTtl { responder } => {
let result = self.get_unicast_hop_limit(Ipv4::VERSION);
maybe_log_error!("get_ip_ttl", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetIpMulticastTtl { value, responder } => {
let result = self.set_multicast_hop_limit(Ipv4::VERSION, value);
maybe_log_error!("set_ip_multicast_ttl", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetIpMulticastTtl { responder } => {
let result = self.get_multicast_hop_limit(Ipv4::VERSION);
maybe_log_error!("get_ip_multicast_ttl", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetIpMulticastInterface { iface, address, responder } => {
let result = self.set_multicast_interface_ipv4(NonZeroU64::new(iface), address);
maybe_log_error!("set_multicast_interface", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetIpMulticastInterface { responder } => {
let result = self.get_multicast_interface_ipv4();
maybe_log_error!("get_ip_multicast_interface", &result);
responder
.send(result.as_ref().map_err(|e| e.clone()))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::SetIpMulticastLoopback { value, responder } => {
// TODO(https://fxbug.dev/42058186): add support for
// looping back sent packets.
responder
.send((!value).then_some(()).ok_or(fposix::Errno::Enoprotoopt))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetIpMulticastLoopback { responder } => {
respond_not_supported!("syncudp::GetIpMulticastLoopback", responder)
}
Request::SetIpTypeOfService { value: _, responder } => {
respond_not_supported!("syncudp::SetIpTypeOfService", responder)
}
Request::GetIpTypeOfService { responder } => {
respond_not_supported!("syncudp::GetIpTypeOfService", responder)
}
Request::AddIpMembership { membership, responder } => {
let result = self.set_multicast_membership(membership, true);
maybe_log_error!("add_ip_membership", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::DropIpMembership { membership, responder } => {
let result = self.set_multicast_membership(membership, false);
maybe_log_error!("drop_ip_membership", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetIpTransparent { value, responder } => {
let result = self.set_ip_transparent(value).map_err(IntoErrno::into_errno);
maybe_log_error!("set_ip_transparent", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetIpTransparent { responder } => {
responder
.send(Ok(self.get_ip_transparent()))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetIpReceiveOriginalDestinationAddress { value, responder } => {
self.data.ip_receive_original_destination_address = value;
responder.send(Ok(())).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetIpReceiveOriginalDestinationAddress { responder } => {
responder
.send(Ok(self.data.ip_receive_original_destination_address))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::AddIpv6Membership { membership, responder } => {
let result = self.set_multicast_membership(membership, true);
maybe_log_error!("add_ipv6_membership", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::DropIpv6Membership { membership, responder } => {
let result = self.set_multicast_membership(membership, false);
maybe_log_error!("drop_ipv6_membership", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetIpv6ReceiveTrafficClass { value: _, responder } => {
respond_not_supported!("syncudp::SetIpv6ReceiveTrafficClass", responder)
}
Request::GetIpv6ReceiveTrafficClass { responder } => {
respond_not_supported!("syncudp::GetIpv6ReceiveTrafficClass", responder)
}
Request::SetIpv6ReceiveHopLimit { value: _, responder } => {
respond_not_supported!("syncudp::SetIpv6ReceiveHopLimit", responder)
}
Request::GetIpv6ReceiveHopLimit { responder } => {
respond_not_supported!("syncudp::GetIpv6ReceiveHopLimit", responder)
}
Request::SetIpReceiveTypeOfService { value: _, responder } => {
respond_not_supported!("syncudp::SetIpReceiveTypeOfService", responder)
}
Request::GetIpReceiveTypeOfService { responder } => {
respond_not_supported!("syncudp::GetIpReceiveTypeOfService", responder)
}
Request::SetIpv6ReceivePacketInfo { value, responder } => {
let result = self.set_ipv6_recv_pkt_info(value);
maybe_log_error!("set_ipv6_recv_pkt_info", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetIpv6ReceivePacketInfo { responder } => {
let result = self.get_ipv6_recv_pkt_info();
maybe_log_error!("get_ipv6_recv_pkt_info", &result);
responder.send(result).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::SetIpReceiveTtl { value: _, responder } => {
respond_not_supported!("syncudp::SetIpReceiveTtl", responder)
}
Request::GetIpReceiveTtl { responder } => {
respond_not_supported!("syncudp::GetIpReceiveTtl", responder)
}
Request::SetIpPacketInfo { value: _, responder } => {
tracing::debug!("syncudp::SetIpPacketInfo is not supported, returning Ok(())");
responder.send(Ok(())).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
Request::GetIpPacketInfo { responder } => {
respond_not_supported!("syncudp::GetIpPacketInfo", responder)
}
Request::SetMark { domain: _, mark: _, responder } => {
// TODO(https://fxbug.dev/337134565): Implement socket marks.
responder
.send(Err(fposix::Errno::Eopnotsupp))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
Request::GetMark { domain: _, responder } => {
// TODO(https://fxbug.dev/337134565): Implement socket marks.
responder
.send(Err(fposix::Errno::Eopnotsupp))
.unwrap_or_else(|e| error!("failed to respond: {e:?}"))
}
}
ControlFlow::Continue(None)
}
fn describe(&self) -> fposix_socket::SynchronousDatagramSocketDescribeResponse {
let peer = self
.data
.peer_event
.duplicate_handle(
// The peer doesn't need to be able to signal, just receive signals,
// so attenuate that right when duplicating.
zx::Rights::BASIC,
)
.expect("failed to duplicate");
fposix_socket::SynchronousDatagramSocketDescribeResponse {
event: Some(peer),
..Default::default()
}
}
fn external_data(&self) -> &DatagramSocketExternalData<I> {
T::external_data(&self.data.info.id)
}
fn get_max_receive_buffer_size(&self) -> u64 {
self.external_data()
.message_queue
.lock()
.max_available_messages_size()
.try_into()
.unwrap_or(u64::MAX)
}
fn set_max_receive_buffer_size(&mut self, max_bytes: u64) {
let max_bytes = max_bytes.try_into().ok_checked::<TryFromIntError>().unwrap_or(usize::MAX);
self.external_data().message_queue.lock().set_max_available_messages_size(max_bytes)
}
/// Handles a [POSIX socket connect request].
///
/// [POSIX socket connect request]: fposix_socket::SynchronousDatagramSocketRequest::Connect
fn connect(self, addr: fnet::SocketAddress) -> Result<(), fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
let sockaddr =
I::SocketAddress::from_sock_addr(<T as Transport<I>>::maybe_map_sock_addr(addr))?;
trace!("connect sockaddr: {:?}", sockaddr);
let (remote_addr, remote_port) =
sockaddr.try_into_core_with_ctx(ctx.bindings_ctx()).map_err(IntoErrno::into_errno)?;
T::connect(ctx, id, remote_addr, remote_port.into()).map_err(IntoErrno::into_errno)?;
Ok(())
}
/// Handles a [POSIX socket bind request].
///
/// [POSIX socket bind request]: fposix_socket::SynchronousDatagramSocketRequest::Bind
fn bind(self, addr: fnet::SocketAddress) -> Result<(), fposix::Errno> {
// Match Linux and return `Einval` when asked to bind an IPv6 socket to
// an Ipv4 address. This Errno is unique to bind.
let sockaddr = match (I::VERSION, &addr) {
(IpVersion::V6, fnet::SocketAddress::Ipv4(_)) => Err(fposix::Errno::Einval),
(_, _) => I::SocketAddress::from_sock_addr(addr),
}?;
trace!("bind sockaddr: {:?}", sockaddr);
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
let (sockaddr, port) =
TryFromFidlWithContext::try_from_fidl_with_ctx(ctx.bindings_ctx(), sockaddr)
.map_err(IntoErrno::into_errno)?;
let local_port = T::LocalIdentifier::from_u16(port);
T::bind(ctx, id, sockaddr, local_port).map_err(IntoErrno::into_errno)?;
Ok(())
}
/// Handles a [POSIX socket disconnect request].
///
/// [POSIX socket connect request]: fposix_socket::SynchronousDatagramSocketRequest::Disconnect
fn disconnect(self) -> Result<(), fposix::Errno> {
trace!("disconnect socket");
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::disconnect(ctx, id).map_err(IntoErrno::into_errno)?;
Ok(())
}
/// Handles a [POSIX socket get_sock_name request].
///
/// [POSIX socket get_sock_name request]: fposix_socket::SynchronousDatagramSocketRequest::GetSockName
fn get_sock_name(self) -> Result<fnet::SocketAddress, fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
let l: LocalAddress<_, _, _> = T::get_socket_info(ctx, id).into_fidl();
l.try_into_fidl_with_ctx(ctx.bindings_ctx()).map(SockAddr::into_sock_addr)
}
/// Handles a [POSIX socket get_info request].
///
/// [POSIX socket get_info request]: fposix_socket::SynchronousDatagramSocketRequest::GetInfo
fn get_sock_info(
self,
) -> Result<(fposix_socket::Domain, fposix_socket::DatagramSocketProtocol), fposix::Errno> {
let domain = match I::VERSION {
IpVersion::V4 => fposix_socket::Domain::Ipv4,
IpVersion::V6 => fposix_socket::Domain::Ipv6,
};
let protocol = match <T as Transport<I>>::PROTOCOL {
DatagramProtocol::Udp => fposix_socket::DatagramSocketProtocol::Udp,
DatagramProtocol::IcmpEcho => fposix_socket::DatagramSocketProtocol::IcmpEcho,
};
Ok((domain, protocol))
}
/// Handles a [POSIX socket get_peer_name request].
///
/// [POSIX socket get_peer_name request]: fposix_socket::SynchronousDatagramSocketRequest::GetPeerName
fn get_peer_name(self) -> Result<fnet::SocketAddress, fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_socket_info(ctx, id).try_into_fidl().and_then(|r: RemoteAddress<_, _, _>| {
r.try_into_fidl_with_ctx(ctx.bindings_ctx()).map(SockAddr::into_sock_addr)
})
}
fn recv_msg(
self,
want_addr: bool,
data_len: usize,
want_control: bool,
recv_flags: fposix_socket::RecvMsgFlags,
) -> Result<
(Option<fnet::SocketAddress>, Vec<u8>, fposix_socket::DatagramSocketRecvControlData, u32),
fposix::Errno,
> {
trace_duration!(c"datagram::recv_msg");
let Self {
ctx,
data:
BindingData {
info: SocketControlInfo { id, .. },
version_specific_data,
ip_receive_original_destination_address,
timestamp_option,
..
},
} = self;
let front = {
let mut messages = <T as Transport<I>>::external_data(id).message_queue.lock();
if recv_flags.contains(fposix_socket::RecvMsgFlags::PEEK) {
messages.peek().cloned()
} else {
messages.pop()
}
};
let AvailableMessage {
interface_id,
source_addr,
source_port,
destination_addr,
destination_port,
timestamp,
mut data,
} = match front {
None => {
// This is safe from races only because the setting of the
// shutdown flag can only be done by the worker executing this
// code. Otherwise, a packet being delivered, followed by
// another thread setting the shutdown flag, then this check
// executing, could result in a race that causes this this code
// to signal EOF with a packet still waiting.
let shutdown = T::get_shutdown(ctx, id);
return match shutdown {
Some(ShutdownType::Receive | ShutdownType::SendAndReceive) => {
// Return empty data to signal EOF.
Ok((
None,
Vec::new(),
fposix_socket::DatagramSocketRecvControlData::default(),
0,
))
}
None | Some(ShutdownType::Send) => Err(fposix::Errno::Eagain),
};
}
Some(front) => front,
};
let addr = want_addr.then(|| {
I::SocketAddress::new(
SpecifiedAddr::new(source_addr).map(|a| {
core_socket::StrictlyZonedAddr::new_with_zone(a, || interface_id).into_inner()
}),
source_port,
)
.into_sock_addr()
});
let truncated = data.len().saturating_sub(data_len);
data.truncate(data_len);
let mut network: Option<fposix_socket::NetworkSocketRecvControlData> = None;
if want_control {
let mut ip = None;
if *ip_receive_original_destination_address {
ip.get_or_insert_with(|| fposix_socket::IpRecvControlData::default())
.original_destination_address = Some(
I::SocketAddress::new(
SpecifiedAddr::new(destination_addr).map(|a| ZonedAddr::Unzoned(a).into()),
destination_port,
)
.into_sock_addr(),
);
}
let IpInvariant(ipv6_control_data) = I::map_ip(
(version_specific_data, destination_addr, IpInvariant(interface_id)),
|(Ipv4BindingsData {}, _ipv4_dst_addr, _interface_id)| IpInvariant(None),
|(Ipv6BindingsData { recv_pkt_info }, ipv6_dst_addr, IpInvariant(interface_id))| {
let mut ipv6_control_data = None;
if *recv_pkt_info {
ipv6_control_data
.get_or_insert_with(|| fposix_socket::Ipv6RecvControlData::default())
.pktinfo = Some(fposix_socket::Ipv6PktInfoRecvControlData {
iface: interface_id.into(),
header_destination_addr: ipv6_dst_addr.into_fidl(),
})
}
// TODO(https://fxbug.dev/326102014): Support SOL_IPV6, IPV6_RECVTCLASS.
// TODO(https://fxbug.dev/326102020): Support SOL_IPV6, IPV6_RECVHOPLIMIT.
IpInvariant(ipv6_control_data)
},
);
let timestamp =
(*timestamp_option != fposix_socket::TimestampOption::Disabled).then(|| {
fposix_socket::Timestamp {
nanoseconds: timestamp.into_nanos(),
requested: *timestamp_option,
}
});
if let Some(ip) = ip {
network.get_or_insert_with(Default::default).ip = Some(ip);
}
if let Some(ipv6_control_data) = ipv6_control_data {
network.get_or_insert_with(Default::default).ipv6 = Some(ipv6_control_data);
}
if let Some(timestamp) = timestamp {
network
.get_or_insert_with(Default::default)
.socket
.get_or_insert_with(Default::default)
.timestamp = Some(timestamp);
};
};
let control_data =
fposix_socket::DatagramSocketRecvControlData { network, ..Default::default() };
Ok((addr, data, control_data, truncated.try_into().unwrap_or(u32::MAX)))
}
fn send_msg(
self,
addr: Option<fnet::SocketAddress>,
data: Vec<u8>,
) -> Result<i64, fposix::Errno> {
trace_duration!(c"datagram::send_msg");
let remote_addr = addr
.map(|addr| {
I::SocketAddress::from_sock_addr(<T as Transport<I>>::maybe_map_sock_addr(addr))
})
.transpose()?;
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
let remote = remote_addr
.map(|remote_addr| {
let (remote_addr, port) =
TryFromFidlWithContext::try_from_fidl_with_ctx(ctx.bindings_ctx(), remote_addr)
.map_err(IntoErrno::into_errno)?;
Ok((remote_addr, port.into()))
})
.transpose()?;
let len = data.len() as i64;
let body = Buf::new(data, ..);
match remote {
Some(remote) => T::send_to(ctx, id, remote, body).map_err(|e| e.into_errno()),
None => T::send(ctx, id, body).map_err(|e| e.into_errno()),
}
.map(|()| len)
}
fn bind_to_device_id(self, device: Option<DeviceId<BindingsCtx>>) -> Result<(), fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::set_socket_device(ctx, id, device.as_ref()).map_err(IntoErrno::into_errno)
}
fn bind_to_device(self, device: Option<&str>) -> Result<(), fposix::Errno> {
let Self { ctx, .. } = &self;
let device = device
.map(|name| {
ctx.bindings_ctx().devices.get_device_by_name(name).ok_or(fposix::Errno::Enodev)
})
.transpose()?;
self.bind_to_device_id(device)
}
fn bind_to_device_index(self, device: u64) -> Result<(), fposix::Errno> {
let Self { ctx, .. } = &self;
// If `device` is 0, then this will clear the bound device.
let device = NonZeroU64::new(device)
.map(|index| ctx.bindings_ctx().devices.get_core_id(index).ok_or(fposix::Errno::Enodev))
.transpose()?;
self.bind_to_device_id(device)
}
fn get_bound_device_id(self) -> Result<Option<DeviceId<BindingsCtx>>, fposix::Errno> {
// NB: Ensure that we do not return a device that was removed from the
// stack. This matches Linux behavior.
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
let device = match T::get_bound_device(ctx, id) {
None => return Ok(None),
Some(d) => d,
};
device.upgrade().ok_or(fposix::Errno::Enodev).map(Some)
}
fn get_bound_device(self) -> Result<Option<String>, fposix::Errno> {
Ok(self.get_bound_device_id()?.map(|core_id| core_id.bindings_id().name.clone()))
}
fn get_bound_device_index(self) -> Result<Option<NonZeroU64>, fposix::Errno> {
Ok(self.get_bound_device_id()?.map(|core_id| core_id.bindings_id().id))
}
fn set_dual_stack_enabled(self, enabled: bool) -> Result<(), fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::set_dual_stack_enabled(ctx, id, enabled).map_err(IntoErrno::into_errno)
}
fn get_dual_stack_enabled(self) -> Result<bool, fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_dual_stack_enabled(ctx, id).map_err(IntoErrno::into_errno)
}
fn set_ipv6_recv_pkt_info(self, new: bool) -> Result<(), fposix::Errno> {
let correct_ip_version: Option<()> = I::map_ip(
&mut self.data.version_specific_data,
|_v4| None,
|Ipv6BindingsData { recv_pkt_info: old }| {
*old = new;
Some(())
},
);
correct_ip_version.ok_or(fposix::Errno::Enoprotoopt)
}
fn get_ipv6_recv_pkt_info(self) -> Result<bool, fposix::Errno> {
let correct_ip_version: Option<bool> = I::map_ip(
&self.data.version_specific_data,
|_v4| None,
|Ipv6BindingsData { recv_pkt_info }| Some(*recv_pkt_info),
);
correct_ip_version.ok_or(fposix::Errno::Eopnotsupp)
}
fn set_reuse_addr(self, reuse_addr: bool) -> Result<(), fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::set_reuse_addr(ctx, id, reuse_addr).map_err(IntoErrno::into_errno)
}
fn get_reuse_addr(self) -> bool {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_reuse_addr(ctx, id)
}
fn set_reuse_port(self, reuse_port: bool) -> Result<(), fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::set_reuse_port(ctx, id, reuse_port).map_err(IntoErrno::into_errno)
}
fn get_reuse_port(self) -> bool {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_reuse_port(ctx, id)
}
fn shutdown(self, how: fposix_socket::ShutdownMode) -> Result<(), fposix::Errno> {
let Self { data: BindingData { info: SocketControlInfo { id, .. }, .. }, ctx } = self;
let how = ShutdownType::from_send_receive(
how.contains(fposix_socket::ShutdownMode::WRITE),
how.contains(fposix_socket::ShutdownMode::READ),
)
.ok_or(fposix::Errno::Einval)?;
T::shutdown(ctx, id, how).map_err(IntoErrno::into_errno)?;
match how {
ShutdownType::Receive | ShutdownType::SendAndReceive => {
// Make sure to signal the peer so any ongoing call to
// receive that is waiting for a signal will poll again.
if let Err(e) = <T as Transport<I>>::external_data(id)
.message_queue
.lock()
.local_event()
.signal_peer(zx::Signals::NONE, ZXSIO_SIGNAL_INCOMING)
{
error!("Failed to signal peer when shutting down: {:?}", e);
}
}
ShutdownType::Send => (),
}
Ok(())
}
fn set_multicast_membership<
M: TryIntoCore<(
MulticastAddr<I::Addr>,
Option<MulticastInterfaceSelector<I::Addr, NonZeroU64>>,
)>,
>(
self,
membership: M,
want_membership: bool,
) -> Result<(), fposix::Errno>
where
M::Error: IntoErrno,
{
let (multicast_group, interface) =
membership.try_into_core().map_err(IntoErrno::into_errno)?;
let interface = interface
.map_or(MulticastMembershipInterfaceSelector::AnyInterfaceWithRoute, Into::into);
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
let interface =
interface.try_into_core_with_ctx(ctx.bindings_ctx()).map_err(IntoErrno::into_errno)?;
T::set_multicast_membership(ctx, id, multicast_group, interface, want_membership)
.map_err(IntoErrno::into_errno)
}
fn set_unicast_hop_limit(
self,
ip_version: IpVersion,
hop_limit: fposix_socket::OptionalUint8,
) -> Result<(), fposix::Errno> {
let hop_limit: Option<u8> = hop_limit.into_core();
let hop_limit =
hop_limit.map(|u| NonZeroU8::new(u).ok_or(fposix::Errno::Einval)).transpose()?;
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::set_unicast_hop_limit(ctx, id, hop_limit, ip_version).map_err(IntoErrno::into_errno)
}
fn set_multicast_hop_limit(
self,
ip_version: IpVersion,
hop_limit: fposix_socket::OptionalUint8,
) -> Result<(), fposix::Errno> {
let hop_limit: Option<u8> = hop_limit.into_core();
// TODO(https://fxbug.dev/42059735): Support setting a multicast hop limit
// of 0.
let hop_limit =
hop_limit.map(|u| NonZeroU8::new(u).ok_or(fposix::Errno::Einval)).transpose()?;
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::set_multicast_hop_limit(ctx, id, hop_limit, ip_version).map_err(IntoErrno::into_errno)
}
fn set_multicast_interface_ipv4(
self,
interface: Option<NonZeroU64>,
addr: fnet::Ipv4Address,
) -> Result<(), fposix::Errno> {
// Multicast interface for IPv4 multicast packets can be selected by
// the IP address. Linux also uses the specified address as the source
// address for outgoing multicast packets. Our implementation of
// IP_MULTICAST_IF diverges from Linux: the address is used only to
// select the interface and is saved to return from `getsockopts()`.
let Self {
ctx,
data: BindingData { info: SocketControlInfo { id, .. }, ipv4_multicast_if_addr, .. },
} = self;
let addr: Option<SpecifiedAddr<Ipv4Addr>> = SpecifiedAddr::new(addr.into_core());
let device_id = match (interface, addr) {
// If both the interface index and the address are specified then
// we use the index to select the interface. The address still
// saved to return it in the future.
(Some(index), _) => Some(
// `setsockopt(IP_MULTICAST_IF)` is supposed to fail with
// `EADDRNOTAVAIL` when the IP or the interface index is invalid. This is
// different from `IPV6_MULTICAST_IF`.
TryFromFidlWithContext::try_from_fidl_with_ctx(ctx.bindings_ctx(), index)
.map_err(|_| fposix::Errno::Eaddrnotavail)?,
),
(None, Some(addr)) => {
let device = ctx
.bindings_ctx()
.devices
.with_devices(|devices| devices.cloned().collect::<Vec<_>>())
.into_iter()
.find(|device| {
let mut ip_found = false;
ctx.api().device_ip::<Ipv4>().for_each_assigned_ip_addr_subnet(
device,
|ip_subnet| {
if ip_subnet.addr() == addr {
ip_found = true;
}
},
);
ip_found
})
.ok_or(fposix::Errno::Eaddrnotavail)?;
Some(device)
}
(None, None) => None,
};
T::set_multicast_interface(ctx, id, device_id.as_ref(), Ipv4::VERSION)
.map_err(IntoErrno::into_errno)
.inspect(|_| *ipv4_multicast_if_addr = addr)
}
fn get_multicast_interface_ipv4(self) -> Result<fnet::Ipv4Address, fposix::Errno> {
let Self { data: BindingData { ipv4_multicast_if_addr, .. }, .. } = self;
Ok(ipv4_multicast_if_addr.map(Into::into).unwrap_or(Ipv4::UNSPECIFIED_ADDRESS).into_fidl())
}
fn set_multicast_interface_ipv6(
self,
interface: Option<NonZeroU64>,
) -> Result<(), fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
let interface = interface
.map(|index| TryFromFidlWithContext::try_from_fidl_with_ctx(ctx.bindings_ctx(), index))
.transpose()
.map_err(IntoErrno::into_errno)?;
T::set_multicast_interface(ctx, id, interface.as_ref(), Ipv6::VERSION)
.map_err(IntoErrno::into_errno)
}
fn get_multicast_interface_ipv6(self) -> Result<Option<NonZeroU64>, fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_multicast_interface(ctx, id, Ipv6::VERSION)
.map_err(IntoErrno::into_errno)
.map_err(|e| match e {
// `getsockopt()` should fail with `EOPNOTSUPP` instead of `ENOPROTOOPT`.
fposix::Errno::Enoprotoopt => fposix::Errno::Eopnotsupp,
e => e,
})?
.map(|id| id.try_into_fidl_with_ctx(ctx.bindings_ctx()))
.transpose()
.map_err(IntoErrno::into_errno)
}
fn get_unicast_hop_limit(self, ip_version: IpVersion) -> Result<u8, fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_unicast_hop_limit(ctx, id, ip_version)
.map(NonZeroU8::get)
.map_err(IntoErrno::into_errno)
}
fn get_multicast_hop_limit(self, ip_version: IpVersion) -> Result<u8, fposix::Errno> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_multicast_hop_limit(ctx, id, ip_version)
.map(NonZeroU8::get)
.map_err(IntoErrno::into_errno)
}
fn set_ip_transparent(self, value: bool) -> Result<(), T::SetIpTransparentError> {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::set_ip_transparent(ctx, id, value)
}
fn get_ip_transparent(self) -> bool {
let Self { ctx, data: BindingData { info: SocketControlInfo { id, .. }, .. } } = self;
T::get_ip_transparent(ctx, id)
}
fn get_timestamp_option(self) -> Result<fposix_socket::TimestampOption, fposix::Errno> {
Ok(self.data.timestamp_option)
}
fn set_timestamp_option(
self,
value: fposix_socket::TimestampOption,
) -> Result<(), fposix::Errno> {
self.data.timestamp_option = value;
Ok(())
}
}
impl IntoErrno for ExpectedUnboundError {
fn into_errno(self) -> fposix::Errno {
let ExpectedUnboundError = self;
fposix::Errno::Einval
}
}
impl IntoErrno for ExpectedConnError {
fn into_errno(self) -> fposix::Errno {
let ExpectedConnError = self;
fposix::Errno::Enotconn
}
}
impl IntoErrno for NotSupportedError {
fn into_errno(self) -> fposix::Errno {
fposix::Errno::Eopnotsupp
}
}
impl<I: Ip, D> IntoFidl<LocalAddress<I, D, NonZeroU16>> for SocketInfo<I::Addr, D> {
fn into_fidl(self) -> LocalAddress<I, D, NonZeroU16> {
let (local_ip, local_identifier) = match self {
Self::Unbound => (None, None),
Self::Listener(ListenerInfo { local_ip, local_identifier }) => {
(local_ip, Some(local_identifier))
}
Self::Connected(ConnInfo {
local_ip,
local_identifier,
remote_ip: _,
remote_identifier: _,
}) => (Some(local_ip), Some(local_identifier)),
};
LocalAddress { address: local_ip, identifier: local_identifier }
}
}
impl<I: Ip, D> TryIntoFidl<RemoteAddress<I, D, u16>> for SocketInfo<I::Addr, D> {
type Error = fposix::Errno;
fn try_into_fidl(self) -> Result<RemoteAddress<I, D, u16>, Self::Error> {
match self {
Self::Unbound | Self::Listener(_) => Err(fposix::Errno::Enotconn),
Self::Connected(ConnInfo {
local_ip: _,
local_identifier: _,
remote_ip,
remote_identifier,
}) => {
if remote_identifier == 0 {
// Match Linux and report `ENOTCONN` for requests to
// 'get_peername` when the connection's remote port is 0 for
// both UDP and ICMP Echo sockets.
Err(fposix::Errno::Enotconn)
} else {
Ok(RemoteAddress { address: remote_ip, identifier: remote_identifier })
}
}
}
}
}
impl<I: IpSockAddrExt, D, L: Into<u16>> TryIntoFidlWithContext<I::SocketAddress>
for LocalAddress<I, D, L>
where
D: TryIntoFidlWithContext<NonZeroU64, Error = DeviceNotFoundError>,
{
type Error = fposix::Errno;
fn try_into_fidl_with_ctx<Ctx: crate::bindings::util::ConversionContext>(
self,
ctx: &Ctx,
) -> Result<I::SocketAddress, Self::Error> {
let Self { address, identifier } = self;
(address, identifier.map_or(0, Into::into))
.try_into_fidl_with_ctx(ctx)
.map_err(IntoErrno::into_errno)
}
}
impl<I: IpSockAddrExt, D, R: Into<u16>> TryIntoFidlWithContext<I::SocketAddress>
for RemoteAddress<I, D, R>
where
D: TryIntoFidlWithContext<NonZeroU64, Error = DeviceNotFoundError>,
{
type Error = fposix::Errno;
fn try_into_fidl_with_ctx<Ctx: crate::bindings::util::ConversionContext>(
self,
ctx: &Ctx,
) -> Result<I::SocketAddress, Self::Error> {
let Self { address, identifier } = self;
(Some(address), identifier.into())
.try_into_fidl_with_ctx(ctx)
.map_err(IntoErrno::into_errno)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fidl::endpoints::{Proxy, ServerEnd};
use fuchsia_async as fasync;
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::StreamExt;
use packet::Serializer as _;
use packet_formats::icmp::IcmpIpExt;
use crate::bindings::{
integration_tests::{
test_ep_name, StackSetupBuilder, TestSetup, TestSetupBuilder, TestStack,
},
socket::{queue::MIN_OUTSTANDING_APPLICATION_MESSAGES_SIZE, testutil::TestSockAddr},
};
use net_types::{
ip::{IpAddr, IpAddress},
Witness as _,
};
async fn prepare_test<A: TestSockAddr>(
proto: fposix_socket::DatagramSocketProtocol,
) -> (TestSetup, fposix_socket::SynchronousDatagramSocketProxy, zx::EventPair) {
// Setup the test with two endpoints, one in `A`'s domain, and the other
// in `A::DifferentDomain` (e.g. IPv4 and IPv6).
let mut t = TestSetupBuilder::new()
.add_endpoint()
.add_endpoint()
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(1), Some(A::config_addr_subnet()))
.add_named_endpoint(
test_ep_name(2),
Some(A::DifferentDomain::config_addr_subnet()),
),
)
.build()
.await;
let (proxy, event) = get_socket_and_event::<A>(t.get(0), proto).await;
(t, proxy, event)
}
async fn get_socket<A: TestSockAddr>(
test_stack: &mut TestStack,
proto: fposix_socket::DatagramSocketProtocol,
) -> fposix_socket::SynchronousDatagramSocketProxy {
let socket_provider = test_stack.connect_socket_provider();
let response = socket_provider
.datagram_socket(A::DOMAIN, proto)
.await
.unwrap()
.expect("Socket succeeds");
match response {
fposix_socket::ProviderDatagramSocketResponse::SynchronousDatagramSocket(sock) => {
fposix_socket::SynchronousDatagramSocketProxy::new(fasync::Channel::from_channel(
sock.into_channel(),
))
}
// TODO(https://fxrev.dev/99905): Implement Fast UDP sockets in Netstack3.
fposix_socket::ProviderDatagramSocketResponse::DatagramSocket(sock) => {
let _: fidl::endpoints::ClientEnd<fposix_socket::DatagramSocketMarker> = sock;
panic!("expected SynchronousDatagramSocket, found DatagramSocket")
}
}
}
async fn get_socket_and_event<A: TestSockAddr>(
test_stack: &mut TestStack,
proto: fposix_socket::DatagramSocketProtocol,
) -> (fposix_socket::SynchronousDatagramSocketProxy, zx::EventPair) {
let ctlr = get_socket::<A>(test_stack, proto).await;
let fposix_socket::SynchronousDatagramSocketDescribeResponse { event, .. } =
ctlr.describe().await.expect("describe succeeds");
(ctlr, event.expect("Socket describe contains event"))
}
macro_rules! declare_tests {
($test_fn:ident, icmp $(#[$icmp_attributes:meta])*) => {
mod $test_fn {
use super::*;
#[fasync::run_singlethreaded(test)]
async fn udp_v4() {
$test_fn::<fnet::Ipv4SocketAddress, Udp>(
fposix_socket::DatagramSocketProtocol::Udp,
)
.await
}
#[fasync::run_singlethreaded(test)]
async fn udp_v6() {
$test_fn::<fnet::Ipv6SocketAddress, Udp>(
fposix_socket::DatagramSocketProtocol::Udp,
)
.await
}
$(#[$icmp_attributes])*
#[fasync::run_singlethreaded(test)]
async fn icmp_v4() {
$test_fn::<fnet::Ipv4SocketAddress, IcmpEcho>(
fposix_socket::DatagramSocketProtocol::IcmpEcho,
)
.await
}
$(#[$icmp_attributes])*
#[fasync::run_singlethreaded(test)]
async fn icmp_v6() {
$test_fn::<fnet::Ipv6SocketAddress, IcmpEcho>(
fposix_socket::DatagramSocketProtocol::IcmpEcho,
)
.await
}
}
};
($test_fn:ident) => {
declare_tests!($test_fn, icmp);
};
}
#[fixture::teardown(TestSetup::shutdown)]
async fn connect_failure<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol) {
let (t, proxy, _event) = prepare_test::<A>(proto).await;
// Pass a socket address of the wrong domain, which should fail for IPv4
// but pass for IPv6 on UDP (as it's implicitly converted to an
// IPv4-mapped-IPv6 address).
let res = proxy
.connect(&A::DifferentDomain::create(A::DifferentDomain::REMOTE_ADDR, 1010))
.await
.unwrap();
match (proto, <<<A as SockAddr>::AddrType as IpAddress>::Version as Ip>::VERSION) {
(fposix_socket::DatagramSocketProtocol::Udp, IpVersion::V6) => {
assert_eq!(res, Ok(()));
// NB: The socket is connected in the IPv4 stack; disconnect it
// so that we can connect it in the IPv6 stack below.
proxy.disconnect().await.unwrap().expect("disconnect should succeed");
}
(_, _) => assert_eq!(res, Err(fposix::Errno::Eafnosupport)),
}
// Pass a zero port. UDP and ICMP both allow it.
let res = proxy.connect(&A::create(A::LOCAL_ADDR, 0)).await.unwrap();
assert_eq!(res, Ok(()));
// Pass an unreachable address (tests error forwarding from `create_connection`).
let res = proxy
.connect(&A::create(A::UNREACHABLE_ADDR, 1010))
.await
.unwrap()
.expect_err("connect fails");
assert_eq!(res, fposix::Errno::Enetunreach);
t
}
declare_tests!(connect_failure);
#[fixture::teardown(TestSetup::shutdown)]
async fn connect<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol) {
let (t, proxy, _event) = prepare_test::<A>(proto).await;
let () = proxy
.connect(&A::create(A::REMOTE_ADDR, 200))
.await
.unwrap()
.expect("connect succeeds");
// Can connect again to a different remote should succeed.
let () = proxy
.connect(&A::create(A::REMOTE_ADDR_2, 200))
.await
.unwrap()
.expect("connect suceeds");
t
}
declare_tests!(connect);
#[fixture::teardown(TestSetup::shutdown)]
async fn connect_loopback<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol) {
let (t, proxy, _event) = prepare_test::<A>(proto).await;
let () = proxy
.connect(&A::create(
<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(),
200,
))
.await
.unwrap()
.expect("connect succeeds");
t
}
declare_tests!(connect_loopback);
#[fixture::teardown(TestSetup::shutdown)]
async fn connect_any<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol) {
// Pass an unspecified remote address. This should be treated as the
// loopback address.
let (t, proxy, _event) = prepare_test::<A>(proto).await;
const PORT: u16 = 1010;
let () = proxy
.connect(&A::create(<A::AddrType as IpAddress>::Version::UNSPECIFIED_ADDRESS, PORT))
.await
.unwrap()
.unwrap();
assert_eq!(
proxy.get_peer_name().await.unwrap().unwrap(),
A::create(<A::AddrType as IpAddress>::Version::LOOPBACK_ADDRESS.get(), PORT)
);
t
}
declare_tests!(connect_any);
#[fixture::teardown(TestSetup::shutdown)]
async fn bind<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol) {
let (mut t, socket, _event) = prepare_test::<A>(proto).await;
let stack = t.get(0);
// Can bind to local address.
let () = socket.bind(&A::create(A::LOCAL_ADDR, 200)).await.unwrap().expect("bind succeeds");
// Can't bind again (to another port).
let res =
socket.bind(&A::create(A::LOCAL_ADDR, 201)).await.unwrap().expect_err("bind fails");
assert_eq!(res, fposix::Errno::Einval);
// Can bind another socket to a different port.
let socket = get_socket::<A>(stack, proto).await;
let () = socket.bind(&A::create(A::LOCAL_ADDR, 201)).await.unwrap().expect("bind succeeds");
// Can bind to unspecified address in a different port.
let socket = get_socket::<A>(stack, proto).await;
let () = socket
.bind(&A::create(<A::AddrType as IpAddress>::Version::UNSPECIFIED_ADDRESS, 202))
.await
.unwrap()
.expect("bind succeeds");
t
}
declare_tests!(bind);
#[fixture::teardown(TestSetup::shutdown)]
async fn bind_then_connect<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol) {
let (t, socket, _event) = prepare_test::<A>(proto).await;
// Can bind to local address.
let () = socket.bind(&A::create(A::LOCAL_ADDR, 200)).await.unwrap().expect("bind suceeds");
let () = socket
.connect(&A::create(A::REMOTE_ADDR, 1010))
.await
.unwrap()
.expect("connect succeeds");
t
}
declare_tests!(bind_then_connect);
#[fixture::teardown(TestSetup::shutdown)]
async fn connect_then_disconnect<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) {
let (t, socket, _event) = prepare_test::<A>(proto).await;
let remote_addr = A::create(A::REMOTE_ADDR, 1010);
let () = socket.connect(&remote_addr).await.unwrap().expect("connect succeeds");
assert_eq!(
socket.get_peer_name().await.unwrap().expect("get_peer_name should suceed"),
remote_addr
);
let () = socket.disconnect().await.unwrap().expect("disconnect succeeds");
assert_eq!(
socket.get_peer_name().await.unwrap().expect_err("alice getpeername fails"),
fposix::Errno::Enotconn
);
t
}
/// ICMP echo sockets require the buffer to be a valid ICMP echo request,
/// this function performs transformations allowing the majority of the
/// sending logic to be common with UDP.
fn prepare_buffer_to_send<A: TestSockAddr>(
proto: fposix_socket::DatagramSocketProtocol,
buf: Vec<u8>,
) -> Vec<u8>
where
<A::AddrType as IpAddress>::Version: IcmpIpExt,
{
match proto {
fposix_socket::DatagramSocketProtocol::Udp => buf,
fposix_socket::DatagramSocketProtocol::IcmpEcho => Buf::new(buf, ..)
.encapsulate(packet_formats::icmp::IcmpPacketBuilder::<
<A::AddrType as IpAddress>::Version,
_,
>::new(
<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(),
<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(),
packet_formats::icmp::IcmpUnusedCode,
packet_formats::icmp::IcmpEchoRequest::new(0, 1),
))
.serialize_vec_outer()
.unwrap()
.into_inner()
.into_inner(),
}
}
/// ICMP echo sockets receive a buffer that is an ICMP echo reply, this
/// function performs transformations allowing the majority of the receiving
/// logic to be common with UDP.
fn expected_buffer_to_receive<A: TestSockAddr>(
proto: fposix_socket::DatagramSocketProtocol,
buf: Vec<u8>,
id: u16,
src_ip: A::AddrType,
dst_ip: A::AddrType,
) -> Vec<u8>
where
<A::AddrType as IpAddress>::Version: IcmpIpExt,
{
match proto {
fposix_socket::DatagramSocketProtocol::Udp => buf,
fposix_socket::DatagramSocketProtocol::IcmpEcho => Buf::new(buf, ..)
.encapsulate(packet_formats::icmp::IcmpPacketBuilder::<
<A::AddrType as IpAddress>::Version,
_,
>::new(
src_ip,
dst_ip,
packet_formats::icmp::IcmpUnusedCode,
packet_formats::icmp::IcmpEchoReply::new(id, 1),
))
.serialize_vec_outer()
.unwrap()
.into_inner()
.into_inner(),
}
}
declare_tests!(connect_then_disconnect);
/// Tests a simple UDP setup with a client and a server, where the client
/// can send data to the server and the server receives it.
// TODO(https://fxbug.dev/42124055): this test is incorrect for ICMP sockets. At the time of this
// writing it crashes before reaching the wrong parts, but we will need to specialize the body
// of this test for ICMP before calling the feature complete.
#[fixture::teardown(TestSetup::shutdown)]
async fn hello<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol)
where
<A::AddrType as IpAddress>::Version: IcmpIpExt,
{
// We create two stacks, Alice (server listening on LOCAL_ADDR:200), and
// Bob (client, bound on REMOTE_ADDR:300). After setup, Bob connects to
// Alice and sends a datagram. Finally, we verify that Alice receives
// the datagram.
let mut t = TestSetupBuilder::new()
.add_endpoint()
.add_endpoint()
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(1), Some(A::config_addr_subnet())),
)
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(2), Some(A::config_addr_subnet_remote())),
)
.build()
.await;
let alice = t.get(0);
let (alice_socket, alice_events) = get_socket_and_event::<A>(alice, proto).await;
// Verify that Alice has no local or peer addresses bound
assert_eq!(
alice_socket.get_sock_name().await.unwrap().unwrap(),
A::new(None, 0).into_sock_addr(),
);
assert_eq!(
alice_socket.get_peer_name().await.unwrap().expect_err("alice getpeername fails"),
fposix::Errno::Enotconn
);
// Setup Alice as a server, bound to LOCAL_ADDR:200
println!("Configuring alice...");
let () = alice_socket
.bind(&A::create(A::LOCAL_ADDR, 200))
.await
.unwrap()
.expect("alice bind suceeds");
// Verify that Alice is listening on the local socket, but still has no
// peer socket
assert_eq!(
alice_socket.get_sock_name().await.unwrap().expect("alice getsockname succeeds"),
A::create(A::LOCAL_ADDR, 200)
);
assert_eq!(
alice_socket.get_peer_name().await.unwrap().expect_err("alice getpeername should fail"),
fposix::Errno::Enotconn
);
// check that alice has no data to read, and it'd block waiting for
// events:
assert_eq!(
alice_socket
.recv_msg(false, 2048, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect_err("Reading from alice should fail"),
fposix::Errno::Eagain
);
assert_eq!(
alice_events
.wait_handle(ZXSIO_SIGNAL_INCOMING, zx::Time::from_nanos(0))
.expect_err("Alice incoming event should not be signaled"),
zx::Status::TIMED_OUT
);
// Setup Bob as a client, bound to REMOTE_ADDR:300
println!("Configuring bob...");
let bob = t.get(1);
let (bob_socket, bob_events) = get_socket_and_event::<A>(bob, proto).await;
let () = bob_socket
.bind(&A::create(A::REMOTE_ADDR, 300))
.await
.unwrap()
.expect("bob bind suceeds");
// Verify that Bob is listening on the local socket, but has no peer
// socket
assert_eq!(
bob_socket.get_sock_name().await.unwrap().expect("bob getsockname suceeds"),
A::create(A::REMOTE_ADDR, 300)
);
assert_eq!(
bob_socket
.get_peer_name()
.await
.unwrap()
.expect_err("get peer name should fail before connected"),
fposix::Errno::Enotconn
);
// Connect Bob to Alice on LOCAL_ADDR:200
println!("Connecting bob to alice...");
let () = bob_socket
.connect(&A::create(A::LOCAL_ADDR, 200))
.await
.unwrap()
.expect("Connect succeeds");
// Verify that Bob still has the right local socket name.
assert_eq!(
bob_socket.get_sock_name().await.unwrap().expect("bob getsockname suceeds"),
A::create(A::REMOTE_ADDR, 300)
);
// Verify that Bob has the peer socket set correctly
assert_eq!(
bob_socket.get_peer_name().await.unwrap().expect("bob getpeername suceeds"),
A::create(A::LOCAL_ADDR, 200)
);
// We don't care which signals are on, only that SIGNAL_OUTGOING is, we
// can ignore the return value.
let _signals = bob_events
.wait_handle(ZXSIO_SIGNAL_OUTGOING, zx::Time::from_nanos(0))
.expect("Bob outgoing event should be signaled");
// Send datagram from Bob's socket.
println!("Writing datagram to bob");
let body = "Hello".as_bytes();
let to_send = prepare_buffer_to_send::<A>(proto, body.to_vec());
assert_eq!(
bob_socket
.send_msg(
None,
&to_send,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect("sendmsg suceeds"),
to_send.len() as i64
);
let (events, socket, port, expected_src_ip) = match proto {
fposix_socket::DatagramSocketProtocol::Udp => {
(&alice_events, &alice_socket, 300, A::REMOTE_ADDR)
}
fposix_socket::DatagramSocketProtocol::IcmpEcho => {
(&bob_events, &bob_socket, 0, A::LOCAL_ADDR)
}
};
println!("Waiting for signals");
assert_eq!(
fasync::OnSignals::new(events, ZXSIO_SIGNAL_INCOMING).await,
Ok(ZXSIO_SIGNAL_INCOMING | ZXSIO_SIGNAL_OUTGOING)
);
let to_recv = expected_buffer_to_receive::<A>(
proto,
body.to_vec(),
300,
A::LOCAL_ADDR,
A::REMOTE_ADDR,
);
let (from, data, _, truncated) = socket
.recv_msg(true, 2048, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("recvmsg suceeeds");
let source = A::from_sock_addr(*from.expect("socket address returned"))
.expect("bad socket address return");
assert_eq!(source.addr(), expected_src_ip);
assert_eq!(source.port(), port);
assert_eq!(truncated, 0);
assert_eq!(&data[..], to_recv);
t
}
declare_tests!(hello);
#[fixture::teardown(TestSetup::shutdown)]
#[test_case::test_matrix(
[
fposix_socket::Domain::Ipv4,
fposix_socket::Domain::Ipv6,
],
[
fposix_socket::DatagramSocketProtocol::Udp,
fposix_socket::DatagramSocketProtocol::IcmpEcho,
]
)]
#[fasync::run_singlethreaded(test)]
async fn socket_describe(
domain: fposix_socket::Domain,
proto: fposix_socket::DatagramSocketProtocol,
) {
let mut t = TestSetupBuilder::new().add_endpoint().add_empty_stack().build().await;
let test_stack = t.get(0);
let socket_provider = test_stack.connect_socket_provider();
let response = socket_provider
.datagram_socket(domain, proto)
.await
.unwrap()
.expect("Socket call succeeds");
let socket = match response {
fposix_socket::ProviderDatagramSocketResponse::SynchronousDatagramSocket(sock) => sock,
// TODO(https://fxrev.dev/99905): Implement Fast UDP sockets in Netstack3.
fposix_socket::ProviderDatagramSocketResponse::DatagramSocket(sock) => {
let _: fidl::endpoints::ClientEnd<fposix_socket::DatagramSocketMarker> = sock;
panic!("expected SynchronousDatagramSocket, found DatagramSocket")
}
};
let fposix_socket::SynchronousDatagramSocketDescribeResponse { event, .. } =
socket.into_proxy().unwrap().describe().await.expect("Describe call succeeds");
let _: zx::EventPair = event.expect("Describe call returns event");
t
}
#[fixture::teardown(TestSetup::shutdown)]
#[test_case::test_matrix(
[
fposix_socket::Domain::Ipv4,
fposix_socket::Domain::Ipv6,
],
[
fposix_socket::DatagramSocketProtocol::Udp,
fposix_socket::DatagramSocketProtocol::IcmpEcho,
]
)]
#[fasync::run_singlethreaded(test)]
async fn socket_get_info(
domain: fposix_socket::Domain,
proto: fposix_socket::DatagramSocketProtocol,
) {
let mut t = TestSetupBuilder::new().add_endpoint().add_empty_stack().build().await;
let test_stack = t.get(0);
let socket_provider = test_stack.connect_socket_provider();
let response = socket_provider
.datagram_socket(domain, proto)
.await
.unwrap()
.expect("Socket call succeeds");
let socket = match response {
fposix_socket::ProviderDatagramSocketResponse::SynchronousDatagramSocket(sock) => sock,
// TODO(https://fxrev.dev/99905): Implement Fast UDP sockets in Netstack3.
fposix_socket::ProviderDatagramSocketResponse::DatagramSocket(sock) => {
let _: fidl::endpoints::ClientEnd<fposix_socket::DatagramSocketMarker> = sock;
panic!("expected SynchronousDatagramSocket, found DatagramSocket")
}
};
let info = socket.into_proxy().unwrap().get_info().await.expect("get_info call succeeds");
assert_eq!(info, Ok((domain, proto)));
t
}
fn socket_clone(
socket: &fposix_socket::SynchronousDatagramSocketProxy,
) -> fposix_socket::SynchronousDatagramSocketProxy {
let (client, server) =
fidl::endpoints::create_proxy::<fposix_socket::SynchronousDatagramSocketMarker>()
.expect("create proxy");
let server = ServerEnd::new(server.into_channel());
let () = socket.clone2(server).expect("socket clone");
client
}
type IpFromSockAddr<A> = <<A as SockAddr>::AddrType as IpAddress>::Version;
#[fixture::teardown(TestSetup::shutdown)]
async fn clone<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol)
where
<A::AddrType as IpAddress>::Version: IcmpIpExt,
T: Transport<Ipv4>,
T: Transport<Ipv6>,
T: Transport<<A::AddrType as IpAddress>::Version>,
{
let mut t = TestSetupBuilder::new()
.add_endpoint()
.add_endpoint()
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(1), Some(A::config_addr_subnet())),
)
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(2), Some(A::config_addr_subnet_remote())),
)
.build()
.await;
let (alice_socket, alice_events) = get_socket_and_event::<A>(t.get(0), proto).await;
let alice_cloned = socket_clone(&alice_socket);
let fposix_socket::SynchronousDatagramSocketDescribeResponse { event: alice_event, .. } =
alice_cloned.describe().await.expect("Describe call succeeds");
let _: zx::EventPair = alice_event.expect("Describe call returns event");
let () = alice_socket
.bind(&A::create(A::LOCAL_ADDR, 200))
.await
.unwrap()
.expect("failed to bind for alice");
// We should be able to read that back from the cloned socket.
assert_eq!(
alice_cloned.get_sock_name().await.unwrap().expect("failed to getsockname for alice"),
A::create(A::LOCAL_ADDR, 200)
);
let (bob_socket, bob_events) = get_socket_and_event::<A>(t.get(1), proto).await;
let bob_cloned = socket_clone(&bob_socket);
let () = bob_cloned
.bind(&A::create(A::REMOTE_ADDR, 200))
.await
.unwrap()
.expect("failed to bind for bob");
// We should be able to read that back from the original socket.
assert_eq!(
bob_socket.get_sock_name().await.unwrap().expect("failed to getsockname for bob"),
A::create(A::REMOTE_ADDR, 200)
);
let body = "Hello".as_bytes();
let to_send = prepare_buffer_to_send::<A>(proto, body.to_vec());
assert_eq!(
alice_socket
.send_msg(
Some(&A::create(A::REMOTE_ADDR, 200)),
&to_send,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect("failed to send_msg"),
to_send.len() as i64
);
let (cloned_events, cloned_socket, expected_from) = match proto {
fposix_socket::DatagramSocketProtocol::Udp => {
(&bob_events, &bob_cloned, A::create(A::LOCAL_ADDR, 200))
}
fposix_socket::DatagramSocketProtocol::IcmpEcho => {
(&alice_events, &alice_cloned, A::create(A::REMOTE_ADDR, 0))
}
};
assert_eq!(
fasync::OnSignals::new(cloned_events, ZXSIO_SIGNAL_INCOMING).await,
Ok(ZXSIO_SIGNAL_INCOMING | ZXSIO_SIGNAL_OUTGOING)
);
// Receive from the cloned socket.
let (from, data, _, truncated) = cloned_socket
.recv_msg(true, 2048, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("failed to recv_msg");
let to_recv = expected_buffer_to_receive::<A>(
proto,
body.to_vec(),
200,
A::REMOTE_ADDR,
A::LOCAL_ADDR,
);
assert_eq!(&data[..], to_recv);
assert_eq!(truncated, 0);
assert_eq!(from.map(|a| *a), Some(expected_from));
// The data have already been received on the cloned socket
assert_eq!(
cloned_socket
.recv_msg(false, 2048, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect_err("Reading from bob should fail"),
fposix::Errno::Eagain
);
match proto {
fposix_socket::DatagramSocketProtocol::Udp => {
// Close the socket should not invalidate the cloned socket.
let () = bob_socket
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
assert_eq!(
bob_cloned
.send_msg(
Some(&A::create(A::LOCAL_ADDR, 200)),
&body,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect("failed to send_msg"),
body.len() as i64
);
let () = alice_cloned
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
assert_eq!(
fasync::OnSignals::new(&alice_events, ZXSIO_SIGNAL_INCOMING).await,
Ok(ZXSIO_SIGNAL_INCOMING | ZXSIO_SIGNAL_OUTGOING)
);
let (from, data, _, truncated) = alice_socket
.recv_msg(true, 2048, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("failed to recv_msg");
assert_eq!(&data[..], body);
assert_eq!(truncated, 0);
assert_eq!(from.map(|a| *a), Some(A::create(A::REMOTE_ADDR, 200)));
// Make sure the sockets are still in the stack.
for i in 0..2 {
t.get(i).with_ctx(|ctx| {
assert_matches!(
&<T as Transport<IpFromSockAddr<A>>>::collect_all_sockets(ctx)[..],
[_]
);
});
}
let () = alice_socket
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
let () = bob_cloned
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
// But the sockets should have gone here.
for i in 0..2 {
t.get(i).with_ctx(|ctx| {
assert_matches!(
&<T as Transport<IpFromSockAddr<A>>>::collect_all_sockets(ctx)[..],
[]
);
});
}
}
fposix_socket::DatagramSocketProtocol::IcmpEcho => {
// For ICMP sockets, the sending and receiving socket are the
// the same socket, so the above test for UDP will not apply -
// closing alice_socket and bob_cloned will keep both sockets
// alive, but closing bob_socket and bob_cloned will actually
// close bob. There is no interesting behavior to test for a
// closed socket.
}
}
t
}
declare_tests!(clone);
#[fixture::teardown(TestSetup::shutdown)]
async fn close_twice<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol)
where
T: Transport<Ipv4>,
T: Transport<Ipv6>,
T: Transport<<A::AddrType as IpAddress>::Version>,
{
// Make sure we cannot close twice from the same channel so that we
// maintain the correct refcount.
let mut t = TestSetupBuilder::new().add_endpoint().add_empty_stack().build().await;
let test_stack = t.get(0);
let socket = get_socket::<A>(test_stack, proto).await;
let cloned = socket_clone(&socket);
let () = socket
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
let _: fidl::Error = socket
.close()
.await
.expect_err("should not be able to close the socket twice on the same channel");
assert!(socket.into_channel().unwrap().is_closed());
// Since we still hold the cloned socket, the binding_data shouldn't be
// empty
test_stack.with_ctx(|ctx| {
assert_matches!(
&<T as Transport<IpFromSockAddr<A>>>::collect_all_sockets(ctx)[..],
[_]
);
});
let () = cloned
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
// Now it should become empty
test_stack.with_ctx(|ctx| {
assert_matches!(&<T as Transport<IpFromSockAddr<A>>>::collect_all_sockets(ctx)[..], []);
});
t
}
declare_tests!(close_twice);
#[fixture::teardown(TestSetup::shutdown)]
async fn implicit_close<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol)
where
T: Transport<Ipv4>,
T: Transport<Ipv6>,
T: Transport<<A::AddrType as IpAddress>::Version>,
{
let mut t = TestSetupBuilder::new().add_endpoint().add_empty_stack().build().await;
let test_stack = t.get(0);
let cloned = {
let socket = get_socket::<A>(test_stack, proto).await;
socket_clone(&socket)
// socket goes out of scope indicating an implicit close.
};
// Using an explicit close here.
let () = cloned
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
// No socket should be there now.
test_stack.with_ctx(|ctx| {
assert_matches!(&<T as Transport<IpFromSockAddr<A>>>::collect_all_sockets(ctx)[..], []);
});
t
}
declare_tests!(implicit_close);
#[fixture::teardown(TestSetup::shutdown)]
async fn invalid_clone_args<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol)
where
T: Transport<Ipv4>,
T: Transport<Ipv6>,
T: Transport<<A::AddrType as IpAddress>::Version>,
{
let mut t = TestSetupBuilder::new().add_endpoint().add_empty_stack().build().await;
let test_stack = t.get(0);
let socket = get_socket::<A>(test_stack, proto).await;
let () = socket
.close()
.await
.expect("FIDL error")
.map_err(zx::Status::from_raw)
.expect("close failed");
// make sure we don't leak anything.
test_stack.with_ctx(|ctx| {
assert_matches!(&<T as Transport<IpFromSockAddr<A>>>::collect_all_sockets(ctx)[..], []);
});
t
}
declare_tests!(invalid_clone_args);
#[fixture::teardown(TestSetup::shutdown)]
async fn shutdown<A: TestSockAddr, T>(proto: fposix_socket::DatagramSocketProtocol) {
let mut t = TestSetupBuilder::new()
.add_endpoint()
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(1), Some(A::config_addr_subnet())),
)
.build()
.await;
let (socket, events) = get_socket_and_event::<A>(t.get(0), proto).await;
let local = A::create(A::LOCAL_ADDR, 200);
let remote = A::create(A::REMOTE_ADDR, 300);
assert_eq!(
socket
.shutdown(fposix_socket::ShutdownMode::WRITE)
.await
.unwrap()
.expect_err("should not shutdown an unconnected socket"),
fposix::Errno::Enotconn,
);
let () = socket.bind(&local).await.unwrap().expect("failed to bind");
assert_eq!(
socket
.shutdown(fposix_socket::ShutdownMode::WRITE)
.await
.unwrap()
.expect_err("should not shutdown an unconnected socket"),
fposix::Errno::Enotconn,
);
let () = socket.connect(&remote).await.unwrap().expect("failed to connect");
assert_eq!(
socket
.shutdown(fposix_socket::ShutdownMode::empty())
.await
.unwrap()
.expect_err("invalid args"),
fposix::Errno::Einval
);
// Cannot send
let body = "Hello".as_bytes();
let () = socket
.shutdown(fposix_socket::ShutdownMode::WRITE)
.await
.unwrap()
.expect("failed to shutdown");
assert_eq!(
socket
.send_msg(
None,
&body,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect_err("writing to an already-shutdown socket should fail"),
fposix::Errno::Epipe,
);
let invalid_addr = A::create(A::REMOTE_ADDR, 0);
let errno = match proto {
fposix_socket::DatagramSocketProtocol::Udp => fposix::Errno::Einval,
fposix_socket::DatagramSocketProtocol::IcmpEcho => fposix::Errno::Epipe,
};
assert_eq!(
socket
.send_msg(
Some(&invalid_addr),
&body,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect_err("writing to port 0 should fail"),
errno
);
let left = async {
assert_eq!(
fasync::OnSignals::new(&events, ZXSIO_SIGNAL_INCOMING).await,
Ok(ZXSIO_SIGNAL_INCOMING | ZXSIO_SIGNAL_OUTGOING)
);
};
let right = async {
let () = socket
.shutdown(fposix_socket::ShutdownMode::READ)
.await
.unwrap()
.expect("failed to shutdown");
let (_, data, _, _) = socket
.recv_msg(false, 2048, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("recvmsg should return empty data");
assert!(data.is_empty());
};
let ((), ()) = futures::future::join(left, right).await;
let () = socket
.shutdown(fposix_socket::ShutdownMode::READ)
.await
.unwrap()
.expect("failed to shutdown the socket twice");
let () = socket
.shutdown(fposix_socket::ShutdownMode::WRITE)
.await
.unwrap()
.expect("failed to shutdown the socket twice");
let () = socket
.shutdown(fposix_socket::ShutdownMode::READ | fposix_socket::ShutdownMode::WRITE)
.await
.unwrap()
.expect("failed to shutdown the socket twice");
t
}
declare_tests!(shutdown);
#[fixture::teardown(TestSetup::shutdown)]
async fn set_receive_buffer_after_delivery<
A: TestSockAddr,
T: Transport<<A::AddrType as IpAddress>::Version> + Transport<Ipv4> + Transport<Ipv6>,
>(
proto: fposix_socket::DatagramSocketProtocol,
) where
<A::AddrType as IpAddress>::Version: IcmpIpExt,
{
let mut t = TestSetupBuilder::new().add_stack(StackSetupBuilder::new()).build().await;
let (socket, _events) = get_socket_and_event::<A>(t.get(0), proto).await;
let addr =
A::create(<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(), 200);
socket.bind(&addr).await.unwrap().expect("bind should succeed");
const SENT_PACKETS: u8 = 10;
for i in 0..SENT_PACKETS {
let buf = prepare_buffer_to_send::<A>(
proto,
vec![i; MIN_OUTSTANDING_APPLICATION_MESSAGES_SIZE],
);
let sent: usize = socket
.send_msg(
Some(&addr),
&buf,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty(),
)
.await
.unwrap()
.expect("send_msg should succeed")
.try_into()
.unwrap();
assert_eq!(sent, buf.len());
}
// Wait for all packets to be delivered before changing the buffer size.
let stack = t.get(0);
let has_all_delivered = |messages: &MessageQueue<_>| {
messages.available_messages().len() == usize::from(SENT_PACKETS)
};
loop {
let all_delivered = stack.with_ctx(|ctx| {
let socket = <T as Transport<IpFromSockAddr<A>>>::collect_all_sockets(ctx)
.into_iter()
.next()
.unwrap();
let external_data = <T as Transport<IpFromSockAddr<A>>>::external_data(&socket);
let message_queue = external_data.message_queue.lock();
has_all_delivered(&message_queue)
});
if all_delivered {
break;
}
// Give other futures on the same executor a chance to run. In a
// single-threaded context, without the yield, this future would
// always be able to re-lock the stack after unlocking, and so no
// other future would make progress.
futures_lite::future::yield_now().await;
}
// Use a buffer size of 0, which will be substituted with the minimum size.
let () =
socket.set_receive_buffer(0).await.unwrap().expect("set buffer size should succeed");
let rx_count = futures::stream::unfold(socket, |socket| async {
let result = socket
.recv_msg(false, u32::MAX, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap();
match result {
Ok((addr, data, control, size)) => {
let _: (
Option<Box<fnet::SocketAddress>>,
fposix_socket::DatagramSocketRecvControlData,
u32,
) = (addr, control, size);
Some((data, socket))
}
Err(fposix::Errno::Eagain) => None,
Err(e) => panic!("unexpected error: {:?}", e),
}
})
.enumerate()
.map(|(i, data)| {
assert_eq!(
&data,
&expected_buffer_to_receive::<A>(
proto,
vec![u8::try_from(i).unwrap(); MIN_OUTSTANDING_APPLICATION_MESSAGES_SIZE],
200,
<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(),
<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(),
)
)
})
.count()
.await;
assert_eq!(rx_count, usize::from(SENT_PACKETS));
t
}
declare_tests!(set_receive_buffer_after_delivery);
#[fixture::teardown(TestSetup::shutdown)]
async fn send_recv_loopback_peek<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) where
<A::AddrType as IpAddress>::Version: IcmpIpExt,
{
let (t, proxy, _event) = prepare_test::<A>(proto).await;
let addr =
A::create(<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(), 100);
let () = proxy.bind(&addr).await.unwrap().expect("bind succeeds");
let () = proxy.connect(&addr).await.unwrap().expect("connect succeeds");
const DATA: &[u8] = &[1, 2, 3, 4, 5];
let to_send = prepare_buffer_to_send::<A>(proto, DATA.to_vec());
assert_eq!(
usize::try_from(
proxy
.send_msg(
None,
&to_send,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect("send_msg should succeed"),
)
.unwrap(),
to_send.len()
);
// First try receiving the message with PEEK set.
let (_addr, data, _control, truncated) = loop {
match proxy
.recv_msg(false, u32::MAX, false, fposix_socket::RecvMsgFlags::PEEK)
.await
.unwrap()
{
Ok(peek) => break peek,
Err(fposix::Errno::Eagain) => {
// The sent datagram hasn't been received yet, so check for
// it again in a moment.
continue;
}
Err(e) => panic!("unexpected error: {e:?}"),
}
};
let expected = expected_buffer_to_receive::<A>(
proto,
DATA.to_vec(),
100,
<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(),
<<A::AddrType as IpAddress>::Version as Ip>::LOOPBACK_ADDRESS.get(),
);
assert_eq!(truncated, 0);
assert_eq!(data.as_slice(), expected,);
// Now that the message has for sure been received, it can be retrieved
// without checking for Eagain.
let (_addr, data, _control, truncated) = proxy
.recv_msg(false, u32::MAX, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("recv should succeed");
assert_eq!(truncated, 0);
assert_eq!(data.as_slice(), expected);
t
}
declare_tests!(send_recv_loopback_peek);
// TODO(https://fxbug.dev/42174378): add a syscall test to exercise this
// behavior.
#[fixture::teardown(TestSetup::shutdown)]
async fn multicast_join_receive<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) {
let (mut t, proxy, event) = prepare_test::<A>(proto).await;
let mcast_addr = <<A::AddrType as IpAddress>::Version as Ip>::MULTICAST_SUBNET.network();
let id = t.get(0).get_endpoint_id(1);
match mcast_addr.into() {
IpAddr::V4(mcast_addr) => {
proxy.add_ip_membership(&fposix_socket::IpMulticastMembership {
mcast_addr: mcast_addr.into_fidl(),
iface: id.get(),
local_addr: fnet::Ipv4Address { addr: [0; 4] },
})
}
IpAddr::V6(mcast_addr) => {
proxy.add_ipv6_membership(&fposix_socket::Ipv6MulticastMembership {
mcast_addr: mcast_addr.into_fidl(),
iface: id.get(),
})
}
}
.await
.unwrap()
.expect("add membership should succeed");
const PORT: u16 = 100;
const DATA: &[u8] = &[1, 2, 3, 4, 5];
let () = proxy
.bind(&A::create(
<<A::AddrType as IpAddress>::Version as Ip>::UNSPECIFIED_ADDRESS,
PORT,
))
.await
.unwrap()
.expect("bind succeeds");
assert_eq!(
usize::try_from(
proxy
.send_msg(
Some(&A::create(mcast_addr, PORT)),
DATA,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect("send_msg should succeed"),
)
.unwrap(),
DATA.len()
);
let _signals = event
.wait_handle(ZXSIO_SIGNAL_INCOMING, zx::Time::INFINITE)
.expect("socket should receive");
let (_addr, data, _control, truncated) = proxy
.recv_msg(false, u32::MAX, false, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("recv should succeed");
assert_eq!(truncated, 0);
assert_eq!(data.as_slice(), DATA);
t
}
declare_tests!(
multicast_join_receive,
icmp #[should_panic = "Eopnotsupp"]
);
#[fixture::teardown(TestSetup::shutdown)]
async fn set_get_hop_limit_unicast<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) {
let (t, proxy, _event) = prepare_test::<A>(proto).await;
const HOP_LIMIT: u8 = 200;
match <<A::AddrType as IpAddress>::Version as Ip>::VERSION {
IpVersion::V4 => proxy.set_ip_multicast_ttl(&Some(HOP_LIMIT).into_fidl()),
IpVersion::V6 => proxy.set_ipv6_multicast_hops(&Some(HOP_LIMIT).into_fidl()),
}
.await
.unwrap()
.expect("set hop limit should succeed");
assert_eq!(
match <<A::AddrType as IpAddress>::Version as Ip>::VERSION {
IpVersion::V4 => proxy.get_ip_multicast_ttl(),
IpVersion::V6 => proxy.get_ipv6_multicast_hops(),
}
.await
.unwrap()
.expect("get hop limit should succeed"),
HOP_LIMIT
);
t
}
declare_tests!(set_get_hop_limit_unicast);
#[fixture::teardown(TestSetup::shutdown)]
async fn set_get_hop_limit_multicast<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) {
let (t, proxy, _event) = prepare_test::<A>(proto).await;
const HOP_LIMIT: u8 = 200;
match <<A::AddrType as IpAddress>::Version as Ip>::VERSION {
IpVersion::V4 => proxy.set_ip_ttl(&Some(HOP_LIMIT).into_fidl()),
IpVersion::V6 => proxy.set_ipv6_unicast_hops(&Some(HOP_LIMIT).into_fidl()),
}
.await
.unwrap()
.expect("set hop limit should succeed");
assert_eq!(
match <<A::AddrType as IpAddress>::Version as Ip>::VERSION {
IpVersion::V4 => proxy.get_ip_ttl(),
IpVersion::V6 => proxy.get_ipv6_unicast_hops(),
}
.await
.unwrap()
.expect("get hop limit should succeed"),
HOP_LIMIT
);
t
}
declare_tests!(set_get_hop_limit_multicast);
#[fixture::teardown(TestSetup::shutdown)]
async fn set_hop_limit_wrong_type<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) {
let (t, proxy, _event) = prepare_test::<A>(proto).await;
const HOP_LIMIT: u8 = 200;
let (multicast_result, unicast_result) =
match <<A::AddrType as IpAddress>::Version as Ip>::VERSION {
IpVersion::V4 => (
proxy.set_ipv6_multicast_hops(&Some(HOP_LIMIT).into_fidl()).await.unwrap(),
proxy.set_ipv6_unicast_hops(&Some(HOP_LIMIT).into_fidl()).await.unwrap(),
),
IpVersion::V6 => (
proxy.set_ip_multicast_ttl(&Some(HOP_LIMIT).into_fidl()).await.unwrap(),
proxy.set_ip_ttl(&Some(HOP_LIMIT).into_fidl()).await.unwrap(),
),
};
match (proto, <<A::AddrType as IpAddress>::Version as Ip>::VERSION) {
// UDPv6 is a dualstack capable protocol, so it allows setting the
// TTL of IPv6 sockets.
(fposix_socket::DatagramSocketProtocol::Udp, IpVersion::V6) => {
assert_matches!(multicast_result, Ok(_));
assert_matches!(unicast_result, Ok(_));
}
// All other [protocol, ip_version] are not dualstack capable.
(_, _) => {
assert_matches!(multicast_result, Err(_));
assert_matches!(unicast_result, Err(_));
}
}
t
}
declare_tests!(set_hop_limit_wrong_type);
#[fixture::teardown(TestSetup::shutdown)]
async fn get_hop_limit_wrong_type<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) {
let (t, proxy, _event) = prepare_test::<A>(proto).await;
let (multicast_result, unicast_result) =
match <<A::AddrType as IpAddress>::Version as Ip>::VERSION {
IpVersion::V4 => (
proxy.get_ipv6_multicast_hops().await.unwrap(),
proxy.get_ipv6_unicast_hops().await.unwrap(),
),
IpVersion::V6 => {
(proxy.get_ip_multicast_ttl().await.unwrap(), proxy.get_ip_ttl().await.unwrap())
}
};
match (proto, <<A::AddrType as IpAddress>::Version as Ip>::VERSION) {
// UDPv6 is a dualstack capable protocol, so it allows getting the
// TTL of IPv6 sockets.
(fposix_socket::DatagramSocketProtocol::Udp, IpVersion::V6) => {
assert_matches!(multicast_result, Ok(_));
assert_matches!(unicast_result, Ok(_));
}
// All other [protocol, ip_version] are not dualstack capable.
(_, _) => {
assert_matches!(multicast_result, Err(_));
assert_matches!(unicast_result, Err(_));
}
}
t
}
declare_tests!(get_hop_limit_wrong_type);
#[fixture::teardown(TestSetup::shutdown)]
async fn receive_original_destination_address<A: TestSockAddr, T>(
proto: fposix_socket::DatagramSocketProtocol,
) {
// Follow the same steps as the hello test above: Create two stacks, Alice (server listening
// on LOCAL_ADDR:200), and Bob (client, bound on REMOTE_ADDR:300).
let mut t = TestSetupBuilder::new()
.add_endpoint()
.add_endpoint()
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(1), Some(A::config_addr_subnet())),
)
.add_stack(
StackSetupBuilder::new()
.add_named_endpoint(test_ep_name(2), Some(A::config_addr_subnet_remote())),
)
.build()
.await;
let alice = t.get(0);
let (alice_socket, alice_events) = get_socket_and_event::<A>(alice, proto).await;
// Setup Alice as a server, bound to LOCAL_ADDR:200
println!("Configuring alice...");
let () = alice_socket
.bind(&A::create(A::LOCAL_ADDR, 200))
.await
.unwrap()
.expect("alice bind suceeds");
// Setup Bob as a client, bound to REMOTE_ADDR:300
println!("Configuring bob...");
let bob = t.get(1);
let bob_socket = get_socket::<A>(bob, proto).await;
let () = bob_socket
.bind(&A::create(A::REMOTE_ADDR, 300))
.await
.unwrap()
.expect("bob bind suceeds");
// Connect Bob to Alice on LOCAL_ADDR:200
println!("Connecting bob to alice...");
let () = bob_socket
.connect(&A::create(A::LOCAL_ADDR, 200))
.await
.unwrap()
.expect("Connect succeeds");
// Send datagram from Bob's socket.
println!("Writing datagram to bob");
let body = "Hello".as_bytes();
assert_eq!(
bob_socket
.send_msg(
None,
&body,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect("sendmsg suceeds"),
body.len() as i64
);
// Wait for datagram to arrive on Alice's socket:
println!("Waiting for signals");
assert_eq!(
fasync::OnSignals::new(&alice_events, ZXSIO_SIGNAL_INCOMING).await,
Ok(ZXSIO_SIGNAL_INCOMING | ZXSIO_SIGNAL_OUTGOING)
);
// Check the option is currently false.
assert!(!alice_socket
.get_ip_receive_original_destination_address()
.await
.expect("get_ip_receive_original_destination_address (FIDL) failed")
.expect("get_ip_receive_original_destination_address failed"),);
alice_socket
.set_ip_receive_original_destination_address(true)
.await
.expect("set_ip_receive_original_destination_address (FIDL) failed")
.expect("set_ip_receive_original_destination_address failed");
// The option should now be reported as set.
assert!(alice_socket
.get_ip_receive_original_destination_address()
.await
.expect("get_ip_receive_original_destination_address (FIDL) failed")
.expect("get_ip_receive_original_destination_address failed"),);
assert_matches!(
alice_socket
.recv_msg(false, 2048, true, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("recvmsg suceeeds"),
(
_,
_,
fposix_socket::DatagramSocketRecvControlData {
network:
Some(fposix_socket::NetworkSocketRecvControlData {
ip:
Some(fposix_socket::IpRecvControlData {
original_destination_address: Some(addr),
..
}),
..
}),
..
},
_,
) => {
let addr = A::from_sock_addr(addr).expect("bad socket address return");
assert_eq!(addr.addr(), A::LOCAL_ADDR);
assert_eq!(addr.port(), 200);
}
);
// Turn it off.
alice_socket
.set_ip_receive_original_destination_address(false)
.await
.expect("set_ip_receive_original_destination_address (FIDL) failed")
.expect("set_ip_receive_original_destination_address failed");
assert!(!alice_socket
.get_ip_receive_original_destination_address()
.await
.expect("get_ip_receive_original_destination_address (FIDL) failed")
.expect("get_ip_receive_original_destination_address failed"),);
assert_eq!(
bob_socket
.send_msg(
None,
&body,
&fposix_socket::DatagramSocketSendControlData::default(),
fposix_socket::SendMsgFlags::empty()
)
.await
.unwrap()
.expect("sendmsg suceeds"),
body.len() as i64
);
// Wait for datagram to arrive on Alice's socket:
println!("Waiting for signals");
assert_eq!(
fasync::OnSignals::new(&alice_events, ZXSIO_SIGNAL_INCOMING).await,
Ok(ZXSIO_SIGNAL_INCOMING | ZXSIO_SIGNAL_OUTGOING)
);
assert_matches!(
alice_socket
.recv_msg(false, 2048, true, fposix_socket::RecvMsgFlags::empty())
.await
.unwrap()
.expect("recvmsg suceeeds"),
(_, _, fposix_socket::DatagramSocketRecvControlData { network: None, .. }, _)
);
t
}
declare_tests!(
receive_original_destination_address,
icmp #[ignore] // ICMP sockets' send/recv are different from what UDP
// does, i.e., alice doesn't receive what bob sends, but rather bob
// receives the echo reply for the echo request they send. If we need
// this option for ICMP sockets, we should write a dedicated test for
// ICMP.
);
}