blob: 7b865ae8ce7188b6de7cdb2b54cf4356ff453c58 [file] [log] [blame]
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package fidl
import (
"sync"
"syscall/zx"
"syscall/zx/internal/context"
)
// ServiceRequest is an abstraction over a FIDL interface request which is
// intended to be used as part of service discovery.
type ServiceRequest interface {
// Name returns the name of the service being requested.
Name() string
// ToChannel returns the underlying channel of the ServiceRequest.
ToChannel() zx.Channel
}
// InterfaceRequest is a wrapper type around a channel and represents the server side
// of a FIDL interface, to be sent to a server as a request.
type InterfaceRequest struct {
zx.Channel
}
// NewInterfaceRequest generates two sides of a channel with one layer of
// type casts out of the way to minimize the amount of generated code. Semantically,
// the two sides of the channel represent the interface request and the client
// side of the interface (the proxy). It returns an error on failure.
func NewInterfaceRequest() (InterfaceRequest, *ChannelProxy, error) {
h0, h1, err := newChannel(0)
if err != nil {
return InterfaceRequest{}, nil, err
}
return InterfaceRequest{Channel: h0}, &ChannelProxy{Channel: h1}, nil
}
// Proxy represents the client side of a FIDL interface.
type Proxy interface {
IsValid() bool
Send(ordinal uint64, req Message) error
Recv(ordinal uint64, resp Message) error
Call(ordinal uint64, req Message, resp Message) error
}
type DispatchArgs struct {
Ctx Context
Ordinal uint64
Bytes []byte
HandleInfos []zx.HandleInfo
}
// Stub represents a generated type which wraps the server-side implementation of a
// FIDL interface.
//
// It contains logic which is able to dispatch into the correct implementation given
// the incoming message ordinal and its data.
type Stub interface {
// Dispatch dispatches into the appropriate method implementation for a FIDL
// interface by using the ordinal.
//
// It also takes the data as bytes and transforms it into arguments usable by
// the method implementation. It then optionally returns a response if the
// method has a response, in which case, the boolean return value is true.
Dispatch(args DispatchArgs) (Message, bool, error)
}
type readResult struct {
header MessageHeader
objBytes []byte // decoded bytes (excluding header)
handleInfos []zx.HandleInfo // decoded handles
// full objects retreived from the byte and handles pool to be returned to the pool.
poolBytes []byte
poolHandleInfos []zx.HandleInfo
}
// ChannelProxy a Proxy that is backed by a channel.
type ChannelProxy struct {
// Channel is the underlying channel endpoint for this interface.
zx.Channel
mu struct {
sync.Mutex
lastTxid uint32
results map[uint32]readResult
// 0-txid corresponds to event messages that lack request/response pair.
// Multiple of these can be read before they are processed and there needs
// to be a place to store them.
zeroTxidResults []readResult
err error
readerElected bool
callsAwaitingReply map[uint32]struct{}
}
cond sync.Cond
initOnce sync.Once
}
func (p *ChannelProxy) init() {
p.cond.L = &p.mu.Mutex
p.mu.results = make(map[uint32]readResult)
p.mu.callsAwaitingReply = make(map[uint32]struct{})
}
// Assert that ChannelProxy implements the Proxy interface.
var _ Proxy = &ChannelProxy{}
// IsValid returns true if the underlying channel is a valid handle.
func (p *ChannelProxy) IsValid() bool {
return p.Channel.Handle().IsValid()
}
// Definitions taken from https://cs.opensource.google/fuchsia/fuchsia/+/main:zircon/system/public/zircon/fidl.h;l=626-646;drc=5fa0d1e94a4f52e3e902692ff181f5df2d737f13
//
// TODO(https://fxbug.dev/42057003): Reference the wire format documentation when
// it is correct.
type EpitaphBody struct {
_ struct{} `fidl:"s" fidl_size_v2:"4" fidl_alignment_v2:"4"`
Error zx.Status `fidl_offset_v2:"0"`
}
var mEpitaphBody = MustCreateMarshaler(EpitaphBody{})
func (*EpitaphBody) Marshaler() Marshaler {
return mEpitaphBody
}
var _ Message = (*EpitaphBody)(nil)
const EpitaphOrdinal = 0xFFFFFFFFFFFFFFFF
// Send sends the request over the channel with the specified ordinal
// without a response.
func (p *ChannelProxy) Send(ordinal uint64, req Message) error {
err := func() error {
// Messages that do not require a response should be written with transaction id 0.
// (See https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format#transactional-messages).
err := p.send(0, ordinal, req)
if err, ok := err.(*zx.Error); ok && err.Status == zx.ErrPeerClosed {
// There may still be data on the channel (perhaps an epitaph); try to read it.
return p.recv(0, EpitaphOrdinal, nil)
}
return err
}()
if err != nil {
// TODO: do something with this error?
_ = channelClose(&p.Channel)
}
return err
}
func (p *ChannelProxy) send(txid uint32, ordinal uint64, req Message) error {
reqHeader := NewCtx().NewHeader()
reqHeader.Txid = txid
reqHeader.Ordinal = ordinal
respb := messageBytesPool.Get().([]byte)
resphd := messageHandleDispositionsPool.Get().([]zx.HandleDisposition)
defer messageBytesPool.Put(respb)
defer messageHandleDispositionsPool.Put(resphd)
nb, nh, err := MarshalHeaderThenMessage(&reqHeader, req, respb[:], resphd[:])
if err != nil {
return err
}
// Write the encoded bytes to the channel.
return withRetryContext(context.Background(), func() error {
return channelWriteEtc(&p.Channel, respb[:nb], resphd[:nh], 0)
}, *p.Channel.Handle(), zx.SignalChannelWritable, zx.SignalChannelPeerClosed)
}
// Recv waits for an event and writes the response into the response.
func (p *ChannelProxy) Recv(ordinal uint64, resp Message) error {
// Messages that do not require a response should be written with transaction id 0.
// (See https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format#transactional-messages).
err := p.recv(0, ordinal, resp)
if err != nil {
// TODO: do something with this error?
_ = channelClose(&p.Channel)
}
return err
}
func (res *readResult) consume(ordinal uint64, resp Message) error {
defer messageBytesPool.Put(res.poolBytes)
defer messageHandleInfosPool.Put(res.poolHandleInfos)
if res.header.Ordinal != ordinal {
return newExpectError(ErrUnexpectedOrdinal, ordinal, res.header.Ordinal)
}
var err error
if resp != nil {
err = Unmarshal(res.header.NewCtx(), res.objBytes, res.handleInfos, resp)
}
return err
}
func (p *ChannelProxy) read(header *MessageHeader, poolBytes []byte, poolHandleInfos []zx.HandleInfo) ([]byte, []zx.HandleInfo, error) {
var nb, nh uint32
if err := withRetryContext(context.Background(), func() error {
var err error
nb, nh, err = channelReadEtc(&p.Channel, poolBytes[:], poolHandleInfos[:], 0)
return err
}, *p.Channel.Handle(), zx.SignalChannelReadable, zx.SignalChannelPeerClosed); err != nil {
return nil, nil, err
}
objBytes := poolBytes[:nb]
handleInfos := poolHandleInfos[:nh]
err := Unmarshal(header.NewCtx(), objBytes[:MessageHeaderSize], nil, header)
if err != nil {
return nil, nil, err
}
if err := header.ValidateWireFormat(); err != nil {
return nil, nil, err
}
return objBytes[MessageHeaderSize:], handleInfos, nil
}
func (p *ChannelProxy) recv(txid uint32, ordinal uint64, resp Message) error {
p.initOnce.Do(p.init)
p.mu.Lock()
defer p.mu.Unlock()
for {
if ordinal != EpitaphOrdinal {
// Check if a result for this txid already exists and handle it.
if txid == 0 {
if len(p.mu.zeroTxidResults) != 0 {
res := p.mu.zeroTxidResults[0]
// Reset the lost element to release references it holds, since
// slicing does not release the underlying memory.
p.mu.zeroTxidResults[0] = readResult{}
p.mu.zeroTxidResults = p.mu.zeroTxidResults[1:]
return res.consume(ordinal, resp)
}
} else {
if res, ok := p.mu.results[txid]; ok {
delete(p.mu.results, txid)
return res.consume(ordinal, resp)
}
}
}
// Check if the channel already errored.
if p.mu.err != nil {
return p.mu.err
}
// If another goroutine is reading from the channel, wait for it to finish.
if p.mu.readerElected {
p.cond.Wait()
continue
}
// This goroutine is the reader, so read from the channel.
p.mu.readerElected = true
poolBytes := messageBytesPool.Get().([]byte)
poolHandleInfos := messageHandleInfosPool.Get().([]zx.HandleInfo)
var header MessageHeader
if err := func() error {
p.mu.Unlock()
objBytes, handleInfos, err := p.read(&header, poolBytes, poolHandleInfos)
p.mu.Lock()
if err != nil {
return err
}
if header.Txid == 0 && header.Ordinal == EpitaphOrdinal {
// This is an epitaph.
var epitaph EpitaphBody
// NB: this does not use (*readResult).consume to avoid
// releasing the pooled buffers twice.
if err := Unmarshal(header.NewCtx(), objBytes, handleInfos, &epitaph); err != nil {
return err
}
return &zx.Error{Status: epitaph.Error, Text: "received epitaph"}
}
result := readResult{
header: header,
objBytes: objBytes,
handleInfos: handleInfos,
poolBytes: poolBytes,
poolHandleInfos: poolHandleInfos,
}
if header.Txid == 0 {
p.mu.zeroTxidResults = append(p.mu.zeroTxidResults, result)
} else {
if _, ok := p.mu.results[header.Txid]; ok {
return newValueError(ErrDuplicateTxidReceived, header.Txid)
}
if _, ok := p.mu.callsAwaitingReply[header.Txid]; !ok {
return newValueError(ErrResponseWithoutRequest, header.Txid)
}
p.mu.results[header.Txid] = result
delete(p.mu.callsAwaitingReply, header.Txid)
}
return nil
}(); err != nil {
messageBytesPool.Put(poolBytes)
messageHandleInfosPool.Put(poolHandleInfos)
p.mu.err = err
// NB: we do not reset p.mu.results because each element there has a
// sitting waiter that will consume the result.
for _, res := range p.mu.zeroTxidResults {
messageBytesPool.Put(res.poolBytes)
messageHandleInfosPool.Put(res.poolHandleInfos)
}
p.mu.zeroTxidResults = nil
p.mu.callsAwaitingReply = nil
}
p.mu.readerElected = false
p.cond.Broadcast()
}
}
// Call sends the request over the channel with the specified ordinal
// and synchronously waits for a response. It then writes the response into the
// response.
func (p *ChannelProxy) Call(ordinal uint64, req Message, resp Message) error {
p.initOnce.Do(p.init)
err := func() error {
var txid uint32
if err := func() error {
p.mu.Lock()
defer p.mu.Unlock()
if err := p.mu.err; err != nil {
return err
}
for {
p.mu.lastTxid++
// Transaction id 1 through 0x7fffffff are reserved for userspace.
// (See https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format#transactional-messages).
if id := p.mu.lastTxid & 0x7fffffff; id != 0 {
txid = id
break
}
}
if _, ok := p.mu.callsAwaitingReply[txid]; ok {
return newValueError(ErrDuplicateTxidWaiting, txid)
}
p.mu.callsAwaitingReply[txid] = struct{}{}
return nil
}(); err != nil {
return err
}
if err := p.send(txid, ordinal, req); err != nil {
if err, ok := err.(*zx.Error); ok && err.Status == zx.ErrPeerClosed {
// There may still be data on the channel (perhaps an epitaph); try to read it.
return p.recv(0, EpitaphOrdinal, nil)
}
return err
}
return p.recv(txid, ordinal, resp)
}()
if err != nil {
// TODO: do something with this error?
_ = channelClose(&p.Channel)
}
return err
}