[netstack] Remove hardError
The purpose of this structure is to leak errors out of loop{Read,Write};
it has hardly anything to do with being "hard". It is also incorrect;
these errors are expected to be consumed after being returned to the
user, which was not being done before this change.
Change-Id: I73dfd6fc1c694387d5b28b9d6f2b6d9936470173
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/525351
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Fuchsia-Auto-Submit: Tamir Duberstein <tamird@google.com>
Reviewed-by: Arthur Sfez <asfez@google.com>
Reviewed-by: Bruno Dal Bo <brunodalbo@google.com>
Reviewed-by: Mithun Iyer <iyerm@google.com>
diff --git a/src/connectivity/network/netstack/fuchsia_posix_socket.go b/src/connectivity/network/netstack/fuchsia_posix_socket.go
index bfc1fe8..5cd5000 100644
--- a/src/connectivity/network/netstack/fuchsia_posix_socket.go
+++ b/src/connectivity/network/netstack/fuchsia_posix_socket.go
@@ -213,13 +213,6 @@
panic(err)
}
-type hardError struct {
- mu struct {
- sync.Mutex
- err tcpip.Error
- }
-}
-
// endpoint is the base structure that models all network sockets.
type endpoint struct {
// TODO(https://fxbug.dev/37419): Remove TransitionalBase after methods landed.
@@ -243,9 +236,7 @@
pending signaler
- // gVisor stack clears the hard error on the endpoint on a read, so,
- // save the error when returned by gVisor endpoint calls.
- hardError hardError
+ terminal chan tcpip.Error
}
func (ep *endpoint) incRef() {
@@ -266,24 +257,6 @@
return doClose
}
-// storeAndRetrieveLocked evaluates if the input error is a "hard
-// error" (one which puts the endpoint in an unrecoverable error state) and
-// stores it. Returns the pre-existing hard error if it was already set or the
-// new value if changed.
-//
-// Must be called with he.mu held.
-func (he *hardError) storeAndRetrieveLocked(err tcpip.Error) tcpip.Error {
- if he.mu.err == nil {
- switch err.(type) {
- case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset,
- *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute, *tcpip.ErrTimeout,
- *tcpip.ErrConnectionRefused:
- he.mu.err = err
- }
- }
- return he.mu.err
-}
-
func (ep *endpoint) Sync(fidl.Context) (int32, error) {
_ = syslog.DebugTf("Sync", "%p", ep)
@@ -334,31 +307,28 @@
}
}
- {
- // Acquire hard error lock across ep calls to avoid races and store the
- // hard error deterministically.
- ep.hardError.mu.Lock()
- err := ep.ep.Connect(addr)
- hardError := ep.hardError.storeAndRetrieveLocked(err)
- ep.hardError.mu.Unlock()
- if err != nil {
- switch err.(type) {
- case *tcpip.ErrConnectStarted:
- localAddr, err := ep.ep.GetLocalAddress()
- if err != nil {
- panic(err)
- }
- _ = syslog.DebugTf("connect", "%p: started, local=%+v, addr=%+v", ep, localAddr, addr)
- // For TCP endpoints, gVisor Connect() returns this error when the endpoint
- // is in an error state and the hard error state has already been read from the
- // endpoint via other APIs. Apply the saved hard error state here.
- case *tcpip.ErrConnectionAborted:
- if hardError != nil {
- err = hardError
- }
+ if err := ep.ep.Connect(addr); err != nil {
+ switch err.(type) {
+ case *tcpip.ErrConnectStarted:
+ localAddr, err := ep.ep.GetLocalAddress()
+ if err != nil {
+ panic(err)
}
- return socket.BaseSocketConnectResultWithErr(tcpipErrorToCode(err)), nil
+ _ = syslog.DebugTf("connect", "%p: started, local=%+v, addr=%+v", ep, localAddr, addr)
+ case *tcpip.ErrConnectionAborted:
+ // For TCP endpoints, gVisor Connect() returns this error when the
+ // endpoint is in an error state and the specific error has already been
+ // consumed.
+ //
+ // If the endpoint is in an error state, that means that loop{Read,Write}
+ // must be shutting down, and the only way to consume the error correctly
+ // is to get it from them.
+ terminal := <-ep.terminal
+ if terminal != nil {
+ err = terminal
+ }
}
+ return socket.BaseSocketConnectResultWithErr(tcpipErrorToCode(err)), nil
}
{
@@ -442,7 +412,7 @@
ep.mu.Unlock()
} else {
var err tcpip.Error
- val, err = GetSockOpt(ep.ep, ep.ns, &ep.hardError, ep.netProto, ep.transProto, level, optName)
+ val, err = GetSockOpt(ep.ep, ep.ns, ep.terminal, ep.netProto, ep.transProto, level, optName)
if level == C.SOL_SOCKET && optName == C.SO_ERROR {
ep.pending.mustUpdate()
}
@@ -492,9 +462,14 @@
}
func (ep *endpoint) GetError(fidl.Context) (socket.BaseSocketGetErrorResult, error) {
- ep.hardError.mu.Lock()
- err := ep.hardError.storeAndRetrieveLocked(ep.ep.LastError())
- ep.hardError.mu.Unlock()
+ err := func() tcpip.Error {
+ select {
+ case err := <-ep.terminal:
+ return err
+ default:
+ return ep.ep.LastError()
+ }
+ }()
ep.pending.mustUpdate()
if err != nil {
return socket.BaseSocketGetErrorResultWithErr(tcpipErrorToCode(err)), nil
@@ -933,7 +908,8 @@
// loop{Read,Write,Poll}Done are signaled iff loop{Read,Write,Poll} have
// exited, respectively.
- loopReadDone, loopWriteDone, loopPollDone <-chan struct{}
+ loopReadDone, loopWriteDone <-chan tcpip.Error
+ loopPollDone <-chan struct{}
}
// closing is signaled iff close has been called.
@@ -975,6 +951,7 @@
readiness: ep.Readiness,
signalPeer: localS.Handle().SignalPeer,
},
+ terminal: make(chan tcpip.Error, 1),
},
local: localS,
peer: peerS,
@@ -1113,29 +1090,48 @@
// race in which the loops are allowed to start without guaranteeing that
// this routine will wait for them to return.
eps.mu.Lock()
+ errChannels := map[string]<-chan tcpip.Error{
+ "loopRead": eps.mu.loopReadDone,
+ "loopWrite": eps.mu.loopWriteDone,
+ }
channels := []<-chan struct{}{
- eps.mu.loopReadDone,
- eps.mu.loopWriteDone,
eps.mu.loopPollDone,
}
eps.mu.Unlock()
// The interruptions above cause our loops to exit. Wait until
// they do before releasing resources they may be using.
+ var terminalError tcpip.Error
+ for name, ch := range errChannels {
+ if ch != nil {
+ for err := range ch {
+ _ = syslog.DebugTf("close", "%p: %s=%#v", eps, name, err)
+ switch err.(type) {
+ case nil:
+ case *tcpip.ErrClosedForReceive:
+ case *tcpip.ErrClosedForSend:
+ default:
+ if terminalError != nil {
+ panic(fmt.Sprintf("terminalError=%#v err=%#v", terminalError, err))
+ }
+ terminalError = err
+ }
+ }
+ }
+ }
for _, ch := range channels {
if ch != nil {
- <-ch
+ for range ch {
+ }
}
}
- // The gVisor endpoint could have got a hard error after the
- // read/write loops have ended, check for that here.
- eps.endpoint.hardError.mu.Lock()
- err := eps.endpoint.hardError.storeAndRetrieveLocked(eps.ep.LastError())
- eps.endpoint.hardError.mu.Unlock()
- // Signal the client about hard errors that require special errno
+ eps.endpoint.terminal <- terminalError
+ close(eps.endpoint.terminal)
+
+ // Signal the client about errors that require special errno
// handling by the client for read/write calls.
- switch err.(type) {
+ switch terminalError.(type) {
case *tcpip.ErrConnectionRefused:
if err := eps.local.Handle().SignalPeer(0, zxsocket.SignalConnectionRefused); err != nil {
panic(fmt.Sprintf("Handle().SignalPeer(0, zxsocket.SignalConnectionRefused) = %s", err))
@@ -1232,13 +1228,13 @@
panic(err)
}
for _, m := range []struct {
- done *<-chan struct{}
- fn func(chan<- struct{})
+ done *<-chan tcpip.Error
+ fn func(chan<- tcpip.Error)
}{
{&eps.mu.loopReadDone, eps.loopRead},
{&eps.mu.loopWriteDone, eps.loopWrite},
} {
- ch := make(chan struct{})
+ ch := make(chan tcpip.Error, 1)
*m.done = ch
go m.fn(ch)
}
@@ -1349,13 +1345,7 @@
// Check if the endpoint has already encountered an error since
// our installed callback will not fire in this case.
- //
- // Acquire hard error lock across ep calls to avoid races and store the
- // hard error deterministically.
- eps.endpoint.hardError.mu.Lock()
- hardError := eps.endpoint.hardError.storeAndRetrieveLocked(eps.ep.LastError())
- eps.endpoint.hardError.mu.Unlock()
- if hardError != nil {
+ if ep.Readiness(waiter.EventErr)&waiter.EventErr != 0 {
eps.HUp()
}
@@ -1364,240 +1354,228 @@
}
// loopWrite shuttles signals and data from the zircon socket to the tcpip.Endpoint.
-func (eps *endpointWithSocket) loopWrite(ch chan<- struct{}) {
+func (eps *endpointWithSocket) loopWrite(ch chan<- tcpip.Error) {
defer close(ch)
const sigs = zx.SignalSocketReadable | zx.SignalSocketPeerWriteDisabled | localSignalClosing
- waitEntry, notifyCh := waiter.NewChannelEntry(nil)
- eps.wq.EventRegister(&waitEntry, waiter.EventOut)
- defer eps.wq.EventUnregister(&waitEntry)
+ ch <- func() tcpip.Error {
+ waitEntry, notifyCh := waiter.NewChannelEntry(nil)
+ eps.wq.EventRegister(&waitEntry, waiter.EventOut)
+ defer eps.wq.EventUnregister(&waitEntry)
- reader := socketReader{
- socket: eps.local,
- }
- for {
- reader.lastError = nil
- reader.lastRead = 0
-
- // Acquire hard error lock across ep calls to avoid races and store the
- // hard error deterministically.
- eps.hardError.mu.Lock()
- n, err := eps.ep.Write(&reader, tcpip.WriteOptions{
- // We must write atomically in order to guarantee all the data fetched
- // from the zircon socket is consumed by the endpoint.
- Atomic: true,
- })
- hardError := eps.hardError.storeAndRetrieveLocked(err)
- eps.hardError.mu.Unlock()
- if n != int64(reader.lastRead) {
- panic(fmt.Sprintf("partial write into endpoint (%s); got %d, want %d", err, n, reader.lastRead))
+ reader := socketReader{
+ socket: eps.local,
}
- // TODO(https://fxbug.dev/35006): Handle all transport write errors.
- switch err.(type) {
- case nil, *tcpip.ErrBadBuffer:
- switch err := reader.lastError.(type) {
- case nil:
- continue
- case *zx.Error:
- switch err.Status {
- case zx.ErrShouldWait:
- obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
- if err != nil {
+ for {
+ reader.lastError = nil
+ reader.lastRead = 0
+
+ n, err := eps.ep.Write(&reader, tcpip.WriteOptions{
+ // We must write atomically in order to guarantee all the data fetched
+ // from the zircon socket is consumed by the endpoint.
+ Atomic: true,
+ })
+ if n != int64(reader.lastRead) {
+ panic(fmt.Sprintf("partial write into endpoint (%s); got %d, want %d", err, n, reader.lastRead))
+ }
+ // TODO(https://fxbug.dev/35006): Handle all transport write errors.
+ switch err.(type) {
+ case nil, *tcpip.ErrBadBuffer:
+ switch err := reader.lastError.(type) {
+ case nil:
+ continue
+ case *zx.Error:
+ switch err.Status {
+ case zx.ErrShouldWait:
+ obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
+ if err != nil {
+ panic(err)
+ }
+ switch {
+ case obs&zx.SignalSocketReadable != 0:
+ // The client might have written some data into the socket.
+ // Always continue to the loop below and try to read even if the
+ // signals show the client has closed the socket.
+ continue
+ case obs&localSignalClosing != 0:
+ // We're shutting down.
+ return nil
+ case obs&zx.SignalSocketPeerWriteDisabled != 0:
+ // Fallthrough.
+ default:
+ panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
+ }
+ fallthrough
+ case zx.ErrBadState:
+ // Reading has been disabled for this socket endpoint.
+ switch err := eps.ep.Shutdown(tcpip.ShutdownWrite); err.(type) {
+ case nil, *tcpip.ErrNotConnected:
+ // Shutdown can return ErrNotConnected if the endpoint was
+ // connected but no longer is.
+ default:
+ panic(err)
+ }
+ return nil
+ }
+ }
+ panic(err)
+ case *tcpip.ErrNotConnected:
+ // Write never returns ErrNotConnected except for endpoints that were
+ // never connected. Such endpoints should never reach this loop.
+ panic(fmt.Sprintf("connected endpoint returned %s", err))
+ case *tcpip.ErrWouldBlock:
+ // NB: we can't select on closing here because the client may have
+ // written some data into the buffer and then immediately closed the
+ // socket.
+ //
+ // We must wait until the linger timeout.
+ select {
+ case <-eps.linger:
+ return nil
+ case <-notifyCh:
+ continue
+ }
+ case *tcpip.ErrConnectionRefused:
+ // TODO(https://fxbug.dev/61594): Allow the socket to be reused for
+ // another connection attempt to match Linux.
+ return err
+ case *tcpip.ErrClosedForSend:
+ // Shut the endpoint down *only* if it is not already in an error
+ // state; an endpoint in an error state will soon be fully closed down,
+ // and shutting it down here would cause signals to be asserted twice,
+ // which can produce races in the client.
+ if eps.ep.Readiness(waiter.EventErr)&waiter.EventErr == 0 {
+ if err := eps.local.Shutdown(zx.SocketShutdownRead); err != nil {
panic(err)
}
- switch {
- case obs&zx.SignalSocketReadable != 0:
- // The client might have written some data into the socket. Always
- // continue to the loop below and try to read even if the signals
- // show the client has closed the socket.
- continue
- case obs&localSignalClosing != 0:
- // We're shutting down.
- return
- case obs&zx.SignalSocketPeerWriteDisabled != 0:
- // Fallthrough.
- default:
- panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
- }
- fallthrough
- case zx.ErrBadState:
- // Reading has been disabled for this socket endpoint.
- switch err := eps.ep.Shutdown(tcpip.ShutdownWrite); err.(type) {
- case nil, *tcpip.ErrNotConnected:
- // Shutdown can return ErrNotConnected if the endpoint was
- // connected but no longer is.
- default:
- panic(err)
- }
- return
+ _ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_READ", eps)
}
+ return err
+ case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
+ return err
+ case *tcpip.ErrTimeout:
+ // The maximum duration of missing ACKs was reached, or the maximum
+ // number of unacknowledged keepalives was reached.
+ return err
+ default:
+ _ = syslog.Errorf("TCP Endpoint.Write(): %s", err)
}
- panic(err)
- case *tcpip.ErrNotConnected:
- // Write never returns ErrNotConnected except for endpoints that were
- // never connected. Such endpoints should never reach this loop.
- panic(fmt.Sprintf("connected endpoint returned %s", err))
- case *tcpip.ErrWouldBlock:
- // NB: we can't select on closing here because the client may have
- // written some data into the buffer and then immediately closed the
- // socket.
- //
- // We must wait until the linger timeout.
- select {
- case <-eps.linger:
- return
- case <-notifyCh:
- continue
- }
- case *tcpip.ErrConnectionRefused:
- // Connection refused is a "hard error" that may be observed on either the
- // read or write loops.
- // TODO(https://fxbug.dev/61594): Allow the socket to be reused for
- // another connection attempt to match Linux.
- return
- case *tcpip.ErrClosedForSend:
- // Closed for send can be issued when the endpoint is in an error state,
- // which is encoded by the presence of a hard error having been
- // observed.
- // To avoid racing signals with the closing caused by a hard error,
- // we won't signal here if a hard error is already observed.
- if hardError == nil {
- if err := eps.local.Shutdown(zx.SocketShutdownRead); err != nil {
- panic(err)
- }
- _ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_READ", eps)
- }
- return
- case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
- return
- case *tcpip.ErrTimeout:
- // The maximum duration of missing ACKs was reached, or the maximum
- // number of unacknowledged keepalives was reached.
- return
- default:
- _ = syslog.Errorf("TCP Endpoint.Write(): %s", err)
}
- }
+ }()
}
// loopRead shuttles signals and data from the tcpip.Endpoint to the zircon socket.
-func (eps *endpointWithSocket) loopRead(ch chan<- struct{}) {
+func (eps *endpointWithSocket) loopRead(ch chan<- tcpip.Error) {
defer close(ch)
- inEntry, inCh := waiter.NewChannelEntry(nil)
- eps.wq.EventRegister(&inEntry, waiter.EventIn)
- defer eps.wq.EventUnregister(&inEntry)
-
const sigs = zx.SignalSocketWritable | zx.SignalSocketWriteDisabled | localSignalClosing
- writer := socketWriter{
- socket: eps.local,
- }
- for {
- // Acquire hard error lock across ep calls to avoid races and store the
- // hard error deterministically.
- eps.hardError.mu.Lock()
- res, err := eps.ep.Read(&writer, tcpip.ReadOptions{})
- hardError := eps.hardError.storeAndRetrieveLocked(err)
- eps.hardError.mu.Unlock()
- // TODO(https://fxbug.dev/35006): Handle all transport read errors.
- switch err.(type) {
- case *tcpip.ErrNotConnected:
- // Read never returns ErrNotConnected except for endpoints that were
- // never connected. Such endpoints should never reach this loop.
- panic(fmt.Sprintf("connected endpoint returned %s", err))
- case *tcpip.ErrTimeout:
- // At the time of writing, this error indicates that a TCP connection
- // has failed. This can occur during the TCP handshake if the peer
- // fails to respond to a SYN within 60 seconds, or if the retransmit
- // logic gives up after 60 seconds of missing ACKs from the peer, or if
- // the maximum number of unacknowledged keepalives is reached.
- //
- // The connection was alive but now is dead - this is equivalent to
- // having received a TCP RST.
- return
- case *tcpip.ErrConnectionRefused:
- // Connection refused is a "hard error" that may be observed on either the
- // read or write loops.
- // TODO(https://fxbug.dev/61594): Allow the socket to be reused for
- // another connection attempt to match Linux.
- return
- case *tcpip.ErrWouldBlock:
- select {
- case <-inCh:
- continue
- case <-eps.closing:
- // We're shutting down.
- return
- }
- case *tcpip.ErrClosedForReceive:
- // Closed for receive can be issued when the endpoint is in an error
- // state, which is encoded by the presence of a hard error having been
- // observed.
- // To avoid racing signals with the closing caused by a hard error,
- // we won't signal here if a hard error is already observed.
- if hardError == nil {
- if err := eps.local.Shutdown(zx.SocketShutdownWrite); err != nil {
- panic(err)
- }
- _ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_WRITE", eps)
- }
- return
- case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
- return
- case nil, *tcpip.ErrBadBuffer:
- if err == nil {
- eps.ep.ModerateRecvBuf(res.Count)
- }
- // `tcpip.Endpoint.Read` returns a nil error if _anything_ was written -
- // even if the writer returned an error - we always want to handle those
- // errors.
- switch err := writer.lastError.(type) {
- case nil:
- continue
- case *zx.Error:
- switch err.Status {
- case zx.ErrShouldWait:
- obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
- if err != nil {
- panic(err)
- }
- switch {
- case obs&zx.SignalSocketWritable != 0:
- continue
- case obs&localSignalClosing != 0:
- // We're shutting down.
- return
- case obs&zx.SignalSocketWriteDisabled != 0:
- // Fallthrough.
- default:
- panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
- }
- fallthrough
- case zx.ErrBadState:
- // Writing has been disabled for this socket endpoint.
- switch err := eps.ep.Shutdown(tcpip.ShutdownRead); err.(type) {
- case nil:
- case *tcpip.ErrNotConnected:
- // An ErrNotConnected while connected is expected if there
- // is pending data to be read and the connection has been
- // reset by the other end of the endpoint. The endpoint will
- // allow the pending data to be read without error but will
- // return ErrNotConnected if Shutdown is called. Otherwise
- // this is unexpected, panic.
- _ = syslog.InfoTf("loopRead", "%p: client shutdown a closed endpoint; ep info: %#v", eps, eps.endpoint.ep.Info())
- default:
- panic(err)
- }
- return
- }
- }
- panic(err)
- default:
- _ = syslog.Errorf("Endpoint.Read(): %s", err)
+ ch <- func() tcpip.Error {
+ inEntry, inCh := waiter.NewChannelEntry(nil)
+ eps.wq.EventRegister(&inEntry, waiter.EventIn)
+ defer eps.wq.EventUnregister(&inEntry)
+
+ writer := socketWriter{
+ socket: eps.local,
}
- }
+ for {
+ res, err := eps.ep.Read(&writer, tcpip.ReadOptions{})
+ // TODO(https://fxbug.dev/35006): Handle all transport read errors.
+ switch err.(type) {
+ case *tcpip.ErrNotConnected:
+ // Read never returns ErrNotConnected except for endpoints that were
+ // never connected. Such endpoints should never reach this loop.
+ panic(fmt.Sprintf("connected endpoint returned %s", err))
+ case *tcpip.ErrTimeout:
+ // At the time of writing, this error indicates that a TCP connection
+ // has failed. This can occur during the TCP handshake if the peer
+ // fails to respond to a SYN within 60 seconds, or if the retransmit
+ // logic gives up after 60 seconds of missing ACKs from the peer, or if
+ // the maximum number of unacknowledged keepalives is reached.
+ //
+ // The connection was alive but now is dead - this is equivalent to
+ // having received a TCP RST.
+ return err
+ case *tcpip.ErrConnectionRefused:
+ // TODO(https://fxbug.dev/61594): Allow the socket to be reused for
+ // another connection attempt to match Linux.
+ return err
+ case *tcpip.ErrWouldBlock:
+ select {
+ case <-inCh:
+ continue
+ case <-eps.closing:
+ // We're shutting down.
+ return nil
+ }
+ case *tcpip.ErrClosedForReceive:
+ // Shut the endpoint down *only* if it is not already in an error
+ // state; an endpoint in an error state will soon be fully closed down,
+ // and shutting it down here would cause signals to be asserted twice,
+ // which can produce races in the client.
+ if eps.ep.Readiness(waiter.EventErr)&waiter.EventErr == 0 {
+ if err := eps.local.Shutdown(zx.SocketShutdownWrite); err != nil {
+ panic(err)
+ }
+ _ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_WRITE", eps)
+ }
+ return err
+ case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
+ return err
+ case nil, *tcpip.ErrBadBuffer:
+ if err == nil {
+ eps.ep.ModerateRecvBuf(res.Count)
+ }
+ // `tcpip.Endpoint.Read` returns a nil error if _anything_ was written
+ // - even if the writer returned an error - we always want to handle
+ // those errors.
+ switch err := writer.lastError.(type) {
+ case nil:
+ continue
+ case *zx.Error:
+ switch err.Status {
+ case zx.ErrShouldWait:
+ obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
+ if err != nil {
+ panic(err)
+ }
+ switch {
+ case obs&zx.SignalSocketWritable != 0:
+ continue
+ case obs&localSignalClosing != 0:
+ // We're shutting down.
+ return nil
+ case obs&zx.SignalSocketWriteDisabled != 0:
+ // Fallthrough.
+ default:
+ panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
+ }
+ fallthrough
+ case zx.ErrBadState:
+ // Writing has been disabled for this socket endpoint.
+ switch err := eps.ep.Shutdown(tcpip.ShutdownRead); err.(type) {
+ case nil:
+ case *tcpip.ErrNotConnected:
+ // An ErrNotConnected while connected is expected if there
+ // is pending data to be read and the connection has been
+ // reset by the other end of the endpoint. The endpoint will
+ // allow the pending data to be read without error but will
+ // return ErrNotConnected if Shutdown is called. Otherwise
+ // this is unexpected, panic.
+ _ = syslog.InfoTf("loopRead", "%p: client shutdown a closed endpoint; ep info: %#v", eps, eps.endpoint.ep.Info())
+ default:
+ panic(err)
+ }
+ return nil
+ }
+ }
+ panic(err)
+ default:
+ _ = syslog.Errorf("Endpoint.Read(): %s", err)
+ }
+ }
+ }()
}
type datagramSocketImpl struct {
diff --git a/src/connectivity/network/netstack/netstack_test.go b/src/connectivity/network/netstack/netstack_test.go
index 690732f..a3dc933 100644
--- a/src/connectivity/network/netstack/netstack_test.go
+++ b/src/connectivity/network/netstack/netstack_test.go
@@ -272,18 +272,30 @@
defer eps.close()
eps.mu.Lock()
+ errChannels := []struct {
+ ch <-chan tcpip.Error
+ name string
+ }{
+ {ch: eps.mu.loopReadDone, name: "loopReadDone"},
+ {ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
+ }
channels := []struct {
ch <-chan struct{}
name string
}{
{ch: eps.closing, name: "closing"},
- {ch: eps.mu.loopReadDone, name: "loopReadDone"},
- {ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
{ch: eps.mu.loopPollDone, name: "loopPollDone"},
}
eps.mu.Unlock()
// Check starting conditions.
+ for _, ch := range errChannels {
+ select {
+ case err := <-ch.ch:
+ t.Errorf("%s cleaned up prematurely: %#v", ch.name, err)
+ default:
+ }
+ }
for _, ch := range channels {
select {
case <-ch.ch:
@@ -354,6 +366,13 @@
}
// There's still a referent.
+ for _, ch := range errChannels {
+ select {
+ case err := <-ch.ch:
+ t.Errorf("%s cleaned up prematurely: %#v", ch.name, err)
+ default:
+ }
+ }
for _, ch := range channels {
select {
case <-ch.ch:
@@ -383,6 +402,18 @@
// Give a generous timeout for the closed channel to be detected.
timeout := make(chan struct{})
time.AfterFunc(5*time.Second, func() { close(timeout) })
+ for _, ch := range errChannels {
+ if ch.ch != nil {
+ select {
+ case err := <-ch.ch:
+ if err != nil {
+ t.Error(err)
+ }
+ case <-timeout:
+ t.Errorf("%s not cleaned up", ch.name)
+ }
+ }
+ }
for _, ch := range channels {
if ch.ch != nil {
select {
@@ -476,13 +507,18 @@
}
eps.mu.Lock()
+ errChannels := []struct {
+ ch <-chan tcpip.Error
+ name string
+ }{
+ {ch: eps.mu.loopReadDone, name: "loopReadDone"},
+ {ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
+ }
channels := []struct {
ch <-chan struct{}
name string
}{
{ch: eps.closing, name: "closing"},
- {ch: eps.mu.loopReadDone, name: "loopReadDone"},
- {ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
{ch: eps.mu.loopPollDone, name: "loopPollDone"},
}
eps.mu.Unlock()
@@ -490,6 +526,23 @@
// Give a generous timeout for the closed channel to be detected.
timeout := make(chan struct{})
time.AfterFunc(5*time.Second, func() { close(timeout) })
+ for _, ch := range errChannels {
+ if ch.ch != nil {
+ select {
+ case err := <-ch.ch:
+ switch err.(type) {
+ case nil:
+ case *tcpip.ErrClosedForReceive:
+ case *tcpip.ErrClosedForSend:
+ case *tcpip.ErrConnectionReset:
+ default:
+ t.Error(err)
+ }
+ case <-timeout:
+ t.Errorf("%s not cleaned up", ch.name)
+ }
+ }
+ }
for _, ch := range channels {
if ch.ch != nil {
select {
diff --git a/src/connectivity/network/netstack/socket_conv.go b/src/connectivity/network/netstack/socket_conv.go
index 1870299..ce97e9c 100644
--- a/src/connectivity/network/netstack/socket_conv.go
+++ b/src/connectivity/network/netstack/socket_conv.go
@@ -78,10 +78,10 @@
return 0
}
-func GetSockOpt(ep tcpip.Endpoint, ns *Netstack, he *hardError, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, level, name int16) (interface{}, tcpip.Error) {
+func GetSockOpt(ep tcpip.Endpoint, ns *Netstack, terminal <-chan tcpip.Error, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, level, name int16) (interface{}, tcpip.Error) {
switch level {
case C.SOL_SOCKET:
- return getSockOptSocket(ep, ns, he, netProto, transProto, name)
+ return getSockOptSocket(ep, ns, terminal, netProto, transProto, name)
case C.SOL_TCP:
return getSockOptTCP(ep, name)
@@ -105,7 +105,7 @@
return nil, &tcpip.ErrUnknownProtocol{}
}
-func getSockOptSocket(ep tcpip.Endpoint, ns *Netstack, he *hardError, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, name int16) (interface{}, tcpip.Error) {
+func getSockOptSocket(ep tcpip.Endpoint, ns *Netstack, terminal <-chan tcpip.Error, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, name int16) (interface{}, tcpip.Error) {
switch name {
case C.SO_TYPE:
switch transProto {
@@ -142,9 +142,14 @@
}
case C.SO_ERROR:
- he.mu.Lock()
- err := he.storeAndRetrieveLocked(ep.LastError())
- he.mu.Unlock()
+ err := func() tcpip.Error {
+ select {
+ case err := <-terminal:
+ return err
+ default:
+ return ep.LastError()
+ }
+ }()
if err == nil {
return int32(0), nil
}
diff --git a/src/connectivity/network/tests/bsdsocket_test.cc b/src/connectivity/network/tests/bsdsocket_test.cc
index 321a739..d4f8a60 100644
--- a/src/connectivity/network/tests/bsdsocket_test.cc
+++ b/src/connectivity/network/tests/bsdsocket_test.cc
@@ -1529,7 +1529,7 @@
<< strerror(errno);
#else
-1);
- ASSERT_EQ(errno, ECONNREFUSED) << strerror(errno);
+ ASSERT_EQ(errno, ECONNABORTED) << strerror(errno);
#endif
ASSERT_EQ(close(listener.release()), 0) << strerror(errno);
@@ -2408,6 +2408,13 @@
socklen_t addrlen = AddrLen();
ASSERT_EQ(connect(sock().get(), reinterpret_cast<const struct sockaddr*>(&any), addrlen), -1);
ASSERT_EQ(errno, ECONNREFUSED) << strerror(errno);
+
+ // The error should have been consumed.
+ int err;
+ socklen_t optlen = sizeof(err);
+ ASSERT_EQ(getsockopt(sock().get(), SOL_SOCKET, SO_ERROR, &err, &optlen), 0) << strerror(errno);
+ ASSERT_EQ(optlen, sizeof(err));
+ ASSERT_EQ(err, 0) << strerror(err);
}
TEST_P(AnyAddrDatagramSocketTest, Connect) {
@@ -3221,9 +3228,10 @@
EXPECT_GE(n, 0) << strerror(errno);
EXPECT_EQ(n, 1);
EXPECT_EQ(pfd.revents, POLLIN | POLLHUP | POLLERR);
+
char c;
EXPECT_EQ(ioMethod.executeIO(fd, &c, sizeof(c)), -1);
- EXPECT_EQ(errno, expected_errno);
+ EXPECT_EQ(errno, expected_errno) << strerror(errno) << " vs " << strerror(expected_errno);
bool isWrite = ioMethod.isWrite();
#if !defined(__Fuchsia__)