blob: f96b9411fd0f50f5d4848b6fc4513cc7d4c55bec [file] [log] [blame]
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build fuchsia
// Package zxwait implements a Zircon port waiter compatible with goroutines.
//
// The function Wait can be used to wait on a handle without consuming
// an OS thread or resorting to event-driven programming.
package zxwait
import (
"sync"
"sync/atomic"
"syscall/zx"
_ "unsafe" // for go:linkname
)
// Wait waits for signals on handle.
//
// The goroutine that calls Wait is parked until a signal is observed or the
// handle is closed. No OS thread is tied up while Wait is blocked.
//
// Semantically it is equivalent to calling the WaitOne method on a zx.Handle.
// However it is not implemented with zx_object_wait_one, instead it uses
// zx_object_wait_async and a port to wait for signals.
func Wait(handle zx.Handle, signals zx.Signals, timeout zx.Time) (observed zx.Signals, err error) {
// TODO: support finite timeouts.
if timeout != zx.TimensecInfinite {
if status := zx.Sys_object_wait_one(handle, signals, timeout, &observed); status != zx.ErrOk {
return observed, zx.Error{Status: status, Text: "zxwait.Wait"}
}
return observed, nil
}
sysWaiterOnce.Do(sysWaiterInit)
return sysWaiter.Wait(handle, signals)
}
//go:linkname gopark runtime.gopark
func gopark(unlockf func(g uintptr, waiting *waitingG) bool, waitng *waitingG, reason string, traceEv byte, traceskip int)
//go:linkname goready runtime.goready
func goready(g uintptr, traceskip int)
// waitingG is used to track a parked g waiting on a singal.
//
// A waitingG is represented by a unique key, which is also
// its positional index in the waiter's all slice. This key
// is not a GCed pointer, so it can be safely passed to the
// kernel and returned at a later time.
type waitingG struct {
key uint64 // index into all, stable identifier passed to kernel
g uintptr
obs uint32 // zx.Signals
ready uintptr // set when g is set by unlockf or obs set by dequeue
}
// A waiter is a zircon port that parks goroutines waiting on signals.
//
// Currently there is only one instance of waiter per process, stored
// in sysWaiter. It is however a self-contained object and multiple
// of them can safely exist concurrently.
type waiter struct {
port zx.Port
mu sync.Mutex
free []*waitingG
all []*waitingG
}
func newWaiter() *waiter {
port, err := zx.NewPort(0)
if err != nil {
panic(err) // misuse of system call, no useful recovery
}
w := &waiter{
port: port,
}
go w.dequeue()
return w
}
var sysWaiterOnce sync.Once
var sysWaiter *waiter
func sysWaiterInit() {
sysWaiter = newWaiter()
}
// dequeue is a dedicated goroutine to waiting on the waiter's port.
func (w *waiter) dequeue() {
var pkt zx.Packet
for {
err := w.port.Wait(&pkt, zx.TimensecInfinite)
if err != nil {
panic(err)
}
w.mu.Lock()
waiting := w.all[pkt.Hdr.Key]
w.mu.Unlock()
obs := pkt.Signal().Observed
atomic.StoreUint32(&waiting.obs, uint32(obs))
if atomic.CompareAndSwapUintptr(&waiting.ready, 0, 1) {
// We beat unlockf, goroutine never parked.
continue
}
// We were beaten by unlockf, so waiting.g has a real value.
g := atomic.LoadUintptr(&waiting.g)
goready(g, 0)
}
}
// Wait waits for signals on handle.
//
// See the package function Wait for more commentary.
func (w *waiter) Wait(handle zx.Handle, signals zx.Signals) (observed zx.Signals, err error) {
var waiting *waitingG
w.mu.Lock()
if len(w.free) == 0 {
waiting = &waitingG{
key: uint64(len(w.all)),
}
w.all = append(w.all, waiting)
} else {
waiting = w.free[len(w.free)-1]
w.free = w.free[:len(w.free)-1]
}
w.mu.Unlock()
var pkt zx.Packet
pkt.Hdr.Key = waiting.key
pkt.Signal().Trigger = signals
const ZX_WAIT_ASYNC_ONCE = 0
if err = w.port.WaitAsync(handle, pkt.Hdr.Key, signals, ZX_WAIT_ASYNC_ONCE); err != nil {
return 0, err
}
const traceEv = 24
gopark(w.unlockf, waiting, "zxwait", traceEv, 0)
obs := atomic.LoadUint32(&waiting.obs)
*waiting = waitingG{key: waiting.key} // clear waitingG for reuse
w.mu.Lock()
w.free = append(w.free, waiting)
w.mu.Unlock()
return zx.Signals(obs), nil
}
// unlockf is passed as a callback to gopark.
//
// Reporting true will park the goroutine until goready is called.
//
// Reporting false will immediately start running the goroutine
// again, and nothing else should call goready.
//
// This method is called without a 'g', so it can do very little.
// Avoid the stack. Don't use anything with 'g'-based runtime
// support (like sync.Mutex). Do as little as possible.
func (w *waiter) unlockf(g uintptr, waiting *waitingG) bool {
atomic.StoreUintptr(&waiting.g, g)
// If we can set ready, then this executed before dequeue
// and the goroutine will be parked.
//
// If we cannot set it, then dequeue has already run and
// waiting.obs is set, so do not park the goroutine.
return atomic.CompareAndSwapUintptr(&waiting.ready, 0, 1)
}