blob: e9e388a60482cc6bf778b339204bbdfcbffcb31d [file] [log] [blame]
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package unix contains the implementation of Unix endpoints.
package unix
import (
"sync"
"sync/atomic"
"github.com/google/netstack/ilist"
"github.com/google/netstack/tcpip"
"github.com/google/netstack/tcpip/buffer"
"github.com/google/netstack/tcpip/transport/queue"
"github.com/google/netstack/waiter"
)
// initialLimit is the starting limit for the socket buffers.
const initialLimit = 16 * 1024
// A SockType is a type (as opposed to family) of sockets. These are enumerated
// in the syscall package as syscall.SOCK_* constants.
type SockType int
const (
// SockStream corresponds to syscall.SOCK_STREAM.
SockStream SockType = 1
// SockDgram corresponds to syscall.SOCK_DGRAM.
SockDgram SockType = 2
// SockRaw corresponds to syscall.SOCK_RAW.
SockRaw SockType = 3
// SockSeqpacket corresponds to syscall.SOCK_SEQPACKET.
SockSeqpacket SockType = 5
)
// A RightsControlMessage is a control message containing FDs.
type RightsControlMessage interface {
// Clone returns a copy of the RightsControlMessage.
Clone() RightsControlMessage
// Release releases any resources owned by the RightsControlMessage.
Release()
}
// A CredentialsControlMessage is a control message containing Unix credentials.
type CredentialsControlMessage interface {
// Equals returns true iff the two messages are equal.
Equals(CredentialsControlMessage) bool
}
// A ControlMessages represents a collection of socket control messages.
//
// +stateify savable
type ControlMessages struct {
// Rights is a control message containing FDs.
Rights RightsControlMessage
// Credentials is a control message containing Unix credentials.
Credentials CredentialsControlMessage
}
// Empty returns true iff the ControlMessages does not contain either
// credentials or rights.
func (c *ControlMessages) Empty() bool {
return c.Rights == nil && c.Credentials == nil
}
// Clone clones both the credentials and the rights.
func (c *ControlMessages) Clone() ControlMessages {
cm := ControlMessages{}
if c.Rights != nil {
cm.Rights = c.Rights.Clone()
}
cm.Credentials = c.Credentials
return cm
}
// Release releases both the credentials and the rights.
func (c *ControlMessages) Release() {
if c.Rights != nil {
c.Rights.Release()
}
*c = ControlMessages{}
}
// Endpoint is the interface implemented by Unix transport protocol
// implementations that expose functionality like sendmsg, recvmsg, connect,
// etc. to Unix socket implementations.
type Endpoint interface {
Credentialer
waiter.Waitable
// Close puts the endpoint in a closed state and frees all resources
// associated with it.
Close()
// RecvMsg reads data and a control message from the endpoint. This method
// does not block if there is no data pending.
//
// creds indicates if credential control messages are requested by the
// caller. This is useful for determining if control messages can be
// coalesced. creds is a hint and can be safely ignored by the
// implementation if no coalescing is possible. It is fine to return
// credential control messages when none were requested or to not return
// credential control messages when they were requested.
//
// numRights is the number of SCM_RIGHTS FDs requested by the caller. This
// is useful if one must allocate a buffer to receive a SCM_RIGHTS message
// or determine if control messages can be coalesced. numRights is a hint
// and can be safely ignored by the implementation if the number of
// available SCM_RIGHTS FDs is known and no coalescing is possible. It is
// fine for the returned number of SCM_RIGHTS FDs to be either higher or
// lower than the requested number.
//
// If peek is true, no data should be consumed from the Endpoint. Any and
// all data returned from a peek should be available in the next call to
// RecvMsg.
//
// recvLen is the number of bytes copied into data.
//
// msgLen is the length of the read message consumed for datagram Endpoints.
// msgLen is always the same as recvLen for stream Endpoints.
RecvMsg(data [][]byte, creds bool, numRights uintptr, peek bool, addr *tcpip.FullAddress) (recvLen, msgLen uintptr, cm ControlMessages, err *tcpip.Error)
// SendMsg writes data and a control message to the endpoint's peer.
// This method does not block if the data cannot be written.
//
// SendMsg does not take ownership of any of its arguments on error.
SendMsg([][]byte, ControlMessages, BoundEndpoint) (uintptr, *tcpip.Error)
// Connect connects this endpoint directly to another.
//
// This should be called on the client endpoint, and the (bound)
// endpoint passed in as a parameter.
//
// The error codes are the same as Connect.
Connect(server BoundEndpoint) *tcpip.Error
// Shutdown closes the read and/or write end of the endpoint connection
// to its peer.
Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error
// Listen puts the endpoint in "listen" mode, which allows it to accept
// new connections.
Listen(backlog int) *tcpip.Error
// Accept returns a new endpoint if a peer has established a connection
// to an endpoint previously set to listen mode. This method does not
// block if no new connections are available.
//
// The returned Queue is the wait queue for the newly created endpoint.
Accept() (Endpoint, *tcpip.Error)
// Bind binds the endpoint to a specific local address and port.
// Specifying a NIC is optional.
//
// An optional commit function will be executed atomically with respect
// to binding the endpoint. If this returns an error, the bind will not
// occur and the error will be propagated back to the caller.
Bind(address tcpip.FullAddress, commit func() *tcpip.Error) *tcpip.Error
// Type return the socket type, typically either SockStream, SockDgram
// or SockSeqpacket.
Type() SockType
// GetLocalAddress returns the address to which the endpoint is bound.
GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
// GetRemoteAddress returns the address to which the endpoint is
// connected.
GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error)
// SetSockOpt sets a socket option. opt should be one of the tcpip.*Option
// types.
SetSockOpt(opt interface{}) *tcpip.Error
// GetSockOpt gets a socket option. opt should be a pointer to one of the
// tcpip.*Option types.
GetSockOpt(opt interface{}) *tcpip.Error
}
// A Credentialer is a socket or endpoint that supports the SO_PASSCRED socket
// option.
type Credentialer interface {
// Passcred returns whether or not the SO_PASSCRED socket option is
// enabled on this end.
Passcred() bool
// ConnectedPasscred returns whether or not the SO_PASSCRED socket option
// is enabled on the connected end.
ConnectedPasscred() bool
}
// A BoundEndpoint is a unix endpoint that can be connected to.
type BoundEndpoint interface {
// BidirectionalConnect establishes a bi-directional connection between two
// unix endpoints in an all-or-nothing manner. If an error occurs during
// connecting, the state of neither endpoint should be modified.
//
// In order for an endpoint to establish such a bidirectional connection
// with a BoundEndpoint, the endpoint calls the BidirectionalConnect method
// on the BoundEndpoint and sends a representation of itself (the
// ConnectingEndpoint) and a callback (returnConnect) to receive the
// connection information (Receiver and ConnectedEndpoint) upon a
// successful connect. The callback should only be called on a successful
// connect.
//
// For a connection attempt to be successful, the ConnectingEndpoint must
// be unconnected and not listening and the BoundEndpoint whose
// BidirectionalConnect method is being called must be listening.
//
// This method will return tcpip.ErrConnectionRefused on endpoints with a
// type that isn't SockStream or SockSeqpacket.
BidirectionalConnect(ep ConnectingEndpoint, returnConnect func(Receiver, ConnectedEndpoint)) *tcpip.Error
// UnidirectionalConnect establishes a write-only connection to a unix
// endpoint.
//
// An endpoint which calls UnidirectionalConnect and supports it itself must
// not hold its own lock when calling UnidirectionalConnect.
//
// This method will return tcpip.ErrConnectionRefused on a non-SockDgram
// endpoint.
UnidirectionalConnect() (ConnectedEndpoint, *tcpip.Error)
// Release releases any resources held by the BoundEndpoint. It must be
// called before dropping all references to a BoundEndpoint returned by a
// function.
Release()
}
// message represents a message passed over a Unix domain socket.
//
// +stateify savable
type message struct {
ilist.Entry
// Data is the Message payload.
Data buffer.View
// Control is auxiliary control message data that goes along with the
// data.
Control ControlMessages
// Address is the bound address of the endpoint that sent the message.
//
// If the endpoint that sent the message is not bound, the Address is
// the empty string.
Address tcpip.FullAddress
}
// Length returns number of bytes stored in the Message.
func (m *message) Length() int64 {
return int64(len(m.Data))
}
// Release releases any resources held by the Message.
func (m *message) Release() {
m.Control.Release()
}
func (m *message) Peek() queue.Entry {
return &message{Data: m.Data, Control: m.Control.Clone(), Address: m.Address}
}
// A Receiver can be used to receive Messages.
type Receiver interface {
// Recv receives a single message. This method does not block.
//
// See Endpoint.RecvMsg for documentation on shared arguments.
//
// notify indicates if RecvNotify should be called.
Recv(data [][]byte, creds bool, numRights uintptr, peek bool) (recvLen, msgLen uintptr, cm ControlMessages, source tcpip.FullAddress, notify bool, err *tcpip.Error)
// RecvNotify notifies the Receiver of a successful Recv. This must not be
// called while holding any endpoint locks.
RecvNotify()
// CloseRecv prevents the receiving of additional Messages.
//
// After CloseRecv is called, CloseNotify must also be called.
CloseRecv()
// CloseNotify notifies the Receiver of recv being closed. This must not be
// called while holding any endpoint locks.
CloseNotify()
// Readable returns if messages should be attempted to be received. This
// includes when read has been shutdown.
Readable() bool
// RecvQueuedSize returns the total amount of data currently receivable.
// RecvQueuedSize should return -1 if the operation isn't supported.
RecvQueuedSize() int64
// RecvMaxQueueSize returns maximum value for RecvQueuedSize.
// RecvMaxQueueSize should return -1 if the operation isn't supported.
RecvMaxQueueSize() int64
// Release releases any resources owned by the Receiver. It should be
// called before droping all references to a Receiver.
Release()
}
// queueReceiver implements Receiver for datagram sockets.
//
// +stateify savable
type queueReceiver struct {
readQueue *queue.Queue
}
// Recv implements Receiver.Recv.
func (q *queueReceiver) Recv(data [][]byte, creds bool, numRights uintptr, peek bool) (uintptr, uintptr, ControlMessages, tcpip.FullAddress, bool, *tcpip.Error) {
var m queue.Entry
var notify bool
var err *tcpip.Error
if peek {
m, err = q.readQueue.Peek()
} else {
m, notify, err = q.readQueue.Dequeue()
}
if err != nil {
return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err
}
msg := m.(*message)
src := []byte(msg.Data)
var copied uintptr
for i := 0; i < len(data) && len(src) > 0; i++ {
n := copy(data[i], src)
copied += uintptr(n)
src = src[n:]
}
return copied, uintptr(len(msg.Data)), msg.Control, msg.Address, notify, nil
}
// RecvNotify implements Receiver.RecvNotify.
func (q *queueReceiver) RecvNotify() {
q.readQueue.WriterQueue.Notify(waiter.EventOut)
}
// CloseNotify implements Receiver.CloseNotify.
func (q *queueReceiver) CloseNotify() {
q.readQueue.ReaderQueue.Notify(waiter.EventIn)
q.readQueue.WriterQueue.Notify(waiter.EventOut)
}
// CloseRecv implements Receiver.CloseRecv.
func (q *queueReceiver) CloseRecv() {
q.readQueue.Close()
}
// Readable implements Receiver.Readable.
func (q *queueReceiver) Readable() bool {
return q.readQueue.IsReadable()
}
// RecvQueuedSize implements Receiver.RecvQueuedSize.
func (q *queueReceiver) RecvQueuedSize() int64 {
return q.readQueue.QueuedSize()
}
// RecvMaxQueueSize implements Receiver.RecvMaxQueueSize.
func (q *queueReceiver) RecvMaxQueueSize() int64 {
return q.readQueue.MaxQueueSize()
}
// Release implements Receiver.Release.
func (*queueReceiver) Release() {}
// streamQueueReceiver implements Receiver for stream sockets.
//
// +stateify savable
type streamQueueReceiver struct {
queueReceiver
mu sync.Mutex
buffer []byte
control ControlMessages
addr tcpip.FullAddress
}
func vecCopy(data [][]byte, buf []byte) (uintptr, [][]byte, []byte) {
var copied uintptr
for len(data) > 0 && len(buf) > 0 {
n := copy(data[0], buf)
copied += uintptr(n)
buf = buf[n:]
data[0] = data[0][n:]
if len(data[0]) == 0 {
data = data[1:]
}
}
return copied, data, buf
}
// Readable implements Receiver.Readable.
func (q *streamQueueReceiver) Readable() bool {
q.mu.Lock()
bl := len(q.buffer)
r := q.readQueue.IsReadable()
q.mu.Unlock()
// We're readable if we have data in our buffer or if the queue receiver is
// readable.
return bl > 0 || r
}
// RecvQueuedSize implements Receiver.RecvQueuedSize.
func (q *streamQueueReceiver) RecvQueuedSize() int64 {
q.mu.Lock()
bl := len(q.buffer)
qs := q.readQueue.QueuedSize()
q.mu.Unlock()
return int64(bl) + qs
}
// RecvMaxQueueSize implements Receiver.RecvMaxQueueSize.
func (q *streamQueueReceiver) RecvMaxQueueSize() int64 {
// The RecvMaxQueueSize() is the readQueue's MaxQueueSize() plus the largest
// message we can buffer which is also the largest message we can receive.
return 2 * q.readQueue.MaxQueueSize()
}
// Recv implements Receiver.Recv.
func (q *streamQueueReceiver) Recv(data [][]byte, wantCreds bool, numRights uintptr, peek bool) (uintptr, uintptr, ControlMessages, tcpip.FullAddress, bool, *tcpip.Error) {
q.mu.Lock()
defer q.mu.Unlock()
var notify bool
// If we have no data in the endpoint, we need to get some.
if len(q.buffer) == 0 {
// Load the next message into a buffer, even if we are peeking. Peeking
// won't consume the message, so it will be still available to be read
// the next time Recv() is called.
m, n, err := q.readQueue.Dequeue()
if err != nil {
return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err
}
notify = n
msg := m.(*message)
q.buffer = []byte(msg.Data)
q.control = msg.Control
q.addr = msg.Address
}
var copied uintptr
if peek {
// Don't consume control message if we are peeking.
c := q.control.Clone()
// Don't consume data since we are peeking.
copied, data, _ = vecCopy(data, q.buffer)
return copied, copied, c, q.addr, notify, nil
}
// Consume data and control message since we are not peeking.
copied, data, q.buffer = vecCopy(data, q.buffer)
// Save the original state of q.control.
c := q.control
// Remove rights from q.control and leave behind just the creds.
q.control.Rights = nil
if !wantCreds {
c.Credentials = nil
}
if c.Rights != nil && numRights == 0 {
c.Rights.Release()
c.Rights = nil
}
haveRights := c.Rights != nil
// If we have more capacity for data and haven't received any usable
// rights.
//
// Linux never coalesces rights control messages.
for !haveRights && len(data) > 0 {
// Get a message from the readQueue.
m, n, err := q.readQueue.Dequeue()
if err != nil {
// We already got some data, so ignore this error. This will
// manifest as a short read to the user, which is what Linux
// does.
break
}
notify = notify || n
msg := m.(*message)
q.buffer = []byte(msg.Data)
q.control = msg.Control
q.addr = msg.Address
if wantCreds {
if (q.control.Credentials == nil) != (c.Credentials == nil) {
// One message has credentials, the other does not.
break
}
if q.control.Credentials != nil && c.Credentials != nil && !q.control.Credentials.Equals(c.Credentials) {
// Both messages have credentials, but they don't match.
break
}
}
if numRights != 0 && c.Rights != nil && q.control.Rights != nil {
// Both messages have rights.
break
}
var cpd uintptr
cpd, data, q.buffer = vecCopy(data, q.buffer)
copied += cpd
if cpd == 0 {
// data was actually full.
break
}
if q.control.Rights != nil {
// Consume rights.
if numRights == 0 {
q.control.Rights.Release()
} else {
c.Rights = q.control.Rights
haveRights = true
}
q.control.Rights = nil
}
}
return copied, copied, c, q.addr, notify, nil
}
// A ConnectedEndpoint is an Endpoint that can be used to send Messages.
type ConnectedEndpoint interface {
// Passcred implements Endpoint.Passcred.
Passcred() bool
// GetLocalAddress implements Endpoint.GetLocalAddress.
GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
// Send sends a single message. This method does not block.
//
// notify indicates if SendNotify should be called.
Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (n uintptr, notify bool, err *tcpip.Error)
// SendNotify notifies the ConnectedEndpoint of a successful Send. This
// must not be called while holding any endpoint locks.
SendNotify()
// CloseSend prevents the sending of additional Messages.
//
// After CloseSend is call, CloseNotify must also be called.
CloseSend()
// CloseNotify notifies the ConnectedEndpoint of send being closed. This
// must not be called while holding any endpoint locks.
CloseNotify()
// Writable returns if messages should be attempted to be sent. This
// includes when write has been shutdown.
Writable() bool
// EventUpdate lets the ConnectedEndpoint know that event registrations
// have changed.
EventUpdate()
// SendQueuedSize returns the total amount of data currently queued for
// sending. SendQueuedSize should return -1 if the operation isn't
// supported.
SendQueuedSize() int64
// SendMaxQueueSize returns maximum value for SendQueuedSize.
// SendMaxQueueSize should return -1 if the operation isn't supported.
SendMaxQueueSize() int64
// Release releases any resources owned by the ConnectedEndpoint. It should
// be called before droping all references to a ConnectedEndpoint.
Release()
}
// +stateify savable
type connectedEndpoint struct {
// endpoint represents the subset of the Endpoint functionality needed by
// the connectedEndpoint. It is implemented by both connectionedEndpoint
// and connectionlessEndpoint and allows the use of types which don't
// fully implement Endpoint.
endpoint interface {
// Passcred implements Endpoint.Passcred.
Passcred() bool
// GetLocalAddress implements Endpoint.GetLocalAddress.
GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
// Type implements Endpoint.Type.
Type() SockType
}
writeQueue *queue.Queue
}
// Passcred implements ConnectedEndpoint.Passcred.
func (e *connectedEndpoint) Passcred() bool {
return e.endpoint.Passcred()
}
// GetLocalAddress implements ConnectedEndpoint.GetLocalAddress.
func (e *connectedEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) {
return e.endpoint.GetLocalAddress()
}
// Send implements ConnectedEndpoint.Send.
func (e *connectedEndpoint) Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (uintptr, bool, *tcpip.Error) {
var l int
for _, d := range data {
l += len(d)
}
// Discard empty stream packets. Since stream sockets don't preserve
// message boundaries, sending zero bytes is a no-op. In Linux, the
// receiver actually uses a zero-length receive as an indication that the
// stream was closed.
if l == 0 && e.endpoint.Type() == SockStream {
controlMessages.Release()
return 0, false, nil
}
v := make([]byte, 0, l)
for _, d := range data {
v = append(v, d...)
}
notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from})
return uintptr(l), notify, err
}
// SendNotify implements ConnectedEndpoint.SendNotify.
func (e *connectedEndpoint) SendNotify() {
e.writeQueue.ReaderQueue.Notify(waiter.EventIn)
}
// CloseNotify implements ConnectedEndpoint.CloseNotify.
func (e *connectedEndpoint) CloseNotify() {
e.writeQueue.ReaderQueue.Notify(waiter.EventIn)
e.writeQueue.WriterQueue.Notify(waiter.EventOut)
}
// CloseSend implements ConnectedEndpoint.CloseSend.
func (e *connectedEndpoint) CloseSend() {
e.writeQueue.Close()
}
// Writable implements ConnectedEndpoint.Writable.
func (e *connectedEndpoint) Writable() bool {
return e.writeQueue.IsWritable()
}
// EventUpdate implements ConnectedEndpoint.EventUpdate.
func (*connectedEndpoint) EventUpdate() {}
// SendQueuedSize implements ConnectedEndpoint.SendQueuedSize.
func (e *connectedEndpoint) SendQueuedSize() int64 {
return e.writeQueue.QueuedSize()
}
// SendMaxQueueSize implements ConnectedEndpoint.SendMaxQueueSize.
func (e *connectedEndpoint) SendMaxQueueSize() int64 {
return e.writeQueue.MaxQueueSize()
}
// Release implements ConnectedEndpoint.Release.
func (*connectedEndpoint) Release() {}
// baseEndpoint is an embeddable unix endpoint base used in both the connected and connectionless
// unix domain socket Endpoint implementations.
//
// Not to be used on its own.
//
// +stateify savable
type baseEndpoint struct {
*waiter.Queue
// passcred specifies whether SCM_CREDENTIALS socket control messages are
// enabled on this endpoint. Must be accessed atomically.
passcred int32
// Mutex protects the below fields.
sync.Mutex
// receiver allows Messages to be received.
receiver Receiver
// connected allows messages to be sent and state information about the
// connected endpoint to be read.
connected ConnectedEndpoint
// path is not empty if the endpoint has been bound,
// or may be used if the endpoint is connected.
path string
}
// EventRegister implements waiter.Waitable.EventRegister.
func (e *baseEndpoint) EventRegister(we *waiter.Entry, mask waiter.EventMask) {
e.Queue.EventRegister(we, mask)
e.Lock()
if e.connected != nil {
e.connected.EventUpdate()
}
e.Unlock()
}
// EventUnregister implements waiter.Waitable.EventUnregister.
func (e *baseEndpoint) EventUnregister(we *waiter.Entry) {
e.Queue.EventUnregister(we)
e.Lock()
if e.connected != nil {
e.connected.EventUpdate()
}
e.Unlock()
}
// Passcred implements Credentialer.Passcred.
func (e *baseEndpoint) Passcred() bool {
return atomic.LoadInt32(&e.passcred) != 0
}
// ConnectedPasscred implements Credentialer.ConnectedPasscred.
func (e *baseEndpoint) ConnectedPasscred() bool {
e.Lock()
defer e.Unlock()
return e.connected != nil && e.connected.Passcred()
}
func (e *baseEndpoint) setPasscred(pc bool) {
if pc {
atomic.StoreInt32(&e.passcred, 1)
} else {
atomic.StoreInt32(&e.passcred, 0)
}
}
// Connected implements ConnectingEndpoint.Connected.
func (e *baseEndpoint) Connected() bool {
return e.receiver != nil && e.connected != nil
}
// RecvMsg reads data and a control message from the endpoint.
func (e *baseEndpoint) RecvMsg(data [][]byte, creds bool, numRights uintptr, peek bool, addr *tcpip.FullAddress) (uintptr, uintptr, ControlMessages, *tcpip.Error) {
e.Lock()
if e.receiver == nil {
e.Unlock()
return 0, 0, ControlMessages{}, tcpip.ErrNotConnected
}
recvLen, msgLen, cms, a, notify, err := e.receiver.Recv(data, creds, numRights, peek)
e.Unlock()
if err != nil {
return 0, 0, ControlMessages{}, err
}
if notify {
e.receiver.RecvNotify()
}
if addr != nil {
*addr = a
}
return recvLen, msgLen, cms, nil
}
// SendMsg writes data and a control message to the endpoint's peer.
// This method does not block if the data cannot be written.
func (e *baseEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoint) (uintptr, *tcpip.Error) {
e.Lock()
if !e.Connected() {
e.Unlock()
return 0, tcpip.ErrNotConnected
}
if to != nil {
e.Unlock()
return 0, tcpip.ErrAlreadyConnected
}
n, notify, err := e.connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)})
e.Unlock()
if err != nil {
return 0, err
}
if notify {
e.connected.SendNotify()
}
return n, nil
}
// SetSockOpt sets a socket option. Currently not supported.
func (e *baseEndpoint) SetSockOpt(opt interface{}) *tcpip.Error {
switch v := opt.(type) {
case tcpip.PasscredOption:
e.setPasscred(v != 0)
return nil
}
return nil
}
// GetSockOpt implements tcpip.Endpoint.GetSockOpt.
func (e *baseEndpoint) GetSockOpt(opt interface{}) *tcpip.Error {
switch o := opt.(type) {
case tcpip.ErrorOption:
return nil
case *tcpip.SendQueueSizeOption:
e.Lock()
if !e.Connected() {
e.Unlock()
return tcpip.ErrNotConnected
}
qs := tcpip.SendQueueSizeOption(e.connected.SendQueuedSize())
e.Unlock()
if qs < 0 {
return tcpip.ErrQueueSizeNotSupported
}
*o = qs
return nil
case *tcpip.ReceiveQueueSizeOption:
e.Lock()
if !e.Connected() {
e.Unlock()
return tcpip.ErrNotConnected
}
qs := tcpip.ReceiveQueueSizeOption(e.receiver.RecvQueuedSize())
e.Unlock()
if qs < 0 {
return tcpip.ErrQueueSizeNotSupported
}
*o = qs
return nil
case *tcpip.PasscredOption:
if e.Passcred() {
*o = tcpip.PasscredOption(1)
} else {
*o = tcpip.PasscredOption(0)
}
return nil
case *tcpip.SendBufferSizeOption:
e.Lock()
if !e.Connected() {
e.Unlock()
return tcpip.ErrNotConnected
}
qs := tcpip.SendBufferSizeOption(e.connected.SendMaxQueueSize())
e.Unlock()
if qs < 0 {
return tcpip.ErrQueueSizeNotSupported
}
*o = qs
return nil
case *tcpip.ReceiveBufferSizeOption:
e.Lock()
if e.receiver == nil {
e.Unlock()
return tcpip.ErrNotConnected
}
qs := tcpip.ReceiveBufferSizeOption(e.receiver.RecvMaxQueueSize())
e.Unlock()
if qs < 0 {
return tcpip.ErrQueueSizeNotSupported
}
*o = qs
return nil
}
return tcpip.ErrUnknownProtocolOption
}
// Shutdown closes the read and/or write end of the endpoint connection to its
// peer.
func (e *baseEndpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error {
e.Lock()
if !e.Connected() {
e.Unlock()
return tcpip.ErrNotConnected
}
if flags&tcpip.ShutdownRead != 0 {
e.receiver.CloseRecv()
}
if flags&tcpip.ShutdownWrite != 0 {
e.connected.CloseSend()
}
e.Unlock()
if flags&tcpip.ShutdownRead != 0 {
e.receiver.CloseNotify()
}
if flags&tcpip.ShutdownWrite != 0 {
e.connected.CloseNotify()
}
return nil
}
// GetLocalAddress returns the bound path.
func (e *baseEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) {
e.Lock()
defer e.Unlock()
return tcpip.FullAddress{Addr: tcpip.Address(e.path)}, nil
}
// GetRemoteAddress returns the local address of the connected endpoint (if
// available).
func (e *baseEndpoint) GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error) {
e.Lock()
c := e.connected
e.Unlock()
if c != nil {
return c.GetLocalAddress()
}
return tcpip.FullAddress{}, tcpip.ErrNotConnected
}
// Release implements BoundEndpoint.Release.
func (*baseEndpoint) Release() {}