syscall/mx: Upgrade ports (and dispatcher) to ports v2
Change-Id: Ibd8a9763def9c578d5400c6698e3eedc8f3e8d68
diff --git a/src/syscall/mx/handle.go b/src/syscall/mx/handle.go
index ef86834..c1fa228 100644
--- a/src/syscall/mx/handle.go
+++ b/src/syscall/mx/handle.go
@@ -236,6 +236,59 @@
return c0, c1, nil
}
+const (
+ PortWaitAsyncOnce = iota
+ PortWaitAsyncRepeating
+)
+
+const (
+ PortPacketTypeUser = iota
+ PortPacketTypeSignalOne
+ PortPacketTypeSignalRepeating
+)
+
+const (
+ PacketHeaderSize = uint(unsafe.Sizeof(PacketHeader{}))
+ PacketMaxSize = uint(unsafe.Sizeof(Packet{}))
+)
+
+type PacketHeader struct {
+ Key uint64
+ Type uint32
+ Extra uint32
+}
+
+// Packet describes the unit of communication via an MXIO protocol
+type Packet struct {
+ Hdr PacketHeader
+ Bytes [32]uint8
+}
+
+func (p *Packet) Signal() *PacketSignal {
+ return (*PacketSignal)(unsafe.Pointer(p))
+}
+
+type PortPacket interface {
+ Header() *PacketHeader
+ Length() int
+}
+
+// PacketSignal describes packets of type PortPacketTypeSignal
+type PacketSignal struct {
+ Hdr PacketHeader
+ Trigger Signals
+ Observed Signals
+ Count uint64
+}
+
+func (s *PacketSignal) Header() *PacketHeader {
+ return &s.Hdr
+}
+
+func (s *PacketSignal) Length() int {
+ return int(unsafe.Sizeof(*s))
+}
+
type Port struct {
Handle Handle
}
@@ -246,15 +299,18 @@
return err
}
-func (p *Port) Queue(packet []byte) error {
- if status := Sys_port_queue(p.Handle, unsafe.Pointer(&packet[0]), uint(len(packet))); status != ErrOk {
+// Queue a PortPacket, which may have a varying size.
+func (p *Port) Queue(packet PortPacket) error {
+ if status := Sys_port_queue(p.Handle, unsafe.Pointer(packet.Header()), uint(packet.Length())); status != ErrOk {
return Error{Status: status, Text: "mx.Port.Queue"}
}
return nil
}
-func (p *Port) Wait(packet []byte, deadline Time) error {
- if status := Sys_port_wait(p.Handle, deadline, unsafe.Pointer(&packet[0]), uint(len(packet))); status != ErrOk {
+// Wait for a Packet, which must be as large as the largest
+// possible packet that may be received.
+func (p *Port) Wait(packet *Packet, deadline Time) error {
+ if status := Sys_port_wait(p.Handle, deadline, unsafe.Pointer(packet), 0); status != ErrOk {
return Error{Status: status, Text: "mx.Port.Wait"}
}
return nil
@@ -267,6 +323,20 @@
return nil
}
+func (p *Port) Cancel(source Handle, key uint64) error {
+ if status := Sys_port_cancel(p.Handle, source, key); status != ErrOk {
+ return Error{Status: status, Text: "mx.Port.Cancel"}
+ }
+ return nil
+}
+
+func (p *Port) WaitAsync(handle Handle, key uint64, signals Signals, options uint32) error {
+ if status := Sys_object_wait_async(handle, p.Handle, key, signals, options); status != ErrOk {
+ return Error{Status: status, Text: "mxio.Port.WaitAsync"}
+ }
+ return nil
+}
+
func NewPort(options uint32) (*Port, error) {
p := new(Port)
if status := Sys_port_create(options, &p.Handle); status != ErrOk {
diff --git a/src/syscall/mx/mxio/dispatcher/dispatcher.go b/src/syscall/mx/mxio/dispatcher/dispatcher.go
index 3526ce1..f25be9d 100644
--- a/src/syscall/mx/mxio/dispatcher/dispatcher.go
+++ b/src/syscall/mx/mxio/dispatcher/dispatcher.go
@@ -11,7 +11,6 @@
"sync"
"syscall/mx"
- "syscall/mx/mxio"
)
// A DispatcherCallback is called when the handle is readable or closed.
@@ -38,7 +37,7 @@
// Dispatcher can read messages from a handle and assign them to a callback.
type Dispatcher struct {
- ioport mx.Handle
+ port *mx.Port
callback DispatcherCallback
mu sync.Mutex
@@ -60,12 +59,12 @@
// New creates a new dispatcher.
func New(callback DispatcherCallback) (*Dispatcher, error) {
- ioport, err := mxio.IoPortCreate(0)
+ port, err := mx.NewPort(1)
if err != nil {
return nil, err
}
return &Dispatcher{
- ioport: ioport,
+ port: port,
callback: callback,
handlers: make(map[uint64]*handler),
}, nil
@@ -74,7 +73,7 @@
// Closes a dispatcher, shutting down all handles added to the dispatcher and preventing additional
// handles from being added.
func (d *Dispatcher) close() {
- d.ioport.Close()
+ d.port.Close()
}
func (d *Dispatcher) destroyHandler(hr *handler, handlerID uint64, doCloseCallback bool) {
@@ -92,12 +91,12 @@
hr.h.Close()
// Send a "SignalPortSignaled" close message to shut down the handler
- var packet mxio.Packet
- packet.Hdr.Key = handlerID
+ var signal mx.PacketSignal
+ signal.Hdr.Key = handlerID
if doCloseCallback {
- packet.Signals = mx.SignalPortSignaled
+ signal.Observed = mx.SignalPortSignaled
}
- mxio.IoPortQueue(d.ioport, &packet, mxio.PacketSize)
+ d.port.Queue(&signal)
hr.flags |= flagDisconnected
}
@@ -107,7 +106,8 @@
func (d *Dispatcher) Serve() {
for {
// Wait for any handler to be ready
- packet, err := mxio.IoPortWait(d.ioport, mx.TimensecInfinite, mxio.PacketSize)
+ var packet mx.Packet
+ err := d.port.Wait(&packet, mx.TimensecInfinite)
if err != nil {
d.close()
return
@@ -123,22 +123,30 @@
if (hr.flags & flagDisconnected) != 0 {
// Do not process packets from disconnected handlers.
// Ignore all events until a "destroy" packet is received.
- if packet.Hdr.PacketType == mxio.PortPacketTypeUser {
- d.destroyHandler(hr, handlerID, (packet.Signals&mx.SignalPortSignaled) != 0)
+ if packet.Hdr.Type == mx.PortPacketTypeUser {
+ d.destroyHandler(hr, handlerID, (packet.Signal().Observed&mx.SignalPortSignaled) != 0)
}
continue
}
// Read everything we can from this single handler
- if (packet.Signals & mx.SignalChannelReadable) != 0 {
+ if (packet.Signal().Observed & mx.SignalChannelReadable) != 0 {
if err := d.callback(hr.h, hr.callback, hr.cookie); err != nil {
if err != ErrNoWork {
d.disconnectHandler(hr, handlerID, err != nil)
continue
}
}
+ // Requeue the handler
+ err := d.port.WaitAsync(hr.h, handlerID, mx.SignalChannelReadable|mx.SignalChannelPeerClosed, mx.PortWaitAsyncOnce)
+ if err != nil {
+ d.mu.Lock()
+ delete(d.handlers, handlerID)
+ d.mu.Unlock()
+ }
+
}
// We haven't yet disconnected from the handler. Should we?
- if (packet.Signals & mx.SignalChannelPeerClosed) != 0 {
+ if (packet.Signal().Observed & mx.SignalChannelPeerClosed) != 0 {
d.disconnectHandler(hr, handlerID, true)
}
}
@@ -167,7 +175,7 @@
d.handlers[handlerID] = hr
d.mu.Unlock()
- err := mxio.IoPortBind(d.ioport, handlerID, h, mx.SignalChannelReadable|mx.SignalChannelPeerClosed)
+ err := d.port.WaitAsync(h, handlerID, mx.SignalChannelReadable|mx.SignalChannelPeerClosed, mx.PortWaitAsyncOnce)
if err != nil {
d.mu.Lock()
delete(d.handlers, handlerID)
diff --git a/src/syscall/mx/mxio/mxio.go b/src/syscall/mx/mxio/mxio.go
index c42b092..b25c76e 100644
--- a/src/syscall/mx/mxio/mxio.go
+++ b/src/syscall/mx/mxio/mxio.go
@@ -8,8 +8,6 @@
package mxio
import (
- "unsafe"
-
"syscall/mx"
)
@@ -30,7 +28,6 @@
}
const (
- PacketSize = uint(unsafe.Sizeof(Packet{}))
// MaxHandles is the maximum number of handles which can be transferred in an mxio op.
MaxHandles = 3
// ChunkSize is the largest number of bytes which can be used in a single low-level op.
@@ -39,29 +36,6 @@
IoctlMaxInput = 1024
)
-const (
- PortPacketTypeKernel = iota
- PortPacketTypeIosn
- PortPacketTypeUser
- PortPacketTypeException
-)
-
-// PacketHeader describes the following packet
-type PacketHeader struct {
- Key uint64
- PacketType uint32
- Extra uint32
-}
-
-// Packet describes the unit of communication via an MXIO protocol
-type Packet struct {
- Hdr PacketHeader
- Timestamp mx.Time
- Bytes uintptr
- Signals mx.Signals
- Reserved uint32
-}
-
type Protocol uint32
// All available MXIO protocols
@@ -126,43 +100,6 @@
var IoctlDevmgrUnmountFS = IoctlNum(IoctlKindDefault, IoctlFamilyDevmgr, 1)
var IoctlDevmgrGetTokenFS = IoctlNum(IoctlKindGetHandle, IoctlFamilyDevmgr, 5)
-// TODO: these IoPort* functions have been superceeded by mx.Port. Remove them.
-
-// IoPortCreate wraps the corresponding syscall
-func IoPortCreate(options uint32) (mx.Handle, error) {
- var h mx.Handle
- if status := mx.Sys_port_create(options, &h); status != mx.ErrOk {
- return 0, mx.Error{Status: status, Text: "mxio.IoPort"}
- }
- return h, nil
-}
-
-// IoPortQueue wraps the corresponding syscall
-func IoPortQueue(handle mx.Handle, packet *Packet, size uint) error {
- if status := mx.Sys_port_queue(handle, unsafe.Pointer(packet), size); status != mx.ErrOk {
- return mx.Error{Status: status, Text: "mxio.IoPort"}
- }
- return nil
-}
-
-// IoPortWait wraps the corresponding syscall
-func IoPortWait(handle mx.Handle, deadline mx.Time, size uint) (*Packet, error) {
- var packet Packet
- status := mx.Sys_port_wait(handle, deadline, unsafe.Pointer(&packet), size)
- if status != mx.ErrOk {
- return nil, mx.Error{Status: status, Text: "mxio.IoPort"}
- }
- return &packet, nil
-}
-
-// IoPortBind wraps the corresponding syscall
-func IoPortBind(handle mx.Handle, key uint64, source mx.Handle, signals mx.Signals) error {
- if status := mx.Sys_port_bind(handle, key, source, signals); status != mx.ErrOk {
- return mx.Error{Status: status, Text: "mxio.IoPort"}
- }
- return nil
-}
-
type Vnattr struct {
Valid uint32
Mode Vtype