blob: 13f52f9cb5e700a0411bf6dad0845aa1caa8fb2d [file] [log] [blame]
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Go's distribution tools attempt to compile everything; this file
// depends on types that don't compile in not-Fuchsia.
//go:build fuchsia
// +build fuchsia
// Package zxwait implements a Zircon port waiter compatible with goroutines.
//
// The function Wait can be used to wait on a handle without consuming
// an OS thread or resorting to event-driven programming.
package zxwait
import (
"strconv"
"sync"
"sync/atomic"
"syscall/zx"
"syscall/zx/internal/context"
_ "unsafe" // for go:linkname
)
// WaitContext waits for signals on handle.
func WaitContext(ctx context.Context, handle zx.Handle, signals zx.Signals) (zx.Signals, error) {
sysWaiterOnce.Do(sysWaiterInit)
return sysWaiter.Wait(ctx, handle, signals)
}
func WithRetryContext(ctx context.Context, fn func() error, handle zx.Handle, ready, closed zx.Signals) error {
signals := ready | closed
for {
err := fn()
if err, ok := err.(*zx.Error); ok && err.Status == zx.ErrShouldWait {
obs, err := WaitContext(
ctx,
handle,
signals,
)
if err != nil {
return err
}
if obs&ready != 0 {
continue
}
if obs&closed != 0 {
return &zx.Error{Status: zx.ErrPeerClosed}
}
panic("unexpected signal mask " + strconv.FormatUint(uint64(obs), 2) + " (expected " + strconv.FormatUint(uint64(signals), 2) + ")")
}
return err
}
}
//go:linkname gopark runtime.gopark
func gopark(unlockf func(g uintptr, waiting *waitingG) bool, waitng *waitingG, reason uint8, traceEv byte, traceskip int)
//go:linkname goready runtime.goready
func goready(g uintptr, traceskip int)
const preparingG = 1
// waitingG is used to track a parked g waiting for a signal.
//
// A waitingG is represented by a unique key, which is also its positional
// index in the waiter's all slice. This key is not a GCed pointer, so it can
// be safely passed to the kernel and returned at a later time.
type waitingG struct {
// key is an index into the waiter's all slice. It is never modified.
key uint64
g uintptr
done chan<- struct{}
obs zx.Signals
}
// A waiter is a zircon port that parks goroutines waiting on signals.
//
// Currently there is only one instance of waiter per process, stored
// in sysWaiter. It is however a self-contained object and multiple
// of them can safely exist concurrently.
type waiter struct {
port zx.Port
mu struct {
sync.Mutex
free []*waitingG
all []*waitingG
allByHandle map[zx.Handle]map[*waitingG]struct{}
}
}
func init() {
port, err := zx.NewPort(0)
if err != nil {
panic(err) // misuse of system call, no useful recovery
}
sysWaiter.port = port
sysWaiter.mu.allByHandle = make(map[zx.Handle]map[*waitingG]struct{})
zx.SetZXWaitCloseFn(sysWaiter.close)
}
var sysWaiterOnce sync.Once
var sysWaiter waiter
func sysWaiterInit() {
go sysWaiter.dequeue()
}
// dequeue is a dedicated goroutine to waiting on the waiter's port.
func (w *waiter) dequeue() {
var pkt zx.Packet
for {
if err := w.port.Wait(&pkt, zx.TimensecInfinite); err != nil {
panic(err)
}
w.mu.Lock()
waiting := w.mu.all[pkt.Hdr.Key]
w.mu.Unlock()
switch pkt.Hdr.Type {
case zx.PortPacketTypeUser:
waiting.obs = zx.SignalHandleClosed
default:
waiting.obs = pkt.Signal().Observed
}
if done := waiting.done; done != nil {
close(done)
} else {
switch g := atomic.SwapUintptr(&waiting.g, 0); g {
case 0, preparingG:
default:
goready(g, 0)
}
}
}
}
func (w *waiter) close(handle zx.Handle, cb func(zx.Handle) error) error {
w.mu.Lock()
defer w.mu.Unlock()
for waiting := range w.mu.allByHandle[handle] {
switch status := zx.Sys_port_cancel(zx.Handle(w.port), handle, waiting.key); status {
case zx.ErrOk:
if err := w.port.Queue(&zx.Packet{
Hdr: zx.PacketHeader{
Key: waiting.key,
Type: zx.PortPacketTypeUser,
},
}); err != nil {
return err
}
case zx.ErrNotFound:
// Nobody is waiting, no need to notify.
default:
return &zx.Error{Status: status, Text: "zx.Port.Cancel"}
}
}
return cb(handle)
}
// Wait waits for signals on handle.
//
// See the package function Wait for more commentary.
func (w *waiter) Wait(ctx context.Context, handle zx.Handle, signals zx.Signals) (zx.Signals, error) {
var waiting *waitingG
w.mu.Lock()
if len(w.mu.free) == 0 {
waiting = &waitingG{
key: uint64(len(w.mu.all)),
}
w.mu.all = append(w.mu.all, waiting)
} else {
waiting = w.mu.free[len(w.mu.free)-1]
w.mu.free = w.mu.free[:len(w.mu.free)-1]
}
waiting.g = preparingG
m, ok := w.mu.allByHandle[handle]
if !ok {
m = make(map[*waitingG]struct{})
w.mu.allByHandle[handle] = m
}
var wait func() (zx.Signals, error)
if done := ctx.Done(); done != nil {
ch := make(chan struct{})
waiting.done = ch
wait = func() (zx.Signals, error) {
select {
case <-ch:
// Wait complete.
return waiting.obs, nil
case <-done:
// Context canceled.
switch status := zx.Sys_port_cancel(zx.Handle(w.port), handle, waiting.key); status {
case zx.ErrOk:
return 0, ctx.Err()
case zx.ErrNotFound:
// We lost the race.
<-ch
return waiting.obs, nil
default:
return 0, &zx.Error{Status: status, Text: "zx.Port.Cancel"}
}
}
}
} else {
waiting.done = nil
}
m[waiting] = struct{}{}
// waiting must be fully initialized before a wakeup in dequeue is possible -
// after the call to wait_async and when the mutex is not held.
err := w.port.WaitAsync(handle, waiting.key, signals, zx.PortWaitAsyncOnce)
w.mu.Unlock()
defer func() {
w.mu.Lock()
delete(m, waiting)
if len(m) == 0 {
delete(w.mu.allByHandle, handle)
}
w.mu.free = append(w.mu.free, waiting)
w.mu.Unlock()
}()
if err != nil {
return 0, err
}
if wait != nil {
return wait()
}
const waitReasonIOWait = 2
const traceEvGoBlockSelect = 24
gopark(w.unlockf, waiting, waitReasonIOWait, traceEvGoBlockSelect, 0)
obs := waiting.obs
return obs, func() error {
if obs == zx.SignalHandleClosed {
return &zx.Error{Status: zx.ErrCanceled, Text: "zxwait.Wait"}
}
return nil
}()
}
// unlockf is passed as a callback to gopark.
//
// Reporting true will park the goroutine until goready is called.
//
// Reporting false will immediately start running the goroutine
// again, and nothing else should call goready.
//
// This method is called without a 'g', so it can do very little.
// Avoid the stack. Don't use anything with 'g'-based runtime
// support (like sync.Mutex). Do as little as possible.
func (w *waiter) unlockf(g uintptr, waiting *waitingG) bool {
// If we can set ready, then this executed before dequeue
// and the goroutine will be parked.
//
// If we cannot set it, then dequeue has already run and
// waiting.obs is set, so do not park the goroutine.
return atomic.CompareAndSwapUintptr(&waiting.g, preparingG, g)
}