| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Go's distribution tools attempt to compile everything; this file |
| // depends on types that don't compile in not-Fuchsia. |
| //go:build fuchsia |
| // +build fuchsia |
| |
| // Package zxwait implements a Zircon port waiter compatible with goroutines. |
| // |
| // The function Wait can be used to wait on a handle without consuming |
| // an OS thread or resorting to event-driven programming. |
| package zxwait |
| |
| import ( |
| "strconv" |
| "sync" |
| "sync/atomic" |
| "syscall/zx" |
| "syscall/zx/internal/context" |
| _ "unsafe" // for go:linkname |
| ) |
| |
| // WaitContext waits for signals on handle. |
| func WaitContext(ctx context.Context, handle zx.Handle, signals zx.Signals) (zx.Signals, error) { |
| sysWaiterOnce.Do(sysWaiterInit) |
| return sysWaiter.Wait(ctx, handle, signals) |
| } |
| |
| func WithRetryContext(ctx context.Context, fn func() error, handle zx.Handle, ready, closed zx.Signals) error { |
| signals := ready | closed |
| for { |
| err := fn() |
| if err, ok := err.(*zx.Error); ok && err.Status == zx.ErrShouldWait { |
| obs, err := WaitContext( |
| ctx, |
| handle, |
| signals, |
| ) |
| if err != nil { |
| return err |
| } |
| if obs&ready != 0 { |
| continue |
| } |
| if obs&closed != 0 { |
| return &zx.Error{Status: zx.ErrPeerClosed} |
| } |
| panic("unexpected signal mask " + strconv.FormatUint(uint64(obs), 2) + " (expected " + strconv.FormatUint(uint64(signals), 2) + ")") |
| } |
| return err |
| } |
| } |
| |
| //go:linkname gopark runtime.gopark |
| func gopark(unlockf func(g uintptr, waiting *waitingG) bool, waitng *waitingG, reason uint8, traceEv byte, traceskip int) |
| |
| //go:linkname goready runtime.goready |
| func goready(g uintptr, traceskip int) |
| |
| const preparingG = 1 |
| |
| // waitingG is used to track a parked g waiting for a signal. |
| // |
| // A waitingG is represented by a unique key, which is also its positional |
| // index in the waiter's all slice. This key is not a GCed pointer, so it can |
| // be safely passed to the kernel and returned at a later time. |
| type waitingG struct { |
| // key is an index into the waiter's all slice. It is never modified. |
| key uint64 |
| |
| g uintptr |
| |
| done chan<- struct{} |
| obs zx.Signals |
| } |
| |
| // A waiter is a zircon port that parks goroutines waiting on signals. |
| // |
| // Currently there is only one instance of waiter per process, stored |
| // in sysWaiter. It is however a self-contained object and multiple |
| // of them can safely exist concurrently. |
| type waiter struct { |
| port zx.Port |
| |
| mu struct { |
| sync.Mutex |
| free []*waitingG |
| all []*waitingG |
| |
| allByHandle map[zx.Handle]map[*waitingG]struct{} |
| } |
| } |
| |
| func init() { |
| port, err := zx.NewPort(0) |
| if err != nil { |
| panic(err) // misuse of system call, no useful recovery |
| } |
| sysWaiter.port = port |
| sysWaiter.mu.allByHandle = make(map[zx.Handle]map[*waitingG]struct{}) |
| zx.SetZXWaitCloseFn(sysWaiter.close) |
| } |
| |
| var sysWaiterOnce sync.Once |
| var sysWaiter waiter |
| |
| func sysWaiterInit() { |
| go sysWaiter.dequeue() |
| } |
| |
| // dequeue is a dedicated goroutine to waiting on the waiter's port. |
| func (w *waiter) dequeue() { |
| var pkt zx.Packet |
| for { |
| if err := w.port.Wait(&pkt, zx.TimensecInfinite); err != nil { |
| panic(err) |
| } |
| |
| w.mu.Lock() |
| waiting := w.mu.all[pkt.Hdr.Key] |
| w.mu.Unlock() |
| |
| switch pkt.Hdr.Type { |
| case zx.PortPacketTypeUser: |
| waiting.obs = zx.SignalHandleClosed |
| default: |
| waiting.obs = pkt.Signal().Observed |
| } |
| |
| if done := waiting.done; done != nil { |
| close(done) |
| } else { |
| switch g := atomic.SwapUintptr(&waiting.g, 0); g { |
| case 0, preparingG: |
| default: |
| goready(g, 0) |
| } |
| } |
| } |
| } |
| |
| func (w *waiter) close(handle zx.Handle, cb func(zx.Handle) error) error { |
| w.mu.Lock() |
| defer w.mu.Unlock() |
| for waiting := range w.mu.allByHandle[handle] { |
| switch status := zx.Sys_port_cancel(zx.Handle(w.port), handle, waiting.key); status { |
| case zx.ErrOk: |
| if err := w.port.Queue(&zx.Packet{ |
| Hdr: zx.PacketHeader{ |
| Key: waiting.key, |
| Type: zx.PortPacketTypeUser, |
| }, |
| }); err != nil { |
| return err |
| } |
| case zx.ErrNotFound: |
| // Nobody is waiting, no need to notify. |
| default: |
| return &zx.Error{Status: status, Text: "zx.Port.Cancel"} |
| } |
| } |
| return cb(handle) |
| } |
| |
| // Wait waits for signals on handle. |
| // |
| // See the package function Wait for more commentary. |
| func (w *waiter) Wait(ctx context.Context, handle zx.Handle, signals zx.Signals) (zx.Signals, error) { |
| var waiting *waitingG |
| |
| w.mu.Lock() |
| if len(w.mu.free) == 0 { |
| waiting = &waitingG{ |
| key: uint64(len(w.mu.all)), |
| } |
| w.mu.all = append(w.mu.all, waiting) |
| } else { |
| waiting = w.mu.free[len(w.mu.free)-1] |
| w.mu.free = w.mu.free[:len(w.mu.free)-1] |
| } |
| waiting.g = preparingG |
| m, ok := w.mu.allByHandle[handle] |
| if !ok { |
| m = make(map[*waitingG]struct{}) |
| w.mu.allByHandle[handle] = m |
| } |
| var wait func() (zx.Signals, error) |
| if done := ctx.Done(); done != nil { |
| ch := make(chan struct{}) |
| waiting.done = ch |
| wait = func() (zx.Signals, error) { |
| select { |
| case <-ch: |
| // Wait complete. |
| return waiting.obs, nil |
| case <-done: |
| // Context canceled. |
| switch status := zx.Sys_port_cancel(zx.Handle(w.port), handle, waiting.key); status { |
| case zx.ErrOk: |
| return 0, ctx.Err() |
| case zx.ErrNotFound: |
| // We lost the race. |
| <-ch |
| return waiting.obs, nil |
| default: |
| return 0, &zx.Error{Status: status, Text: "zx.Port.Cancel"} |
| } |
| } |
| } |
| } else { |
| waiting.done = nil |
| } |
| m[waiting] = struct{}{} |
| // waiting must be fully initialized before a wakeup in dequeue is possible - |
| // after the call to wait_async and when the mutex is not held. |
| err := w.port.WaitAsync(handle, waiting.key, signals, zx.PortWaitAsyncOnce) |
| w.mu.Unlock() |
| |
| defer func() { |
| w.mu.Lock() |
| delete(m, waiting) |
| if len(m) == 0 { |
| delete(w.mu.allByHandle, handle) |
| } |
| w.mu.free = append(w.mu.free, waiting) |
| w.mu.Unlock() |
| }() |
| |
| if err != nil { |
| return 0, err |
| } |
| |
| if wait != nil { |
| return wait() |
| } |
| |
| const waitReasonIOWait = 2 |
| const traceEvGoBlockSelect = 24 |
| gopark(w.unlockf, waiting, waitReasonIOWait, traceEvGoBlockSelect, 0) |
| |
| obs := waiting.obs |
| |
| return obs, func() error { |
| if obs == zx.SignalHandleClosed { |
| return &zx.Error{Status: zx.ErrCanceled, Text: "zxwait.Wait"} |
| } |
| return nil |
| }() |
| } |
| |
| // unlockf is passed as a callback to gopark. |
| // |
| // Reporting true will park the goroutine until goready is called. |
| // |
| // Reporting false will immediately start running the goroutine |
| // again, and nothing else should call goready. |
| // |
| // This method is called without a 'g', so it can do very little. |
| // Avoid the stack. Don't use anything with 'g'-based runtime |
| // support (like sync.Mutex). Do as little as possible. |
| func (w *waiter) unlockf(g uintptr, waiting *waitingG) bool { |
| // If we can set ready, then this executed before dequeue |
| // and the goroutine will be parked. |
| // |
| // If we cannot set it, then dequeue has already run and |
| // waiting.obs is set, so do not park the goroutine. |
| return atomic.CompareAndSwapUintptr(&waiting.g, preparingG, g) |
| } |