blob: 69a7d236ba3eb28a26c5cdc69eff880dfff9940d [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//go:build !build_with_native_toolchain
package netstack
import (
"bytes"
"context"
"fmt"
"io"
"math"
"runtime"
"sort"
"strings"
"sync/atomic"
"syscall/zx"
"syscall/zx/fidl"
"syscall/zx/zxwait"
"time"
"unsafe"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/fidlconv"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/sync"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/tracing/trace"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/udp_serde"
"go.fuchsia.dev/fuchsia/src/lib/component"
syslog "go.fuchsia.dev/fuchsia/src/lib/syslog/go"
fnet "fidl/fuchsia/net"
"fidl/fuchsia/posix"
"fidl/fuchsia/posix/socket"
packetsocket "fidl/fuchsia/posix/socket/packet"
rawsocket "fidl/fuchsia/posix/socket/raw"
"fidl/fuchsia/unknown"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/network/ipv4"
"gvisor.dev/gvisor/pkg/tcpip/network/ipv6"
"gvisor.dev/gvisor/pkg/tcpip/stack"
"gvisor.dev/gvisor/pkg/tcpip/transport"
"gvisor.dev/gvisor/pkg/tcpip/transport/icmp"
"gvisor.dev/gvisor/pkg/tcpip/transport/tcp"
"gvisor.dev/gvisor/pkg/tcpip/transport/udp"
"gvisor.dev/gvisor/pkg/waiter"
)
const (
// NB: these constants are defined in gVisor as well, but not exported.
ccReno = "reno"
ccCubic = "cubic"
// Max values for sockopt TCP_KEEPIDLE and TCP_KEEPINTVL in Linux.
//
// https://github.com/torvalds/linux/blob/f2850dd5ee015bd7b77043f731632888887689c7/include/net/tcp.h#L156-L158
maxTCPKeepIdle = 32767
maxTCPKeepIntvl = 32767
maxTCPKeepCnt = 127
signalStreamIncoming = zx.Signals(socket.SignalStreamIncoming)
signalStreamConnected = zx.Signals(socket.SignalStreamConnected)
signalDatagramIncoming = zx.Signals(socket.SignalDatagramIncoming)
signalDatagramOutgoing = zx.Signals(socket.SignalDatagramOutgoing)
signalDatagramError = zx.Signals(socket.SignalDatagramError)
signalDatagramShutdownRead = zx.Signals(socket.SignalDatagramShutdownRead)
signalDatagramShutdownWrite = zx.Signals(socket.SignalDatagramShutdownWrite)
)
func optionalUint8ToInt(v socket.OptionalUint8, unset int) (int, tcpip.Error) {
switch v.Which() {
case socket.OptionalUint8Value:
return int(v.Value), nil
case socket.OptionalUint8Unset:
return unset, nil
default:
return -1, &tcpip.ErrInvalidOptionValue{}
}
}
func optionalUint32ToInt(v socket.OptionalUint32, unset int) (int, tcpip.Error) {
switch v.Which() {
case socket.OptionalUint32Value:
return int(v.Value), nil
case socket.OptionalUint32Unset:
return unset, nil
default:
return -1, &tcpip.ErrInvalidOptionValue{}
}
}
var _ io.Writer = (*socketWriter)(nil)
type socketWriter struct {
socket zx.Socket
lastError error
}
func (w *socketWriter) Write(p []byte) (int, error) {
n, err := w.socket.Write(p, 0)
if err == nil && n != len(p) {
err = &zx.Error{Status: zx.ErrShouldWait}
}
w.lastError = err
return n, err
}
var _ tcpip.Payloader = (*socketReader)(nil)
type socketReader struct {
socket zx.Socket
lastError error
// readBytes keep track of how many bytes were read from the reader.
readBytes int
}
func (r *socketReader) Read(p []byte) (int, error) {
n, err := r.socket.Read(p, 0)
if err == nil && n != len(p) {
err = &zx.Error{Status: zx.ErrShouldWait}
}
r.lastError = err
r.readBytes += n
return n, err
}
func (r *socketReader) Len() int {
n, err := func() (int, error) {
var info zx.InfoSocket
if err := r.socket.Handle().GetInfo(zx.ObjectInfoSocket, unsafe.Pointer(&info), uint(unsafe.Sizeof(info))); err != nil {
return 0, err
}
return info.RXBufAvailable, nil
}()
if err == nil && n == 0 {
err = &zx.Error{Status: zx.ErrShouldWait}
}
r.lastError = err
return n
}
func transProtoToString(proto tcpip.TransportProtocolNumber) string {
switch proto {
case tcp.ProtocolNumber:
return "TCP"
case udp.ProtocolNumber:
return "UDP"
default:
return fmt.Sprintf("0x%x", proto)
}
}
type signaler struct {
supported waiter.EventMask
eventsToSignals func(waiter.EventMask) zx.Signals
readiness func(waiter.EventMask) waiter.EventMask
signalPeer func(zx.Signals, zx.Signals) error
mu struct {
sync.Mutex
asserted waiter.EventMask
}
}
func (s *signaler) update() error {
// We lock to ensure that no incoming event changes readiness while we maybe
// set the signals.
s.mu.Lock()
defer s.mu.Unlock()
// Consult the present readiness of the events we are interested in while
// we're locked, as they may have changed already.
observed := s.readiness(s.supported)
// readiness may return events that were not requested so only keep the events
// we explicitly requested. For example, Readiness implementation of the UDP
// endpoint in gvisor may return EventErr whether or not it was set in the
// mask:
// https://github.com/google/gvisor/blob/8ee4a3f/pkg/tcpip/transport/udp/endpoint.go#L1252
observed &= s.supported
if observed == s.mu.asserted {
// No events changed since last time.
return nil
}
set := s.eventsToSignals(observed &^ s.mu.asserted)
clear := s.eventsToSignals(s.mu.asserted &^ observed)
if set == 0 && clear == 0 {
return nil
}
if err := s.signalPeer(clear, set); err != nil {
return err
}
s.mu.asserted = observed
return nil
}
func (s *signaler) mustUpdate() {
err := s.update()
switch err := err.(type) {
case nil:
return
case *zx.Error:
switch err.Status {
case zx.ErrBadHandle, zx.ErrPeerClosed:
return
}
}
panic(err)
}
type streamSocketError struct {
mu struct {
sync.Mutex
ch <-chan tcpip.Error
err tcpip.Error
}
}
func (s *streamSocketError) setLocked(err tcpip.Error) {
switch previous := s.setLockedInner(err); previous {
case nil, err:
default:
switch previous.(type) {
case *tcpip.ErrNetworkUnreachable:
case *tcpip.ErrHostUnreachable:
// At the time of writing, these errors are triggered by inbound
// ICMP Destination Unreachable messages; if another error is to
// be set on the endpoint, treat that one as the true error
// because ICMP Destination Unreachable messages must not be
// treated as a hard error as required by RFC 1122:
//
// A Destination Unreachable message that is received with code
// 0 (Net), 1 (Host), or 5 (Bad Source Route) may result from a
// routing transient and MUST therefore be interpreted as only
// a hint, not proof, that the specified destination is
// unreachable [IP:11].
//
// https://datatracker.ietf.org/doc/html/rfc1122#page-40
default:
panic(fmt.Sprintf("previous=%#v err=%#v", previous, err))
}
}
}
func (s *streamSocketError) setLockedInner(err tcpip.Error) tcpip.Error {
switch err.(type) {
case
*tcpip.ErrConnectionAborted,
*tcpip.ErrConnectionRefused,
*tcpip.ErrConnectionReset,
*tcpip.ErrNetworkUnreachable,
*tcpip.ErrHostUnreachable,
*tcpip.ErrTimeout:
previous := s.mu.err
_ = syslog.DebugTf("setLockedInner", "previous=%#v err=%#v", previous, err)
if previous == nil {
ch := make(chan tcpip.Error, 1)
ch <- err
close(ch)
s.mu.ch = ch
s.mu.err = err
}
return previous
default:
return nil
}
}
// setConsumedLocked is used to set errors that are about to be returned to the
// client; since errors can only be returned once, this is used only for its
// side effect of causing subsequent reads to treat the error is consumed.
//
// This method is needed because gVisor sometimes double-reports errors:
//
// Double set: https://github.com/google/gvisor/blob/949c814/pkg/tcpip/transport/tcp/connect.go#L1357-L1361
//
// Retrieval: https://github.com/google/gvisor/blob/949c814/pkg/tcpip/transport/tcp/endpoint.go#L1273-L1281
func (s *streamSocketError) setConsumedLocked(err tcpip.Error) {
if ch := s.setConsumedLockedInner(err); ch != nil {
switch consumed := <-ch; consumed {
case nil, err:
default:
panic(fmt.Sprintf("consumed=%#v err=%#v", consumed, err))
}
}
}
func (s *streamSocketError) setConsumedLockedInner(err tcpip.Error) <-chan tcpip.Error {
if previous := s.setLockedInner(err); previous != nil {
// Was already set; let the caller decide whether to consume.
return s.mu.ch
}
ch := s.mu.ch
if ch != nil {
// Wasn't set, became set. Consume since we're returning the error.
if consumed := <-ch; consumed != err {
panic(fmt.Sprintf("consumed=%#v err=%#v", consumed, err))
}
}
return ch
}
type socketOptionStats interface {
isSocketOptionStats()
}
type NetworkSocketOptionStats struct {
// The following stats are defined on fuchsia.posix.socket/BaseSocket.
SetReuseAddress atomicUint32Stat
GetReuseAddress atomicUint32Stat
GetError atomicUint32Stat
SetBroadcast atomicUint32Stat
GetBroadcast atomicUint32Stat
SetSendBuffer atomicUint32Stat
GetSendBuffer atomicUint32Stat
SetReceiveBuffer atomicUint32Stat
GetReceiveBuffer atomicUint32Stat
SetKeepAlive atomicUint32Stat
GetKeepAlive atomicUint32Stat
SetOutOfBandInline atomicUint32Stat
GetOutOfBandInline atomicUint32Stat
SetNoCheck atomicUint32Stat
GetNoCheck atomicUint32Stat
SetLinger atomicUint32Stat
GetLinger atomicUint32Stat
SetReusePort atomicUint32Stat
GetReusePort atomicUint32Stat
GetAcceptConn atomicUint32Stat
SetBindToDevice atomicUint32Stat
GetBindToDevice atomicUint32Stat
SetTimestamp atomicUint32Stat
GetTimestamp atomicUint32Stat
// The following stats are defined on fuchsia.posix.socket/BaseNetworkSocket.
SetIpTypeOfService atomicUint32Stat
GetIpTypeOfService atomicUint32Stat
SetIpTtl atomicUint32Stat
GetIpTtl atomicUint32Stat
SetIpPacketInfo atomicUint32Stat
GetIpPacketInfo atomicUint32Stat
SetIpReceiveTypeOfService atomicUint32Stat
GetIpReceiveTypeOfService atomicUint32Stat
SetIpReceiveTtl atomicUint32Stat
GetIpReceiveTtl atomicUint32Stat
SetIpMulticastInterface atomicUint32Stat
GetIpMulticastInterface atomicUint32Stat
SetIpMulticastTtl atomicUint32Stat
GetIpMulticastTtl atomicUint32Stat
SetIpMulticastLoopback atomicUint32Stat
GetIpMulticastLoopback atomicUint32Stat
SetIpv6MulticastInterface atomicUint32Stat
GetIpv6MulticastInterface atomicUint32Stat
SetIpv6UnicastHops atomicUint32Stat
GetIpv6UnicastHops atomicUint32Stat
SetIpv6ReceiveHopLimit atomicUint32Stat
GetIpv6ReceiveHopLimit atomicUint32Stat
SetIpv6MulticastHops atomicUint32Stat
GetIpv6MulticastHops atomicUint32Stat
SetIpv6MulticastLoopback atomicUint32Stat
GetIpv6MulticastLoopback atomicUint32Stat
AddIpMembership atomicUint32Stat
DropIpMembership atomicUint32Stat
AddIpv6Membership atomicUint32Stat
DropIpv6Membership atomicUint32Stat
SetIpv6Only atomicUint32Stat
GetIpv6Only atomicUint32Stat
SetIpv6ReceiveTrafficClass atomicUint32Stat
GetIpv6ReceiveTrafficClass atomicUint32Stat
SetIpv6TrafficClass atomicUint32Stat
GetIpv6TrafficClass atomicUint32Stat
SetIpv6ReceivePacketInfo atomicUint32Stat
GetIpv6ReceivePacketInfo atomicUint32Stat
}
var _ socketOptionStats = (*NetworkSocketOptionStats)(nil)
// isSocketOptionStats implements socketOptionStats.
func (s *NetworkSocketOptionStats) isSocketOptionStats() {
}
type StreamSocketSpecificOptionStats struct {
SetTcpNoDelay atomicUint32Stat
GetTcpNoDelay atomicUint32Stat
SetTcpMaxSegment atomicUint32Stat
GetTcpMaxSegment atomicUint32Stat
SetTcpCork atomicUint32Stat
GetTcpCork atomicUint32Stat
SetTcpKeepAliveIdle atomicUint32Stat
GetTcpKeepAliveIdle atomicUint32Stat
SetTcpKeepAliveInterval atomicUint32Stat
GetTcpKeepAliveInterval atomicUint32Stat
SetTcpKeepAliveCount atomicUint32Stat
GetTcpKeepAliveCount atomicUint32Stat
SetTcpSynCount atomicUint32Stat
GetTcpSynCount atomicUint32Stat
SetTcpLinger atomicUint32Stat
GetTcpLinger atomicUint32Stat
SetTcpDeferAccept atomicUint32Stat
GetTcpDeferAccept atomicUint32Stat
SetTcpWindowClamp atomicUint32Stat
GetTcpWindowClamp atomicUint32Stat
GetTcpInfo atomicUint32Stat
SetTcpQuickAck atomicUint32Stat
GetTcpQuickAck atomicUint32Stat
SetTcpCongestion atomicUint32Stat
GetTcpCongestion atomicUint32Stat
SetTcpUserTimeout atomicUint32Stat
GetTcpUserTimeout atomicUint32Stat
}
type streamSocketOptionStats struct {
*NetworkSocketOptionStats
StreamSocketSpecificOptionStats
}
var _ socketOptionStats = (*streamSocketOptionStats)(nil)
type RawSocketSpecificOptionStats struct {
SetIpHeaderIncluded atomicUint32Stat
GetIpHeaderIncluded atomicUint32Stat
SetIcmpv6Filter atomicUint32Stat
GetIcmpv6Filter atomicUint32Stat
SetIpv6Checksum atomicUint32Stat
GetIpv6Checksum atomicUint32Stat
}
type rawSocketOptionStats struct {
*NetworkSocketOptionStats
RawSocketSpecificOptionStats
}
var _ socketOptionStats = (*rawSocketOptionStats)(nil)
// endpoint is the base structure that models all network sockets.
type endpoint struct {
wq *waiter.Queue
ep tcpip.Endpoint
mu struct {
sync.RWMutex
refcount uint32
sockOptTimestamp socket.TimestampOption
}
transProto tcpip.TransportProtocolNumber
netProto tcpip.NetworkProtocolNumber
key uint64
ns *Netstack
sockOptStats NetworkSocketOptionStats
}
// endpointWithMutators exposes FIDL methods that mutate the state of the
// endpoint. This division lets endpoints embed non-mutating endpoints while
// programmatically enforcing special handling for mutators (e.g. datagram
// sockets need to flush caches on every mutation).
type endpointWithMutators struct {
ep *endpoint
}
func (ep *endpoint) incRef() {
ep.mu.Lock()
ep.mu.refcount++
ep.mu.Unlock()
}
func (ep *endpoint) decRef() bool {
ep.mu.Lock()
doubleClose := ep.mu.refcount == 0
ep.mu.refcount--
doClose := ep.mu.refcount == 0
ep.mu.Unlock()
if doubleClose {
panic(fmt.Sprintf("%p: double close", ep))
}
return doClose
}
func (ep *endpointWithMutators) Bind(_ fidl.Context, sockaddr fnet.SocketAddress) (socket.BaseNetworkSocketBindResult, error) {
addr := fidlconv.ToTCPIPFullAddress(sockaddr)
if err := ep.ep.ep.Bind(addr); err != nil {
return socket.BaseNetworkSocketBindResultWithErr(tcpipErrorToCode(err)), nil
}
{
localAddr, err := ep.ep.ep.GetLocalAddress()
if err != nil {
panic(err)
}
_ = syslog.DebugTf("bind", "%p: local=%+v", ep, localAddr)
}
return socket.BaseNetworkSocketBindResultWithResponse(socket.BaseNetworkSocketBindResponse{}), nil
}
func (ep *endpoint) toTCPIPFullAddress(address fnet.SocketAddress) (tcpip.FullAddress, tcpip.Error) {
addr := fidlconv.ToTCPIPFullAddress(address)
if l := addr.Addr.Len(); l > 0 {
addressSupported := func() bool {
switch ep.netProto {
case ipv4.ProtocolNumber:
return l == header.IPv4AddressSize
case ipv6.ProtocolNumber:
if ep.transProto == tcp.ProtocolNumber {
return l == header.IPv6AddressSize
}
return true
default:
panic(fmt.Sprintf("unknown network protocol number: %d", ep.netProto))
}
}()
if !addressSupported {
_ = syslog.DebugTf("connect", "%p: unsupported address %s", ep, addr.Addr)
return addr, &tcpip.ErrAddressFamilyNotSupported{}
}
}
return addr, nil
}
func (ep *endpoint) connect(addr tcpip.FullAddress) tcpip.Error {
if err := ep.ep.Connect(addr); err != nil {
return err
}
{
localAddr, err := ep.ep.GetLocalAddress()
if err != nil {
panic(err)
}
remoteAddr, err := ep.ep.GetRemoteAddress()
if err != nil {
if _, ok := err.(*tcpip.ErrNotConnected); !ok {
panic(err)
}
_ = syslog.DebugTf("connect", "%p: local=%+v, remote=disconnected", ep, localAddr)
} else {
_ = syslog.DebugTf("connect", "%p: local=%+v, remote=%+v", ep, localAddr, remoteAddr)
}
}
return nil
}
func (ep *endpointWithMutators) Disconnect(fidl.Context) (socket.BaseNetworkSocketDisconnectResult, error) {
if err := ep.ep.ep.Disconnect(); err != nil {
return socket.BaseNetworkSocketDisconnectResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketDisconnectResultWithResponse(socket.BaseNetworkSocketDisconnectResponse{}), nil
}
func (ep *endpoint) GetSockName(fidl.Context) (socket.BaseNetworkSocketGetSockNameResult, error) {
addr, err := ep.ep.GetLocalAddress()
if err != nil {
return socket.BaseNetworkSocketGetSockNameResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketGetSockNameResultWithResponse(socket.BaseNetworkSocketGetSockNameResponse{
Addr: fidlconv.ToNetSocketAddressWithProto(ep.netProto, addr),
}), nil
}
func (ep *endpoint) GetPeerName(fidl.Context) (socket.BaseNetworkSocketGetPeerNameResult, error) {
addr, err := ep.ep.GetRemoteAddress()
if err != nil {
return socket.BaseNetworkSocketGetPeerNameResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketGetPeerNameResultWithResponse(socket.BaseNetworkSocketGetPeerNameResponse{
Addr: fidlconv.ToNetSocketAddressWithProto(ep.netProto, addr),
}), nil
}
func (ep *endpoint) GetTimestamp(fidl.Context) (socket.BaseSocketGetTimestampResult, error) {
ep.sockOptStats.GetTimestamp.Add(1)
ep.mu.RLock()
value := ep.mu.sockOptTimestamp
ep.mu.RUnlock()
return socket.BaseSocketGetTimestampResultWithResponse(socket.BaseSocketGetTimestampResponse{Value: value}), nil
}
func (ep *endpoint) setTimestamp(value socket.TimestampOption) {
ep.mu.Lock()
ep.mu.sockOptTimestamp = value
ep.mu.Unlock()
}
func (ep *endpointWithMutators) SetTimestamp(_ fidl.Context, value socket.TimestampOption) (socket.BaseSocketSetTimestampResult, error) {
ep.ep.sockOptStats.SetTimestamp.Add(1)
ep.ep.setTimestamp(value)
return socket.BaseSocketSetTimestampResultWithResponse(socket.BaseSocketSetTimestampResponse{}), nil
}
func (ep *endpoint) domain() (socket.Domain, tcpip.Error) {
switch ep.netProto {
case ipv4.ProtocolNumber:
return socket.DomainIpv4, nil
case ipv6.ProtocolNumber:
return socket.DomainIpv6, nil
}
return 0, &tcpip.ErrNotSupported{}
}
func setBufferSize(size uint64, set func(int64, bool), limits func() (min, max int64)) {
if size > math.MaxInt64 {
size = math.MaxInt64
}
{
size := int64(size)
// packetOverheadFactor is used to multiply the value provided by the user on
// a setsockopt(2) for setting the send/receive buffer sizes sockets.
const packetOverheadFactor = 2
if size > math.MaxInt64/packetOverheadFactor {
size = math.MaxInt64
} else {
size *= packetOverheadFactor
}
min, max := limits()
if size > max {
size = max
}
if size < min {
size = min
}
set(size, true /* notify */)
}
}
func (ep *endpointWithMutators) SetSendBuffer(_ fidl.Context, size uint64) (socket.BaseSocketSetSendBufferResult, error) {
ep.ep.sockOptStats.SetSendBuffer.Add(1)
opts := ep.ep.ep.SocketOptions()
setBufferSize(size, opts.SetSendBufferSize, opts.SendBufferLimits)
return socket.BaseSocketSetSendBufferResultWithResponse(socket.BaseSocketSetSendBufferResponse{}), nil
}
func (ep *endpoint) GetSendBuffer(fidl.Context) (socket.BaseSocketGetSendBufferResult, error) {
ep.sockOptStats.GetSendBuffer.Add(1)
size := ep.ep.SocketOptions().GetSendBufferSize()
return socket.BaseSocketGetSendBufferResultWithResponse(socket.BaseSocketGetSendBufferResponse{ValueBytes: uint64(size)}), nil
}
func (ep *endpointWithMutators) SetReceiveBuffer(_ fidl.Context, size uint64) (socket.BaseSocketSetReceiveBufferResult, error) {
ep.ep.sockOptStats.SetReceiveBuffer.Add(1)
opts := ep.ep.ep.SocketOptions()
setBufferSize(size, opts.SetReceiveBufferSize, opts.ReceiveBufferLimits)
return socket.BaseSocketSetReceiveBufferResultWithResponse(socket.BaseSocketSetReceiveBufferResponse{}), nil
}
func (ep *endpoint) GetReceiveBuffer(fidl.Context) (socket.BaseSocketGetReceiveBufferResult, error) {
ep.sockOptStats.GetReceiveBuffer.Add(1)
size := ep.ep.SocketOptions().GetReceiveBufferSize()
return socket.BaseSocketGetReceiveBufferResultWithResponse(socket.BaseSocketGetReceiveBufferResponse{ValueBytes: uint64(size)}), nil
}
func (ep *endpointWithMutators) SetReuseAddress(_ fidl.Context, value bool) (socket.BaseSocketSetReuseAddressResult, error) {
ep.ep.sockOptStats.SetReuseAddress.Add(1)
ep.ep.ep.SocketOptions().SetReuseAddress(value)
return socket.BaseSocketSetReuseAddressResultWithResponse(socket.BaseSocketSetReuseAddressResponse{}), nil
}
func (ep *endpoint) GetReuseAddress(fidl.Context) (socket.BaseSocketGetReuseAddressResult, error) {
ep.sockOptStats.GetReuseAddress.Add(1)
value := ep.ep.SocketOptions().GetReuseAddress()
return socket.BaseSocketGetReuseAddressResultWithResponse(socket.BaseSocketGetReuseAddressResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetReusePort(_ fidl.Context, value bool) (socket.BaseSocketSetReusePortResult, error) {
ep.ep.sockOptStats.SetReusePort.Add(1)
ep.ep.ep.SocketOptions().SetReusePort(value)
return socket.BaseSocketSetReusePortResultWithResponse(socket.BaseSocketSetReusePortResponse{}), nil
}
func (ep *endpoint) GetReusePort(fidl.Context) (socket.BaseSocketGetReusePortResult, error) {
ep.sockOptStats.GetReusePort.Add(1)
value := ep.ep.SocketOptions().GetReusePort()
return socket.BaseSocketGetReusePortResultWithResponse(socket.BaseSocketGetReusePortResponse{Value: value}), nil
}
func (ep *endpoint) GetAcceptConn(fidl.Context) (socket.BaseSocketGetAcceptConnResult, error) {
ep.sockOptStats.GetAcceptConn.Add(1)
value := false
if ep.transProto == tcp.ProtocolNumber {
value = tcp.EndpointState(ep.ep.State()) == tcp.StateListen
}
return socket.BaseSocketGetAcceptConnResultWithResponse(socket.BaseSocketGetAcceptConnResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetBindToDevice(_ fidl.Context, value string) (socket.BaseSocketSetBindToDeviceResult, error) {
ep.ep.sockOptStats.SetBindToDevice.Add(1)
if err := func() tcpip.Error {
if len(value) == 0 {
return ep.ep.ep.SocketOptions().SetBindToDevice(0)
}
for id, info := range ep.ep.ns.stack.NICInfo() {
if value == info.Name {
return ep.ep.ep.SocketOptions().SetBindToDevice(int32(id))
}
}
return &tcpip.ErrUnknownDevice{}
}(); err != nil {
return socket.BaseSocketSetBindToDeviceResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseSocketSetBindToDeviceResultWithResponse(socket.BaseSocketSetBindToDeviceResponse{}), nil
}
func (ep *endpoint) GetBindToDevice(fidl.Context) (socket.BaseSocketGetBindToDeviceResult, error) {
ep.sockOptStats.GetBindToDevice.Add(1)
id := ep.ep.SocketOptions().GetBindToDevice()
if id == 0 {
return socket.BaseSocketGetBindToDeviceResultWithResponse(socket.BaseSocketGetBindToDeviceResponse{}), nil
}
if name := ep.ns.stack.FindNICNameFromID(tcpip.NICID(id)); len(name) != 0 {
return socket.BaseSocketGetBindToDeviceResultWithResponse(
socket.BaseSocketGetBindToDeviceResponse{
Value: name,
}), nil
}
return socket.BaseSocketGetBindToDeviceResultWithErr(posix.ErrnoEnodev), nil
}
func (ep *endpointWithMutators) SetBindToInterfaceIndex(ctx fidl.Context, value uint64) (socket.BaseSocketSetBindToInterfaceIndexResult, error) {
id := tcpip.NICID(value)
if err := func() tcpip.Error {
if value == 0 {
return ep.ep.ep.SocketOptions().SetBindToDevice(0)
}
if _, ok := ep.ep.ns.stack.NICInfo()[id]; ok {
return ep.ep.ep.SocketOptions().SetBindToDevice(int32(id))
}
return &tcpip.ErrUnknownDevice{}
}(); err != nil {
return socket.BaseSocketSetBindToInterfaceIndexResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseSocketSetBindToInterfaceIndexResultWithResponse(socket.BaseSocketSetBindToInterfaceIndexResponse{}), nil
}
func (ep *endpoint) GetBindToInterfaceIndex(fidl.Context) (socket.BaseSocketGetBindToInterfaceIndexResult, error) {
id := ep.ep.SocketOptions().GetBindToDevice()
return socket.BaseSocketGetBindToInterfaceIndexResultWithResponse(socket.BaseSocketGetBindToInterfaceIndexResponse{
Value: uint64(id),
}), nil
}
func (ep *endpointWithMutators) SetBroadcast(_ fidl.Context, value bool) (socket.BaseSocketSetBroadcastResult, error) {
ep.ep.sockOptStats.SetBroadcast.Add(1)
ep.ep.ep.SocketOptions().SetBroadcast(value)
return socket.BaseSocketSetBroadcastResultWithResponse(socket.BaseSocketSetBroadcastResponse{}), nil
}
func (ep *endpoint) GetBroadcast(fidl.Context) (socket.BaseSocketGetBroadcastResult, error) {
ep.sockOptStats.GetBroadcast.Add(1)
value := ep.ep.SocketOptions().GetBroadcast()
return socket.BaseSocketGetBroadcastResultWithResponse(socket.BaseSocketGetBroadcastResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetKeepAlive(_ fidl.Context, value bool) (socket.BaseSocketSetKeepAliveResult, error) {
ep.ep.sockOptStats.SetKeepAlive.Add(1)
ep.ep.ep.SocketOptions().SetKeepAlive(value)
return socket.BaseSocketSetKeepAliveResultWithResponse(socket.BaseSocketSetKeepAliveResponse{}), nil
}
func (ep *endpoint) GetKeepAlive(fidl.Context) (socket.BaseSocketGetKeepAliveResult, error) {
ep.sockOptStats.GetKeepAlive.Add(1)
value := ep.ep.SocketOptions().GetKeepAlive()
return socket.BaseSocketGetKeepAliveResultWithResponse(socket.BaseSocketGetKeepAliveResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetLinger(_ fidl.Context, linger bool, seconds uint32) (socket.BaseSocketSetLingerResult, error) {
ep.ep.sockOptStats.SetLinger.Add(1)
ep.ep.ep.SocketOptions().SetLinger(tcpip.LingerOption{
Enabled: linger,
Timeout: time.Second * time.Duration(seconds),
})
return socket.BaseSocketSetLingerResultWithResponse(socket.BaseSocketSetLingerResponse{}), nil
}
func (ep *endpoint) GetLinger(fidl.Context) (socket.BaseSocketGetLingerResult, error) {
ep.sockOptStats.GetLinger.Add(1)
value := ep.ep.SocketOptions().GetLinger()
return socket.BaseSocketGetLingerResultWithResponse(
socket.BaseSocketGetLingerResponse{
Linger: value.Enabled,
LengthSecs: uint32(value.Timeout / time.Second),
},
), nil
}
func (ep *endpointWithMutators) SetOutOfBandInline(_ fidl.Context, value bool) (socket.BaseSocketSetOutOfBandInlineResult, error) {
ep.ep.sockOptStats.SetOutOfBandInline.Add(1)
ep.ep.ep.SocketOptions().SetOutOfBandInline(value)
return socket.BaseSocketSetOutOfBandInlineResultWithResponse(socket.BaseSocketSetOutOfBandInlineResponse{}), nil
}
func (ep *endpoint) GetOutOfBandInline(fidl.Context) (socket.BaseSocketGetOutOfBandInlineResult, error) {
ep.sockOptStats.GetOutOfBandInline.Add(1)
value := ep.ep.SocketOptions().GetOutOfBandInline()
return socket.BaseSocketGetOutOfBandInlineResultWithResponse(
socket.BaseSocketGetOutOfBandInlineResponse{
Value: value,
},
), nil
}
func (ep *endpointWithMutators) SetNoCheck(_ fidl.Context, value bool) (socket.BaseSocketSetNoCheckResult, error) {
ep.ep.sockOptStats.SetNoCheck.Add(1)
ep.ep.ep.SocketOptions().SetNoChecksum(value)
return socket.BaseSocketSetNoCheckResultWithResponse(socket.BaseSocketSetNoCheckResponse{}), nil
}
func (ep *endpoint) GetNoCheck(fidl.Context) (socket.BaseSocketGetNoCheckResult, error) {
ep.sockOptStats.GetNoCheck.Add(1)
value := ep.ep.SocketOptions().GetNoChecksum()
return socket.BaseSocketGetNoCheckResultWithResponse(socket.BaseSocketGetNoCheckResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetIpv6Only(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6OnlyResult, error) {
ep.ep.sockOptStats.SetIpv6Only.Add(1)
ep.ep.ep.SocketOptions().SetV6Only(value)
return socket.BaseNetworkSocketSetIpv6OnlyResultWithResponse(socket.BaseNetworkSocketSetIpv6OnlyResponse{}), nil
}
func (ep *endpoint) GetIpv6Only(fidl.Context) (socket.BaseNetworkSocketGetIpv6OnlyResult, error) {
ep.sockOptStats.GetIpv6Only.Add(1)
value := ep.ep.SocketOptions().GetV6Only()
return socket.BaseNetworkSocketGetIpv6OnlyResultWithResponse(socket.BaseNetworkSocketGetIpv6OnlyResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetIpv6TrafficClass(_ fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpv6TrafficClassResult, error) {
ep.ep.sockOptStats.SetIpv6TrafficClass.Add(1)
v, err := optionalUint8ToInt(value, 0)
if err != nil {
return socket.BaseNetworkSocketSetIpv6TrafficClassResultWithErr(tcpipErrorToCode(err)), nil
}
if err := ep.ep.ep.SetSockOptInt(tcpip.IPv6TrafficClassOption, v); err != nil {
return socket.BaseNetworkSocketSetIpv6TrafficClassResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpv6TrafficClassResultWithResponse(socket.BaseNetworkSocketSetIpv6TrafficClassResponse{}), nil
}
func (ep *endpoint) GetIpv6TrafficClass(fidl.Context) (socket.BaseNetworkSocketGetIpv6TrafficClassResult, error) {
ep.sockOptStats.GetIpv6TrafficClass.Add(1)
value, err := ep.ep.GetSockOptInt(tcpip.IPv6TrafficClassOption)
if err != nil {
return socket.BaseNetworkSocketGetIpv6TrafficClassResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketGetIpv6TrafficClassResultWithResponse(
socket.BaseNetworkSocketGetIpv6TrafficClassResponse{
Value: uint8(value),
},
), nil
}
func (ep *endpointWithMutators) SetIpv6MulticastInterface(_ fidl.Context, value uint64) (socket.BaseNetworkSocketSetIpv6MulticastInterfaceResult, error) {
ep.ep.sockOptStats.SetIpv6MulticastInterface.Add(1)
opt := tcpip.MulticastInterfaceOption{
NIC: tcpip.NICID(value),
}
if err := ep.ep.ep.SetSockOpt(&opt); err != nil {
return socket.BaseNetworkSocketSetIpv6MulticastInterfaceResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpv6MulticastInterfaceResultWithResponse(socket.BaseNetworkSocketSetIpv6MulticastInterfaceResponse{}), nil
}
func (ep *endpoint) GetIpv6MulticastInterface(fidl.Context) (socket.BaseNetworkSocketGetIpv6MulticastInterfaceResult, error) {
ep.sockOptStats.GetIpv6MulticastInterface.Add(1)
var value tcpip.MulticastInterfaceOption
if err := ep.ep.GetSockOpt(&value); err != nil {
return socket.BaseNetworkSocketGetIpv6MulticastInterfaceResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketGetIpv6MulticastInterfaceResultWithResponse(socket.BaseNetworkSocketGetIpv6MulticastInterfaceResponse{Value: uint64(value.NIC)}), nil
}
func (ep *endpointWithMutators) SetIpv6MulticastHops(_ fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpv6MulticastHopsResult, error) {
ep.ep.sockOptStats.SetIpv6MulticastHops.Add(1)
v, err := optionalUint8ToInt(value, 1)
if err != nil {
return socket.BaseNetworkSocketSetIpv6MulticastHopsResultWithErr(tcpipErrorToCode(err)), nil
}
if err := ep.ep.ep.SetSockOptInt(tcpip.MulticastTTLOption, v); err != nil {
return socket.BaseNetworkSocketSetIpv6MulticastHopsResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpv6MulticastHopsResultWithResponse(socket.BaseNetworkSocketSetIpv6MulticastHopsResponse{}), nil
}
func (ep *endpoint) GetIpv6MulticastHops(fidl.Context) (socket.BaseNetworkSocketGetIpv6MulticastHopsResult, error) {
ep.sockOptStats.GetIpv6MulticastHops.Add(1)
value, err := ep.ep.GetSockOptInt(tcpip.MulticastTTLOption)
if err != nil {
return socket.BaseNetworkSocketGetIpv6MulticastHopsResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketGetIpv6MulticastHopsResultWithResponse(
socket.BaseNetworkSocketGetIpv6MulticastHopsResponse{
Value: uint8(value),
},
), nil
}
func (ep *endpointWithMutators) SetIpv6UnicastHops(_ fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpv6UnicastHopsResult, error) {
ep.ep.sockOptStats.SetIpv6UnicastHops.Add(1)
v, err := optionalUint8ToInt(value, -1)
if err != nil {
return socket.BaseNetworkSocketSetIpv6UnicastHopsResultWithErr(tcpipErrorToCode(err)), nil
}
if err := ep.ep.ep.SetSockOptInt(tcpip.IPv6HopLimitOption, v); err != nil {
return socket.BaseNetworkSocketSetIpv6UnicastHopsResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpv6UnicastHopsResultWithResponse(socket.BaseNetworkSocketSetIpv6UnicastHopsResponse{}), nil
}
func (ep *endpoint) GetIpv6UnicastHops(fidl.Context) (socket.BaseNetworkSocketGetIpv6UnicastHopsResult, error) {
ep.sockOptStats.GetIpv6UnicastHops.Add(1)
value, err := ep.ep.GetSockOptInt(tcpip.IPv6HopLimitOption)
if err != nil {
return socket.BaseNetworkSocketGetIpv6UnicastHopsResultWithErr(tcpipErrorToCode(err)), nil
}
if value == -1 {
var defaultHopLimit tcpip.DefaultTTLOption
if err := ep.ns.stack.NetworkProtocolOption(header.IPv6ProtocolNumber, &defaultHopLimit); err != nil {
panic(fmt.Sprintf("stack.NetworkProtocolOption(header.IPv6ProtocolNumber, _): %s", err))
}
value = int(defaultHopLimit)
}
return socket.BaseNetworkSocketGetIpv6UnicastHopsResultWithResponse(
socket.BaseNetworkSocketGetIpv6UnicastHopsResponse{
Value: uint8(value),
},
), nil
}
func (ep *endpointWithMutators) SetIpv6MulticastLoopback(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6MulticastLoopbackResult, error) {
ep.ep.sockOptStats.SetIpv6MulticastLoopback.Add(1)
ep.ep.ep.SocketOptions().SetMulticastLoop(value)
return socket.BaseNetworkSocketSetIpv6MulticastLoopbackResultWithResponse(socket.BaseNetworkSocketSetIpv6MulticastLoopbackResponse{}), nil
}
func (ep *endpoint) GetIpv6MulticastLoopback(fidl.Context) (socket.BaseNetworkSocketGetIpv6MulticastLoopbackResult, error) {
ep.sockOptStats.GetIpv6MulticastLoopback.Add(1)
value := ep.ep.SocketOptions().GetMulticastLoop()
return socket.BaseNetworkSocketGetIpv6MulticastLoopbackResultWithResponse(socket.BaseNetworkSocketGetIpv6MulticastLoopbackResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetIpTtl(_ fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpTtlResult, error) {
ep.ep.sockOptStats.SetIpTtl.Add(1)
v, err := optionalUint8ToInt(value, -1)
if err != nil {
return socket.BaseNetworkSocketSetIpTtlResultWithErr(tcpipErrorToCode(err)), nil
}
switch v {
case -1:
// Unset maps to default value
v = 0
case 0:
return socket.BaseNetworkSocketSetIpTtlResultWithErr(posix.ErrnoEinval), nil
}
if err := ep.ep.ep.SetSockOptInt(tcpip.IPv4TTLOption, v); err != nil {
return socket.BaseNetworkSocketSetIpTtlResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpTtlResultWithResponse(socket.BaseNetworkSocketSetIpTtlResponse{}), nil
}
func (ep *endpoint) GetIpTtl(fidl.Context) (socket.BaseNetworkSocketGetIpTtlResult, error) {
ep.sockOptStats.GetIpTtl.Add(1)
value, err := ep.ep.GetSockOptInt(tcpip.IPv4TTLOption)
if err != nil {
return socket.BaseNetworkSocketGetIpTtlResultWithErr(tcpipErrorToCode(err)), nil
}
if value == 0 {
var defaultTtl tcpip.DefaultTTLOption
if err := ep.ns.stack.NetworkProtocolOption(header.IPv4ProtocolNumber, &defaultTtl); err != nil {
panic(fmt.Sprintf("stack.NetworkProtocolOption(header.IPv4ProtocolNumber, _): %s", err))
}
value = int(defaultTtl)
}
return socket.BaseNetworkSocketGetIpTtlResultWithResponse(socket.BaseNetworkSocketGetIpTtlResponse{Value: uint8(value)}), nil
}
func (ep *endpointWithMutators) SetIpMulticastTtl(_ fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpMulticastTtlResult, error) {
ep.ep.sockOptStats.SetIpMulticastTtl.Add(1)
// Linux translates -1 (unset) to 1
v, err := optionalUint8ToInt(value, 1)
if err != nil {
return socket.BaseNetworkSocketSetIpMulticastTtlResultWithErr(tcpipErrorToCode(err)), nil
}
if err := ep.ep.ep.SetSockOptInt(tcpip.MulticastTTLOption, v); err != nil {
return socket.BaseNetworkSocketSetIpMulticastTtlResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpMulticastTtlResultWithResponse(socket.BaseNetworkSocketSetIpMulticastTtlResponse{}), nil
}
func (ep *endpoint) GetIpMulticastTtl(fidl.Context) (socket.BaseNetworkSocketGetIpMulticastTtlResult, error) {
ep.sockOptStats.GetIpMulticastTtl.Add(1)
value, err := ep.ep.GetSockOptInt(tcpip.MulticastTTLOption)
if err != nil {
return socket.BaseNetworkSocketGetIpMulticastTtlResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketGetIpMulticastTtlResultWithResponse(socket.BaseNetworkSocketGetIpMulticastTtlResponse{Value: uint8(value)}), nil
}
func (ep *endpointWithMutators) SetIpMulticastInterface(_ fidl.Context, iface uint64, value fnet.Ipv4Address) (socket.BaseNetworkSocketSetIpMulticastInterfaceResult, error) {
ep.ep.sockOptStats.SetIpMulticastInterface.Add(1)
opt := tcpip.MulticastInterfaceOption{
NIC: tcpip.NICID(iface),
InterfaceAddr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv4(value),
}
if err := ep.ep.ep.SetSockOpt(&opt); err != nil {
return socket.BaseNetworkSocketSetIpMulticastInterfaceResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpMulticastInterfaceResultWithResponse(socket.BaseNetworkSocketSetIpMulticastInterfaceResponse{}), nil
}
func (ep *endpoint) GetIpMulticastInterface(fidl.Context) (socket.BaseNetworkSocketGetIpMulticastInterfaceResult, error) {
ep.sockOptStats.GetIpMulticastInterface.Add(1)
var v tcpip.MulticastInterfaceOption
if err := ep.ep.GetSockOpt(&v); err != nil {
return socket.BaseNetworkSocketGetIpMulticastInterfaceResultWithErr(tcpipErrorToCode(err)), nil
}
var addr fnet.Ipv4Address
if v.InterfaceAddr.Len() == header.IPv4AddressSize {
copy(addr.Addr[:], v.InterfaceAddr.AsSlice())
}
return socket.BaseNetworkSocketGetIpMulticastInterfaceResultWithResponse(
socket.BaseNetworkSocketGetIpMulticastInterfaceResponse{
Value: addr,
},
), nil
}
func (ep *endpointWithMutators) SetIpMulticastLoopback(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpMulticastLoopbackResult, error) {
ep.ep.sockOptStats.SetIpMulticastLoopback.Add(1)
ep.ep.ep.SocketOptions().SetMulticastLoop(value)
return socket.BaseNetworkSocketSetIpMulticastLoopbackResultWithResponse(socket.BaseNetworkSocketSetIpMulticastLoopbackResponse{}), nil
}
func (ep *endpoint) GetIpMulticastLoopback(fidl.Context) (socket.BaseNetworkSocketGetIpMulticastLoopbackResult, error) {
ep.sockOptStats.GetIpMulticastLoopback.Add(1)
value := ep.ep.SocketOptions().GetMulticastLoop()
return socket.BaseNetworkSocketGetIpMulticastLoopbackResultWithResponse(socket.BaseNetworkSocketGetIpMulticastLoopbackResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetIpTypeOfService(_ fidl.Context, value uint8) (socket.BaseNetworkSocketSetIpTypeOfServiceResult, error) {
ep.ep.sockOptStats.SetIpTypeOfService.Add(1)
if err := ep.ep.ep.SetSockOptInt(tcpip.IPv4TOSOption, int(value)); err != nil {
return socket.BaseNetworkSocketSetIpTypeOfServiceResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketSetIpTypeOfServiceResultWithResponse(socket.BaseNetworkSocketSetIpTypeOfServiceResponse{}), nil
}
func (ep *endpoint) GetIpTypeOfService(fidl.Context) (socket.BaseNetworkSocketGetIpTypeOfServiceResult, error) {
ep.sockOptStats.GetIpTypeOfService.Add(1)
value, err := ep.ep.GetSockOptInt(tcpip.IPv4TOSOption)
if err != nil {
return socket.BaseNetworkSocketGetIpTypeOfServiceResultWithErr(tcpipErrorToCode(err)), nil
}
if value < 0 || value > math.MaxUint8 {
value = 0
}
return socket.BaseNetworkSocketGetIpTypeOfServiceResultWithResponse(socket.BaseNetworkSocketGetIpTypeOfServiceResponse{Value: uint8(value)}), nil
}
func (ep *endpointWithMutators) AddIpMembership(_ fidl.Context, membership socket.IpMulticastMembership) (socket.BaseNetworkSocketAddIpMembershipResult, error) {
ep.ep.sockOptStats.AddIpMembership.Add(1)
opt := tcpip.AddMembershipOption{
NIC: tcpip.NICID(membership.Iface),
InterfaceAddr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv4(membership.LocalAddr),
MulticastAddr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv4(membership.McastAddr),
}
if err := ep.ep.ep.SetSockOpt(&opt); err != nil {
return socket.BaseNetworkSocketAddIpMembershipResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketAddIpMembershipResultWithResponse(socket.BaseNetworkSocketAddIpMembershipResponse{}), nil
}
func (ep *endpointWithMutators) DropIpMembership(_ fidl.Context, membership socket.IpMulticastMembership) (socket.BaseNetworkSocketDropIpMembershipResult, error) {
ep.ep.sockOptStats.DropIpMembership.Add(1)
opt := tcpip.RemoveMembershipOption{
NIC: tcpip.NICID(membership.Iface),
InterfaceAddr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv4(membership.LocalAddr),
MulticastAddr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv4(membership.McastAddr),
}
if err := ep.ep.ep.SetSockOpt(&opt); err != nil {
return socket.BaseNetworkSocketDropIpMembershipResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketDropIpMembershipResultWithResponse(socket.BaseNetworkSocketDropIpMembershipResponse{}), nil
}
func (ep *endpointWithMutators) SetIpTransparent(_ fidl.Context, _ bool) (socket.BaseNetworkSocketSetIpTransparentResult, error) {
return socket.BaseNetworkSocketSetIpTransparentResultWithErr(posix.ErrnoEnoprotoopt), nil
}
func (ep *endpoint) GetIpTransparent(_ fidl.Context) (socket.BaseNetworkSocketGetIpTransparentResult, error) {
return socket.BaseNetworkSocketGetIpTransparentResultWithErr(posix.ErrnoEnoprotoopt), nil
}
func (ep *endpointWithMutators) SetIpReceiveOriginalDestinationAddress(_ fidl.Context, _ bool) (socket.BaseNetworkSocketSetIpReceiveOriginalDestinationAddressResult, error) {
return socket.BaseNetworkSocketSetIpReceiveOriginalDestinationAddressResultWithErr(posix.ErrnoEnoprotoopt), nil
}
func (ep *endpoint) GetIpReceiveOriginalDestinationAddress(_ fidl.Context) (socket.BaseNetworkSocketGetIpReceiveOriginalDestinationAddressResult, error) {
return socket.BaseNetworkSocketGetIpReceiveOriginalDestinationAddressResultWithErr(posix.ErrnoEnoprotoopt), nil
}
func (ep *endpointWithMutators) AddIpv6Membership(_ fidl.Context, membership socket.Ipv6MulticastMembership) (socket.BaseNetworkSocketAddIpv6MembershipResult, error) {
ep.ep.sockOptStats.AddIpv6Membership.Add(1)
opt := tcpip.AddMembershipOption{
NIC: tcpip.NICID(membership.Iface),
MulticastAddr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv6(membership.McastAddr),
}
if err := ep.ep.ep.SetSockOpt(&opt); err != nil {
return socket.BaseNetworkSocketAddIpv6MembershipResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketAddIpv6MembershipResultWithResponse(socket.BaseNetworkSocketAddIpv6MembershipResponse{}), nil
}
func (ep *endpointWithMutators) DropIpv6Membership(_ fidl.Context, membership socket.Ipv6MulticastMembership) (socket.BaseNetworkSocketDropIpv6MembershipResult, error) {
ep.ep.sockOptStats.DropIpv6Membership.Add(1)
opt := tcpip.RemoveMembershipOption{
NIC: tcpip.NICID(membership.Iface),
MulticastAddr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv6(membership.McastAddr),
}
if err := ep.ep.ep.SetSockOpt(&opt); err != nil {
return socket.BaseNetworkSocketDropIpv6MembershipResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketDropIpv6MembershipResultWithResponse(socket.BaseNetworkSocketDropIpv6MembershipResponse{}), nil
}
func (ep *endpoint) setIpv6ReceiveTrafficClass(value bool) {
ep.ep.SocketOptions().SetReceiveTClass(value)
}
func (ep *endpointWithMutators) SetIpv6ReceiveTrafficClass(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6ReceiveTrafficClassResult, error) {
ep.ep.sockOptStats.SetIpv6ReceiveTrafficClass.Add(1)
ep.ep.setIpv6ReceiveTrafficClass(value)
return socket.BaseNetworkSocketSetIpv6ReceiveTrafficClassResultWithResponse(socket.BaseNetworkSocketSetIpv6ReceiveTrafficClassResponse{}), nil
}
func (ep *endpoint) GetIpv6ReceiveTrafficClass(fidl.Context) (socket.BaseNetworkSocketGetIpv6ReceiveTrafficClassResult, error) {
ep.sockOptStats.GetIpv6ReceiveTrafficClass.Add(1)
value := ep.ep.SocketOptions().GetReceiveTClass()
return socket.BaseNetworkSocketGetIpv6ReceiveTrafficClassResultWithResponse(socket.BaseNetworkSocketGetIpv6ReceiveTrafficClassResponse{Value: value}), nil
}
func (ep *endpoint) setIpv6ReceiveHopLimit(value bool) {
ep.ep.SocketOptions().SetReceiveHopLimit(value)
}
func (ep *endpointWithMutators) SetIpv6ReceiveHopLimit(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6ReceiveHopLimitResult, error) {
ep.ep.sockOptStats.SetIpv6ReceiveHopLimit.Add(1)
ep.ep.setIpv6ReceiveHopLimit(value)
return socket.BaseNetworkSocketSetIpv6ReceiveHopLimitResultWithResponse(socket.BaseNetworkSocketSetIpv6ReceiveHopLimitResponse{}), nil
}
func (ep *endpoint) GetIpv6ReceiveHopLimit(fidl.Context) (socket.BaseNetworkSocketGetIpv6ReceiveHopLimitResult, error) {
ep.sockOptStats.GetIpv6ReceiveHopLimit.Add(1)
value := ep.ep.SocketOptions().GetReceiveHopLimit()
return socket.BaseNetworkSocketGetIpv6ReceiveHopLimitResultWithResponse(socket.BaseNetworkSocketGetIpv6ReceiveHopLimitResponse{Value: value}), nil
}
func (ep *endpoint) setIpv6ReceivePacketInfo(value bool) {
ep.ep.SocketOptions().SetIPv6ReceivePacketInfo(value)
}
func (ep *endpointWithMutators) SetIpv6ReceivePacketInfo(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6ReceivePacketInfoResult, error) {
ep.ep.sockOptStats.SetIpv6ReceivePacketInfo.Add(1)
ep.ep.setIpv6ReceivePacketInfo(value)
return socket.BaseNetworkSocketSetIpv6ReceivePacketInfoResultWithResponse(socket.BaseNetworkSocketSetIpv6ReceivePacketInfoResponse{}), nil
}
func (ep *endpoint) GetIpv6ReceivePacketInfo(fidl.Context) (socket.BaseNetworkSocketGetIpv6ReceivePacketInfoResult, error) {
ep.sockOptStats.GetIpv6ReceivePacketInfo.Add(1)
value := ep.ep.SocketOptions().GetIPv6ReceivePacketInfo()
return socket.BaseNetworkSocketGetIpv6ReceivePacketInfoResultWithResponse(socket.BaseNetworkSocketGetIpv6ReceivePacketInfoResponse{Value: value}), nil
}
func (ep *endpoint) GetOriginalDestination(_ fidl.Context) (socket.BaseNetworkSocketGetOriginalDestinationResult, error) {
return socket.BaseNetworkSocketGetOriginalDestinationResultWithErr(posix.ErrnoEnoprotoopt), nil
}
func (ep *endpoint) setIpReceiveTypeOfService(value bool) {
ep.ep.SocketOptions().SetReceiveTOS(value)
}
func (ep *endpointWithMutators) SetIpReceiveTypeOfService(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpReceiveTypeOfServiceResult, error) {
ep.ep.sockOptStats.SetIpReceiveTypeOfService.Add(1)
ep.ep.setIpReceiveTypeOfService(value)
return socket.BaseNetworkSocketSetIpReceiveTypeOfServiceResultWithResponse(socket.BaseNetworkSocketSetIpReceiveTypeOfServiceResponse{}), nil
}
func (ep *endpoint) GetIpReceiveTypeOfService(fidl.Context) (socket.BaseNetworkSocketGetIpReceiveTypeOfServiceResult, error) {
ep.sockOptStats.GetIpReceiveTypeOfService.Add(1)
value := ep.ep.SocketOptions().GetReceiveTOS()
return socket.BaseNetworkSocketGetIpReceiveTypeOfServiceResultWithResponse(socket.BaseNetworkSocketGetIpReceiveTypeOfServiceResponse{Value: value}), nil
}
func (ep *endpoint) setIpReceiveTtl(value bool) {
ep.ep.SocketOptions().SetReceiveTTL(value)
}
func (ep *endpointWithMutators) SetIpReceiveTtl(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpReceiveTtlResult, error) {
ep.ep.sockOptStats.SetIpReceiveTtl.Add(1)
ep.ep.setIpReceiveTtl(value)
return socket.BaseNetworkSocketSetIpReceiveTtlResultWithResponse(socket.BaseNetworkSocketSetIpReceiveTtlResponse{}), nil
}
func (ep *endpoint) GetIpReceiveTtl(fidl.Context) (socket.BaseNetworkSocketGetIpReceiveTtlResult, error) {
ep.sockOptStats.GetIpReceiveTtl.Add(1)
value := ep.ep.SocketOptions().GetReceiveTTL()
return socket.BaseNetworkSocketGetIpReceiveTtlResultWithResponse(socket.BaseNetworkSocketGetIpReceiveTtlResponse{Value: value}), nil
}
func (ep *endpointWithMutators) SetIpPacketInfo(_ fidl.Context, value bool) (socket.BaseNetworkSocketSetIpPacketInfoResult, error) {
ep.ep.sockOptStats.SetIpPacketInfo.Add(1)
ep.ep.ep.SocketOptions().SetReceivePacketInfo(value)
return socket.BaseNetworkSocketSetIpPacketInfoResultWithResponse(socket.BaseNetworkSocketSetIpPacketInfoResponse{}), nil
}
func (ep *endpoint) GetIpPacketInfo(fidl.Context) (socket.BaseNetworkSocketGetIpPacketInfoResult, error) {
ep.sockOptStats.GetIpPacketInfo.Add(1)
value := ep.ep.SocketOptions().GetReceivePacketInfo()
return socket.BaseNetworkSocketGetIpPacketInfoResultWithResponse(socket.BaseNetworkSocketGetIpPacketInfoResponse{Value: value}), nil
}
func (ep *endpoint) SetMark(_ fidl.Context, domain socket.MarkDomain, mark socket.OptionalUint32) (socket.BaseSocketSetMarkResult, error) {
return socket.BaseSocketSetMarkResultWithErr(posix.ErrnoEopnotsupp), nil
}
func (ep *endpoint) GetMark(_ fidl.Context, domain socket.MarkDomain) (socket.BaseSocketGetMarkResult, error) {
return socket.BaseSocketGetMarkResultWithErr(posix.ErrnoEopnotsupp), nil
}
// endpointWithSocket implements a network socket that uses a zircon socket for
// its data plane. This structure creates a pair of goroutines which are
// responsible for moving data and signals between the underlying
// tcpip.Endpoint and the zircon socket.
type endpointWithSocket struct {
endpoint
local, peer zx.Socket
// These channels enable coordination of orderly shutdown of loops, handles,
// and endpoints. See the comment on `close` for more information.
mu struct {
sync.Mutex
// loop{Read,Write}Done are signaled iff loop{Read,Write} have exited,
// respectively.
loopReadDone, loopWriteDone <-chan struct{}
}
// closing is signaled iff close has been called.
closing chan struct{}
// This is used to make sure that endpoint.close only cleans up its
// resources once - the first time it was closed.
closeOnce sync.Once
onHUpOnce sync.Once
// onHUp is used to register callback for closing events.
onHUp waiter.Entry
}
func newEndpointWithSocket(
ep tcpip.Endpoint,
wq *waiter.Queue,
transProto tcpip.TransportProtocolNumber,
netProto tcpip.NetworkProtocolNumber,
ns *Netstack,
socketType uint32,
) (*endpointWithSocket, error) {
localS, peerS, err := zx.NewSocket(socketType)
if err != nil {
return nil, err
}
eps := &endpointWithSocket{
endpoint: endpoint{
ep: ep,
wq: wq,
transProto: transProto,
netProto: netProto,
ns: ns,
},
local: localS,
peer: peerS,
closing: make(chan struct{}),
}
// Add the endpoint before registering callback for hangup event.
// The callback could be called soon after registration, where the
// endpoint is attempted to be removed from the map.
//
// Also note that stream socket initialization relies on `eps.sockOptStats`
// being the socket option stats stored in the endpoints map, so both of
// these places need to be changed at the same time.
ns.onAddEndpoint(&eps.endpoint, &eps.sockOptStats)
eps.onHUp = waiter.NewFunctionEntry(waiter.EventHUp, func(waiter.EventMask) {
eps.HUp()
})
eps.wq.EventRegister(&eps.onHUp)
return eps, nil
}
func (eps *endpointWithSocket) HUp() {
eps.onHUpOnce.Do(func() {
if !eps.endpoint.ns.onRemoveEndpoint(eps.endpoint.key) {
_ = syslog.Errorf("endpoint map delete error, endpoint with key %d does not exist", eps.endpoint.key)
}
// Run this in a separate goroutine to avoid deadlock.
//
// The waiter.Queue lock is held by the caller of this callback.
// close() blocks on completions of `loop*`, which
// depend on acquiring waiter.Queue lock to unregister events.
go func() {
eps.wq.EventUnregister(&eps.onHUp)
eps.close()
}()
})
}
type endpointWithEvent struct {
endpoint
endpointWithMutators
nonStreamEndpoint
local, peer zx.Handle
entry waiter.Entry
pending signaler
}
func (ep *endpointWithEvent) GetError(fidl.Context) (socket.BaseSocketGetErrorResult, error) {
ep.endpoint.sockOptStats.GetError.Add(1)
err := ep.endpoint.ep.LastError()
ep.pending.mustUpdate()
if err != nil {
return socket.BaseSocketGetErrorResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseSocketGetErrorResultWithResponse(socket.BaseSocketGetErrorResponse{}), nil
}
func (epe *endpointWithEvent) describe() (zx.Handle, error) {
// TODO(https://fxbug.dev/42157659): The rights on this handle should be capped at the connection's.
event, err := epe.peer.Duplicate(zx.RightTransfer | zx.RightWait)
_ = syslog.DebugTf("Describe", "%p: err=%v", epe, err)
return event, err
}
func (epe *endpointWithEvent) Connect(_ fidl.Context, address fnet.SocketAddress) (socket.BaseNetworkSocketConnectResult, error) {
if err := epe.nonStreamEndpoint.connect(&epe.endpoint, address); err != nil {
return socket.BaseNetworkSocketConnectResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketConnectResultWithResponse(socket.BaseNetworkSocketConnectResponse{}), nil
}
func (epe *endpointWithEvent) shutdown(how socket.ShutdownMode) (posix.Errno, error) {
var signals zx.Signals
var flags tcpip.ShutdownFlags
if how&socket.ShutdownModeRead != 0 {
signals |= signalDatagramShutdownRead
flags |= tcpip.ShutdownRead
}
if how&socket.ShutdownModeWrite != 0 {
signals |= signalDatagramShutdownWrite
flags |= tcpip.ShutdownWrite
}
if flags == 0 {
return posix.ErrnoEinval, nil
}
if err := epe.endpoint.ep.Shutdown(flags); err != nil {
return tcpipErrorToCode(err), nil
}
if flags&tcpip.ShutdownRead != 0 {
epe.wq.EventUnregister(&epe.entry)
}
if err := epe.local.SignalPeer(0, signals); err != nil {
return 0, err
}
return 0, nil
}
func (epe *endpointWithEvent) Shutdown(_ fidl.Context, how socket.ShutdownMode) (socket.BaseNetworkSocketShutdownResult, error) {
errno, err := epe.shutdown(how)
_ = syslog.DebugTf("shutdown", "%p: how=%s errno=%s err=%v", epe, how, errno, err)
if err != nil {
return socket.BaseNetworkSocketShutdownResult{}, err
}
if errno != 0 {
return socket.BaseNetworkSocketShutdownResultWithErr(errno), nil
}
return socket.BaseNetworkSocketShutdownResultWithResponse(socket.BaseNetworkSocketShutdownResponse{}), nil
}
const localSignalClosing = zx.SignalUser1
// close destroys the endpoint and releases associated resources.
//
// When called, close signals loopRead and loopWrite (via closing and
// local) to exit, and then blocks until its arguments are signaled. close
// is typically called with loop{Read,Write}Done.
//
// Note, calling close on an endpoint that has already been closed is safe as
// the cleanup work will only be done once.
func (eps *endpointWithSocket) close() {
eps.closeOnce.Do(func() {
// Interrupt waits on notification channels. Notification reads
// are always combined with closing in a select statement.
close(eps.closing)
// Interrupt waits on endpoint.local. Handle waits always
// include localSignalClosing.
if err := eps.local.Handle().Signal(0, localSignalClosing); err != nil {
panic(err)
}
// Grab the loop channels _after_ having closed `eps.closing` to avoid a
// race in which the loops are allowed to start without guaranteeing that
// this routine will wait for them to return.
eps.mu.Lock()
channels := []<-chan struct{}{
eps.mu.loopReadDone,
eps.mu.loopWriteDone,
}
eps.mu.Unlock()
// The interruptions above cause our loops to exit. Wait until
// they do before releasing resources they may be using.
for _, ch := range channels {
if ch != nil {
for range ch {
}
}
}
if err := eps.local.Close(); err != nil {
panic(err)
}
eps.endpoint.ep.Close()
_ = syslog.DebugTf("close", "%p", eps)
})
}
func (s *streamSocketImpl) Listen(_ fidl.Context, backlog int16) (socket.StreamSocketListenResult, error) {
if backlog < 0 {
backlog = 0
}
// Accept one more than the configured listen backlog to keep in parity with
// Linux. Ref, because of missing equality check here:
// https://github.com/torvalds/linux/blob/7acac4b3196/include/net/sock.h#L937
//
// Upcast from int16 to int before incrementing to avoid overflow when
// backlog is math.MaxInt16. Note that int is always at least 32 bits as per
// https://golang.google.cn/pkg/builtin/#int.
if err := s.endpoint.ep.Listen(int(backlog) + 1); err != nil {
return socket.StreamSocketListenResultWithErr(tcpipErrorToCode(err)), nil
}
// It is possible to call `listen` on a connected socket - such a call would
// fail above, so we register the callback only in the success case to avoid
// incorrectly handling events on connected sockets.
s.sharedState.onListen.Do(func() {
s.sharedState.pending.supported = waiter.EventIn
var entry waiter.Entry
cb := func() {
err := s.sharedState.pending.update()
switch err := err.(type) {
case nil:
return
case *zx.Error:
switch err.Status {
case zx.ErrBadHandle, zx.ErrPeerClosed:
// The endpoint is closing -- this is possible when an incoming
// connection races with the listening endpoint being closed.
go s.wq.EventUnregister(&entry)
return
}
}
panic(err)
}
entry = waiter.NewFunctionEntry(s.sharedState.pending.supported, func(waiter.EventMask) {
cb()
})
s.wq.EventRegister(&entry)
// We're registering after calling Listen, so we might've missed an event.
// Call the callback once to check for already-present incoming
// connections.
cb()
})
_ = syslog.DebugTf("listen", "%p: backlog=%d", s, backlog)
return socket.StreamSocketListenResultWithResponse(socket.StreamSocketListenResponse{}), nil
}
func (eps *endpointWithSocket) startReadWriteLoops(loopRead, loopWrite func(chan<- struct{})) {
eps.mu.Lock()
defer eps.mu.Unlock()
select {
case <-eps.closing:
default:
if err := eps.local.Handle().SignalPeer(0, signalStreamConnected); err != nil {
panic(err)
}
for _, m := range []struct {
done *<-chan struct{}
fn func(chan<- struct{})
}{
{&eps.mu.loopReadDone, loopRead},
{&eps.mu.loopWriteDone, loopWrite},
} {
ch := make(chan struct{})
*m.done = ch
go m.fn(ch)
}
}
}
func (eps *endpointWithSocket) describe() (zx.Handle, error) {
// TODO(https://fxbug.dev/42157659): The rights on this handle should be capped at the connection's.
socket, err := eps.peer.Handle().Duplicate(zx.RightTransfer | zx.RightsIO | zx.RightWait | zx.RightInspect)
_ = syslog.DebugTf("Describe", "%p: err=%v", eps, err)
return socket, err
}
func (s *streamSocketImpl) Connect(_ fidl.Context, address fnet.SocketAddress) (socket.BaseNetworkSocketConnectResult, error) {
err := func() tcpip.Error {
addr, err := s.endpoint.toTCPIPFullAddress(address)
if err != nil {
return err
}
s.sharedState.err.mu.Lock()
err = s.endpoint.connect(addr)
ch := s.sharedState.err.setConsumedLockedInner(err)
s.sharedState.err.mu.Unlock()
if err != nil {
switch err.(type) {
case *tcpip.ErrConnectStarted:
localAddr, err := s.endpoint.ep.GetLocalAddress()
if err != nil {
panic(err)
}
_ = syslog.DebugTf("connect", "%p: started, local=%+v, addr=%+v", &s.endpoint, localAddr, address)
case *tcpip.ErrConnectionAborted:
// For TCP endpoints, gVisor Connect() returns this error when the
// endpoint is in an error state and the specific error has already been
// consumed.
//
// If the endpoint is in an error state, that means that loop{Read,Write}
// must be shutting down, and the only way to consume the error correctly
// is to get it from them.
if ch == nil {
panic(fmt.Sprintf("nil terminal error channel after Connect(%#v)=%#v", address, err))
}
if terminal := <-ch; terminal != nil {
err = terminal
}
}
}
return err
}()
switch err.(type) {
case *tcpip.ErrConnectStarted, nil:
// It is possible to call `connect` on a listening socket - such a call
// would fail above, so we register the callback only in the success case
// to avoid incorrectly handling events on connected sockets.
s.sharedState.onConnect.Do(func() {
var (
once sync.Once
entry waiter.Entry
)
cb := func(m waiter.EventMask) {
once.Do(func() {
go s.wq.EventUnregister(&entry)
if m&waiter.EventErr == 0 {
s.startReadWriteLoops(s.loopRead, s.loopWrite)
} else {
s.HUp()
}
})
}
entry = waiter.NewFunctionEntry(waiter.EventOut|waiter.EventErr, cb)
s.wq.EventRegister(&entry)
// We're registering after calling Connect, so we might've missed an
// event. Call the callback once to check for an already-complete (even
// with error) handshake.
if m := s.endpoint.ep.Readiness(waiter.EventOut | waiter.EventErr); m != 0 {
cb(m)
}
})
}
if err != nil {
return socket.BaseNetworkSocketConnectResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketConnectResultWithResponse(socket.BaseNetworkSocketConnectResponse{}), nil
}
func (s *streamSocketImpl) accept(wantAddr bool) (posix.Errno, *tcpip.FullAddress, streamSocketImpl, error) {
var addr *tcpip.FullAddress
if wantAddr {
addr = new(tcpip.FullAddress)
}
ep, wq, err := s.endpoint.ep.Accept(addr)
if err != nil {
return tcpipErrorToCode(err), nil, streamSocketImpl{}, nil
}
{
if err := s.sharedState.pending.update(); err != nil {
panic(err)
}
}
switch localAddr, err := ep.GetLocalAddress(); err.(type) {
case *tcpip.ErrNotConnected:
// This should never happen as of writing as GetLocalAddress does not
// actually return any errors. However, we handle the tcpip.ErrNotConnected
// case now for the same reasons as mentioned below for the
// ep.GetRemoteAddress case.
_ = syslog.DebugTf("accept", "%p: disconnected", s)
case nil:
switch remoteAddr, err := ep.GetRemoteAddress(); err.(type) {
case *tcpip.ErrNotConnected:
// GetRemoteAddress returns a tcpip.ErrNotConnected error if ep is no
// longer connected. This can happen if the endpoint was closed after the
// call to Accept returned, but before this point. A scenario this was
// actually witnessed was when a TCP RST was received after the call to
// Accept returned, but before this point. If GetRemoteAddress returns
// other (unexpected) errors, panic.
_ = syslog.DebugTf("accept", "%p: local=%+v, disconnected", s, localAddr)
case nil:
_ = syslog.DebugTf("accept", "%p: local=%+v, remote=%+v", s, localAddr, remoteAddr)
default:
panic(err)
}
default:
panic(err)
}
{
eps, err := newEndpointWithSocket(ep, wq, s.transProto, s.netProto, s.endpoint.ns, zx.SocketStream)
if err != nil {
return 0, nil, streamSocketImpl{}, err
}
{
s := makeStreamSocketImpl(eps)
// NB: signal connectedness before handling any error below to ensure
// correct interpretation in zxio.
//
// See //sdk/lib/zxio/socket.cc:stream_socket::wait_begin/wait_end for
// details on how zxio infers the error code from asserted signals.
s.sharedState.onConnect.Do(func() { s.startReadWriteLoops(s.loopRead, s.loopWrite) })
// Check if the endpoint has already encountered an error since
// our installed callback will not fire in this case.
if s.endpoint.ep.Readiness(waiter.EventErr)&waiter.EventErr != 0 {
s.HUp()
}
return 0, addr, s, nil
}
}
}
// handleZxSocketReadError contains handling logic for zx socket read errors.
// Returns true iff the error was found to be terminal.
func (eps *endpointWithSocket) handleZxSocketReadError(err zx.Error, waitCallback func(zx.Signals) zx.Signals) bool {
const sigs = zx.SignalSocketReadable | zx.SignalSocketPeerWriteDisabled | localSignalClosing
switch err.Status {
case zx.ErrShouldWait:
obs := waitCallback(sigs)
switch {
case obs&zx.SignalSocketReadable != 0:
// The client might have written some data into the socket.
// Continue trying to read even if the signals show the client has
// closed the socket.
return false
case obs&localSignalClosing != 0:
// We're closing the endpoint.
return true
case obs&zx.SignalSocketPeerWriteDisabled != 0:
// Fallthrough.
default:
panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
}
fallthrough
case zx.ErrBadState:
// Reading has been disabled for this socket endpoint.
switch err := eps.endpoint.ep.Shutdown(tcpip.ShutdownWrite); err.(type) {
case nil, *tcpip.ErrNotConnected:
// Shutdown can return ErrNotConnected if the endpoint was
// connected but no longer is.
default:
panic(err)
}
return true
default:
panic(err)
}
}
// handleEndpointWriteError contains handling logic for tcpip.Endpoint write errors.
// Returns true iff the error was found to be terminal.
func (eps *endpointWithSocket) handleEndpointWriteError(err tcpip.Error, transProto tcpip.TransportProtocolNumber) bool {
switch err.(type) {
case *tcpip.ErrClosedForSend:
// Shut the endpoint down *only* if it is not already in an error
// state; an endpoint in an error state will soon be fully closed down,
// and shutting it down here would cause signals to be asserted twice,
// which can produce races in the client.
if eps.endpoint.ep.Readiness(waiter.EventErr)&waiter.EventErr == 0 {
if err := eps.local.SetDisposition(0, zx.SocketDispositionWriteDisabled); err != nil {
panic(err)
}
_ = syslog.DebugTf("zx_socket_set_disposition", "%p: disposition=0, disposition_peer=ZX_SOCKET_DISPOSITION_WRITE_DISABLED", eps)
}
return true
case *tcpip.ErrNetworkUnreachable, *tcpip.ErrHostUnreachable, *tcpip.ErrInvalidEndpointState:
// These errors can be returned when we attempt to write to an interface
// that has been removed, or is in the process of removal. Since this
// error is not propagated to the client and can occur in normal usage,
// log a warning instead of an error.
_ = syslog.Warnf("%s Endpoint.Write(): %s", transProtoToString(transProto), err)
return false
default:
_ = syslog.Errorf("%s Endpoint.Write(): %s", transProtoToString(transProto), err)
return false
}
}
// loopWrite shuttles signals and data from the zircon socket to the tcpip.Endpoint.
func (s *streamSocketImpl) loopWrite(ch chan<- struct{}) {
defer close(ch)
waitEntry, notifyCh := waiter.NewChannelEntry(waiter.EventOut)
s.wq.EventRegister(&waitEntry)
defer s.wq.EventUnregister(&waitEntry)
reader := socketReader{
socket: s.local,
}
for {
reader.lastError = nil
// Reset the number of read bytes so we can assert the exact number of bytes
// read by ep.Write below.
reader.readBytes = 0
s.sharedState.err.mu.Lock()
trace.AsyncBegin("net", "fuchsia_posix_socket.streamSocket.transferTx", trace.AsyncID(uintptr(unsafe.Pointer(s))))
n, err := s.endpoint.ep.Write(&reader, tcpip.WriteOptions{
// We must write atomically in order to guarantee all the data fetched
// from the zircon socket is consumed by the endpoint.
Atomic: true,
})
trace.AsyncEnd("net", "fuchsia_posix_socket.streamSocket.transferTx", trace.AsyncID(uintptr(unsafe.Pointer(s))))
s.sharedState.err.setLocked(err)
s.sharedState.err.mu.Unlock()
if n != int64(reader.readBytes) {
panic(fmt.Sprintf("partial write into endpoint (%s); got %d, want %d", err, n, reader.readBytes))
}
// TODO(https://fxbug.dev/42110377): Handle all transport write errors.
switch err.(type) {
case nil, *tcpip.ErrBadBuffer:
switch err := reader.lastError.(type) {
case nil:
continue
case *zx.Error:
if s.handleZxSocketReadError(*err, func(sigs zx.Signals) zx.Signals {
obs, err := zxwait.WaitContext(context.Background(), zx.Handle(s.local), sigs)
if err != nil {
panic(err)
}
return obs
}) {
return
}
default:
panic(err)
}
case *tcpip.ErrNotConnected:
// Write never returns ErrNotConnected except for endpoints that were
// never connected. Such endpoints should never reach this loop.
panic(fmt.Sprintf("connected endpoint returned %s", err))
case *tcpip.ErrWouldBlock:
// NB: we can't select on closing here because the socket may need to
// delay terminating loopWrite to allow time for pending writes in the
// zircon socket to be flushed to the gVisor endpoint, and eventually to
// the peer. This may be needed when we have SO_LINGER enabled, or when
// the zircon socket has pending bytes even when SO_LINGER is disabled.
// See streamSocketImpl.close for more details.
select {
case <-s.unblockLoopWrite:
return
case <-notifyCh:
continue
}
case *tcpip.ErrConnectionRefused:
// TODO(https://fxbug.dev/42139897): Allow the socket to be reused for
// another connection attempt to match Linux.
return
case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrHostUnreachable:
return
case *tcpip.ErrTimeout:
// The maximum duration of missing ACKs was reached, or the maximum
// number of unacknowledged keepalives was reached.
return
default:
if s.handleEndpointWriteError(err, tcp.ProtocolNumber) {
return
}
}
}
}
// handleEndpointReadError contains handling logic for tcpip.Endpoint read errors.
// Returns true iff the error was found to be terminal.
func (eps *endpointWithSocket) handleEndpointReadError(err tcpip.Error, inCh <-chan struct{}, transProto tcpip.TransportProtocolNumber) bool {
// TODO(https://fxbug.dev/42110377): Handle all transport read errors.
switch err.(type) {
case *tcpip.ErrWouldBlock:
select {
case <-inCh:
return false
case <-eps.closing:
// We're shutting down.
return true
}
case *tcpip.ErrClosedForReceive:
// Shut the endpoint down *only* if it is not already in an error
// state; an endpoint in an error state will soon be fully closed down,
// and shutting it down here would cause signals to be asserted twice,
// which can produce races in the client.
if eps.endpoint.ep.Readiness(waiter.EventErr)&waiter.EventErr == 0 {
if err := eps.local.SetDisposition(zx.SocketDispositionWriteDisabled, 0); err != nil {
panic(err)
}
_ = syslog.DebugTf("zx_socket_set_disposition", "%p: disposition=ZX_SOCKET_DISPOSITION_WRITE_DISABLED, disposition_peer=0", eps)
}
return true
default:
_ = syslog.Errorf("%s Endpoint.Read(): %s", transProtoToString(transProto), err)
return false
}
}
// handleZxSocketWriteError handles the zircon socket read error contained in `err`, returning
// true iff the error was found to be terminal. If the `err` requires a socket wait, the
// `waitThreshold` parameter identifies the # of bytes to wait for.
func (eps *endpointWithSocket) handleZxSocketWriteError(err zx.Error, waitThreshold uint) bool {
switch err.Status {
case zx.ErrShouldWait:
sigs := zx.SignalSocketWriteDisabled | localSignalClosing
performThresholdWait := waitThreshold != 0
if performThresholdWait {
sigs |= zx.SignalSocketWriteThreshold
if status := zx.Sys_object_set_property(
zx.Handle(eps.local),
zx.PropSocketTXThreshold,
unsafe.Pointer(&waitThreshold),
uint(unsafe.Sizeof(waitThreshold)),
); status != zx.ErrOk {
panic(fmt.Sprintf("failed to set property %d with value %d on socket: %s", zx.PropSocketTXThreshold, waitThreshold, status))
}
} else {
sigs |= zx.SignalSocketWritable
}
obs, err := zxwait.WaitContext(context.Background(), zx.Handle(eps.local), zx.Signals(sigs))
if err != nil {
panic(err)
}
switch {
case obs&localSignalClosing != 0:
// Should always be checked first so loop routines can terminate when
// the socket is closed.
return true
case obs&zx.SignalSocketWriteThreshold != 0:
if !performThresholdWait {
panic(fmt.Sprintf("zxwait.WaitContext(_, _, %b) returned %b even though threshold wait not requested", sigs, zx.SignalSocketWriteThreshold))
}
return false
case obs&zx.SignalSocketWritable != 0:
if performThresholdWait {
panic(fmt.Sprintf("zxwait.WaitContext(_, _, %b) returned %b even though threshold wait was requested", sigs, zx.SignalSocketWritable))
}
return false
case obs&zx.SignalSocketWriteDisabled != 0:
// Fallthrough.
default:
panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
}
fallthrough
case zx.ErrBadState:
// Writing has been disabled for this zircon socket endpoint. This
// happens when the client called the Shutdown FIDL method, with
// socket.ShutdownModeRead: because the client will not read from this
// zircon socket endpoint anymore, writes are disabled.
//
// Clients can't revert a shutdown, so we can terminate.
return true
default:
panic(err)
}
}
// loopRead shuttles signals and data from the tcpip.Endpoint to the zircon socket.
func (s *streamSocketImpl) loopRead(ch chan<- struct{}) {
defer close(ch)
inEntry, inCh := waiter.NewChannelEntry(waiter.EventIn)
s.wq.EventRegister(&inEntry)
defer s.wq.EventUnregister(&inEntry)
writer := socketWriter{
socket: s.local,
}
for {
s.sharedState.err.mu.Lock()
trace.AsyncBegin("net", "fuchsia_posix_socket.streamSocket.transferRx", trace.AsyncID(uintptr(unsafe.Pointer(s))))
res, err := s.endpoint.ep.Read(&writer, tcpip.ReadOptions{})
trace.AsyncEnd("net", "fuchsia_posix_socket.streamSocket.transferRx", trace.AsyncID(uintptr(unsafe.Pointer(s))))
s.sharedState.err.setLocked(err)
s.sharedState.err.mu.Unlock()
switch err.(type) {
case *tcpip.ErrNotConnected:
// Read never returns ErrNotConnected except for endpoints that were
// never connected. Such endpoints should never be read from.
panic(fmt.Sprintf("connected endpoint returned %s", err))
case *tcpip.ErrTimeout:
// At the time of writing, this error indicates that a TCP connection
// has failed. This can occur during the TCP handshake if the peer
// fails to respond to a SYN within 60 seconds, or if the retransmit
// logic gives up after 60 seconds of missing ACKs from the peer, or if
// the maximum number of unacknowledged keepalives is reached.
//
// The connection was alive but now is dead - this is equivalent to
// having received a TCP RST.
return
case *tcpip.ErrConnectionRefused:
// TODO(https://fxbug.dev/42139897): Allow the socket to be reused for
// another connection attempt to match Linux.
return
case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrHostUnreachable:
return
case nil, *tcpip.ErrBadBuffer:
if err == nil {
s.endpoint.ep.ModerateRecvBuf(res.Count)
}
// `tcpip.Endpoint.Read` returns a nil error if _anything_ was written
// - even if the writer returned an error - we always want to handle
// those errors.
switch err := writer.lastError.(type) {
case nil:
continue
case *zx.Error:
// Pass zero `waitThreshold` since TCP can accept any nonzero
// number of bytes.
if s.handleZxSocketWriteError(*err, 0 /* waitThreshold */) {
return
}
default:
panic(err)
}
default:
if s.handleEndpointReadError(err, inCh, tcp.ProtocolNumber) {
return
}
}
}
}
func (eps *endpointWithSocket) shutdown(how socket.ShutdownMode) (posix.Errno, error) {
var disposition, dispositionPeer uint32
// Typically, a Shutdown(Write) is equivalent to disabling the writes on the
// local zircon socket. Similarly, a Shutdown(Read) translates to
// disabling the writes on the peer zircon socket.
// Here, the client wants to shutdown its endpoint but we are calling
// setDisposition on our local zircon socket (eg, the client's peer), which is
// why disposition and dispositionPeer seem backwards.
if how&socket.ShutdownModeRead != 0 {
disposition = zx.SocketDispositionWriteDisabled
how ^= socket.ShutdownModeRead
}
if how&socket.ShutdownModeWrite != 0 {
dispositionPeer = zx.SocketDispositionWriteDisabled
how ^= socket.ShutdownModeWrite
}
if how != 0 || (disposition == 0 && dispositionPeer == 0) {
return posix.ErrnoEinval, nil
}
if h := eps.local.Handle(); !h.IsValid() {
return tcpipErrorToCode(&tcpip.ErrNotConnected{}), nil
}
if err := eps.local.SetDisposition(disposition, dispositionPeer); err != nil {
return 0, err
}
// Only handle a shutdown read on the endpoint here. Shutdown write is
// performed by the loopWrite goroutine: it ensures that it happens *after*
// all the buffered data in the zircon socket was read.
if disposition&zx.SocketDispositionWriteDisabled != 0 {
switch err := eps.endpoint.ep.Shutdown(tcpip.ShutdownRead); err.(type) {
case nil, *tcpip.ErrNotConnected:
// Shutdown can return ErrNotConnected if the endpoint was connected but
// no longer is, in which case the loopRead is also expected to be
// terminating.
eps.mu.Lock()
ch := eps.mu.loopReadDone
eps.mu.Unlock()
if ch != nil {
for range ch {
}
}
default:
panic(err)
}
}
return 0, nil
}
func (eps *endpointWithSocket) Shutdown(_ fidl.Context, how socket.ShutdownMode) (socket.BaseNetworkSocketShutdownResult, error) {
errno, err := eps.shutdown(how)
_ = syslog.DebugTf("shutdown", "%p: how=%s errno=%s err=%v", eps, how, errno, err)
if err != nil {
return socket.BaseNetworkSocketShutdownResult{}, err
}
if errno != 0 {
return socket.BaseNetworkSocketShutdownResultWithErr(errno), nil
}
return socket.BaseNetworkSocketShutdownResultWithResponse(socket.BaseNetworkSocketShutdownResponse{}), nil
}
type destinationCache struct {
local, peer zx.Handle
}
func (d *destinationCache) initialized() bool {
return d.local != zx.HandleInvalid && d.peer != zx.HandleInvalid
}
func (d *destinationCache) reset() {
if d.initialized() {
d.clear()
}
if status := zx.Sys_eventpair_create(0, &d.local, &d.peer); status != zx.ErrOk {
panic(fmt.Sprintf("failed to create eventpairs with error: %s", status))
}
}
func (d *destinationCache) clear() {
// TODO(https://fxbug.dev/42051658): Use a single syscall to close these eventpairs.
if err := d.local.Close(); err != nil {
panic(fmt.Sprintf("local.Close() = %s", err))
}
if err := d.peer.Close(); err != nil {
panic(fmt.Sprintf("peer.Close() = %s", err))
}
}
type cmsgCache struct {
local, peer zx.Handle
ipTos bool
ipTtl bool
ipv6Tclass bool
ipv6HopLimit bool
ipv6PktInfo bool
timestamp socket.TimestampOption
}
func (c *cmsgCache) initialized() bool {
return c.local != zx.HandleInvalid && c.peer != zx.HandleInvalid
}
func (c *cmsgCache) reset() {
trace.AsyncBegin("net", "fuchsia_posix_socket.cmsgCache.reset", trace.AsyncID(uintptr(unsafe.Pointer(c))))
defer trace.AsyncEnd("net", "fuchsia_posix_socket.cmsgCache.reset", trace.AsyncID(uintptr(unsafe.Pointer(c))))
if c.initialized() {
c.clear()
}
if status := zx.Sys_eventpair_create(0, &c.local, &c.peer); status != zx.ErrOk {
panic(fmt.Sprintf("failed to create cmsg eventpairs with error: %s", status))
}
}
func (c *cmsgCache) clear() {
// TODO(https://fxbug.dev/42051658): Use a single syscall to close these eventpairs.
if err := c.local.Close(); err != nil {
panic(fmt.Sprintf("local.Close() = %s", err))
}
if err := c.peer.Close(); err != nil {
panic(fmt.Sprintf("peer.Close() = %s", err))
}
}
type nonStreamEndpoint struct{}
func (*nonStreamEndpoint) connect(ep *endpoint, address fnet.SocketAddress) tcpip.Error {
addr, err := ep.toTCPIPFullAddress(address)
if err != nil {
return err
}
return ep.connect(addr)
}
type datagramSocketError struct {
mu struct {
sync.Mutex
err tcpip.Error
signalPeer func(zx.Signals, zx.Signals) error
}
}
func (e *datagramSocketError) set(err tcpip.Error) bool {
switch err.(type) {
case nil:
return false
case *tcpip.ErrConnectionRefused:
e.mu.Lock()
defer e.mu.Unlock()
if e.mu.err == nil {
e.setErrorSignalLocked(true)
} else {
if e.mu.err != err {
panic(fmt.Sprintf("overwriting error (%#v) with (%#v)", e.mu.err, err))
}
}
e.mu.err = err
return true
default:
return false
}
}
func (e *datagramSocketError) consume() tcpip.Error {
e.mu.Lock()
defer e.mu.Unlock()
consumed := e.mu.err
e.mu.err = nil
if consumed != nil {
e.setErrorSignalLocked(false)
}
return consumed
}
func (e *datagramSocketError) setErrorSignalLocked(setErr bool) {
var set, clear = zx.SignalNone, zx.SignalNone
if setErr {
set |= signalDatagramError
} else {
clear |= signalDatagramError
}
if err := e.mu.signalPeer(clear, set); err != nil {
panic(err)
}
}
// State shared across all copies of this socket. Collecting this state here lets us
// allocate only once during initialization.
type sharedDatagramSocketState struct {
entry waiter.Entry
destinationCacheMu struct {
sync.Mutex
destinationCache destinationCache
}
cmsgCacheMu struct {
sync.Mutex
cmsgCache cmsgCache
}
localEDrainedCond sync.Cond
err datagramSocketError
maxPayloadSize uint32
}
type datagramSocketImpl struct {
*endpointWithSocket
nonStreamEndpoint
cancel context.CancelFunc
sharedState *sharedDatagramSocketState
}
var _ socket.DatagramSocketWithCtx = (*datagramSocketImpl)(nil)
var _ waiter.EventListener = (*datagramSocketImpl)(nil)
var (
udpTxPreludeSize = udp_serde.TxUdpPreludeSize()
udpRxPreludeSize = udp_serde.RxUdpPreludeSize()
)
func newDatagramSocketImpl(ns *Netstack, netProto tcpip.NetworkProtocolNumber, ep tcpip.Endpoint, wq *waiter.Queue) (*datagramSocketImpl, error) {
eps, err := newEndpointWithSocket(
ep,
wq,
udp.ProtocolNumber,
netProto,
ns,
zx.SocketDatagram,
)
if err != nil {
return nil, err
}
// Set the TX threshold for the socket to the maximum size of a UDP datagram
// that could be enqueued, so that the client can wait for that signal
// knowing any valid datagram will be writable when it is reported.
const maxUDPPayloadSize uint32 = header.UDPMaximumPacketSize - header.UDPMinimumSize
threshold := uint(maxUDPPayloadSize + udpTxPreludeSize)
if status := zx.Sys_object_set_property(
zx.Handle(eps.peer),
zx.PropSocketTXThreshold,
unsafe.Pointer(&threshold),
uint(unsafe.Sizeof(threshold)),
); status != zx.ErrOk {
panic(fmt.Sprintf("failed to set property %d with value %d on socket: %s", zx.PropSocketTXThreshold, threshold, status))
}
// Receive all control messages that might be passed to the client.
eps.setTimestamp(socket.TimestampOptionNanosecond)
eps.setIpReceiveTypeOfService(true)
eps.setIpReceiveTtl(true)
eps.setIpv6ReceiveTrafficClass(true)
eps.setIpv6ReceiveHopLimit(true)
eps.setIpv6ReceivePacketInfo(true)
s := &datagramSocketImpl{
endpointWithSocket: eps,
sharedState: &sharedDatagramSocketState{
maxPayloadSize: maxUDPPayloadSize,
},
}
s.sharedState.err.mu.signalPeer = eps.local.Handle().SignalPeer
// Listen for errors so we can signal the client.
s.sharedState.entry.Init(s, waiter.EventErr)
s.wq.EventRegister(&s.sharedState.entry)
// Initialize caches.
s.sharedState.destinationCacheMu.destinationCache.reset()
s.sharedState.cmsgCacheMu.cmsgCache.reset()
// Initialize CV used to drain the socket.
s.sharedState.localEDrainedCond = sync.Cond{L: &sync.Mutex{}}
// Datagram sockets should be readable/writeable immediately upon creation.
s.startReadWriteLoops(s.loopRead, s.loopWrite)
return s, nil
}
// Datagram sockets maintain multiple kinds of distributed state:
//
// 1. Payloads are enqueued by the client into a zircon socket and wait there
// until they are dequeued by the Netstack.
// 2. The client caches state locally in order to avoid having to validate
// payload metadata (e.g. destination address) over FIDL on each I/O.
//
// When the socket is mutated, we have to make sure that this state is handled correctly:
//
// 1. Payloads currently enqueued into the zircon socket should be processed with the
// state of the socket before mutation.
// 2. Client caches should be flushed in order to force clients to re-validate metadata
// against the socket state after mutation.
//
// This method should be used to carry out the above procedure for any method that might affect
// socket state.
func executeMutatorWithCacheFlushes[Res any, Err error](s *datagramSocketImpl, setter func(ewm endpointWithMutators) (Res, Err)) (Res, Err) {
trace.AsyncBegin("net", "fuchsia_posix_socket.executeMutatorWithCacheFlushes", trace.AsyncID(uintptr(unsafe.Pointer(s))))
defer trace.AsyncEnd("net", "fuchsia_posix_socket.executeMutatorWithCacheFlushes", trace.AsyncID(uintptr(unsafe.Pointer(s))))
s.blockUntilSocketDrained()
res, err := setter(endpointWithMutators{
ep: &s.endpointWithSocket.endpoint,
})
s.sharedState.destinationCacheMu.Lock()
defer s.sharedState.destinationCacheMu.Unlock()
s.sharedState.destinationCacheMu.destinationCache.reset()
return res, err
}
// loopRead shuttles signals and data from the tcpip.Endpoint to the zircon socket.
func (s *datagramSocketImpl) loopRead(ch chan<- struct{}) {
defer close(ch)
inEntry, inCh := waiter.NewChannelEntry(waiter.EventIn)
s.wq.EventRegister(&inEntry)
defer s.wq.EventUnregister(&inEntry)
buf := make([]byte, udpRxPreludeSize+s.sharedState.maxPayloadSize)
for {
payloadBuf := buf[udpRxPreludeSize:]
w := tcpip.SliceWriter(payloadBuf)
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.ep.Read", trace.AsyncID(uintptr(unsafe.Pointer(s))))
res, err := s.ep.Read(&w, tcpip.ReadOptions{NeedRemoteAddr: true})
trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.ep.Read", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if stored := s.sharedState.err.set(err); stored {
continue
}
switch err.(type) {
case nil:
case *tcpip.ErrBadBuffer:
panic(fmt.Sprintf("unexpected short read from UDP endpoint"))
default:
if s.handleEndpointReadError(err, inCh, udp.ProtocolNumber) {
return
}
continue
}
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.SerializeRecvMsgMeta", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if err := udp_serde.SerializeRecvMsgMeta(s.netProto, res, buf[:udpRxPreludeSize]); err != nil {
panic(fmt.Sprintf("serialization error: %s", err))
}
trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.SerializeRecvMsgMeta", trace.AsyncID(uintptr(unsafe.Pointer(s))))
v := buf[:udpRxPreludeSize+uint32(res.Count)]
for {
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.socket.Write", trace.AsyncID(uintptr(unsafe.Pointer(s))))
n, err := s.local.Write(v[:], 0)
trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.socket.Write", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if err != nil {
if err, ok := err.(*zx.Error); ok {
if s.handleZxSocketWriteError(*err, uint(len(v))) {
return
}
continue
}
panic(err)
}
if n < len(v) {
panic(fmt.Sprintf("unexpected short write on zx socket: %d/%d", n, len(v)))
}
break
}
}
}
// loopWrite shuttles datagrams from the zircon socket to the tcpip.Endpoint.
func (s *datagramSocketImpl) loopWrite(ch chan<- struct{}) {
defer close(ch)
waitEntry, notifyCh := waiter.NewChannelEntry(waiter.EventOut)
s.wq.EventRegister(&waitEntry)
defer s.wq.EventUnregister(&waitEntry)
buf := make([]byte, udpTxPreludeSize+s.sharedState.maxPayloadSize)
s.sharedState.localEDrainedCond.L.Lock()
defer s.sharedState.localEDrainedCond.L.Unlock()
for {
v := buf
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.socket.Read", trace.AsyncID(uintptr(unsafe.Pointer(s))))
n, err := s.local.Read(v, 0)
trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.socket.Read", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if err != nil {
if err, ok := err.(*zx.Error); ok {
if s.handleZxSocketReadError(*err, func(sigs zx.Signals) zx.Signals {
// FIDL methods on datagram sockets block until all of the payloads
// in the contained zircon socket have been dequeued. When the
// socket is readable (aka contains payloads), methods wait on
// a condition variable until the next time it might be empty.
//
// Since this callback is invoked precisely when the socket has
// been found to be empty (aka returns ErrShouldWait) wake up
// any waiters.
s.sharedState.localEDrainedCond.L.Unlock()
s.sharedState.localEDrainedCond.Broadcast()
sigs, err := zxwait.WaitContext(context.Background(), zx.Handle(s.local), sigs)
s.sharedState.localEDrainedCond.L.Lock()
if err != nil {
panic(err)
}
return sigs
}) {
return
}
continue
}
panic(err)
}
v = v[:n]
opts := tcpip.WriteOptions{
Atomic: true,
}
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.DeserializeSendMsgMeta", trace.AsyncID(uintptr(unsafe.Pointer(s))))
addr, cmsgs, err := udp_serde.DeserializeSendMsgMeta(v[:udpTxPreludeSize])
trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.DeserializeSendMsgMeta", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if err != nil {
// Ideally, we'd like to close the socket so as to inform the client
// that they're mishandling the ABI. This is complicated by the fact
// that the zircon socket can be shared by multiple clients, and we
// can't tell which client sent the bad payload. Instead, we just
// log the error and drop the payload on the floor.
_ = syslog.Errorf("error deserializing payload from socket: %s", err)
continue
}
if errno := validateSendableControlMessages(&cmsgs); errno != 0 {
// As above, we'd prefer to close the socket but instead just drop
// the payload on the floor.
_ = syslog.Errorf("error validating control messages (%#v): %s", cmsgs, err)
continue
}
if addr != nil {
opts.To = addr
}
opts.ControlMessages = cmsgs
v = v[udpTxPreludeSize:]
for {
var r bytes.Reader
r.Reset(v)
lenPrev := len(v)
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.ep.Write", trace.AsyncID(uintptr(unsafe.Pointer(s))))
written, err := s.ep.Write(&r, opts)
trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.ep.Write", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if stored := s.sharedState.err.set(err); stored {
continue
}
if err == nil {
if int(written) != lenPrev {
panic(fmt.Sprintf("UDP disallows short writes; saw: %d/%d", written, lenPrev))
}
} else {
switch err.(type) {
case *tcpip.ErrWouldBlock:
select {
case <-notifyCh:
continue
case <-s.endpointWithSocket.closing:
return
}
default:
if s.handleEndpointWriteError(err, udp.ProtocolNumber) {
return
}
}
}
break
}
}
}
func (s *datagramSocketImpl) addConnection(_ fidl.Context, channel zx.Channel) {
{
sCopy := *s
s := &sCopy
s.ns.stats.SocketCount.Increment()
s.endpoint.incRef()
go func() {
defer s.ns.stats.SocketCount.Decrement()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.cancel = cancel
defer func() {
// Avoid double close when the peer calls Close and then hangs up.
if ctx.Err() == nil {
s.close()
}
}()
stub := socket.DatagramSocketWithCtxStub{Impl: s}
component.Serve(ctx, &stub, channel, component.ServeOptions{
OnError: func(err error) {
// NB: this protocol is not discoverable, so the bindings do not include its name.
_ = syslog.WarnTf("fuchsia.posix.socket.DatagramSocket", "%s", err)
},
})
}()
}
}
// FIDL calls modifying socket options affecting packet egress. These calls must
// wait for the zircon socket to drain so as to avoid applying socket options
// to packets enqueued before the call.
func (s *datagramSocketImpl) Connect(_ fidl.Context, address fnet.SocketAddress) (socket.BaseNetworkSocketConnectResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketConnectResult, error) {
if err := s.nonStreamEndpoint.connect(ewm.ep, address); err != nil {
return socket.BaseNetworkSocketConnectResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseNetworkSocketConnectResultWithResponse(socket.BaseNetworkSocketConnectResponse{}), nil
})
}
func (s *datagramSocketImpl) Disconnect(ctx fidl.Context) (socket.BaseNetworkSocketDisconnectResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketDisconnectResult, error) {
return ewm.Disconnect(ctx)
})
}
func (s *datagramSocketImpl) SetIpTypeOfService(ctx fidl.Context, value uint8) (socket.BaseNetworkSocketSetIpTypeOfServiceResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpTypeOfServiceResult, error) {
return ewm.SetIpTypeOfService(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpv6Only(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6OnlyResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpv6OnlyResult, error) {
return ewm.SetIpv6Only(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpv6TrafficClass(ctx fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpv6TrafficClassResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpv6TrafficClassResult, error) {
return ewm.SetIpv6TrafficClass(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpv6UnicastHops(ctx fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpv6UnicastHopsResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpv6UnicastHopsResult, error) {
return ewm.SetIpv6UnicastHops(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpTtl(ctx fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpTtlResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpTtlResult, error) {
return ewm.SetIpTtl(ctx, value)
})
}
func (s *datagramSocketImpl) SetSendBuffer(ctx fidl.Context, size uint64) (socket.BaseSocketSetSendBufferResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetSendBufferResult, error) {
return ewm.SetSendBuffer(ctx, size)
})
}
func (s *datagramSocketImpl) SetBindToDevice(ctx fidl.Context, value string) (socket.BaseSocketSetBindToDeviceResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetBindToDeviceResult, error) {
return ewm.SetBindToDevice(ctx, value)
})
}
func (s *datagramSocketImpl) SetBindToInterfaceIndex(ctx fidl.Context, value uint64) (socket.BaseSocketSetBindToInterfaceIndexResult, error) {
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetBindToInterfaceIndexResult, error) {
return ewm.SetBindToInterfaceIndex(ctx, value)
})
}
func (s *datagramSocketImpl) SetBroadcast(ctx fidl.Context, value bool) (socket.BaseSocketSetBroadcastResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetBroadcastResult, error) {
return ewm.SetBroadcast(ctx, value)
})
}
func (s *datagramSocketImpl) SetNoCheck(ctx fidl.Context, value bool) (socket.BaseSocketSetNoCheckResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetNoCheckResult, error) {
return ewm.SetNoCheck(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpv6MulticastInterface(ctx fidl.Context, value uint64) (socket.BaseNetworkSocketSetIpv6MulticastInterfaceResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpv6MulticastInterfaceResult, error) {
return ewm.SetIpv6MulticastInterface(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpv6MulticastHops(ctx fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpv6MulticastHopsResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpv6MulticastHopsResult, error) {
return ewm.SetIpv6MulticastHops(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpv6MulticastLoopback(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6MulticastLoopbackResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpv6MulticastLoopbackResult, error) {
return ewm.SetIpv6MulticastLoopback(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpMulticastTtl(ctx fidl.Context, value socket.OptionalUint8) (socket.BaseNetworkSocketSetIpMulticastTtlResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpMulticastTtlResult, error) {
return ewm.SetIpMulticastTtl(ctx, value)
})
}
func (s *datagramSocketImpl) SetIpMulticastInterface(ctx fidl.Context, iface uint64, value fnet.Ipv4Address) (socket.BaseNetworkSocketSetIpMulticastInterfaceResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpMulticastInterfaceResult, error) {
return ewm.SetIpMulticastInterface(ctx, iface, value)
})
}
func (s *datagramSocketImpl) SetIpMulticastLoopback(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpMulticastLoopbackResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpMulticastLoopbackResult, error) {
return ewm.SetIpMulticastLoopback(ctx, value)
})
}
func (s *datagramSocketImpl) AddIpMembership(ctx fidl.Context, membership socket.IpMulticastMembership) (socket.BaseNetworkSocketAddIpMembershipResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketAddIpMembershipResult, error) {
return ewm.AddIpMembership(ctx, membership)
})
}
func (s *datagramSocketImpl) DropIpMembership(ctx fidl.Context, membership socket.IpMulticastMembership) (socket.BaseNetworkSocketDropIpMembershipResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketDropIpMembershipResult, error) {
return ewm.DropIpMembership(ctx, membership)
})
}
func (s *datagramSocketImpl) SetIpTransparent(_ fidl.Context, _ bool) (socket.BaseNetworkSocketSetIpTransparentResult, error) {
return socket.BaseNetworkSocketSetIpTransparentResultWithErr(posix.ErrnoEopnotsupp), nil
}
func (s *datagramSocketImpl) SetIpReceiveOriginalDestinationAddress(_ fidl.Context, _ bool) (socket.BaseNetworkSocketSetIpReceiveOriginalDestinationAddressResult, error) {
return socket.BaseNetworkSocketSetIpReceiveOriginalDestinationAddressResultWithErr(posix.ErrnoEopnotsupp), nil
}
func (s *datagramSocketImpl) AddIpv6Membership(ctx fidl.Context, membership socket.Ipv6MulticastMembership) (socket.BaseNetworkSocketAddIpv6MembershipResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketAddIpv6MembershipResult, error) {
return ewm.AddIpv6Membership(ctx, membership)
})
}
func (s *datagramSocketImpl) DropIpv6Membership(ctx fidl.Context, membership socket.Ipv6MulticastMembership) (socket.BaseNetworkSocketDropIpv6MembershipResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketDropIpv6MembershipResult, error) {
return ewm.DropIpv6Membership(ctx, membership)
})
}
func (s *datagramSocketImpl) Bind(ctx fidl.Context, sockaddr fnet.SocketAddress) (socket.BaseNetworkSocketBindResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketBindResult, error) {
return ewm.Bind(ctx, sockaddr)
})
}
func (s *datagramSocketImpl) SetIpPacketInfo(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpPacketInfoResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseNetworkSocketSetIpPacketInfoResult, error) {
return ewm.SetIpPacketInfo(ctx, value)
})
}
func (s *datagramSocketImpl) SetKeepAlive(ctx fidl.Context, value bool) (socket.BaseSocketSetKeepAliveResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetKeepAliveResult, error) {
return ewm.SetKeepAlive(ctx, value)
})
}
func (s *datagramSocketImpl) SetLinger(ctx fidl.Context, linger bool, seconds uint32) (socket.BaseSocketSetLingerResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetLingerResult, error) {
return ewm.SetLinger(ctx, linger, seconds)
})
}
func (s *datagramSocketImpl) SetOutOfBandInline(ctx fidl.Context, value bool) (socket.BaseSocketSetOutOfBandInlineResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetOutOfBandInlineResult, error) {
return ewm.SetOutOfBandInline(ctx, value)
})
}
func (s *datagramSocketImpl) SetReceiveBuffer(ctx fidl.Context, size uint64) (socket.BaseSocketSetReceiveBufferResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetReceiveBufferResult, error) {
return ewm.SetReceiveBuffer(ctx, size)
})
}
func (s *datagramSocketImpl) SetReuseAddress(ctx fidl.Context, value bool) (socket.BaseSocketSetReuseAddressResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetReuseAddressResult, error) {
return ewm.SetReuseAddress(ctx, value)
})
}
func (s *datagramSocketImpl) SetReusePort(ctx fidl.Context, value bool) (socket.BaseSocketSetReusePortResult, error) {
// TODO(https://fxbug.dev/42061146): Audit cache flushes after Fast UDP launches.
// TODO(https://fxbug.dev/42178052): Test synchronous semantics wrt packet sends
return executeMutatorWithCacheFlushes(s, func(ewm endpointWithMutators) (socket.BaseSocketSetReusePortResult, error) {
return ewm.SetReusePort(ctx, value)
})
}
// blockUntilSocketDrained blocks the goroutine until all of the payloads in the contained
// zircon socket have been dequeued. All control-path FIDL methods should call this method
// before modifying endpoint state in order to avoid racing with concurrently enqueued
// payloads.
func (s *datagramSocketImpl) blockUntilSocketDrained() {
// TODO(https://fxbug.dev/42051639): Prevent ingress into the socket while draining.
s.sharedState.localEDrainedCond.L.Lock()
defer s.sharedState.localEDrainedCond.L.Unlock()
for {
status := zx.Sys_object_wait_one(zx.Handle(s.local), zx.SignalSocketReadable, 0, nil)
switch status {
case zx.ErrOk:
s.sharedState.localEDrainedCond.Wait()
continue
case zx.ErrTimedOut:
return
default:
panic(fmt.Sprintf("wait for socket to become readable failed with status = %s", status))
}
}
}
func (s *datagramSocketImpl) NotifyEvent(waiter.EventMask) {
s.sharedState.err.set(s.endpoint.ep.LastError())
}
func (s *datagramSocketImpl) GetError(fidl.Context) (socket.BaseSocketGetErrorResult, error) {
s.sockOptStats.GetError.Add(1)
if err := s.sharedState.err.consume(); err != nil {
return socket.BaseSocketGetErrorResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseSocketGetErrorResultWithResponse(socket.BaseSocketGetErrorResponse{}), nil
}
func (s *datagramSocketImpl) Shutdown(ctx fidl.Context, how socket.ShutdownMode) (socket.BaseNetworkSocketShutdownResult, error) {
switch state := transport.DatagramEndpointState(s.ep.State()); state {
case transport.DatagramEndpointStateConnected, transport.DatagramEndpointStateBound:
result, err := s.endpointWithSocket.Shutdown(ctx, how)
if state == transport.DatagramEndpointStateBound && result.Which() != socket.BaseNetworkSocketShutdownResultErr {
return socket.BaseNetworkSocketShutdownResultWithErr(tcpipErrorToCode(&tcpip.ErrNotConnected{})), nil
}
return result, err
default:
return socket.BaseNetworkSocketShutdownResultWithErr(tcpipErrorToCode(&tcpip.ErrNotConnected{})), nil
}
}
// FIDL calls accessing or modifying the set of requested control messages. Modifying calls
// must invalidate any clients that depend upon the (now-outdated) cache.
func (s *datagramSocketImpl) GetTimestamp(fidl.Context) (socket.BaseSocketGetTimestampResult, error) {
s.sockOptStats.GetTimestamp.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
return socket.BaseSocketGetTimestampResultWithResponse(socket.BaseSocketGetTimestampResponse{Value: s.sharedState.cmsgCacheMu.cmsgCache.timestamp}), nil
}
func (s *datagramSocketImpl) SetTimestamp(ctx fidl.Context, value socket.TimestampOption) (socket.BaseSocketSetTimestampResult, error) {
s.sockOptStats.SetTimestamp.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
s.sharedState.cmsgCacheMu.cmsgCache.timestamp = value
s.sharedState.cmsgCacheMu.cmsgCache.reset()
return socket.BaseSocketSetTimestampResultWithResponse(socket.BaseSocketSetTimestampResponse{}), nil
}
func (s *datagramSocketImpl) GetIpReceiveTypeOfService(fidl.Context) (socket.BaseNetworkSocketGetIpReceiveTypeOfServiceResult, error) {
s.sockOptStats.GetIpReceiveTypeOfService.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
return socket.BaseNetworkSocketGetIpReceiveTypeOfServiceResultWithResponse(socket.BaseNetworkSocketGetIpReceiveTypeOfServiceResponse{Value: s.sharedState.cmsgCacheMu.cmsgCache.ipTos}), nil
}
func (s *datagramSocketImpl) SetIpReceiveTypeOfService(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpReceiveTypeOfServiceResult, error) {
s.sockOptStats.SetIpReceiveTypeOfService.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
s.sharedState.cmsgCacheMu.cmsgCache.ipTos = value
s.sharedState.cmsgCacheMu.cmsgCache.reset()
return socket.BaseNetworkSocketSetIpReceiveTypeOfServiceResultWithResponse(socket.BaseNetworkSocketSetIpReceiveTypeOfServiceResponse{}), nil
}
func (s *datagramSocketImpl) GetIpReceiveTtl(fidl.Context) (socket.BaseNetworkSocketGetIpReceiveTtlResult, error) {
s.sockOptStats.GetIpReceiveTtl.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
return socket.BaseNetworkSocketGetIpReceiveTtlResultWithResponse(socket.BaseNetworkSocketGetIpReceiveTtlResponse{Value: s.sharedState.cmsgCacheMu.cmsgCache.ipTtl}), nil
}
func (s *datagramSocketImpl) SetIpReceiveTtl(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpReceiveTtlResult, error) {
s.sockOptStats.SetIpReceiveTtl.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
s.sharedState.cmsgCacheMu.cmsgCache.ipTtl = value
s.sharedState.cmsgCacheMu.cmsgCache.reset()
return socket.BaseNetworkSocketSetIpReceiveTtlResultWithResponse(socket.BaseNetworkSocketSetIpReceiveTtlResponse{}), nil
}
func (s *datagramSocketImpl) GetIpv6ReceiveTrafficClass(fidl.Context) (socket.BaseNetworkSocketGetIpv6ReceiveTrafficClassResult, error) {
s.sockOptStats.GetIpv6ReceiveTrafficClass.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
return socket.BaseNetworkSocketGetIpv6ReceiveTrafficClassResultWithResponse(socket.BaseNetworkSocketGetIpv6ReceiveTrafficClassResponse{Value: s.sharedState.cmsgCacheMu.cmsgCache.ipv6Tclass}), nil
}
func (s *datagramSocketImpl) SetIpv6ReceiveTrafficClass(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6ReceiveTrafficClassResult, error) {
s.sockOptStats.SetIpv6ReceiveTrafficClass.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
s.sharedState.cmsgCacheMu.cmsgCache.ipv6Tclass = value
s.sharedState.cmsgCacheMu.cmsgCache.reset()
return socket.BaseNetworkSocketSetIpv6ReceiveTrafficClassResultWithResponse(socket.BaseNetworkSocketSetIpv6ReceiveTrafficClassResponse{}), nil
}
func (s *datagramSocketImpl) GetIpv6ReceiveHopLimit(fidl.Context) (socket.BaseNetworkSocketGetIpv6ReceiveHopLimitResult, error) {
s.sockOptStats.GetIpv6ReceiveHopLimit.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
return socket.BaseNetworkSocketGetIpv6ReceiveHopLimitResultWithResponse(socket.BaseNetworkSocketGetIpv6ReceiveHopLimitResponse{Value: s.sharedState.cmsgCacheMu.cmsgCache.ipv6HopLimit}), nil
}
func (s *datagramSocketImpl) SetIpv6ReceiveHopLimit(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6ReceiveHopLimitResult, error) {
s.sockOptStats.SetIpv6ReceiveHopLimit.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
s.sharedState.cmsgCacheMu.cmsgCache.ipv6HopLimit = value
s.sharedState.cmsgCacheMu.cmsgCache.reset()
return socket.BaseNetworkSocketSetIpv6ReceiveHopLimitResultWithResponse(socket.BaseNetworkSocketSetIpv6ReceiveHopLimitResponse{}), nil
}
func (s *datagramSocketImpl) SetIpv6ReceivePacketInfo(ctx fidl.Context, value bool) (socket.BaseNetworkSocketSetIpv6ReceivePacketInfoResult, error) {
s.sockOptStats.SetIpv6ReceivePacketInfo.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
s.sharedState.cmsgCacheMu.cmsgCache.ipv6PktInfo = value
s.sharedState.cmsgCacheMu.cmsgCache.reset()
return socket.BaseNetworkSocketSetIpv6ReceivePacketInfoResultWithResponse(socket.BaseNetworkSocketSetIpv6ReceivePacketInfoResponse{}), nil
}
func (s *datagramSocketImpl) GetIpv6ReceivePacketInfo(fidl.Context) (socket.BaseNetworkSocketGetIpv6ReceivePacketInfoResult, error) {
s.sockOptStats.GetIpv6ReceivePacketInfo.Add(1)
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
return socket.BaseNetworkSocketGetIpv6ReceivePacketInfoResultWithResponse(socket.BaseNetworkSocketGetIpv6ReceivePacketInfoResponse{Value: s.sharedState.cmsgCacheMu.cmsgCache.ipv6PktInfo}), nil
}
func (s *datagramSocketImpl) Clone2(ctx fidl.Context, object unknown.CloneableWithCtxInterfaceRequest) error {
s.addConnection(ctx, object.Channel)
_ = syslog.DebugTf("Clone", "%p", s.endpointWithSocket)
return nil
}
func (s *datagramSocketImpl) close() {
if s.decRef() {
s.wq.EventUnregister(&s.sharedState.entry)
s.sharedState.destinationCacheMu.destinationCache.clear()
s.sharedState.cmsgCacheMu.cmsgCache.clear()
s.endpointWithSocket.close()
if err := s.peer.Close(); err != nil {
panic(err)
}
}
s.cancel()
}
func (s *datagramSocketImpl) Close(fidl.Context) (unknown.CloseableCloseResult, error) {
_ = syslog.DebugTf("Close", "%p", s)
s.close()
return unknown.CloseableCloseResultWithResponse(unknown.CloseableCloseResponse{}), nil
}
func (*datagramSocketImpl) Query(fidl.Context) ([]uint8, error) {
return []byte(socket.DatagramSocketProtocolName_), nil
}
func (s *datagramSocketImpl) Describe(fidl.Context) (socket.DatagramSocketDescribeResponse, error) {
var response socket.DatagramSocketDescribeResponse
handle, err := s.describe()
if err != nil {
return response, err
}
response.SetSocket(zx.Socket(handle))
response.SetTxMetaBufSize(uint64(udpTxPreludeSize))
response.SetRxMetaBufSize(uint64(udpRxPreludeSize))
response.SetMetadataEncodingProtocolVersion(socket.UdpMetadataEncodingProtocolVersionZero)
return response, nil
}
func (s *datagramSocketImpl) GetInfo(fidl.Context) (socket.BaseDatagramSocketGetInfoResult, error) {
domain, err := s.domain()
if err != nil {
return socket.BaseDatagramSocketGetInfoResultWithErr(tcpipErrorToCode(err)), nil
}
var proto socket.DatagramSocketProtocol
switch s.transProto {
case udp.ProtocolNumber:
proto = socket.DatagramSocketProtocolUdp
default:
panic(fmt.Sprintf("unhandled transport protocol: %d", s.transProto))
}
return socket.BaseDatagramSocketGetInfoResultWithResponse(socket.BaseDatagramSocketGetInfoResponse{
Domain: domain,
Proto: proto,
}), nil
}
func (s *datagramSocketImpl) SendMsgPreflight(_ fidl.Context, req socket.DatagramSocketSendMsgPreflightRequest) (socket.DatagramSocketSendMsgPreflightResult, error) {
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.SendMsgPreflight", trace.AsyncID(uintptr(unsafe.Pointer(s))))
defer trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.SendMsgPreflight", trace.AsyncID(uintptr(unsafe.Pointer(s))))
s.sharedState.destinationCacheMu.Lock()
defer s.sharedState.destinationCacheMu.Unlock()
// NB: It is important that the netstack-wide destination cache lock be held
// for the duration of this method in order to avoid data races.
//
// For example, if an interface is removed after `Preflight` is called, but
// before the netstack-wide event pair is duplicated to be sent to the
// client, it's possible for the client to receive a valid eventpair for a
// route through a NIC that has been removed from the stack.
s.ns.destinationCacheMu.RLock()
defer s.ns.destinationCacheMu.RUnlock()
var addr tcpip.FullAddress
useConnectedAddr := !req.HasTo()
if useConnectedAddr {
if connectedAddr, err := s.ep.GetRemoteAddress(); err == nil {
addr = connectedAddr
} else {
return socket.DatagramSocketSendMsgPreflightResultWithErr(tcpipErrorToCode(&tcpip.ErrDestinationRequired{})), nil
}
} else {
addr = fidlconv.ToTCPIPFullAddress(req.To)
if s.endpoint.netProto == ipv4.ProtocolNumber && addr.Addr.Len() == header.IPv6AddressSize {
return socket.DatagramSocketSendMsgPreflightResultWithErr(tcpipErrorToCode(&tcpip.ErrAddressFamilyNotSupported{})), nil
}
}
var writeOpts tcpip.WriteOptions
writeOpts.To = &addr
if req.HasIpv6Pktinfo() {
writeOpts.ControlMessages = tcpip.SendableControlMessages{
HasIPv6PacketInfo: true,
IPv6PacketInfo: tcpip.IPv6PacketInfo{
Addr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv6(req.Ipv6Pktinfo.LocalAddr),
NIC: tcpip.NICID(req.Ipv6Pktinfo.Iface),
},
}
}
if epWithPreflight, ok := s.ep.(tcpip.EndpointWithPreflight); ok {
if err := epWithPreflight.Preflight(writeOpts); err != nil {
return socket.DatagramSocketSendMsgPreflightResultWithErr(tcpipErrorToCode(err)), nil
}
} else {
panic("endpoint does not implement EndpointWithPreflight")
}
// The Netstack's destinationCache tracks the state of the route table and is invalidated
// whenever the route table is modified.
// TODO(https://fxbug.dev/42051659): Implement per-route caching invalidation.
var nsEventPair zx.Handle
if status := zx.Sys_handle_duplicate(s.ns.destinationCacheMu.destinationCache.peer, zx.RightsBasic, &nsEventPair); status != zx.ErrOk {
return socket.DatagramSocketSendMsgPreflightResult{}, &zx.Error{Status: status, Text: "zx.EventPair"}
}
// The socket's destinationCache tracks the state of the socket itself and is invalidated
// whenever a FIDL call modifies that state.
var socketEventPair zx.Handle
if status := zx.Sys_handle_duplicate(s.sharedState.destinationCacheMu.destinationCache.peer, zx.RightsBasic, &socketEventPair); status != zx.ErrOk {
return socket.DatagramSocketSendMsgPreflightResult{}, &zx.Error{Status: status, Text: "zx.EventPair"}
}
response := socket.DatagramSocketSendMsgPreflightResponse{}
// TODO(https://fxbug.dev/42175034): Compute MTU dynamically once `IP_DONTFRAG` is supported.
response.SetMaximumSize(s.sharedState.maxPayloadSize)
response.SetValidity([]zx.Handle{nsEventPair, socketEventPair})
if useConnectedAddr {
response.SetTo(fidlconv.ToNetSocketAddressWithProto(s.netProto, addr))
}
return socket.DatagramSocketSendMsgPreflightResultWithResponse(response), nil
}
func (s *datagramSocketImpl) RecvMsgPostflight(_ fidl.Context) (socket.DatagramSocketRecvMsgPostflightResult, error) {
trace.AsyncBegin("net", "fuchsia_posix_socket.datagramSocket.RecvMsgPostflight", trace.AsyncID(uintptr(unsafe.Pointer(s))))
defer trace.AsyncEnd("net", "fuchsia_posix_socket.datagramSocket.RecvMsgPostflight", trace.AsyncID(uintptr(unsafe.Pointer(s))))
var response socket.DatagramSocketRecvMsgPostflightResponse
s.sharedState.cmsgCacheMu.Lock()
defer s.sharedState.cmsgCacheMu.Unlock()
var binaryCmsgRequests socket.CmsgRequests
if s.sharedState.cmsgCacheMu.cmsgCache.ipTos {
binaryCmsgRequests |= socket.CmsgRequestsIpTos
}
if s.sharedState.cmsgCacheMu.cmsgCache.ipTtl {
binaryCmsgRequests |= socket.CmsgRequestsIpTtl
}
if s.sharedState.cmsgCacheMu.cmsgCache.ipv6Tclass {
binaryCmsgRequests |= socket.CmsgRequestsIpv6Tclass
}
if s.sharedState.cmsgCacheMu.cmsgCache.ipv6HopLimit {
binaryCmsgRequests |= socket.CmsgRequestsIpv6Hoplimit
}
if s.sharedState.cmsgCacheMu.cmsgCache.ipv6PktInfo {
binaryCmsgRequests |= socket.CmsgRequestsIpv6Pktinfo
}
response.SetRequests(binaryCmsgRequests)
response.SetTimestamp(s.sharedState.cmsgCacheMu.cmsgCache.timestamp)
var validity zx.Handle
if status := zx.Sys_handle_duplicate(s.sharedState.cmsgCacheMu.cmsgCache.peer, zx.RightsBasic, &validity); status != zx.ErrOk {
return socket.DatagramSocketRecvMsgPostflightResult{}, &zx.Error{Status: status, Text: "zx.EventPair"}
}
response.SetValidity(validity)
return socket.DatagramSocketRecvMsgPostflightResultWithResponse(response), nil
}
type synchronousDatagramSocket struct {
*endpointWithEvent
cancel context.CancelFunc
}
type networkDatagramSocket struct {
synchronousDatagramSocket
}
type synchronousDatagramSocketImpl struct {
networkDatagramSocket
}
var _ socket.SynchronousDatagramSocketWithCtx = (*synchronousDatagramSocketImpl)(nil)
func (*synchronousDatagramSocketImpl) Query(fidl.Context) ([]byte, error) {
return []byte(socket.SynchronousDatagramSocketProtocolName_), nil
}
func (s *synchronousDatagramSocketImpl) Describe(fidl.Context) (socket.SynchronousDatagramSocketDescribeResponse, error) {
var response socket.SynchronousDatagramSocketDescribeResponse
event, err := s.describe()
if err != nil {
return response, err
}
response.SetEvent(event)
return response, nil
}
func (s *synchronousDatagramSocket) socketControlMessagesToFIDL(cmsg tcpip.ReceivableControlMessages) socket.SocketRecvControlData {
s.mu.RLock()
sockOptTimestamp := s.endpoint.mu.sockOptTimestamp
s.mu.RUnlock()
var controlData socket.SocketRecvControlData
controlData.SetTimestamp(socket.Timestamp{
Nanoseconds: cmsg.Timestamp.UnixNano(),
Requested: sockOptTimestamp,
})
return controlData
}
func (s *synchronousDatagramSocket) ipControlMessagesToFIDL(cmsg tcpip.ReceivableControlMessages) socket.IpRecvControlData {
var controlData socket.IpRecvControlData
if cmsg.HasTOS {
controlData.SetTos(cmsg.TOS)
}
if cmsg.HasTTL {
controlData.SetTtl(cmsg.TTL)
}
return controlData
}
func (s *synchronousDatagramSocket) ipv6ControlMessagesToFIDL(cmsg tcpip.ReceivableControlMessages) socket.Ipv6RecvControlData {
var controlData socket.Ipv6RecvControlData
if cmsg.HasTClass {
controlData.SetTclass(uint8(cmsg.TClass))
}
if cmsg.HasHopLimit {
controlData.SetHoplimit(uint8(cmsg.HopLimit))
}
if cmsg.HasIPv6PacketInfo {
controlData.SetPktinfo(socket.Ipv6PktInfoRecvControlData{
Iface: uint64(cmsg.IPv6PacketInfo.NIC),
HeaderDestinationAddr: fidlconv.ToNetIpAddress(cmsg.IPv6PacketInfo.Addr).Ipv6,
})
}
return controlData
}
func (s *synchronousDatagramSocket) networkSocketControlMessagesToFIDL(cmsg tcpip.ReceivableControlMessages) socket.NetworkSocketRecvControlData {
var controlData socket.NetworkSocketRecvControlData
if socketControlData := s.socketControlMessagesToFIDL(cmsg); socketControlData != (socket.SocketRecvControlData{}) {
controlData.SetSocket(socketControlData)
}
if ipControlData := s.ipControlMessagesToFIDL(cmsg); ipControlData != (socket.IpRecvControlData{}) {
controlData.SetIp(ipControlData)
}
if ipv6ControlData := s.ipv6ControlMessagesToFIDL(cmsg); ipv6ControlData != (socket.Ipv6RecvControlData{}) {
controlData.SetIpv6(ipv6ControlData)
}
return controlData
}
func (s *synchronousDatagramSocketImpl) controlMessagesToFIDL(cmsg tcpip.ReceivableControlMessages) socket.DatagramSocketRecvControlData {
var controlData socket.DatagramSocketRecvControlData
if networkSocketControlData := s.networkSocketControlMessagesToFIDL(cmsg); networkSocketControlData != (socket.NetworkSocketRecvControlData{}) {
controlData.SetNetwork(networkSocketControlData)
}
return controlData
}
func validateSendableControlMessages(cmsgs *tcpip.SendableControlMessages) posix.Errno {
if cmsgs.HasTTL && cmsgs.TTL == 0 {
return posix.ErrnoEinval
}
return 0
}
func fnetworkControlDataToControlMessages(in socket.NetworkSocketSendControlData, out *tcpip.SendableControlMessages) {
if in.HasIp() {
inIp := in.GetIp()
if inIp.HasTtl() {
ttl := inIp.GetTtl()
out.TTL = ttl
out.HasTTL = true
}
}
if in.HasIpv6() {
inIpv6 := in.GetIpv6()
if inIpv6.HasHoplimit() {
out.HopLimit = inIpv6.GetHoplimit()
out.HasHopLimit = true
}
if inIpv6.HasPktinfo() {
pktInfo := inIpv6.GetPktinfo()
out.IPv6PacketInfo = tcpip.IPv6PacketInfo{
NIC: tcpip.NICID(pktInfo.Iface),
Addr: fidlconv.ToTcpIpAddressDroppingUnspecifiedv6(pktInfo.LocalAddr),
}
out.HasIPv6PacketInfo = true
}
}
}
func fidlDatagramControlDataToControlMessages(in socket.DatagramSocketSendControlData, out *tcpip.SendableControlMessages) {
if in.HasNetwork() {
fnetworkControlDataToControlMessages(in.GetNetwork(), out)
}
}
func (s *synchronousDatagramSocket) close() {
if s.endpoint.decRef() {
s.wq.EventUnregister(&s.entry)
if err := s.local.Close(); err != nil {
panic(fmt.Sprintf("local.Close() = %s", err))
}
if err := s.peer.Close(); err != nil {
panic(fmt.Sprintf("peer.Close() = %s", err))
}
if !s.ns.onRemoveEndpoint(s.endpoint.key) {
_ = syslog.Errorf("endpoint map delete error, endpoint with key %d does not exist", s.endpoint.key)
}
s.endpoint.ep.Close()
_ = syslog.DebugTf("close", "%p", s.endpointWithEvent)
}
s.cancel()
}
func (s *synchronousDatagramSocket) Close(fidl.Context) (unknown.CloseableCloseResult, error) {
_ = syslog.DebugTf("Close", "%p", s.endpointWithEvent)
s.close()
return unknown.CloseableCloseResultWithResponse(unknown.CloseableCloseResponse{}), nil
}
func (s *synchronousDatagramSocketImpl) addConnection(_ fidl.Context, channel zx.Channel) {
{
sCopy := *s
s := &sCopy
// NB: this protocol is not discoverable, so the bindings do not include its name.
s.synchronousDatagramSocket.addConnection("fuchsia.posix.socket.SynchronousDatagramSocket", channel, &socket.SynchronousDatagramSocketWithCtxStub{Impl: s})
}
}
func (s *synchronousDatagramSocket) addConnection(prefix string, channel zx.Channel, stub fidl.Stub) {
s.ns.stats.SocketCount.Increment()
s.endpoint.incRef()
go func() {
defer s.ns.stats.SocketCount.Decrement()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.cancel = cancel
defer func() {
// Avoid double close when the peer calls Close and then hangs up.
if ctx.Err() == nil {
s.close()
}
}()
component.Serve(ctx, stub, channel, component.ServeOptions{
OnError: func(err error) {
// NB: this protocol is not discoverable, so the bindings do not include its name.
_ = syslog.WarnTf(prefix, "%s", err)
},
})
}()
}
func (s *synchronousDatagramSocketImpl) Clone2(ctx fidl.Context, object unknown.CloneableWithCtxInterfaceRequest) error {
s.addConnection(ctx, object.Channel)
_ = syslog.DebugTf("Clone", "%p", s.endpointWithEvent)
return nil
}
func (s *synchronousDatagramSocket) recvMsg(opts tcpip.ReadOptions, dataLen uint32) ([]byte, tcpip.ReadResult, tcpip.Error) {
var b bytes.Buffer
dst := tcpip.LimitedWriter{
W: &b,
N: int64(dataLen),
}
trace.AsyncBegin("net", "fuchsia_posix_socket.synchronousDatagramSocket.ep.Read", trace.AsyncID(uintptr(unsafe.Pointer(s))))
res, err := s.endpoint.ep.Read(&dst, opts)
trace.AsyncEnd("net", "fuchsia_posix_socket.synchronousDatagramSocket.ep.Read", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if _, ok := err.(*tcpip.ErrBadBuffer); ok && dataLen == 0 {
err = nil
}
if err := s.pending.update(); err != nil {
panic(err)
}
return b.Bytes(), res, err
}
func (s *networkDatagramSocket) recvMsg(wantAddr bool, dataLen uint32, peek bool) (fnet.SocketAddress, []byte, uint32, tcpip.ReceivableControlMessages, tcpip.Error) {
bytes, res, err := s.synchronousDatagramSocket.recvMsg(tcpip.ReadOptions{
Peek: peek,
NeedRemoteAddr: wantAddr,
}, dataLen)
if err != nil {
return fnet.SocketAddress{}, nil, 0, tcpip.ReceivableControlMessages{}, err
}
var addr fnet.SocketAddress
if wantAddr {
sockaddr := fidlconv.ToNetSocketAddressWithProto(s.netProto, res.RemoteAddr)
addr = sockaddr
}
return addr, bytes, uint32(res.Total - res.Count), res.ControlMessages, nil
}
func (s *synchronousDatagramSocketImpl) RecvMsg(_ fidl.Context, wantAddr bool, dataLen uint32, wantControl bool, flags socket.RecvMsgFlags) (socket.SynchronousDatagramSocketRecvMsgResult, error) {
trace.AsyncBegin("net", "fuchsia_posix_socket.synchronousDatagramSocket.RecvMsg", trace.AsyncID(uintptr(unsafe.Pointer(s))))
defer trace.AsyncEnd("net", "fuchsia_posix_socket.synchronousDatagramSocket.RecvMsg", trace.AsyncID(uintptr(unsafe.Pointer(s))))
addr, data, truncated, cmsg, err := s.recvMsg(wantAddr, dataLen, flags&socket.RecvMsgFlagsPeek != 0)
if err != nil {
return socket.SynchronousDatagramSocketRecvMsgResultWithErr(tcpipErrorToCode(err)), nil
}
var pAddr *fnet.SocketAddress
if wantAddr {
pAddr = &addr
}
var controlData socket.DatagramSocketRecvControlData
if wantControl {
controlData = s.controlMessagesToFIDL(cmsg)
}
return socket.SynchronousDatagramSocketRecvMsgResultWithResponse(socket.SynchronousDatagramSocketRecvMsgResponse{
Addr: pAddr,
Data: data,
Control: controlData,
Truncated: truncated,
}), nil
}
func (s *synchronousDatagramSocket) sendMsg(to *tcpip.FullAddress, data []uint8, cmsg tcpip.SendableControlMessages) (int64, tcpip.Error) {
var r bytes.Reader
r.Reset(data)
trace.AsyncBegin("net", "fuchsia_posix_socket.synchronousDatagramSocket.ep.Write", trace.AsyncID(uintptr(unsafe.Pointer(s))))
n, err := s.endpoint.ep.Write(&r, tcpip.WriteOptions{
To: to,
ControlMessages: cmsg,
})
trace.AsyncEnd("net", "fuchsia_posix_socket.synchronousDatagramSocket.ep.Write", trace.AsyncID(uintptr(unsafe.Pointer(s))))
if err != nil {
if err := s.pending.update(); err != nil {
panic(err)
}
return 0, err
}
return n, nil
}
func (s *networkDatagramSocket) sendMsg(addr *fnet.SocketAddress, data []uint8, cmsg tcpip.SendableControlMessages) (int64, tcpip.Error) {
var fullAddr tcpip.FullAddress
var to *tcpip.FullAddress
if addr != nil {
fullAddr = fidlconv.ToTCPIPFullAddress(*addr)
if s.endpoint.netProto == ipv4.ProtocolNumber && fullAddr.Addr.Len() == header.IPv6AddressSize {
return 0, &tcpip.ErrAddressFamilyNotSupported{}
}
to = &fullAddr
}
return s.synchronousDatagramSocket.sendMsg(to, data, cmsg)
}
func (s *synchronousDatagramSocketImpl) SendMsg(_ fidl.Context, addr *fnet.SocketAddress, data []uint8, controlData socket.DatagramSocketSendControlData, _ socket.SendMsgFlags) (socket.SynchronousDatagramSocketSendMsgResult, error) {
trace.AsyncBegin("net", "fuchsia_posix_socket.synchronousDatagramSocket.SendMsg", trace.AsyncID(uintptr(unsafe.Pointer(s))))
defer trace.AsyncEnd("net", "fuchsia_posix_socket.synchronousDatagramSocket.SendMsg", trace.AsyncID(uintptr(unsafe.Pointer(s))))
var cmsg tcpip.SendableControlMessages
fidlDatagramControlDataToControlMessages(controlData, &cmsg)
if err := validateSendableControlMessages(&cmsg); err != 0 {
return socket.SynchronousDatagramSocketSendMsgResultWithErr(err), nil
}
n, err := s.sendMsg(addr, data, cmsg)
if err != nil {
return socket.SynchronousDatagramSocketSendMsgResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.SynchronousDatagramSocketSendMsgResultWithResponse(socket.SynchronousDatagramSocketSendMsgResponse{Len: n}), nil
}
func (s *synchronousDatagramSocketImpl) GetInfo(fidl.Context) (socket.BaseDatagramSocketGetInfoResult, error) {
domain, err := s.domain()
if err != nil {
return socket.BaseDatagramSocketGetInfoResultWithErr(tcpipErrorToCode(err)), nil
}
var proto socket.DatagramSocketProtocol
switch s.transProto {
case udp.ProtocolNumber:
proto = socket.DatagramSocketProtocolUdp
case header.ICMPv4ProtocolNumber, header.ICMPv6ProtocolNumber:
proto = socket.DatagramSocketProtocolIcmpEcho
default:
panic(fmt.Sprintf("unhandled transport protocol: %d", s.transProto))
}
return socket.BaseDatagramSocketGetInfoResultWithResponse(socket.BaseDatagramSocketGetInfoResponse{
Domain: domain,
Proto: proto,
}), nil
}
// State shared across all copies of this socket. Collecting this state here lets us
// allocate only once during initialization.
type sharedStreamSocketState struct {
// onConnect is used to register callbacks for connected sockets.
onConnect sync.Once
// onListen is used to register callbacks for listening sockets.
onListen sync.Once
// err is used to store errors returned on the socket.
err streamSocketError
pending signaler
sockOptStats *streamSocketOptionStats
}
type streamSocketImpl struct {
*endpointWithSocket
endpointWithMutators
sharedState *sharedStreamSocketState
// Used to unblock waiting to write to an endpoint after receiving
// an error indicating that the endpoint's send buffer is full.
unblockLoopWrite chan struct{}
cancel context.CancelFunc
}
var _ socket.StreamSocketWithCtx = (*streamSocketImpl)(nil)
func makeStreamSocketImpl(eps *endpointWithSocket) streamSocketImpl {
sockOptStats := streamSocketOptionStats{
NetworkSocketOptionStats: &eps.sockOptStats,
}
// NB: There are two possibilities when CompareAndSwap returns false:
// 1. The key was not found in the map, which is valid as the endpoint
// can be removed from the map between when it was added and here; or
// 2. The value in the map is not equal to the value passed.
//
// Care needs to be taken to avoid the latter case: this means that the
// value in the map must not be modified between when the endpoint is added
// to the endpointsMap during `endpointWithSocket` initialization and here;
// and also if the value stored in the map changes, the argument passed here
// needs to be updated to match.
_ = eps.ns.endpoints.CompareAndSwap(eps.key,
endpointAndStats{
ep: eps.ep,
sockOptStats: &eps.sockOptStats,
}, endpointAndStats{
ep: eps.ep,
sockOptStats: &sockOptStats,
})
return streamSocketImpl{
endpointWithSocket: eps,
endpointWithMutators: endpointWithMutators{ep: &eps.endpoint},
unblockLoopWrite: make(chan struct{}),
sharedState: &sharedStreamSocketState{
pending: signaler{
eventsToSignals: func(events waiter.EventMask) zx.Signals {
signals := zx.SignalNone
if events&waiter.EventIn != 0 {
signals |= signalStreamIncoming
events ^= waiter.EventIn
}
if events != 0 {
panic(fmt.Sprintf("unexpected events=%b", events))
}
return signals
},
readiness: eps.endpoint.ep.Readiness,
signalPeer: eps.local.Handle().SignalPeer,
},
sockOptStats: &sockOptStats,
},
}
}
func newStreamSocket(s streamSocketImpl) (socket.StreamSocketWithCtxInterface, error) {
localC, peerC, err := zx.NewChannel(0)
if err != nil {
return socket.StreamSocketWithCtxInterface{}, err
}
s.addConnection(context.Background(), localC)
_ = syslog.DebugTf("NewStream", "%p", &s)
return socket.StreamSocketWithCtxInterface{Channel: peerC}, nil
}
func (s *streamSocketImpl) close() {
defer s.cancel()
if s.endpoint.decRef() {
linger := s.endpoint.ep.SocketOptions().GetLinger()
doClose := func() {
s.endpointWithSocket.close()
if err := s.peer.Close(); err != nil {
panic(err)
}
}
clock := s.endpoint.ns.stack.Clock()
if linger.Enabled {
// `man 7 socket`:
//
// When enabled, a close(2) or shutdown(2) will not return until all
// queued messages for the socket have been successfully sent or the
// linger timeout has been reached. Otherwise, the call returns
// immediately and the closing is done in the background. When the
// socket is closed as part of exit(2), it always lingers in the
// background.
//
// Thus we must allow linger-amount-of-time for pending writes to flush,
// and do so synchronously if linger is enabled.
clock.AfterFunc(linger.Timeout, func() { close(s.unblockLoopWrite) })
doClose()
} else {
// Here be dragons.
//
// Normally, with linger disabled, the socket is immediately closed to
// the application (accepting no more writes) and lingers for TCP_LINGER2
// duration. However, because of the use of a zircon socket in front of
// the netstack endpoint, we can't be sure that all writes have flushed
// from the zircon socket to the netstack endpoint when we observe
// `close(3)`. This in turn means we can't close the netstack endpoint
// (which would start the TCP_LINGER2 timer), because there may still be
// data pending in the zircon socket (e.g. when the netstack endpoint's
// send buffer is full). We need *some* condition to break us out of this
// deadlock.
//
// We pick TCP_LINGER2 somewhat arbitrarily. In the worst case, this
// means that our true TCP linger time will be twice the configured
// value, but this is the best we can do without rethinking the
// interfaces.
// If no data is in the buffer, close synchronously. This is an important
// optimization that prevents flakiness when a socket is closed and
// another socket is immediately bound to the port.
if reader := (socketReader{socket: s.endpointWithSocket.local}); reader.Len() == 0 {
// We close s.unblockLoopWrite as loopWrite may be currently blocked on
// the select in the ErrWouldBlock handler.
//
// This can happen when a previous write exactly fills up the send
// buffer in gVisor and then the next call to the endpoint's Write
// method fails with ErrWouldBlock (even if the write payload/zircon
// socket) is empty.
close(s.unblockLoopWrite)
doClose()
} else {
var linger tcpip.TCPLingerTimeoutOption
if err := s.endpoint.ep.GetSockOpt(&linger); err != nil {
panic(fmt.Sprintf("GetSockOpt(%T): %s", linger, err))
}
clock.AfterFunc(time.Duration(linger), func() { close(s.unblockLoopWrite) })
go doClose()
}
}
}
}
func (s *streamSocketImpl) Close(fidl.Context) (unknown.CloseableCloseResult, error) {
_ = syslog.DebugTf("Close", "%p", s)
s.close()
return unknown.CloseableCloseResultWithResponse(unknown.CloseableCloseResponse{}), nil
}
func (s *streamSocketImpl) GetError(fidl.Context) (socket.BaseSocketGetErrorResult, error) {
s.sharedState.sockOptStats.GetError.Add(1)
err := func() tcpip.Error {
s.sharedState.err.mu.Lock()
defer s.sharedState.err.mu.Unlock()
if ch := s.sharedState.err.mu.ch; ch != nil {
err := <-ch
return err
}
err := s.endpoint.ep.LastError()
s.sharedState.err.setConsumedLocked(err)
return err
}()
_ = syslog.DebugTf("GetError", "%p: err=%#v", s, err)
if err != nil {
return socket.BaseSocketGetErrorResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.BaseSocketGetErrorResultWithResponse(socket.BaseSocketGetErrorResponse{}), nil
}
func (s *streamSocketImpl) addConnection(_ fidl.Context, channel zx.Channel) {
{
sCopy := *s
s := &sCopy
s.ns.stats.SocketCount.Increment()
s.endpoint.incRef()
go func() {
defer s.ns.stats.SocketCount.Decrement()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.cancel = cancel
defer func() {
// Avoid double close when the peer calls Close and then hangs up.
if ctx.Err() == nil {
s.close()
}
}()
stub := socket.StreamSocketWithCtxStub{Impl: s}
component.Serve(ctx, &stub, channel, component.ServeOptions{
OnError: func(err error) {
// NB: this protocol is not discoverable, so the bindings do not include its name.
_ = syslog.WarnTf("fuchsia.posix.socket.StreamSocket", "%s", err)
},
})
}()
}
}
func (s *streamSocketImpl) Clone2(ctx fidl.Context, object unknown.CloneableWithCtxInterfaceRequest) error {
s.addConnection(ctx, object.Channel)
_ = syslog.DebugTf("Clone", "%p", s.endpointWithSocket)
return nil
}
func (*streamSocketImpl) Query(fidl.Context) ([]uint8, error) {
return []byte(socket.StreamSocketProtocolName_), nil
}
func (s *streamSocketImpl) Describe(fidl.Context) (socket.StreamSocketDescribeResponse, error) {
var response socket.StreamSocketDescribeResponse
handle, err := s.describe()
if err != nil {
return response, err
}
response.SetSocket(zx.Socket(handle))
return response, nil
}
func (s *streamSocketImpl) Accept(_ fidl.Context, wantAddr bool) (socket.StreamSocketAcceptResult, error) {
{
code, addr, s, err := s.accept(wantAddr)
if err != nil {
return socket.StreamSocketAcceptResult{}, err
}
if code != 0 {
return socket.StreamSocketAcceptResultWithErr(code), nil
}
streamSocketInterface, err := newStreamSocket(s)
if err != nil {
return socket.StreamSocketAcceptResult{}, err
}
// TODO(https://fxbug.dev/42146533): this copies a lock; avoid this when FIDL bindings are better.
response := socket.StreamSocketAcceptResponse{
S: streamSocketInterface,
}
if addr != nil {
sockaddr := fidlconv.ToNetSocketAddressWithProto(s.netProto, *addr)
response.Addr = &sockaddr
}
return socket.StreamSocketAcceptResultWithResponse(response), nil
}
}
func (s *streamSocketImpl) GetInfo(fidl.Context) (socket.StreamSocketGetInfoResult, error) {
domain, err := s.domain()
if err != nil {
return socket.StreamSocketGetInfoResultWithErr(tcpipErrorToCode(err)), nil
}
var proto socket.StreamSocketProtocol
switch s.transProto {
case tcp.ProtocolNumber:
proto = socket.StreamSocketProtocolTcp
default:
panic(fmt.Sprintf("unhandled transport protocol: %d", s.transProto))
}
return socket.StreamSocketGetInfoResultWithResponse(socket.StreamSocketGetInfoResponse{
Domain: domain,
Proto: proto,
}), nil
}
func (s *streamSocketImpl) SetTcpNoDelay(_ fidl.Context, value bool) (socket.StreamSocketSetTcpNoDelayResult, error) {
s.sharedState.sockOptStats.SetTcpNoDelay.Add(1)
s.endpoint.ep.SocketOptions().SetDelayOption(!value)
return socket.StreamSocketSetTcpNoDelayResultWithResponse(socket.StreamSocketSetTcpNoDelayResponse{}), nil
}
func (s *streamSocketImpl) GetTcpNoDelay(fidl.Context) (socket.StreamSocketGetTcpNoDelayResult, error) {
s.sharedState.sockOptStats.GetTcpNoDelay.Add(1)
value := s.endpoint.ep.SocketOptions().GetDelayOption()
return socket.StreamSocketGetTcpNoDelayResultWithResponse(
socket.StreamSocketGetTcpNoDelayResponse{
Value: !value,
},
), nil
}
func (s *streamSocketImpl) SetTcpCork(_ fidl.Context, value bool) (socket.StreamSocketSetTcpCorkResult, error) {
s.sharedState.sockOptStats.SetTcpCork.Add(1)
s.endpoint.ep.SocketOptions().SetCorkOption(value)
return socket.StreamSocketSetTcpCorkResultWithResponse(socket.StreamSocketSetTcpCorkResponse{}), nil
}
func (s *streamSocketImpl) GetTcpCork(fidl.Context) (socket.StreamSocketGetTcpCorkResult, error) {
s.sharedState.sockOptStats.GetTcpCork.Add(1)
value := s.endpoint.ep.SocketOptions().GetCorkOption()
return socket.StreamSocketGetTcpCorkResultWithResponse(socket.StreamSocketGetTcpCorkResponse{Value: value}), nil
}
func (s *streamSocketImpl) SetTcpQuickAck(_ fidl.Context, value bool) (socket.StreamSocketSetTcpQuickAckResult, error) {
s.sharedState.sockOptStats.SetTcpQuickAck.Add(1)
s.endpoint.ep.SocketOptions().SetQuickAck(value)
return socket.StreamSocketSetTcpQuickAckResultWithResponse(socket.StreamSocketSetTcpQuickAckResponse{}), nil
}
func (s *streamSocketImpl) GetTcpQuickAck(fidl.Context) (socket.StreamSocketGetTcpQuickAckResult, error) {
s.sharedState.sockOptStats.GetTcpQuickAck.Add(1)
value := s.endpoint.ep.SocketOptions().GetQuickAck()
return socket.StreamSocketGetTcpQuickAckResultWithResponse(socket.StreamSocketGetTcpQuickAckResponse{Value: value}), nil
}
func (s *streamSocketImpl) SetTcpMaxSegment(_ fidl.Context, valueBytes uint32) (socket.StreamSocketSetTcpMaxSegmentResult, error) {
s.sharedState.sockOptStats.SetTcpMaxSegment.Add(1)
if err := s.endpoint.ep.SetSockOptInt(tcpip.MaxSegOption, int(valueBytes)); err != nil {
return socket.StreamSocketSetTcpMaxSegmentResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpMaxSegmentResultWithResponse(socket.StreamSocketSetTcpMaxSegmentResponse{}), nil
}
func (s *streamSocketImpl) GetTcpMaxSegment(fidl.Context) (socket.StreamSocketGetTcpMaxSegmentResult, error) {
s.sharedState.sockOptStats.GetTcpMaxSegment.Add(1)
value, err := s.endpoint.ep.GetSockOptInt(tcpip.MaxSegOption)
if err != nil {
return socket.StreamSocketGetTcpMaxSegmentResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpMaxSegmentResultWithResponse(socket.StreamSocketGetTcpMaxSegmentResponse{
ValueBytes: uint32(value),
}), nil
}
func (s *streamSocketImpl) SetTcpKeepAliveIdle(_ fidl.Context, valueSecs uint32) (socket.StreamSocketSetTcpKeepAliveIdleResult, error) {
s.sharedState.sockOptStats.SetTcpKeepAliveIdle.Add(1)
// https://github.com/torvalds/linux/blob/f2850dd5ee015bd7b77043f731632888887689c7/net/ipv4/tcp.c#L2991
if valueSecs < 1 || valueSecs > maxTCPKeepIdle {
return socket.StreamSocketSetTcpKeepAliveIdleResultWithErr(posix.ErrnoEinval), nil
}
opt := tcpip.KeepaliveIdleOption(time.Second * time.Duration(valueSecs))
if err := s.endpoint.ep.SetSockOpt(&opt); err != nil {
return socket.StreamSocketSetTcpKeepAliveIdleResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpKeepAliveIdleResultWithResponse(socket.StreamSocketSetTcpKeepAliveIdleResponse{}), nil
}
func (s *streamSocketImpl) GetTcpKeepAliveIdle(fidl.Context) (socket.StreamSocketGetTcpKeepAliveIdleResult, error) {
s.sharedState.sockOptStats.GetTcpKeepAliveIdle.Add(1)
var value tcpip.KeepaliveIdleOption
if err := s.endpoint.ep.GetSockOpt(&value); err != nil {
return socket.StreamSocketGetTcpKeepAliveIdleResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpKeepAliveIdleResultWithResponse(socket.StreamSocketGetTcpKeepAliveIdleResponse{
ValueSecs: uint32(time.Duration(value).Seconds()),
}), nil
}
func (s *streamSocketImpl) SetTcpKeepAliveInterval(_ fidl.Context, valueSecs uint32) (socket.StreamSocketSetTcpKeepAliveIntervalResult, error) {
s.sharedState.sockOptStats.SetTcpKeepAliveInterval.Add(1)
// https://github.com/torvalds/linux/blob/f2850dd5ee015bd7b77043f731632888887689c7/net/ipv4/tcp.c#L3008
if valueSecs < 1 || valueSecs > maxTCPKeepIntvl {
return socket.StreamSocketSetTcpKeepAliveIntervalResultWithErr(posix.ErrnoEinval), nil
}
opt := tcpip.KeepaliveIntervalOption(time.Second * time.Duration(valueSecs))
if err := s.endpoint.ep.SetSockOpt(&opt); err != nil {
return socket.StreamSocketSetTcpKeepAliveIntervalResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpKeepAliveIntervalResultWithResponse(socket.StreamSocketSetTcpKeepAliveIntervalResponse{}), nil
}
func (s *streamSocketImpl) GetTcpKeepAliveInterval(fidl.Context) (socket.StreamSocketGetTcpKeepAliveIntervalResult, error) {
s.sharedState.sockOptStats.GetTcpKeepAliveInterval.Add(1)
var value tcpip.KeepaliveIntervalOption
if err := s.endpoint.ep.GetSockOpt(&value); err != nil {
return socket.StreamSocketGetTcpKeepAliveIntervalResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpKeepAliveIntervalResultWithResponse(socket.StreamSocketGetTcpKeepAliveIntervalResponse{
ValueSecs: uint32(time.Duration(value).Seconds()),
}), nil
}
func (s *streamSocketImpl) SetTcpKeepAliveCount(_ fidl.Context, value uint32) (socket.StreamSocketSetTcpKeepAliveCountResult, error) {
s.sharedState.sockOptStats.SetTcpKeepAliveCount.Add(1)
// https://github.com/torvalds/linux/blob/f2850dd5ee015bd7b77043f731632888887689c7/net/ipv4/tcp.c#L3014
if value < 1 || value > maxTCPKeepCnt {
return socket.StreamSocketSetTcpKeepAliveCountResultWithErr(posix.ErrnoEinval), nil
}
if err := s.endpoint.ep.SetSockOptInt(tcpip.KeepaliveCountOption, int(value)); err != nil {
return socket.StreamSocketSetTcpKeepAliveCountResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpKeepAliveCountResultWithResponse(socket.StreamSocketSetTcpKeepAliveCountResponse{}), nil
}
func (s *streamSocketImpl) GetTcpKeepAliveCount(fidl.Context) (socket.StreamSocketGetTcpKeepAliveCountResult, error) {
s.sharedState.sockOptStats.GetTcpKeepAliveCount.Add(1)
value, err := s.endpoint.ep.GetSockOptInt(tcpip.KeepaliveCountOption)
if err != nil {
return socket.StreamSocketGetTcpKeepAliveCountResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpKeepAliveCountResultWithResponse(socket.StreamSocketGetTcpKeepAliveCountResponse{Value: uint32(value)}), nil
}
func (s *streamSocketImpl) SetTcpUserTimeout(_ fidl.Context, valueMillis uint32) (socket.StreamSocketSetTcpUserTimeoutResult, error) {
s.sharedState.sockOptStats.SetTcpUserTimeout.Add(1)
opt := tcpip.TCPUserTimeoutOption(time.Millisecond * time.Duration(valueMillis))
if err := s.endpoint.ep.SetSockOpt(&opt); err != nil {
return socket.StreamSocketSetTcpUserTimeoutResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpUserTimeoutResultWithResponse(socket.StreamSocketSetTcpUserTimeoutResponse{}), nil
}
func (s *streamSocketImpl) GetTcpUserTimeout(fidl.Context) (socket.StreamSocketGetTcpUserTimeoutResult, error) {
s.sharedState.sockOptStats.GetTcpUserTimeout.Add(1)
var value tcpip.TCPUserTimeoutOption
if err := s.endpoint.ep.GetSockOpt(&value); err != nil {
return socket.StreamSocketGetTcpUserTimeoutResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpUserTimeoutResultWithResponse(socket.StreamSocketGetTcpUserTimeoutResponse{
ValueMillis: uint32(time.Duration(value).Milliseconds()),
}), nil
}
func (s *streamSocketImpl) SetTcpCongestion(_ fidl.Context, value socket.TcpCongestionControl) (socket.StreamSocketSetTcpCongestionResult, error) {
s.sharedState.sockOptStats.SetTcpCongestion.Add(1)
var cc string
switch value {
case socket.TcpCongestionControlReno:
cc = ccReno
case socket.TcpCongestionControlCubic:
cc = ccCubic
default:
// Linux returns ENOENT when an invalid congestion
// control algorithm is specified.
return socket.StreamSocketSetTcpCongestionResultWithErr(posix.ErrnoEnoent), nil
}
opt := tcpip.CongestionControlOption(cc)
if err := s.endpoint.ep.SetSockOpt(&opt); err != nil {
return socket.StreamSocketSetTcpCongestionResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpCongestionResultWithResponse(socket.StreamSocketSetTcpCongestionResponse{}), nil
}
func (s *streamSocketImpl) GetTcpCongestion(fidl.Context) (socket.StreamSocketGetTcpCongestionResult, error) {
s.sharedState.sockOptStats.GetTcpCongestion.Add(1)
var value tcpip.CongestionControlOption
if err := s.endpoint.ep.GetSockOpt(&value); err != nil {
return socket.StreamSocketGetTcpCongestionResultWithErr(tcpipErrorToCode(err)), nil
}
var cc socket.TcpCongestionControl
switch string(value) {
case ccReno:
cc = socket.TcpCongestionControlReno
case ccCubic:
cc = socket.TcpCongestionControlCubic
default:
return socket.StreamSocketGetTcpCongestionResultWithErr(posix.ErrnoEopnotsupp), nil
}
return socket.StreamSocketGetTcpCongestionResultWithResponse(socket.StreamSocketGetTcpCongestionResponse{Value: cc}), nil
}
func (s *streamSocketImpl) SetTcpDeferAccept(_ fidl.Context, valueSecs uint32) (socket.StreamSocketSetTcpDeferAcceptResult, error) {
s.sharedState.sockOptStats.SetTcpDeferAccept.Add(1)
opt := tcpip.TCPDeferAcceptOption(time.Second * time.Duration(valueSecs))
if err := s.endpoint.ep.SetSockOpt(&opt); err != nil {
return socket.StreamSocketSetTcpDeferAcceptResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpDeferAcceptResultWithResponse(socket.StreamSocketSetTcpDeferAcceptResponse{}), nil
}
func (s *streamSocketImpl) GetTcpDeferAccept(fidl.Context) (socket.StreamSocketGetTcpDeferAcceptResult, error) {
s.sharedState.sockOptStats.GetTcpDeferAccept.Add(1)
var value tcpip.TCPDeferAcceptOption
if err := s.endpoint.ep.GetSockOpt(&value); err != nil {
return socket.StreamSocketGetTcpDeferAcceptResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpDeferAcceptResultWithResponse(socket.StreamSocketGetTcpDeferAcceptResponse{
ValueSecs: uint32(time.Duration(value).Seconds()),
}), nil
}
func (s *streamSocketImpl) GetTcpInfo(fidl.Context) (socket.StreamSocketGetTcpInfoResult, error) {
s.sharedState.sockOptStats.GetTcpInfo.Add(1)
var value tcpip.TCPInfoOption
if err := s.endpoint.ep.GetSockOpt(&value); err != nil {
return socket.StreamSocketGetTcpInfoResultWithErr(tcpipErrorToCode(err)), nil
}
var info socket.TcpInfo
info.SetCaState(func() socket.TcpCongestionControlState {
switch state := value.CcState; state {
case tcpip.Open:
return socket.TcpCongestionControlStateOpen
case tcpip.RTORecovery:
return socket.TcpCongestionControlStateLoss
case tcpip.FastRecovery, tcpip.SACKRecovery:
return socket.TcpCongestionControlStateRecovery
case tcpip.Disorder:
return socket.TcpCongestionControlStateDisorder
default:
panic(fmt.Sprintf("unknown congestion control state: %d", state))
}
}())
info.SetState(func() socket.TcpState {
switch state := tcp.EndpointState(value.State); state {
case tcp.StateEstablished:
return socket.TcpStateEstablished
case tcp.StateSynSent:
return socket.TcpStateSynSent
case tcp.StateSynRecv:
return socket.TcpStateSynRecv
case tcp.StateFinWait1:
return socket.TcpStateFinWait1
case tcp.StateFinWait2:
return socket.TcpStateFinWait2
case tcp.StateTimeWait:
return socket.TcpStateTimeWait
case tcp.StateClose:
return socket.TcpStateClose
case tcp.StateCloseWait:
return socket.TcpStateCloseWait
case tcp.StateLastAck:
return socket.TcpStateLastAck
case tcp.StateListen:
return socket.TcpStateListen
case tcp.StateClosing:
return socket.TcpStateClosing
// Endpoint states internal to netstack.
case tcp.StateInitial, tcp.StateBound, tcp.StateConnecting, tcp.StateError:
return socket.TcpStateClose
default:
panic(fmt.Sprintf("unknown state: %d", state))
}
}())
info.SetRtoUsec(uint32(value.RTO.Microseconds()))
info.SetRttUsec(uint32(value.RTT.Microseconds()))
info.SetRttVarUsec(uint32(value.RTTVar.Microseconds()))
info.SetSndSsthresh(value.SndSsthresh)
info.SetSndCwnd(value.SndCwnd)
info.SetReorderSeen(value.ReorderSeen)
return socket.StreamSocketGetTcpInfoResultWithResponse(socket.StreamSocketGetTcpInfoResponse{
Info: info,
}), nil
}
func (s *streamSocketImpl) SetTcpSynCount(_ fidl.Context, value uint32) (socket.StreamSocketSetTcpSynCountResult, error) {
s.sharedState.sockOptStats.SetTcpSynCount.Add(1)
if err := s.endpoint.ep.SetSockOptInt(tcpip.TCPSynCountOption, int(value)); err != nil {
return socket.StreamSocketSetTcpSynCountResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpSynCountResultWithResponse(socket.StreamSocketSetTcpSynCountResponse{}), nil
}
func (s *streamSocketImpl) GetTcpSynCount(fidl.Context) (socket.StreamSocketGetTcpSynCountResult, error) {
s.sharedState.sockOptStats.GetTcpSynCount.Add(1)
value, err := s.endpoint.ep.GetSockOptInt(tcpip.TCPSynCountOption)
if err != nil {
return socket.StreamSocketGetTcpSynCountResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpSynCountResultWithResponse(socket.StreamSocketGetTcpSynCountResponse{Value: uint32(value)}), nil
}
func (s *streamSocketImpl) SetTcpWindowClamp(_ fidl.Context, value uint32) (socket.StreamSocketSetTcpWindowClampResult, error) {
s.sharedState.sockOptStats.SetTcpWindowClamp.Add(1)
if err := s.endpoint.ep.SetSockOptInt(tcpip.TCPWindowClampOption, int(value)); err != nil {
return socket.StreamSocketSetTcpWindowClampResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpWindowClampResultWithResponse(socket.StreamSocketSetTcpWindowClampResponse{}), nil
}
func (s *streamSocketImpl) GetTcpWindowClamp(fidl.Context) (socket.StreamSocketGetTcpWindowClampResult, error) {
s.sharedState.sockOptStats.GetTcpWindowClamp.Add(1)
value, err := s.endpoint.ep.GetSockOptInt(tcpip.TCPWindowClampOption)
if err != nil {
return socket.StreamSocketGetTcpWindowClampResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketGetTcpWindowClampResultWithResponse(socket.StreamSocketGetTcpWindowClampResponse{Value: uint32(value)}), nil
}
func (s *streamSocketImpl) SetTcpLinger(_ fidl.Context, valueSecs socket.OptionalUint32) (socket.StreamSocketSetTcpLingerResult, error) {
s.sharedState.sockOptStats.SetTcpLinger.Add(1)
v, err := optionalUint32ToInt(valueSecs, -1)
if err != nil {
return socket.StreamSocketSetTcpLingerResultWithErr(tcpipErrorToCode(err)), nil
}
opt := tcpip.TCPLingerTimeoutOption(time.Second * time.Duration(v))
if err := s.endpoint.ep.SetSockOpt(&opt); err != nil {
return socket.StreamSocketSetTcpLingerResultWithErr(tcpipErrorToCode(err)), nil
}
return socket.StreamSocketSetTcpLingerResultWithResponse(socket.StreamSocketSetTcpLingerResponse{}), nil
}
func (s *streamSocketImpl) GetTcpLinger(fidl.Context) (socket.StreamSocketGetTcpLingerResult, error) {
s.sharedState.sockOptStats.GetTcpLinger.Add(1)
var value tcpip.TCPLingerTimeoutOption
if err := s.endpoint.ep.GetSockOpt(&value); err != nil {
return socket.StreamSocketGetTcpLingerResultWithErr(tcpipErrorToCode(err)), nil
}
v := socket.OptionalUint32WithUnset(socket.Empty{})
if seconds := time.Duration(value) / time.Second; seconds != 0 {
v = socket.OptionalUint32WithValue(uint32(seconds))
}
return socket.StreamSocketGetTcpLingerResultWithResponse(socket.StreamSocketGetTcpLingerResponse{
ValueSecs: v,
}), nil
}
func (ns *Netstack) onAddEndpoint(e *endpoint, sockOptStats socketOptionStats) {
ns.stats.SocketsCreated.Increment()
var key uint64
// Reserve key value 0 to indicate that the endpoint was never
// added to the endpoints map.
for key == 0 {
key = atomic.AddUint64(&ns.endpoints.nextKey, 1)
}
// Check if the key exists in the map already. The key is a uint64 value
// and we skip adding the endpoint to the map in the unlikely wrap around
// case for now.
if stats, loaded := ns.endpoints.LoadOrStore(key, endpointAndStats{
ep: e.ep,
sockOptStats: sockOptStats,
}); loaded {
var info stack.TransportEndpointInfo
switch t := stats.ep.Info().(type) {
case *stack.TransportEndpointInfo:
info = *t
}
_ = syslog.Errorf("endpoint map store error, key %d exists for endpoint %+v", key, info)
} else {
e.key = key
}
}
func (ns *Netstack) onRemoveEndpoint(key uint64) bool {
ns.stats.SocketsDestroyed.Increment()
// Key value 0 would indicate that the endpoint was never
// added to the endpoints map.
if key == 0 {
return false
}
endpointStats, deleted := ns.endpoints.LoadAndDelete(key)
// Update the max socket option stats if any of this endpoint's stats are
// larger. Making the update only when sockets close and when inspect data
// is queried minimizes overhead.
ns.stats.MaxSocketOptionStats.updateMax(endpointStats.sockOptStats)
return deleted
}
type providerImpl struct {
ns *Netstack
}
var _ socket.ProviderWithCtx = (*providerImpl)(nil)
func toTransProtoStream(_ socket.Domain, proto socket.StreamSocketProtocol) (posix.Errno, tcpip.TransportProtocolNumber) {
switch proto {
case socket.StreamSocketProtocolTcp:
return 0, tcp.ProtocolNumber
}
return posix.ErrnoEprotonosupport, 0
}
func toTransProtoDatagram(domain socket.Domain, proto socket.DatagramSocketProtocol) (posix.Errno, tcpip.TransportProtocolNumber) {
switch proto {
case socket.DatagramSocketProtocolUdp:
return 0, udp.ProtocolNumber
case socket.DatagramSocketProtocolIcmpEcho:
switch domain {
case socket.DomainIpv4:
return 0, icmp.ProtocolNumber4
case socket.DomainIpv6:
return 0, icmp.ProtocolNumber6
}
}
return posix.ErrnoEprotonosupport, 0
}
func toNetProto(domain socket.Domain) (posix.Errno, tcpip.NetworkProtocolNumber) {
switch domain {
case socket.DomainIpv4:
return 0, ipv4.ProtocolNumber
case socket.DomainIpv6:
return 0, ipv6.ProtocolNumber
default:
return posix.ErrnoEpfnosupport, 0
}
}
func makeSynchronousDatagramSocket(ep tcpip.Endpoint, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, wq *waiter.Queue, ns *Netstack) (synchronousDatagramSocket, error) {
var localE, peerE zx.Handle
if status := zx.Sys_eventpair_create(0, &localE, &peerE); status != zx.ErrOk {
return synchronousDatagramSocket{}, &zx.Error{Status: status, Text: "zx.EventPair"}
}
s := synchronousDatagramSocket{
endpointWithEvent: &endpointWithEvent{
endpoint: endpoint{
ep: ep,
wq: wq,
transProto: transProto,
netProto: netProto,
ns: ns,
},
local: localE,
peer: peerE,
pending: signaler{
supported: waiter.EventIn | waiter.EventErr,
eventsToSignals: func(events waiter.EventMask) zx.Signals {
signals := zx.SignalNone
if events&waiter.EventIn != 0 {
signals |= signalDatagramIncoming
events ^= waiter.EventIn
}
if events&waiter.EventErr != 0 {
signals |= signalDatagramError
events ^= waiter.EventErr
}
if events != 0 {
panic(fmt.Sprintf("unexpected events=%b", events))
}
return signals
},
readiness: ep.Readiness,
signalPeer: localE.SignalPeer,
},
},
}
s.endpointWithEvent.endpointWithMutators.ep = &s.endpointWithEvent.endpoint
s.entry = waiter.NewFunctionEntry(s.pending.supported, func(waiter.EventMask) {
if err := s.pending.update(); err != nil {
panic(err)
}
})
s.wq.EventRegister(&s.entry)
return s, nil
}
func (sp *providerImpl) DatagramSocketDeprecated(ctx fidl.Context, domain socket.Domain, proto socket.DatagramSocketProtocol) (socket.ProviderDatagramSocketDeprecatedResult, error) {
code, netProto := toNetProto(domain)
if code != 0 {
return socket.ProviderDatagramSocketDeprecatedResultWithErr(code), nil
}
code, transProto := toTransProtoDatagram(domain, proto)
if code != 0 {
return socket.ProviderDatagramSocketDeprecatedResultWithErr(code), nil
}
wq := new(waiter.Queue)
var ep tcpip.Endpoint
{
var err tcpip.Error
ep, err = sp.ns.stack.NewEndpoint(transProto, netProto, wq)
if err != nil {
return socket.ProviderDatagramSocketDeprecatedResultWithErr(tcpipErrorToCode(err)), nil
}
}
synchronousDatagramSocket, err := makeSynchronousDatagramSocket(ep, netProto, transProto, wq, sp.ns)
if err != nil {
return socket.ProviderDatagramSocketDeprecatedResult{}, err
}
s := synchronousDatagramSocketImpl{
networkDatagramSocket: networkDatagramSocket{
synchronousDatagramSocket: synchronousDatagramSocket,
},
}
localC, peerC, err := zx.NewChannel(0)
if err != nil {
return socket.ProviderDatagramSocketDeprecatedResult{}, err
}