blob: 438cfd02a31fd99907393d81e2ac563416e64b92 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//go:build !build_with_native_toolchain
// +build !build_with_native_toolchain
package netstack
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"syscall/zx"
"syscall/zx/fidl"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/sync"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/time"
"go.fuchsia.dev/fuchsia/src/lib/component"
syslog "go.fuchsia.dev/fuchsia/src/lib/syslog/go"
"fidl/fuchsia/net"
"fidl/fuchsia/net/interfaces"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
const watcherProtocolName = "fuchsia.net.interfaces/Watcher"
func initialProperties(ifs *ifState, name string) interfaces.Properties {
var p interfaces.Properties
p.SetId(uint64(ifs.nicid))
p.SetName(name)
if ifs.endpoint.Capabilities()&stack.CapabilityLoopback != 0 {
p.SetDeviceClass(interfaces.DeviceClassWithLoopback(interfaces.Empty{}))
} else if ifs.controller != nil {
p.SetDeviceClass(interfaces.DeviceClassWithDevice(ifs.controller.DeviceClass()))
} else {
panic(fmt.Sprintf("can't extract DeviceClass from non-loopback NIC %d(%s) with nil controller", ifs.nicid, name))
}
p.SetOnline(false)
p.SetHasDefaultIpv4Route(false)
p.SetHasDefaultIpv6Route(false)
p.SetAddresses([]interfaces.Address{})
return p
}
var _ interfaces.WatcherWithCtx = (*interfaceWatcherImpl)(nil)
type interfaceWatcherImpl struct {
cancelServe context.CancelFunc
ready chan struct{}
mu struct {
sync.Mutex
isHanging bool
queue []interfaces.Event
}
}
const maxInterfaceWatcherQueueLen = 128
func (wi *interfaceWatcherImpl) onEvent(e interfaces.Event) {
wi.mu.Lock()
if len(wi.mu.queue) >= maxInterfaceWatcherQueueLen {
_ = syslog.ErrorTf(watcherProtocolName, "too many unconsumed events (client may not be calling Watch as frequently as possible): %d, max: %d", len(wi.mu.queue), maxInterfaceWatcherQueueLen)
wi.cancelServe()
} else {
wi.mu.queue = append(wi.mu.queue, e)
}
queueLen := len(wi.mu.queue)
isHanging := wi.mu.isHanging
wi.mu.Unlock()
if queueLen > 0 && isHanging {
select {
case wi.ready <- struct{}{}:
default:
}
}
}
func cmpSubnet(ifAddr1 net.Subnet, ifAddr2 net.Subnet) int {
switch ifAddr1.Addr.Which() {
case net.IpAddressIpv4:
if ifAddr2.Addr.Which() == net.IpAddressIpv6 {
return -1
}
if diff := bytes.Compare(ifAddr1.Addr.Ipv4.Addr[:], ifAddr2.Addr.Ipv4.Addr[:]); diff != 0 {
return diff
}
case net.IpAddressIpv6:
if ifAddr2.Addr.Which() == net.IpAddressIpv4 {
return 1
}
if diff := bytes.Compare(ifAddr1.Addr.Ipv6.Addr[:], ifAddr2.Addr.Ipv6.Addr[:]); diff != 0 {
return diff
}
}
if ifAddr1.PrefixLen < ifAddr2.PrefixLen {
return -1
} else if ifAddr1.PrefixLen > ifAddr2.PrefixLen {
return 1
}
return 0
}
func (wi *interfaceWatcherImpl) Watch(ctx fidl.Context) (interfaces.Event, error) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.mu.isHanging {
wi.cancelServe()
return interfaces.Event{}, errors.New("not allowed to call Watcher.Watch when a call is already pending")
}
for {
if len(wi.mu.queue) > 0 {
event := wi.mu.queue[0]
wi.mu.queue = wi.mu.queue[1:]
return event, nil
}
wi.mu.isHanging = true
wi.mu.Unlock()
var err error
select {
case <-wi.ready:
case <-ctx.Done():
err = fmt.Errorf("cancelled: %w", ctx.Err())
}
wi.mu.Lock()
wi.mu.isHanging = false
if err != nil {
return interfaces.Event{}, err
}
}
}
var _ interfaces.StateWithCtx = (*interfaceStateImpl)(nil)
type interfaceStateImpl struct {
watcherChan chan<- interfaces.WatcherWithCtxInterfaceRequest
}
func (si *interfaceStateImpl) GetWatcher(_ fidl.Context, _ interfaces.WatcherOptions, watcher interfaces.WatcherWithCtxInterfaceRequest) error {
si.watcherChan <- watcher
return nil
}
type interfaceEvent interface {
isInterfaceEvent()
}
type interfaceAdded interfaces.Properties
var _ interfaceEvent = (*interfaceAdded)(nil)
func (interfaceAdded) isInterfaceEvent() {}
type interfaceRemoved tcpip.NICID
var _ interfaceEvent = (*interfaceRemoved)(nil)
func (interfaceRemoved) isInterfaceEvent() {}
type onlineChanged struct {
nicid tcpip.NICID
online bool
}
var _ interfaceEvent = (*onlineChanged)(nil)
func (onlineChanged) isInterfaceEvent() {}
type defaultRouteChanged struct {
nicid tcpip.NICID
hasDefaultIPv4Route *bool
hasDefaultIPv6Route *bool
}
var _ interfaceEvent = (*defaultRouteChanged)(nil)
func (defaultRouteChanged) isInterfaceEvent() {}
type addressAdded struct {
nicid tcpip.NICID
subnet net.Subnet
// TODO(https://fxbug.dev/97731): Remove this once address assignment
// state is tracked.
//
// If true, receiver should panic if this event contains an address that
// already exists.
strict bool
}
var _ interfaceEvent = (*addressAdded)(nil)
func (addressAdded) isInterfaceEvent() {}
type addressRemoved struct {
nicid tcpip.NICID
subnet net.Subnet
// TODO(https://fxbug.dev/97731): Remove this once address assignment
// state is tracked.
//
// If true, receiver should panic if this event contains an address that
// doesn't exist.
strict bool
}
var _ interfaceEvent = (*addressRemoved)(nil)
func (addressRemoved) isInterfaceEvent() {}
type validUntilChanged struct {
nicid tcpip.NICID
subnet net.Subnet
validUntil time.Time
}
var _ interfaceEvent = (*validUntilChanged)(nil)
func (validUntilChanged) isInterfaceEvent() {}
func interfaceWatcherEventLoop(eventChan <-chan interfaceEvent, watcherChan <-chan interfaces.WatcherWithCtxInterfaceRequest) {
watchers := make(map[*interfaceWatcherImpl]struct{})
propertiesMap := make(map[tcpip.NICID]interfaces.Properties)
watcherClosedChan := make(chan *interfaceWatcherImpl)
for {
select {
case e := <-eventChan:
switch event := e.(type) {
case interfaceAdded:
added := interfaces.Properties(event)
if !added.HasId() {
panic(fmt.Sprintf("interface added event with no ID: %#v", event))
}
nicid := tcpip.NICID(added.GetId())
if properties, ok := propertiesMap[nicid]; ok {
panic(fmt.Sprintf("interface %#v already exists but duplicate added event received: %#v", properties, event))
}
propertiesMap[nicid] = added
for w := range watchers {
w.onEvent(interfaces.EventWithAdded(added))
}
case interfaceRemoved:
removed := tcpip.NICID(event)
if _, ok := propertiesMap[removed]; !ok {
panic(fmt.Sprintf("unknown interface NIC=%d removed", removed))
continue
}
delete(propertiesMap, removed)
for w := range watchers {
w.onEvent(interfaces.EventWithRemoved(uint64(removed)))
}
case defaultRouteChanged:
properties, ok := propertiesMap[event.nicid]
// TODO(https://fxbug.dev/95468): Change to panic once interface properties
// are guaranteed to not change after an interface is removed.
if !ok {
_ = syslog.WarnTf(watcherProtocolName, "default route changed event for unknown interface: %#v", event)
break
}
// TODO(https://fxbug.dev/95574): Once these events are only emitted when
// the presence of a default route has actually changed, panic if the event
// disagrees with our view of the world.
var changes interfaces.Properties
if event.hasDefaultIPv4Route != nil && properties.GetHasDefaultIpv4Route() != *event.hasDefaultIPv4Route {
properties.SetHasDefaultIpv4Route(*event.hasDefaultIPv4Route)
changes.SetHasDefaultIpv4Route(*event.hasDefaultIPv4Route)
}
if event.hasDefaultIPv6Route != nil && properties.GetHasDefaultIpv6Route() != *event.hasDefaultIPv6Route {
properties.SetHasDefaultIpv6Route(*event.hasDefaultIPv6Route)
changes.SetHasDefaultIpv6Route(*event.hasDefaultIPv6Route)
}
if changes.HasHasDefaultIpv4Route() || changes.HasHasDefaultIpv6Route() {
propertiesMap[event.nicid] = properties
changes.SetId(uint64(event.nicid))
for w := range watchers {
w.onEvent(interfaces.EventWithChanged(changes))
}
}
case onlineChanged:
properties, ok := propertiesMap[event.nicid]
// TODO(https://fxbug.dev/95468): Change to panic once interface properties
// are guaranteed to not change after an interface is removed.
if !ok {
_ = syslog.WarnTf(watcherProtocolName, "online changed event for unknown interface: %#v", event)
break
}
if event.online == properties.GetOnline() {
// This assertion is possible because the event is always emitted under a
// lock (so cannot race against itself), and the event is only emitted when
// there is an actual change to the boolean value.
panic(fmt.Sprintf("online changed event for interface with properties %#v with no actual change", properties))
}
properties.SetOnline(event.online)
propertiesMap[event.nicid] = properties
var changes interfaces.Properties
changes.SetId(uint64(event.nicid))
changes.SetOnline(event.online)
for w := range watchers {
w.onEvent(interfaces.EventWithChanged(changes))
}
case addressAdded:
properties, ok := propertiesMap[event.nicid]
// TODO(https://fxbug.dev/95468): Change to panic once interface properties
// are guaranteed to not change after an interface is removed.
if !ok {
_ = syslog.WarnTf(watcherProtocolName, "address added event for unknown interface: %#v", event)
break
}
addresses := properties.GetAddresses()
// Addresses are sorted by subnet.
i := sort.Search(len(addresses), func(i int) bool {
return cmpSubnet(event.subnet, addresses[i].GetAddr()) <= 0
})
if i < len(addresses) && cmpSubnet(event.subnet, addresses[i].GetAddr()) == 0 {
// TODO(https://fxbug.dev/97731): Panic if we receive duplicate DAD success
// within the same link once address assignment state is tracked.
if event.strict {
panic(fmt.Sprintf("duplicate address added event: %#v", event))
} else {
_ = syslog.WarnTf(watcherProtocolName, "address added event for already-assigned address: %#v", event)
break
}
}
addresses = append(addresses, interfaces.Address{})
copy(addresses[i+1:], addresses[i:])
newAddr := &addresses[i]
newAddr.SetAddr(event.subnet)
newAddr.SetValidUntil(int64(zx.TimensecInfinite))
properties.SetAddresses(addresses)
propertiesMap[event.nicid] = properties
var changes interfaces.Properties
changes.SetId(uint64(event.nicid))
changes.SetAddresses(append([]interfaces.Address(nil), addresses...))
for w := range watchers {
w.onEvent(interfaces.EventWithChanged(changes))
}
case addressRemoved:
properties, ok := propertiesMap[event.nicid]
// TODO(https://fxbug.dev/95468): Change to panic once interface properties
// are guaranteed to not change after an interface is removed.
if !ok {
_ = syslog.WarnTf(watcherProtocolName, "address removed event for unknown interface: %#v", event)
break
}
addresses := properties.GetAddresses()
// Addresses are sorted by subnet.
i := sort.Search(len(addresses), func(i int) bool {
return cmpSubnet(event.subnet, addresses[i].GetAddr()) <= 0
})
if i == len(addresses) || cmpSubnet(event.subnet, addresses[i].GetAddr()) != 0 {
// TODO(https://fxbug.dev/97731): Panic when the address being removed
// isn't assigned or tentative when `event.strict` is true.
_ = syslog.WarnTf(watcherProtocolName, "address removed event for non-existent address: %#v", event)
break
}
addresses = append(addresses[:i], addresses[i+1:]...)
properties.SetAddresses(addresses)
propertiesMap[event.nicid] = properties
var changes interfaces.Properties
changes.SetId(uint64(event.nicid))
changes.SetAddresses(append([]interfaces.Address(nil), addresses...))
for w := range watchers {
w.onEvent(interfaces.EventWithChanged(changes))
}
case validUntilChanged:
properties, ok := propertiesMap[event.nicid]
// TODO(https://fxbug.dev/95468): Change to panic once interface properties
// are guaranteed to not change after an interface is removed.
if !ok {
_ = syslog.WarnTf(watcherProtocolName, "address validUntil changed event for unknown interface: %#v", event)
break
}
addresses := properties.GetAddresses()
// Addresses are sorted by subnet.
i := sort.Search(len(addresses), func(i int) bool {
return cmpSubnet(event.subnet, addresses[i].GetAddr()) <= 0
})
if i == len(addresses) || cmpSubnet(event.subnet, addresses[i].GetAddr()) != 0 {
// TODO(https://fxbug.dev/96130): Change this to panic once DHCPv4 client
// is guaranteed to not send this event if the address is missing.
_ = syslog.ErrorTf(watcherProtocolName, "validUntil changed event for non-existent address: %#v", event)
break
}
if time.Monotonic(addresses[i].GetValidUntil()) != event.validUntil {
addresses[i].SetValidUntil(event.validUntil.MonotonicNano())
var changes interfaces.Properties
changes.SetId(uint64(event.nicid))
changes.SetAddresses(append([]interfaces.Address(nil), addresses...))
for w := range watchers {
w.onEvent(interfaces.EventWithChanged(changes))
}
}
}
case watcher := <-watcherChan:
ctx, cancel := context.WithCancel(context.Background())
impl := interfaceWatcherImpl{
ready: make(chan struct{}, 1),
cancelServe: cancel,
}
impl.mu.queue = make([]interfaces.Event, 0, maxInterfaceWatcherQueueLen)
for _, properties := range propertiesMap {
// Must make a deep copy of the addresses so that updates to the slice
// don't accidentally change the event added to the queue.
properties.SetAddresses(append([]interfaces.Address(nil), properties.GetAddresses()...))
impl.mu.queue = append(impl.mu.queue, interfaces.EventWithExisting(properties))
}
impl.mu.queue = append(impl.mu.queue, interfaces.EventWithIdle(interfaces.Empty{}))
watchers[&impl] = struct{}{}
go func() {
component.Serve(ctx, &interfaces.WatcherWithCtxStub{Impl: &impl}, watcher.Channel, component.ServeOptions{
Concurrent: true,
OnError: func(err error) {
_ = syslog.WarnTf(watcherProtocolName, "%s", err)
},
})
watcherClosedChan <- &impl
}()
case watcherClosed := <-watcherClosedChan:
delete(watchers, watcherClosed)
}
}
}