blob: f55b2cc286eb667f0d9276fab48e1882832b04c2 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package netstack
import (
"context"
"errors"
"fmt"
"syscall/zx/fidl"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/sync"
"fidl/fuchsia/net/multicast/admin"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/stack"
tcpipstack "gvisor.dev/gvisor/pkg/tcpip/stack"
)
const bufferCapacity = int(admin.MaxRoutingEvents)
type multicastEvent struct {
event admin.RoutingEvent
context stack.MulticastPacketContext
}
var _ stack.MulticastForwardingEventDispatcher = (*multicastEventDispatcher)(nil)
// TODO(https://fxbug.dev/42162431): Extract the common logic for implementing
// hanging gets.
type multicastEventDispatcher struct {
cancelServe context.CancelFunc
ready chan struct{}
mu struct {
sync.Mutex
isHanging bool
eventBuffer multicastEventBuffer
}
}
type multicastEventBuffer struct {
data []multicastEvent
index int
size int
numDroppedEvents uint64
}
func (m *multicastEventBuffer) isFull() bool {
return m.size == bufferCapacity
}
func (m *multicastEventBuffer) isEmpty() bool {
return m.size == 0
}
func (m *multicastEventBuffer) enqueue(event multicastEvent) {
if m.isFull() {
m.data[m.index] = event
m.index = m.physicalIndex(1)
m.numDroppedEvents++
} else {
m.size++
m.data[m.physicalIndex(m.size-1)] = event
}
}
func (m *multicastEventBuffer) dequeue() (multicastEvent, uint64, bool) {
if m.isEmpty() {
return multicastEvent{}, 0, false
}
val := m.data[m.index]
m.index = m.physicalIndex(1)
m.size--
droppedEvents := m.numDroppedEvents
m.numDroppedEvents = 0
return val, droppedEvents, true
}
func (m *multicastEventBuffer) physicalIndex(pos int) int {
return (m.index + pos) % bufferCapacity
}
func newMulticastEventDispatcher(cancelServe context.CancelFunc) *multicastEventDispatcher {
dispatcher := &multicastEventDispatcher{
cancelServe: cancelServe,
ready: make(chan struct{}, 1),
}
dispatcher.mu.eventBuffer = multicastEventBuffer{data: make([]multicastEvent, bufferCapacity)}
return dispatcher
}
func (m *multicastEventDispatcher) onEvent(event multicastEvent) {
m.mu.Lock()
m.mu.eventBuffer.enqueue(event)
isHanging := m.mu.isHanging
m.mu.Unlock()
if isHanging {
select {
case m.ready <- struct{}{}:
default:
}
}
}
// nextMulticastEvent returns the next queued multicast event along with the
// number of events that were dropped immediately before it.
//
// If no events are ready, then blocks until one is ready. If this method is
// called while another invocation of the method is blocking, then an error is
// returned and the onClose callback is invoked. Additionally, returns an error
// if the context is cancelled.
func (m *multicastEventDispatcher) nextMulticastEvent(ctx fidl.Context, onClose func()) (multicastEvent, uint64, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.mu.isHanging {
onClose()
m.cancelServe()
return multicastEvent{}, 0, errors.New("request to hanging nextMulticastEvent already in-flight")
}
for {
if event, numDroppedEvents, ok := m.mu.eventBuffer.dequeue(); ok {
return event, numDroppedEvents, nil
}
m.mu.isHanging = true
m.mu.Unlock()
var err error
select {
case <-m.ready:
case <-ctx.Done():
err = fmt.Errorf("context cancelled during hanging get: %w", ctx.Err())
}
m.mu.Lock()
m.mu.isHanging = false
if err != nil {
return multicastEvent{}, 0, err
}
}
}
func (m *multicastEventDispatcher) OnMissingRoute(context stack.MulticastPacketContext) {
event := multicastEvent{
event: admin.RoutingEventWithMissingRoute(admin.Empty{}),
context: context,
}
m.onEvent(event)
}
func (m *multicastEventDispatcher) OnUnexpectedInputInterface(context stack.MulticastPacketContext, expectedInputInterface tcpip.NICID) {
var wrongInputInterface admin.WrongInputInterface
wrongInputInterface.SetExpectedInputInterface(uint64(expectedInputInterface))
event := multicastEvent{
event: admin.RoutingEventWithWrongInputInterface(wrongInputInterface),
context: context,
}
m.onEvent(event)
}
// MulticastRouteHasDuplicateOutputs returns true if the given multicast route
// lists the same output interface multiple times.
func MulticastRouteHasDuplicateOutputs(m *tcpipstack.MulticastRoute) bool {
// NB: Search for duplicates by doing a naive n^2 comparison. This is
// expected to be more performant than other approaches (e.g.
// sort + dedup, or collecting into a hash map) given how small the vec
// is expected to be.
for i, outgoingInterface := range m.OutgoingInterfaces {
// NB: Only check the interfaces that occur after the current interface.
// The interfaces before this one were checked in previous iterations.
for _, otherOutgoingInterface := range m.OutgoingInterfaces[i+1:] {
if outgoingInterface.ID == otherOutgoingInterface.ID {
return true
}
}
}
return false
}