| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // +build fuchsia |
| |
| // Package zxwait implements a Zircon port waiter compatible with goroutines. |
| // |
| // The function Wait can be used to wait on a handle without consuming |
| // an OS thread or resorting to event-driven programming. |
| package zxwait |
| |
| import ( |
| "sync" |
| "sync/atomic" |
| "syscall/zx" |
| |
| _ "unsafe" // for go:linkname |
| ) |
| |
| // Wait waits for signals on handle. |
| // |
| // The goroutine that calls Wait is parked until a signal is observed or the |
| // handle is closed. No OS thread is tied up while Wait is blocked. |
| // |
| // Semantically it is equivalent to calling the WaitOne method on a zx.Handle. |
| // However it is not implemented with zx_object_wait_one, instead it uses |
| // zx_object_wait_async and a port to wait for signals. |
| func Wait(handle zx.Handle, signals zx.Signals, timeout zx.Time) (observed zx.Signals, err error) { |
| // TODO: support finite timeouts. |
| if timeout != zx.TimensecInfinite { |
| if status := zx.Sys_object_wait_one(handle, signals, timeout, &observed); status != zx.ErrOk { |
| return observed, zx.Error{Status: status, Text: "zxwait.Wait"} |
| } |
| return observed, nil |
| } |
| sysWaiterOnce.Do(sysWaiterInit) |
| return sysWaiter.Wait(handle, signals) |
| } |
| |
| //go:linkname gopark runtime.gopark |
| func gopark(unlockf func(g uintptr, waiting *waitingG) bool, waitng *waitingG, reason string, traceEv byte, traceskip int) |
| |
| //go:linkname goready runtime.goready |
| func goready(g uintptr, traceskip int) |
| |
| // waitingG is used to track a parked g waiting on a singal. |
| // |
| // A waitingG is represented by a unique key, which is also |
| // its positional index in the waiter's all slice. This key |
| // is not a GCed pointer, so it can be safely passed to the |
| // kernel and returned at a later time. |
| type waitingG struct { |
| key uint64 // index into all, stable identifier passed to kernel |
| g uintptr |
| obs uint32 // zx.Signals |
| ready uintptr // set when g is set by unlockf or obs set by dequeue |
| } |
| |
| // A waiter is a zircon port that parks goroutines waiting on signals. |
| // |
| // Currently there is only one instance of waiter per process, stored |
| // in sysWaiter. It is however a self-contained object and multiple |
| // of them can safely exist concurrently. |
| type waiter struct { |
| port zx.Port |
| |
| mu sync.Mutex |
| free []*waitingG |
| all []*waitingG |
| } |
| |
| func newWaiter() *waiter { |
| port, err := zx.NewPort(0) |
| if err != nil { |
| panic(err) // misuse of system call, no useful recovery |
| } |
| w := &waiter{ |
| port: port, |
| } |
| go w.dequeue() |
| return w |
| } |
| |
| var sysWaiterOnce sync.Once |
| var sysWaiter *waiter |
| |
| func sysWaiterInit() { |
| sysWaiter = newWaiter() |
| } |
| |
| // dequeue is a dedicated goroutine to waiting on the waiter's port. |
| func (w *waiter) dequeue() { |
| var pkt zx.Packet |
| for { |
| err := w.port.Wait(&pkt, zx.TimensecInfinite) |
| if err != nil { |
| panic(err) |
| } |
| |
| w.mu.Lock() |
| waiting := w.all[pkt.Hdr.Key] |
| w.mu.Unlock() |
| |
| obs := pkt.Signal().Observed |
| |
| atomic.StoreUint32(&waiting.obs, uint32(obs)) |
| if atomic.CompareAndSwapUintptr(&waiting.ready, 0, 1) { |
| // We beat unlockf, goroutine never parked. |
| continue |
| } |
| // We were beaten by unlockf, so waiting.g has a real value. |
| g := atomic.LoadUintptr(&waiting.g) |
| goready(g, 0) |
| } |
| } |
| |
| // Wait waits for signals on handle. |
| // |
| // See the package function Wait for more commentary. |
| func (w *waiter) Wait(handle zx.Handle, signals zx.Signals) (observed zx.Signals, err error) { |
| var waiting *waitingG |
| |
| w.mu.Lock() |
| if len(w.free) == 0 { |
| waiting = &waitingG{ |
| key: uint64(len(w.all)), |
| } |
| w.all = append(w.all, waiting) |
| } else { |
| waiting = w.free[len(w.free)-1] |
| w.free = w.free[:len(w.free)-1] |
| } |
| w.mu.Unlock() |
| |
| var pkt zx.Packet |
| pkt.Hdr.Key = waiting.key |
| pkt.Signal().Trigger = signals |
| |
| const ZX_WAIT_ASYNC_ONCE = 0 |
| if err = w.port.WaitAsync(handle, pkt.Hdr.Key, signals, ZX_WAIT_ASYNC_ONCE); err != nil { |
| return 0, err |
| } |
| |
| const traceEv = 24 |
| gopark(w.unlockf, waiting, "zxwait", traceEv, 0) |
| |
| obs := atomic.LoadUint32(&waiting.obs) |
| |
| *waiting = waitingG{key: waiting.key} // clear waitingG for reuse |
| |
| w.mu.Lock() |
| w.free = append(w.free, waiting) |
| w.mu.Unlock() |
| |
| return zx.Signals(obs), nil |
| } |
| |
| // unlockf is passed as a callback to gopark. |
| // |
| // Reporting true will park the goroutine until goready is called. |
| // |
| // Reporting false will immediately start running the goroutine |
| // again, and nothing else should call goready. |
| // |
| // This method is called without a 'g', so it can do very little. |
| // Avoid the stack. Don't use anything with 'g'-based runtime |
| // support (like sync.Mutex). Do as little as possible. |
| func (w *waiter) unlockf(g uintptr, waiting *waitingG) bool { |
| atomic.StoreUintptr(&waiting.g, g) |
| |
| // If we can set ready, then this executed before dequeue |
| // and the goroutine will be parked. |
| // |
| // If we cannot set it, then dequeue has already run and |
| // waiting.obs is set, so do not park the goroutine. |
| return atomic.CompareAndSwapUintptr(&waiting.ready, 0, 1) |
| } |