blob: df92757d8cb957b1398bccdd132d70f3ed0e825a [file] [log] [blame]
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build fuchsia
package dispatch
import (
"strconv"
"sync"
"syscall/zx"
)
// WaitResult represents a wait result that is returned by the callback.
// WaitResult determines whether the wait should be re-queued.
type WaitResult int
const (
WaitFinished WaitResult = iota
WaitAgain
)
// WaitID is a monotonically increasing ID which corresponds to a particular
// call to BeginWait. Note that it may become invalidated if a wait is dequeued.
type WaitID uint64
// WaitFlags represents the collection of certain options for waiting.
type WaitFlags uint32
const (
HandleShutdown WaitFlags = 1 << 0 // Calls the callback on shutdown.
)
// Handler is the callback that will be called when the wait is complete.
type Handler func(*Dispatcher, zx.Status, *zx.PacketSignal) WaitResult
// waitContext is a bookkeeping structure for in-progress waits.
type waitContext struct {
object zx.Handle
callback Handler
trigger zx.Signals
flags WaitFlags
}
// Dispatcher can read messages from a handle and assign them to a callback.
type Dispatcher struct {
// port is the underlying port used to wait on signals and dispatch.
port zx.Port
// objects is a map that manages wait contexts.
mu sync.RWMutex
objects map[WaitID]*waitContext
nextWaitID WaitID
// handling is a map that manages which wait IDs that are current being
// handled. It is also protected by mu.
handling map[WaitID]struct{}
// shutdown is whether the dispatcher has shut down.
shutdown bool
}
// New creates a new dispatcher.
func NewDispatcher() (*Dispatcher, error) {
port, err := zx.NewPort(0)
if err != nil {
return nil, err
}
return &Dispatcher{
port: port,
objects: make(map[WaitID]*waitContext),
handling: make(map[WaitID]struct{}),
}, nil
}
func assertWaitResult(result WaitResult, status zx.Status) {
if !(result == WaitFinished || (result == WaitAgain && status == zx.ErrOk)) {
panic("expected " + strconv.Itoa(int(result)) + " for status " + strconv.Itoa(int(status)))
}
}
// Closes a dispatcher, shutting down all handles added to the dispatcher.
func (d *Dispatcher) Close() {
d.mu.Lock()
defer d.mu.Unlock()
d.port.Close()
for _, obj := range d.objects {
if obj != nil && (obj.flags&HandleShutdown) != 0 {
// Ignore the result; we won't be waiting again.
result := obj.callback(d, zx.ErrCanceled, nil)
assertWaitResult(result, zx.ErrCanceled)
}
}
d.objects = make(map[WaitID]*waitContext)
d.shutdown = true
}
func (d *Dispatcher) getWaitID() WaitID {
_, ok := d.objects[d.nextWaitID]
for ok {
d.nextWaitID++
_, ok = d.objects[d.nextWaitID]
}
return d.nextWaitID
}
// BeginWait creates a new wait on the handle h for signals t. The new wait
// will respect the flags f, and call the handler c on wait completion.
func (d *Dispatcher) BeginWait(h zx.Handle, t zx.Signals, f WaitFlags, c Handler) (WaitID, error) {
d.mu.Lock()
defer d.mu.Unlock()
// If we've shut down, just notify that we're in a bad state.
if d.shutdown {
return 0, zx.Error{Status: zx.ErrBadState}
}
// Register the wait.
id := d.getWaitID()
d.objects[id] = &waitContext{
object: h,
callback: c,
trigger: t,
flags: f,
}
// Schedule the wait on the port.
err := d.port.WaitAsync(h, uint64(id), t, zx.PortWaitAsyncOnce)
// If we fail, make sure to de-register the wait.
if err != nil {
delete(d.objects, id)
}
return id, err
}
// CancelWait cancels the wait with the given WaitID.
func (d *Dispatcher) CancelWait(id WaitID) error {
d.mu.Lock()
defer d.mu.Unlock()
// Look up the wait context.
wc := d.objects[id]
if wc == nil {
return zx.Error{Status: zx.ErrNotFound}
}
// If we're currently handling it, it's not being waited on, so there's nothing
// to cancel.
_, ok := d.handling[id]
if ok {
return zx.Error{Status: zx.ErrNotFound}
}
// Deregister no matter what here. Due to stack semantics of defer, this will always
// execute before the lock is released.
defer delete(d.objects, id)
// Cancel the actual wait.
return d.port.Cancel(wc.object, uint64(id))
}
func (d *Dispatcher) dispatch(id WaitID, wc *waitContext, signals *zx.PacketSignal) {
// Deregister the handler before invoking it.
d.mu.Lock()
d.handling[id] = struct{}{}
d.mu.Unlock()
// Call the handler.
result := wc.callback(d, zx.ErrOk, signals)
assertWaitResult(result, zx.ErrOk)
switch result {
case WaitAgain:
// Re-arm the handler, as the handler requested. Use a fresh ID since there's a
// chance the old ID has already been re-used.
d.mu.Lock()
delete(d.handling, id)
err := d.port.WaitAsync(wc.object, uint64(id), wc.trigger, zx.PortWaitAsyncOnce)
d.mu.Unlock()
// If we fail to re-arm, notify the handler of what happened.
if err != nil {
zxErr := err.(zx.Error)
result = wc.callback(d, zxErr.Status, nil)
assertWaitResult(result, zxErr.Status)
}
case WaitFinished:
d.mu.Lock()
delete(d.handling, id)
delete(d.objects, id)
d.mu.Unlock()
}
}
// Serve runs indefinitely, waiting for the port to return a packet and dispatches
// the relevant handlers.
func (d *Dispatcher) Serve() {
for {
// Wait for any handler to be ready
var packet zx.Packet
err := d.port.Wait(&packet, zx.TimensecInfinite)
if err != nil {
d.Close()
return
}
id := WaitID(packet.Hdr.Key)
// Get the object for dispatching.
d.mu.RLock()
wc := d.objects[id]
d.mu.RUnlock()
// If handler was removed while this packet was in the queue,
// just go back to waiting.
if wc == nil {
continue
}
// Dispatch to the appropriate handler.
if packet.Hdr.Type == zx.PortPacketTypeSignalOne {
d.dispatch(id, wc, packet.Signal())
}
}
}