blob: afd7f8c4f2168c23f039cf74db85d2e826d3c1c2 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package netstack
import (
"context"
"errors"
"fmt"
"syscall/zx/fidl"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/sync"
"fidl/fuchsia/net/multicast/admin"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
const bufferCapacity = int(admin.MaxRoutingEvents)
type multicastEvent struct {
event admin.RoutingEvent
context stack.MulticastPacketContext
}
var _ stack.MulticastForwardingEventDispatcher = (*multicastEventDispatcher)(nil)
// TODO(https://fxbug.dev/42162431): Extract the common logic for implementing
// hanging gets.
type multicastEventDispatcher struct {
cancelServe context.CancelFunc
ready chan struct{}
mu struct {
sync.Mutex
isHanging bool
eventBuffer multicastEventBuffer
}
}
type multicastEventBuffer struct {
data []multicastEvent
index int
size int
numDroppedEvents uint64
}
func (m *multicastEventBuffer) isFull() bool {
return m.size == bufferCapacity
}
func (m *multicastEventBuffer) isEmpty() bool {
return m.size == 0
}
func (m *multicastEventBuffer) enqueue(event multicastEvent) {
if m.isFull() {
m.data[m.index] = event
m.index = m.physicalIndex(1)
m.numDroppedEvents++
} else {
m.size++
m.data[m.physicalIndex(m.size-1)] = event
}
}
func (m *multicastEventBuffer) dequeue() (multicastEvent, uint64, bool) {
if m.isEmpty() {
return multicastEvent{}, 0, false
}
val := m.data[m.index]
m.index = m.physicalIndex(1)
m.size--
droppedEvents := m.numDroppedEvents
m.numDroppedEvents = 0
return val, droppedEvents, true
}
func (m *multicastEventBuffer) physicalIndex(pos int) int {
return (m.index + pos) % bufferCapacity
}
func newMulticastEventDispatcher(cancelServe context.CancelFunc) *multicastEventDispatcher {
dispatcher := &multicastEventDispatcher{
cancelServe: cancelServe,
ready: make(chan struct{}, 1),
}
dispatcher.mu.eventBuffer = multicastEventBuffer{data: make([]multicastEvent, bufferCapacity)}
return dispatcher
}
func (m *multicastEventDispatcher) onEvent(event multicastEvent) {
m.mu.Lock()
m.mu.eventBuffer.enqueue(event)
isHanging := m.mu.isHanging
m.mu.Unlock()
if isHanging {
select {
case m.ready <- struct{}{}:
default:
}
}
}
// nextMulticastEvent returns the next queued multicast event along with the
// number of events that were dropped immediately before it.
//
// If no events are ready, then blocks until one is ready. If this method is
// called while another invocation of the method is blocking, then an error is
// returned and the onClose callback is invoked. Additionally, returns an error
// if the context is cancelled.
func (m *multicastEventDispatcher) nextMulticastEvent(ctx fidl.Context, onClose func()) (multicastEvent, uint64, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.mu.isHanging {
onClose()
m.cancelServe()
return multicastEvent{}, 0, errors.New("request to hanging nextMulticastEvent already in-flight")
}
for {
if event, numDroppedEvents, ok := m.mu.eventBuffer.dequeue(); ok {
return event, numDroppedEvents, nil
}
m.mu.isHanging = true
m.mu.Unlock()
var err error
select {
case <-m.ready:
case <-ctx.Done():
err = fmt.Errorf("context cancelled during hanging get: %w", ctx.Err())
}
m.mu.Lock()
m.mu.isHanging = false
if err != nil {
return multicastEvent{}, 0, err
}
}
}
func (m *multicastEventDispatcher) OnMissingRoute(context stack.MulticastPacketContext) {
event := multicastEvent{
event: admin.RoutingEventWithMissingRoute(admin.Empty{}),
context: context,
}
m.onEvent(event)
}
func (m *multicastEventDispatcher) OnUnexpectedInputInterface(context stack.MulticastPacketContext, expectedInputInterface tcpip.NICID) {
var wrongInputInterface admin.WrongInputInterface
wrongInputInterface.SetExpectedInputInterface(uint64(expectedInputInterface))
event := multicastEvent{
event: admin.RoutingEventWithWrongInputInterface(wrongInputInterface),
context: context,
}
m.onEvent(event)
}