| // Copyright 2016 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // +build fuchsia |
| |
| // Package dispatcher provides a mechanism to dispatch callbacks to readable handles |
| package dispatcher |
| |
| import ( |
| "sync" |
| |
| "syscall/mx" |
| ) |
| |
| // A DispatcherCallback is called when the handle is readable or closed. |
| // Passing '0' to this callback as a handle indicates that the handle has been closed. |
| // |
| // A return value of err == nil causes another callback on the same handle. |
| // A return value of ErrNoWork causes the message draining to end. |
| // A return value of ErrDisconnectNoCallback disconnects the handler WITHOUT a callback. |
| // Any other error as a return value disconnects the handler WITH a final callback on an empty handle. |
| type DispatcherCallback func(h mx.Handle, callback interface{}, cookie int64) error |
| |
| type handler struct { |
| h mx.Handle |
| flags uint32 |
| callback interface{} |
| cookie int64 |
| } |
| |
| // handler flag options |
| const ( |
| // flagDisconnected identifies that a handler is no longer usable. |
| flagDisconnected = 1 |
| ) |
| |
| // Dispatcher can read messages from a handle and assign them to a callback. |
| type Dispatcher struct { |
| port *mx.Port |
| callback DispatcherCallback |
| |
| mu sync.Mutex |
| handlers map[uint64]*handler |
| nextHandlerID uint64 |
| } |
| |
| // Custom dispatcher errors |
| var ( |
| // If returned from dispatcher callback, indicates that the dispatcher should immediately |
| // disconnect with no additional callbacks. |
| ErrDisconnectNoCallback = mx.Error{Status: 1, Text: "mxio.Dispatcher"} |
| // If returned from dispatcher callback, indicates that there are no more messages to read. |
| ErrNoWork = mx.Error{Status: -9999, Text: "mxio.Dispatcher.NoWork"} |
| // If returned from dispatcher callback, indicates that the message was handed to another |
| // server. |
| ErrIndirect = mx.Error{Status: -9998, Text: "mxio.Dispatcher.Indirect"} |
| ) |
| |
| // New creates a new dispatcher. |
| func New(callback DispatcherCallback) (*Dispatcher, error) { |
| port, err := mx.NewPort(1) |
| if err != nil { |
| return nil, err |
| } |
| return &Dispatcher{ |
| port: port, |
| callback: callback, |
| handlers: make(map[uint64]*handler), |
| }, nil |
| } |
| |
| // Closes a dispatcher, shutting down all handles added to the dispatcher and preventing additional |
| // handles from being added. |
| func (d *Dispatcher) close() { |
| d.port.Close() |
| } |
| |
| func (d *Dispatcher) destroyHandler(hr *handler, handlerID uint64, doCloseCallback bool) { |
| if doCloseCallback { |
| d.callback(0, hr.callback, hr.cookie) |
| } |
| d.mu.Lock() |
| delete(d.handlers, handlerID) |
| d.mu.Unlock() |
| } |
| |
| // Unbind a handler from the dispatcher to prevent further messages from arriving. |
| func (d *Dispatcher) disconnectHandler(hr *handler, handlerID uint64, doCloseCallback bool) { |
| // close handle, so we get no further messages |
| hr.h.Close() |
| |
| // Send a "SignalPortSignaled" close message to shut down the handler |
| var signal mx.PacketSignal |
| signal.Hdr.Key = handlerID |
| if doCloseCallback { |
| signal.Observed = mx.SignalPortSignaled |
| } |
| d.port.Queue(&signal) |
| hr.flags |= flagDisconnected |
| } |
| |
| // Serve launches a dispatcher, which will continue serving 'forever'. |
| // The dispatcher waits on all handlers bound to it, and calls the callback of these handles as soon |
| // as any of them become readable or closed. |
| func (d *Dispatcher) Serve() { |
| for { |
| // Wait for any handler to be ready |
| var packet mx.Packet |
| err := d.port.Wait(&packet, mx.TimensecInfinite) |
| if err != nil { |
| d.close() |
| return |
| } |
| handlerID := packet.Hdr.Key |
| d.mu.Lock() |
| hr, ok := d.handlers[handlerID] |
| d.mu.Unlock() |
| if !ok { |
| // The handler is concurrently being closed by another thread |
| continue |
| } |
| if (hr.flags & flagDisconnected) != 0 { |
| // Do not process packets from disconnected handlers. |
| // Ignore all events until a "destroy" packet is received. |
| if packet.Hdr.Type == mx.PortPacketTypeUser { |
| d.destroyHandler(hr, handlerID, (packet.Signal().Observed&mx.SignalPortSignaled) != 0) |
| } |
| continue |
| } |
| // Read everything we can from this single handler |
| if (packet.Signal().Observed & mx.SignalChannelReadable) != 0 { |
| if err := d.callback(hr.h, hr.callback, hr.cookie); err != nil { |
| if err != ErrNoWork { |
| d.disconnectHandler(hr, handlerID, err != nil) |
| continue |
| } |
| } |
| // Requeue the handler |
| err := d.port.WaitAsync(hr.h, handlerID, mx.SignalChannelReadable|mx.SignalChannelPeerClosed, mx.PortWaitAsyncOnce) |
| if err != nil { |
| d.mu.Lock() |
| delete(d.handlers, handlerID) |
| d.mu.Unlock() |
| } |
| |
| } |
| // We haven't yet disconnected from the handler. Should we? |
| if (packet.Signal().Observed & mx.SignalChannelPeerClosed) != 0 { |
| d.disconnectHandler(hr, handlerID, true) |
| } |
| } |
| } |
| |
| // AddHandler adds a handle to the dispatcher, with an associated callback. The callback will be |
| // called by the dispatcher server, as soon as h becomes readable or closed. |
| func (d *Dispatcher) AddHandler(h mx.Handle, callback interface{}, cookie int64) error { |
| hr := &handler{ |
| h: h, |
| flags: 0, |
| callback: callback, |
| cookie: cookie, |
| } |
| |
| d.mu.Lock() |
| var handlerID uint64 |
| for { |
| // Find a handlerID which is not already in use |
| handlerID = d.nextHandlerID |
| d.nextHandlerID++ |
| if _, ok := d.handlers[handlerID]; !ok { |
| break |
| } |
| } |
| d.handlers[handlerID] = hr |
| d.mu.Unlock() |
| |
| err := d.port.WaitAsync(h, handlerID, mx.SignalChannelReadable|mx.SignalChannelPeerClosed, mx.PortWaitAsyncOnce) |
| if err != nil { |
| d.mu.Lock() |
| delete(d.handlers, handlerID) |
| d.mu.Unlock() |
| } |
| return err |
| } |