| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package fidl |
| |
| import ( |
| "sync" |
| "syscall/zx" |
| "syscall/zx/internal/context" |
| ) |
| |
| // ServiceRequest is an abstraction over a FIDL interface request which is |
| // intended to be used as part of service discovery. |
| type ServiceRequest interface { |
| // Name returns the name of the service being requested. |
| Name() string |
| |
| // ToChannel returns the underlying channel of the ServiceRequest. |
| ToChannel() zx.Channel |
| } |
| |
| // InterfaceRequest is a wrapper type around a channel and represents the server side |
| // of a FIDL interface, to be sent to a server as a request. |
| type InterfaceRequest struct { |
| zx.Channel |
| } |
| |
| // NewInterfaceRequest generates two sides of a channel with one layer of |
| // type casts out of the way to minimize the amount of generated code. Semantically, |
| // the two sides of the channel represent the interface request and the client |
| // side of the interface (the proxy). It returns an error on failure. |
| func NewInterfaceRequest() (InterfaceRequest, *ChannelProxy, error) { |
| h0, h1, err := newChannel(0) |
| if err != nil { |
| return InterfaceRequest{}, nil, err |
| } |
| return InterfaceRequest{Channel: h0}, &ChannelProxy{Channel: h1}, nil |
| } |
| |
| // Proxy represents the client side of a FIDL interface. |
| type Proxy interface { |
| IsValid() bool |
| Send(ordinal uint64, req Message) error |
| Recv(ordinal uint64, resp Message) error |
| Call(ordinal uint64, req Message, resp Message) error |
| } |
| |
| type DispatchArgs struct { |
| Ctx Context |
| Ordinal uint64 |
| Bytes []byte |
| HandleInfos []zx.HandleInfo |
| } |
| |
| // Stub represents a generated type which wraps the server-side implementation of a |
| // FIDL interface. |
| // |
| // It contains logic which is able to dispatch into the correct implementation given |
| // the incoming message ordinal and its data. |
| type Stub interface { |
| // Dispatch dispatches into the appropriate method implementation for a FIDL |
| // interface by using the ordinal. |
| // |
| // It also takes the data as bytes and transforms it into arguments usable by |
| // the method implementation. It then optionally returns a response if the |
| // method has a response, in which case, the boolean return value is true. |
| Dispatch(args DispatchArgs) (Message, bool, error) |
| } |
| |
| type readResult struct { |
| header MessageHeader |
| objBytes []byte // decoded bytes (excluding header) |
| handleInfos []zx.HandleInfo // decoded handles |
| // full objects retreived from the byte and handles pool to be returned to the pool. |
| poolBytes []byte |
| poolHandleInfos []zx.HandleInfo |
| } |
| |
| // ChannelProxy a Proxy that is backed by a channel. |
| type ChannelProxy struct { |
| // Channel is the underlying channel endpoint for this interface. |
| zx.Channel |
| |
| mu struct { |
| sync.Mutex |
| lastTxid uint32 |
| results map[uint32]readResult |
| // 0-txid corresponds to event messages that lack request/response pair. |
| // Multiple of these can be read before they are processed and there needs |
| // to be a place to store them. |
| zeroTxidResults []readResult |
| err error |
| readerElected bool |
| callsAwaitingReply map[uint32]struct{} |
| } |
| cond sync.Cond |
| initOnce sync.Once |
| } |
| |
| func (p *ChannelProxy) init() { |
| p.cond.L = &p.mu.Mutex |
| p.mu.results = make(map[uint32]readResult) |
| p.mu.callsAwaitingReply = make(map[uint32]struct{}) |
| } |
| |
| // Assert that ChannelProxy implements the Proxy interface. |
| var _ Proxy = &ChannelProxy{} |
| |
| // IsValid returns true if the underlying channel is a valid handle. |
| func (p *ChannelProxy) IsValid() bool { |
| return p.Channel.Handle().IsValid() |
| } |
| |
| // Definitions taken from https://cs.opensource.google/fuchsia/fuchsia/+/main:zircon/system/public/zircon/fidl.h;l=626-646;drc=5fa0d1e94a4f52e3e902692ff181f5df2d737f13 |
| // |
| // TODO(https://fxbug.dev/105740): Reference the wire format documentation when |
| // it is correct. |
| type EpitaphBody struct { |
| _ struct{} `fidl:"s" fidl_size_v2:"4" fidl_alignment_v2:"4"` |
| Error zx.Status `fidl_offset_v2:"0"` |
| } |
| |
| var mEpitaphBody = MustCreateMarshaler(EpitaphBody{}) |
| |
| func (*EpitaphBody) Marshaler() Marshaler { |
| return mEpitaphBody |
| } |
| |
| var _ Message = (*EpitaphBody)(nil) |
| |
| const EpitaphOrdinal = 0xFFFFFFFFFFFFFFFF |
| |
| // Send sends the request over the channel with the specified ordinal |
| // without a response. |
| func (p *ChannelProxy) Send(ordinal uint64, req Message) error { |
| err := func() error { |
| // Messages that do not require a response should be written with transaction id 0. |
| // (See https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format#transactional-messages). |
| err := p.send(0, ordinal, req) |
| if err, ok := err.(*zx.Error); ok && err.Status == zx.ErrPeerClosed { |
| // There may still be data on the channel (perhaps an epitaph); try to read it. |
| return p.recv(0, EpitaphOrdinal, nil) |
| } |
| return err |
| }() |
| if err != nil { |
| // TODO: do something with this error? |
| _ = channelClose(&p.Channel) |
| } |
| return err |
| } |
| |
| func (p *ChannelProxy) send(txid uint32, ordinal uint64, req Message) error { |
| reqHeader := NewCtx().NewHeader() |
| |
| reqHeader.Txid = txid |
| reqHeader.Ordinal = ordinal |
| |
| respb := messageBytesPool.Get().([]byte) |
| resphd := messageHandleDispositionsPool.Get().([]zx.HandleDisposition) |
| |
| defer messageBytesPool.Put(respb) |
| defer messageHandleDispositionsPool.Put(resphd) |
| |
| nb, nh, err := MarshalHeaderThenMessage(&reqHeader, req, respb[:], resphd[:]) |
| if err != nil { |
| return err |
| } |
| |
| // Write the encoded bytes to the channel. |
| return withRetryContext(context.Background(), func() error { |
| return channelWriteEtc(&p.Channel, respb[:nb], resphd[:nh], 0) |
| }, *p.Channel.Handle(), zx.SignalChannelWritable, zx.SignalChannelPeerClosed) |
| } |
| |
| // Recv waits for an event and writes the response into the response. |
| func (p *ChannelProxy) Recv(ordinal uint64, resp Message) error { |
| // Messages that do not require a response should be written with transaction id 0. |
| // (See https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format#transactional-messages). |
| err := p.recv(0, ordinal, resp) |
| if err != nil { |
| // TODO: do something with this error? |
| _ = channelClose(&p.Channel) |
| } |
| return err |
| } |
| |
| func (res *readResult) consume(ordinal uint64, resp Message) error { |
| defer messageBytesPool.Put(res.poolBytes) |
| defer messageHandleInfosPool.Put(res.poolHandleInfos) |
| |
| if res.header.Ordinal != ordinal { |
| return newExpectError(ErrUnexpectedOrdinal, ordinal, res.header.Ordinal) |
| } |
| |
| var err error |
| if resp != nil { |
| err = Unmarshal(res.header.NewCtx(), res.objBytes, res.handleInfos, resp) |
| } |
| |
| return err |
| } |
| |
| func (p *ChannelProxy) read(header *MessageHeader, poolBytes []byte, poolHandleInfos []zx.HandleInfo) ([]byte, []zx.HandleInfo, error) { |
| var nb, nh uint32 |
| if err := withRetryContext(context.Background(), func() error { |
| var err error |
| nb, nh, err = channelReadEtc(&p.Channel, poolBytes[:], poolHandleInfos[:], 0) |
| return err |
| }, *p.Channel.Handle(), zx.SignalChannelReadable, zx.SignalChannelPeerClosed); err != nil { |
| return nil, nil, err |
| } |
| objBytes := poolBytes[:nb] |
| handleInfos := poolHandleInfos[:nh] |
| |
| err := Unmarshal(header.NewCtx(), objBytes[:MessageHeaderSize], nil, header) |
| if err != nil { |
| return nil, nil, err |
| } |
| if err := header.ValidateWireFormat(); err != nil { |
| return nil, nil, err |
| } |
| return objBytes[MessageHeaderSize:], handleInfos, nil |
| } |
| |
| func (p *ChannelProxy) recv(txid uint32, ordinal uint64, resp Message) error { |
| p.initOnce.Do(p.init) |
| |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| |
| for { |
| if ordinal != EpitaphOrdinal { |
| // Check if a result for this txid already exists and handle it. |
| if txid == 0 { |
| if len(p.mu.zeroTxidResults) != 0 { |
| res := p.mu.zeroTxidResults[0] |
| // Reset the lost element to release references it holds, since |
| // slicing does not release the underlying memory. |
| p.mu.zeroTxidResults[0] = readResult{} |
| p.mu.zeroTxidResults = p.mu.zeroTxidResults[1:] |
| return res.consume(ordinal, resp) |
| } |
| } else { |
| if res, ok := p.mu.results[txid]; ok { |
| delete(p.mu.results, txid) |
| return res.consume(ordinal, resp) |
| } |
| } |
| } |
| |
| // Check if the channel already errored. |
| if p.mu.err != nil { |
| return p.mu.err |
| } |
| |
| // If another goroutine is reading from the channel, wait for it to finish. |
| if p.mu.readerElected { |
| p.cond.Wait() |
| continue |
| } |
| |
| // This goroutine is the reader, so read from the channel. |
| p.mu.readerElected = true |
| |
| poolBytes := messageBytesPool.Get().([]byte) |
| poolHandleInfos := messageHandleInfosPool.Get().([]zx.HandleInfo) |
| var header MessageHeader |
| if err := func() error { |
| p.mu.Unlock() |
| objBytes, handleInfos, err := p.read(&header, poolBytes, poolHandleInfos) |
| p.mu.Lock() |
| |
| if err != nil { |
| return err |
| } |
| |
| if header.Txid == 0 && header.Ordinal == EpitaphOrdinal { |
| // This is an epitaph. |
| var epitaph EpitaphBody |
| // NB: this does not use (*readResult).consume to avoid |
| // releasing the pooled buffers twice. |
| if err := Unmarshal(header.NewCtx(), objBytes, handleInfos, &epitaph); err != nil { |
| return err |
| } |
| return &zx.Error{Status: epitaph.Error, Text: "received epitaph"} |
| } |
| |
| result := readResult{ |
| header: header, |
| objBytes: objBytes, |
| handleInfos: handleInfos, |
| poolBytes: poolBytes, |
| poolHandleInfos: poolHandleInfos, |
| } |
| |
| if header.Txid == 0 { |
| p.mu.zeroTxidResults = append(p.mu.zeroTxidResults, result) |
| } else { |
| if _, ok := p.mu.results[header.Txid]; ok { |
| return newValueError(ErrDuplicateTxidReceived, header.Txid) |
| } |
| if _, ok := p.mu.callsAwaitingReply[header.Txid]; !ok { |
| return newValueError(ErrResponseWithoutRequest, header.Txid) |
| } |
| p.mu.results[header.Txid] = result |
| delete(p.mu.callsAwaitingReply, header.Txid) |
| } |
| return nil |
| }(); err != nil { |
| messageBytesPool.Put(poolBytes) |
| messageHandleInfosPool.Put(poolHandleInfos) |
| |
| p.mu.err = err |
| // NB: we do not reset p.mu.results because each element there has a |
| // sitting waiter that will consume the result. |
| for _, res := range p.mu.zeroTxidResults { |
| messageBytesPool.Put(res.poolBytes) |
| messageHandleInfosPool.Put(res.poolHandleInfos) |
| } |
| p.mu.zeroTxidResults = nil |
| p.mu.callsAwaitingReply = nil |
| } |
| |
| p.mu.readerElected = false |
| p.cond.Broadcast() |
| } |
| } |
| |
| // Call sends the request over the channel with the specified ordinal |
| // and synchronously waits for a response. It then writes the response into the |
| // response. |
| func (p *ChannelProxy) Call(ordinal uint64, req Message, resp Message) error { |
| p.initOnce.Do(p.init) |
| |
| err := func() error { |
| var txid uint32 |
| if err := func() error { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| |
| if err := p.mu.err; err != nil { |
| return err |
| } |
| |
| for { |
| p.mu.lastTxid++ |
| // Transaction id 1 through 0x7fffffff are reserved for userspace. |
| // (See https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format#transactional-messages). |
| if id := p.mu.lastTxid & 0x7fffffff; id != 0 { |
| txid = id |
| break |
| } |
| } |
| if _, ok := p.mu.callsAwaitingReply[txid]; ok { |
| return newValueError(ErrDuplicateTxidWaiting, txid) |
| } |
| p.mu.callsAwaitingReply[txid] = struct{}{} |
| return nil |
| }(); err != nil { |
| return err |
| } |
| |
| if err := p.send(txid, ordinal, req); err != nil { |
| if err, ok := err.(*zx.Error); ok && err.Status == zx.ErrPeerClosed { |
| // There may still be data on the channel (perhaps an epitaph); try to read it. |
| return p.recv(0, EpitaphOrdinal, nil) |
| } |
| return err |
| } |
| return p.recv(txid, ordinal, resp) |
| }() |
| if err != nil { |
| // TODO: do something with this error? |
| _ = channelClose(&p.Channel) |
| } |
| return err |
| } |