[netstack] Remove hardError

The purpose of this structure is to leak errors out of loop{Read,Write};
it has hardly anything to do with being "hard". It is also incorrect;
these errors are expected to be consumed after being returned to the
user, which was not being done before this change.

Change-Id: I73dfd6fc1c694387d5b28b9d6f2b6d9936470173
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/525351
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Fuchsia-Auto-Submit: Tamir Duberstein <tamird@google.com>
Reviewed-by: Arthur Sfez <asfez@google.com>
Reviewed-by: Bruno Dal Bo <brunodalbo@google.com>
Reviewed-by: Mithun Iyer <iyerm@google.com>
diff --git a/src/connectivity/network/netstack/fuchsia_posix_socket.go b/src/connectivity/network/netstack/fuchsia_posix_socket.go
index bfc1fe8..5cd5000 100644
--- a/src/connectivity/network/netstack/fuchsia_posix_socket.go
+++ b/src/connectivity/network/netstack/fuchsia_posix_socket.go
@@ -213,13 +213,6 @@
 	panic(err)
 }
 
-type hardError struct {
-	mu struct {
-		sync.Mutex
-		err tcpip.Error
-	}
-}
-
 // endpoint is the base structure that models all network sockets.
 type endpoint struct {
 	// TODO(https://fxbug.dev/37419): Remove TransitionalBase after methods landed.
@@ -243,9 +236,7 @@
 
 	pending signaler
 
-	// gVisor stack clears the hard error on the endpoint on a read, so,
-	// save the error when returned by gVisor endpoint calls.
-	hardError hardError
+	terminal chan tcpip.Error
 }
 
 func (ep *endpoint) incRef() {
@@ -266,24 +257,6 @@
 	return doClose
 }
 
-// storeAndRetrieveLocked evaluates if the input error is a "hard
-// error" (one which puts the endpoint in an unrecoverable error state) and
-// stores it. Returns the pre-existing hard error if it was already set or the
-// new value if changed.
-//
-// Must be called with he.mu held.
-func (he *hardError) storeAndRetrieveLocked(err tcpip.Error) tcpip.Error {
-	if he.mu.err == nil {
-		switch err.(type) {
-		case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset,
-			*tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute, *tcpip.ErrTimeout,
-			*tcpip.ErrConnectionRefused:
-			he.mu.err = err
-		}
-	}
-	return he.mu.err
-}
-
 func (ep *endpoint) Sync(fidl.Context) (int32, error) {
 	_ = syslog.DebugTf("Sync", "%p", ep)
 
@@ -334,31 +307,28 @@
 		}
 	}
 
-	{
-		// Acquire hard error lock across ep calls to avoid races and store the
-		// hard error deterministically.
-		ep.hardError.mu.Lock()
-		err := ep.ep.Connect(addr)
-		hardError := ep.hardError.storeAndRetrieveLocked(err)
-		ep.hardError.mu.Unlock()
-		if err != nil {
-			switch err.(type) {
-			case *tcpip.ErrConnectStarted:
-				localAddr, err := ep.ep.GetLocalAddress()
-				if err != nil {
-					panic(err)
-				}
-				_ = syslog.DebugTf("connect", "%p: started, local=%+v, addr=%+v", ep, localAddr, addr)
-			// For TCP endpoints, gVisor Connect() returns this error when the endpoint
-			// is in an error state and the hard error state has already been read from the
-			// endpoint via other APIs. Apply the saved hard error state here.
-			case *tcpip.ErrConnectionAborted:
-				if hardError != nil {
-					err = hardError
-				}
+	if err := ep.ep.Connect(addr); err != nil {
+		switch err.(type) {
+		case *tcpip.ErrConnectStarted:
+			localAddr, err := ep.ep.GetLocalAddress()
+			if err != nil {
+				panic(err)
 			}
-			return socket.BaseSocketConnectResultWithErr(tcpipErrorToCode(err)), nil
+			_ = syslog.DebugTf("connect", "%p: started, local=%+v, addr=%+v", ep, localAddr, addr)
+		case *tcpip.ErrConnectionAborted:
+			// For TCP endpoints, gVisor Connect() returns this error when the
+			// endpoint is in an error state and the specific error has already been
+			// consumed.
+			//
+			// If the endpoint is in an error state, that means that loop{Read,Write}
+			// must be shutting down, and the only way to consume the error correctly
+			// is to get it from them.
+			terminal := <-ep.terminal
+			if terminal != nil {
+				err = terminal
+			}
 		}
+		return socket.BaseSocketConnectResultWithErr(tcpipErrorToCode(err)), nil
 	}
 
 	{
@@ -442,7 +412,7 @@
 		ep.mu.Unlock()
 	} else {
 		var err tcpip.Error
-		val, err = GetSockOpt(ep.ep, ep.ns, &ep.hardError, ep.netProto, ep.transProto, level, optName)
+		val, err = GetSockOpt(ep.ep, ep.ns, ep.terminal, ep.netProto, ep.transProto, level, optName)
 		if level == C.SOL_SOCKET && optName == C.SO_ERROR {
 			ep.pending.mustUpdate()
 		}
@@ -492,9 +462,14 @@
 }
 
 func (ep *endpoint) GetError(fidl.Context) (socket.BaseSocketGetErrorResult, error) {
-	ep.hardError.mu.Lock()
-	err := ep.hardError.storeAndRetrieveLocked(ep.ep.LastError())
-	ep.hardError.mu.Unlock()
+	err := func() tcpip.Error {
+		select {
+		case err := <-ep.terminal:
+			return err
+		default:
+			return ep.ep.LastError()
+		}
+	}()
 	ep.pending.mustUpdate()
 	if err != nil {
 		return socket.BaseSocketGetErrorResultWithErr(tcpipErrorToCode(err)), nil
@@ -933,7 +908,8 @@
 
 		// loop{Read,Write,Poll}Done are signaled iff loop{Read,Write,Poll} have
 		// exited, respectively.
-		loopReadDone, loopWriteDone, loopPollDone <-chan struct{}
+		loopReadDone, loopWriteDone <-chan tcpip.Error
+		loopPollDone                <-chan struct{}
 	}
 
 	// closing is signaled iff close has been called.
@@ -975,6 +951,7 @@
 				readiness:  ep.Readiness,
 				signalPeer: localS.Handle().SignalPeer,
 			},
+			terminal: make(chan tcpip.Error, 1),
 		},
 		local:   localS,
 		peer:    peerS,
@@ -1113,29 +1090,48 @@
 		// race in which the loops are allowed to start without guaranteeing that
 		// this routine will wait for them to return.
 		eps.mu.Lock()
+		errChannels := map[string]<-chan tcpip.Error{
+			"loopRead":  eps.mu.loopReadDone,
+			"loopWrite": eps.mu.loopWriteDone,
+		}
 		channels := []<-chan struct{}{
-			eps.mu.loopReadDone,
-			eps.mu.loopWriteDone,
 			eps.mu.loopPollDone,
 		}
 		eps.mu.Unlock()
 
 		// The interruptions above cause our loops to exit. Wait until
 		// they do before releasing resources they may be using.
+		var terminalError tcpip.Error
+		for name, ch := range errChannels {
+			if ch != nil {
+				for err := range ch {
+					_ = syslog.DebugTf("close", "%p: %s=%#v", eps, name, err)
+					switch err.(type) {
+					case nil:
+					case *tcpip.ErrClosedForReceive:
+					case *tcpip.ErrClosedForSend:
+					default:
+						if terminalError != nil {
+							panic(fmt.Sprintf("terminalError=%#v err=%#v", terminalError, err))
+						}
+						terminalError = err
+					}
+				}
+			}
+		}
 		for _, ch := range channels {
 			if ch != nil {
-				<-ch
+				for range ch {
+				}
 			}
 		}
 
-		// The gVisor endpoint could have got a hard error after the
-		// read/write loops have ended, check for that here.
-		eps.endpoint.hardError.mu.Lock()
-		err := eps.endpoint.hardError.storeAndRetrieveLocked(eps.ep.LastError())
-		eps.endpoint.hardError.mu.Unlock()
-		// Signal the client about hard errors that require special errno
+		eps.endpoint.terminal <- terminalError
+		close(eps.endpoint.terminal)
+
+		// Signal the client about errors that require special errno
 		// handling by the client for read/write calls.
-		switch err.(type) {
+		switch terminalError.(type) {
 		case *tcpip.ErrConnectionRefused:
 			if err := eps.local.Handle().SignalPeer(0, zxsocket.SignalConnectionRefused); err != nil {
 				panic(fmt.Sprintf("Handle().SignalPeer(0, zxsocket.SignalConnectionRefused) = %s", err))
@@ -1232,13 +1228,13 @@
 			panic(err)
 		}
 		for _, m := range []struct {
-			done *<-chan struct{}
-			fn   func(chan<- struct{})
+			done *<-chan tcpip.Error
+			fn   func(chan<- tcpip.Error)
 		}{
 			{&eps.mu.loopReadDone, eps.loopRead},
 			{&eps.mu.loopWriteDone, eps.loopWrite},
 		} {
-			ch := make(chan struct{})
+			ch := make(chan tcpip.Error, 1)
 			*m.done = ch
 			go m.fn(ch)
 		}
@@ -1349,13 +1345,7 @@
 
 		// Check if the endpoint has already encountered an error since
 		// our installed callback will not fire in this case.
-		//
-		// Acquire hard error lock across ep calls to avoid races and store the
-		// hard error deterministically.
-		eps.endpoint.hardError.mu.Lock()
-		hardError := eps.endpoint.hardError.storeAndRetrieveLocked(eps.ep.LastError())
-		eps.endpoint.hardError.mu.Unlock()
-		if hardError != nil {
+		if ep.Readiness(waiter.EventErr)&waiter.EventErr != 0 {
 			eps.HUp()
 		}
 
@@ -1364,240 +1354,228 @@
 }
 
 // loopWrite shuttles signals and data from the zircon socket to the tcpip.Endpoint.
-func (eps *endpointWithSocket) loopWrite(ch chan<- struct{}) {
+func (eps *endpointWithSocket) loopWrite(ch chan<- tcpip.Error) {
 	defer close(ch)
 
 	const sigs = zx.SignalSocketReadable | zx.SignalSocketPeerWriteDisabled | localSignalClosing
 
-	waitEntry, notifyCh := waiter.NewChannelEntry(nil)
-	eps.wq.EventRegister(&waitEntry, waiter.EventOut)
-	defer eps.wq.EventUnregister(&waitEntry)
+	ch <- func() tcpip.Error {
+		waitEntry, notifyCh := waiter.NewChannelEntry(nil)
+		eps.wq.EventRegister(&waitEntry, waiter.EventOut)
+		defer eps.wq.EventUnregister(&waitEntry)
 
-	reader := socketReader{
-		socket: eps.local,
-	}
-	for {
-		reader.lastError = nil
-		reader.lastRead = 0
-
-		// Acquire hard error lock across ep calls to avoid races and store the
-		// hard error deterministically.
-		eps.hardError.mu.Lock()
-		n, err := eps.ep.Write(&reader, tcpip.WriteOptions{
-			// We must write atomically in order to guarantee all the data fetched
-			// from the zircon socket is consumed by the endpoint.
-			Atomic: true,
-		})
-		hardError := eps.hardError.storeAndRetrieveLocked(err)
-		eps.hardError.mu.Unlock()
-		if n != int64(reader.lastRead) {
-			panic(fmt.Sprintf("partial write into endpoint (%s); got %d, want %d", err, n, reader.lastRead))
+		reader := socketReader{
+			socket: eps.local,
 		}
-		// TODO(https://fxbug.dev/35006): Handle all transport write errors.
-		switch err.(type) {
-		case nil, *tcpip.ErrBadBuffer:
-			switch err := reader.lastError.(type) {
-			case nil:
-				continue
-			case *zx.Error:
-				switch err.Status {
-				case zx.ErrShouldWait:
-					obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
-					if err != nil {
+		for {
+			reader.lastError = nil
+			reader.lastRead = 0
+
+			n, err := eps.ep.Write(&reader, tcpip.WriteOptions{
+				// We must write atomically in order to guarantee all the data fetched
+				// from the zircon socket is consumed by the endpoint.
+				Atomic: true,
+			})
+			if n != int64(reader.lastRead) {
+				panic(fmt.Sprintf("partial write into endpoint (%s); got %d, want %d", err, n, reader.lastRead))
+			}
+			// TODO(https://fxbug.dev/35006): Handle all transport write errors.
+			switch err.(type) {
+			case nil, *tcpip.ErrBadBuffer:
+				switch err := reader.lastError.(type) {
+				case nil:
+					continue
+				case *zx.Error:
+					switch err.Status {
+					case zx.ErrShouldWait:
+						obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
+						if err != nil {
+							panic(err)
+						}
+						switch {
+						case obs&zx.SignalSocketReadable != 0:
+							// The client might have written some data into the socket.
+							// Always continue to the loop below and try to read even if the
+							// signals show the client has closed the socket.
+							continue
+						case obs&localSignalClosing != 0:
+							// We're shutting down.
+							return nil
+						case obs&zx.SignalSocketPeerWriteDisabled != 0:
+							// Fallthrough.
+						default:
+							panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
+						}
+						fallthrough
+					case zx.ErrBadState:
+						// Reading has been disabled for this socket endpoint.
+						switch err := eps.ep.Shutdown(tcpip.ShutdownWrite); err.(type) {
+						case nil, *tcpip.ErrNotConnected:
+							// Shutdown can return ErrNotConnected if the endpoint was
+							// connected but no longer is.
+						default:
+							panic(err)
+						}
+						return nil
+					}
+				}
+				panic(err)
+			case *tcpip.ErrNotConnected:
+				// Write never returns ErrNotConnected except for endpoints that were
+				// never connected. Such endpoints should never reach this loop.
+				panic(fmt.Sprintf("connected endpoint returned %s", err))
+			case *tcpip.ErrWouldBlock:
+				// NB: we can't select on closing here because the client may have
+				// written some data into the buffer and then immediately closed the
+				// socket.
+				//
+				// We must wait until the linger timeout.
+				select {
+				case <-eps.linger:
+					return nil
+				case <-notifyCh:
+					continue
+				}
+			case *tcpip.ErrConnectionRefused:
+				// TODO(https://fxbug.dev/61594): Allow the socket to be reused for
+				// another connection attempt to match Linux.
+				return err
+			case *tcpip.ErrClosedForSend:
+				// Shut the endpoint down *only* if it is not already in an error
+				// state; an endpoint in an error state will soon be fully closed down,
+				// and shutting it down here would cause signals to be asserted twice,
+				// which can produce races in the client.
+				if eps.ep.Readiness(waiter.EventErr)&waiter.EventErr == 0 {
+					if err := eps.local.Shutdown(zx.SocketShutdownRead); err != nil {
 						panic(err)
 					}
-					switch {
-					case obs&zx.SignalSocketReadable != 0:
-						// The client might have written some data into the socket. Always
-						// continue to the loop below and try to read even if the signals
-						// show the client has closed the socket.
-						continue
-					case obs&localSignalClosing != 0:
-						// We're shutting down.
-						return
-					case obs&zx.SignalSocketPeerWriteDisabled != 0:
-						// Fallthrough.
-					default:
-						panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
-					}
-					fallthrough
-				case zx.ErrBadState:
-					// Reading has been disabled for this socket endpoint.
-					switch err := eps.ep.Shutdown(tcpip.ShutdownWrite); err.(type) {
-					case nil, *tcpip.ErrNotConnected:
-						// Shutdown can return ErrNotConnected if the endpoint was
-						// connected but no longer is.
-					default:
-						panic(err)
-					}
-					return
+					_ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_READ", eps)
 				}
+				return err
+			case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
+				return err
+			case *tcpip.ErrTimeout:
+				// The maximum duration of missing ACKs was reached, or the maximum
+				// number of unacknowledged keepalives was reached.
+				return err
+			default:
+				_ = syslog.Errorf("TCP Endpoint.Write(): %s", err)
 			}
-			panic(err)
-		case *tcpip.ErrNotConnected:
-			// Write never returns ErrNotConnected except for endpoints that were
-			// never connected. Such endpoints should never reach this loop.
-			panic(fmt.Sprintf("connected endpoint returned %s", err))
-		case *tcpip.ErrWouldBlock:
-			// NB: we can't select on closing here because the client may have
-			// written some data into the buffer and then immediately closed the
-			// socket.
-			//
-			// We must wait until the linger timeout.
-			select {
-			case <-eps.linger:
-				return
-			case <-notifyCh:
-				continue
-			}
-		case *tcpip.ErrConnectionRefused:
-			// Connection refused is a "hard error" that may be observed on either the
-			// read or write loops.
-			// TODO(https://fxbug.dev/61594): Allow the socket to be reused for
-			// another connection attempt to match Linux.
-			return
-		case *tcpip.ErrClosedForSend:
-			// Closed for send can be issued when the endpoint is in an error state,
-			// which is encoded by the presence of a hard error having been
-			// observed.
-			// To avoid racing signals with the closing caused by a hard error,
-			// we won't signal here if a hard error is already observed.
-			if hardError == nil {
-				if err := eps.local.Shutdown(zx.SocketShutdownRead); err != nil {
-					panic(err)
-				}
-				_ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_READ", eps)
-			}
-			return
-		case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
-			return
-		case *tcpip.ErrTimeout:
-			// The maximum duration of missing ACKs was reached, or the maximum
-			// number of unacknowledged keepalives was reached.
-			return
-		default:
-			_ = syslog.Errorf("TCP Endpoint.Write(): %s", err)
 		}
-	}
+	}()
 }
 
 // loopRead shuttles signals and data from the tcpip.Endpoint to the zircon socket.
-func (eps *endpointWithSocket) loopRead(ch chan<- struct{}) {
+func (eps *endpointWithSocket) loopRead(ch chan<- tcpip.Error) {
 	defer close(ch)
 
-	inEntry, inCh := waiter.NewChannelEntry(nil)
-	eps.wq.EventRegister(&inEntry, waiter.EventIn)
-	defer eps.wq.EventUnregister(&inEntry)
-
 	const sigs = zx.SignalSocketWritable | zx.SignalSocketWriteDisabled | localSignalClosing
 
-	writer := socketWriter{
-		socket: eps.local,
-	}
-	for {
-		// Acquire hard error lock across ep calls to avoid races and store the
-		// hard error deterministically.
-		eps.hardError.mu.Lock()
-		res, err := eps.ep.Read(&writer, tcpip.ReadOptions{})
-		hardError := eps.hardError.storeAndRetrieveLocked(err)
-		eps.hardError.mu.Unlock()
-		// TODO(https://fxbug.dev/35006): Handle all transport read errors.
-		switch err.(type) {
-		case *tcpip.ErrNotConnected:
-			// Read never returns ErrNotConnected except for endpoints that were
-			// never connected. Such endpoints should never reach this loop.
-			panic(fmt.Sprintf("connected endpoint returned %s", err))
-		case *tcpip.ErrTimeout:
-			// At the time of writing, this error indicates that a TCP connection
-			// has failed. This can occur during the TCP handshake if the peer
-			// fails to respond to a SYN within 60 seconds, or if the retransmit
-			// logic gives up after 60 seconds of missing ACKs from the peer, or if
-			// the maximum number of unacknowledged keepalives is reached.
-			//
-			// The connection was alive but now is dead - this is equivalent to
-			// having received a TCP RST.
-			return
-		case *tcpip.ErrConnectionRefused:
-			// Connection refused is a "hard error" that may be observed on either the
-			// read or write loops.
-			// TODO(https://fxbug.dev/61594): Allow the socket to be reused for
-			// another connection attempt to match Linux.
-			return
-		case *tcpip.ErrWouldBlock:
-			select {
-			case <-inCh:
-				continue
-			case <-eps.closing:
-				// We're shutting down.
-				return
-			}
-		case *tcpip.ErrClosedForReceive:
-			// Closed for receive can be issued when the endpoint is in an error
-			// state, which is encoded by the presence of a hard error having been
-			// observed.
-			// To avoid racing signals with the closing caused by a hard error,
-			// we won't signal here if a hard error is already observed.
-			if hardError == nil {
-				if err := eps.local.Shutdown(zx.SocketShutdownWrite); err != nil {
-					panic(err)
-				}
-				_ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_WRITE", eps)
-			}
-			return
-		case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
-			return
-		case nil, *tcpip.ErrBadBuffer:
-			if err == nil {
-				eps.ep.ModerateRecvBuf(res.Count)
-			}
-			// `tcpip.Endpoint.Read` returns a nil error if _anything_ was written -
-			// even if the writer returned an error - we always want to handle those
-			// errors.
-			switch err := writer.lastError.(type) {
-			case nil:
-				continue
-			case *zx.Error:
-				switch err.Status {
-				case zx.ErrShouldWait:
-					obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
-					if err != nil {
-						panic(err)
-					}
-					switch {
-					case obs&zx.SignalSocketWritable != 0:
-						continue
-					case obs&localSignalClosing != 0:
-						// We're shutting down.
-						return
-					case obs&zx.SignalSocketWriteDisabled != 0:
-						// Fallthrough.
-					default:
-						panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
-					}
-					fallthrough
-				case zx.ErrBadState:
-					// Writing has been disabled for this socket endpoint.
-					switch err := eps.ep.Shutdown(tcpip.ShutdownRead); err.(type) {
-					case nil:
-					case *tcpip.ErrNotConnected:
-						// An ErrNotConnected while connected is expected if there
-						// is pending data to be read and the connection has been
-						// reset by the other end of the endpoint. The endpoint will
-						// allow the pending data to be read without error but will
-						// return ErrNotConnected if Shutdown is called. Otherwise
-						// this is unexpected, panic.
-						_ = syslog.InfoTf("loopRead", "%p: client shutdown a closed endpoint; ep info: %#v", eps, eps.endpoint.ep.Info())
-					default:
-						panic(err)
-					}
-					return
-				}
-			}
-			panic(err)
-		default:
-			_ = syslog.Errorf("Endpoint.Read(): %s", err)
+	ch <- func() tcpip.Error {
+		inEntry, inCh := waiter.NewChannelEntry(nil)
+		eps.wq.EventRegister(&inEntry, waiter.EventIn)
+		defer eps.wq.EventUnregister(&inEntry)
+
+		writer := socketWriter{
+			socket: eps.local,
 		}
-	}
+		for {
+			res, err := eps.ep.Read(&writer, tcpip.ReadOptions{})
+			// TODO(https://fxbug.dev/35006): Handle all transport read errors.
+			switch err.(type) {
+			case *tcpip.ErrNotConnected:
+				// Read never returns ErrNotConnected except for endpoints that were
+				// never connected. Such endpoints should never reach this loop.
+				panic(fmt.Sprintf("connected endpoint returned %s", err))
+			case *tcpip.ErrTimeout:
+				// At the time of writing, this error indicates that a TCP connection
+				// has failed. This can occur during the TCP handshake if the peer
+				// fails to respond to a SYN within 60 seconds, or if the retransmit
+				// logic gives up after 60 seconds of missing ACKs from the peer, or if
+				// the maximum number of unacknowledged keepalives is reached.
+				//
+				// The connection was alive but now is dead - this is equivalent to
+				// having received a TCP RST.
+				return err
+			case *tcpip.ErrConnectionRefused:
+				// TODO(https://fxbug.dev/61594): Allow the socket to be reused for
+				// another connection attempt to match Linux.
+				return err
+			case *tcpip.ErrWouldBlock:
+				select {
+				case <-inCh:
+					continue
+				case <-eps.closing:
+					// We're shutting down.
+					return nil
+				}
+			case *tcpip.ErrClosedForReceive:
+				// Shut the endpoint down *only* if it is not already in an error
+				// state; an endpoint in an error state will soon be fully closed down,
+				// and shutting it down here would cause signals to be asserted twice,
+				// which can produce races in the client.
+				if eps.ep.Readiness(waiter.EventErr)&waiter.EventErr == 0 {
+					if err := eps.local.Shutdown(zx.SocketShutdownWrite); err != nil {
+						panic(err)
+					}
+					_ = syslog.DebugTf("zx_socket_shutdown", "%p: ZX_SOCKET_SHUTDOWN_WRITE", eps)
+				}
+				return err
+			case *tcpip.ErrConnectionAborted, *tcpip.ErrConnectionReset, *tcpip.ErrNetworkUnreachable, *tcpip.ErrNoRoute:
+				return err
+			case nil, *tcpip.ErrBadBuffer:
+				if err == nil {
+					eps.ep.ModerateRecvBuf(res.Count)
+				}
+				// `tcpip.Endpoint.Read` returns a nil error if _anything_ was written
+				// - even if the writer returned an error - we always want to handle
+				// those errors.
+				switch err := writer.lastError.(type) {
+				case nil:
+					continue
+				case *zx.Error:
+					switch err.Status {
+					case zx.ErrShouldWait:
+						obs, err := zxwait.Wait(zx.Handle(eps.local), sigs, zx.TimensecInfinite)
+						if err != nil {
+							panic(err)
+						}
+						switch {
+						case obs&zx.SignalSocketWritable != 0:
+							continue
+						case obs&localSignalClosing != 0:
+							// We're shutting down.
+							return nil
+						case obs&zx.SignalSocketWriteDisabled != 0:
+							// Fallthrough.
+						default:
+							panic(fmt.Sprintf("impossible signals observed: %b/%b", obs, sigs))
+						}
+						fallthrough
+					case zx.ErrBadState:
+						// Writing has been disabled for this socket endpoint.
+						switch err := eps.ep.Shutdown(tcpip.ShutdownRead); err.(type) {
+						case nil:
+						case *tcpip.ErrNotConnected:
+							// An ErrNotConnected while connected is expected if there
+							// is pending data to be read and the connection has been
+							// reset by the other end of the endpoint. The endpoint will
+							// allow the pending data to be read without error but will
+							// return ErrNotConnected if Shutdown is called. Otherwise
+							// this is unexpected, panic.
+							_ = syslog.InfoTf("loopRead", "%p: client shutdown a closed endpoint; ep info: %#v", eps, eps.endpoint.ep.Info())
+						default:
+							panic(err)
+						}
+						return nil
+					}
+				}
+				panic(err)
+			default:
+				_ = syslog.Errorf("Endpoint.Read(): %s", err)
+			}
+		}
+	}()
 }
 
 type datagramSocketImpl struct {
diff --git a/src/connectivity/network/netstack/netstack_test.go b/src/connectivity/network/netstack/netstack_test.go
index 690732f..a3dc933 100644
--- a/src/connectivity/network/netstack/netstack_test.go
+++ b/src/connectivity/network/netstack/netstack_test.go
@@ -272,18 +272,30 @@
 	defer eps.close()
 
 	eps.mu.Lock()
+	errChannels := []struct {
+		ch   <-chan tcpip.Error
+		name string
+	}{
+		{ch: eps.mu.loopReadDone, name: "loopReadDone"},
+		{ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
+	}
 	channels := []struct {
 		ch   <-chan struct{}
 		name string
 	}{
 		{ch: eps.closing, name: "closing"},
-		{ch: eps.mu.loopReadDone, name: "loopReadDone"},
-		{ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
 		{ch: eps.mu.loopPollDone, name: "loopPollDone"},
 	}
 	eps.mu.Unlock()
 
 	// Check starting conditions.
+	for _, ch := range errChannels {
+		select {
+		case err := <-ch.ch:
+			t.Errorf("%s cleaned up prematurely: %#v", ch.name, err)
+		default:
+		}
+	}
 	for _, ch := range channels {
 		select {
 		case <-ch.ch:
@@ -354,6 +366,13 @@
 	}
 
 	// There's still a referent.
+	for _, ch := range errChannels {
+		select {
+		case err := <-ch.ch:
+			t.Errorf("%s cleaned up prematurely: %#v", ch.name, err)
+		default:
+		}
+	}
 	for _, ch := range channels {
 		select {
 		case <-ch.ch:
@@ -383,6 +402,18 @@
 	// Give a generous timeout for the closed channel to be detected.
 	timeout := make(chan struct{})
 	time.AfterFunc(5*time.Second, func() { close(timeout) })
+	for _, ch := range errChannels {
+		if ch.ch != nil {
+			select {
+			case err := <-ch.ch:
+				if err != nil {
+					t.Error(err)
+				}
+			case <-timeout:
+				t.Errorf("%s not cleaned up", ch.name)
+			}
+		}
+	}
 	for _, ch := range channels {
 		if ch.ch != nil {
 			select {
@@ -476,13 +507,18 @@
 	}
 
 	eps.mu.Lock()
+	errChannels := []struct {
+		ch   <-chan tcpip.Error
+		name string
+	}{
+		{ch: eps.mu.loopReadDone, name: "loopReadDone"},
+		{ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
+	}
 	channels := []struct {
 		ch   <-chan struct{}
 		name string
 	}{
 		{ch: eps.closing, name: "closing"},
-		{ch: eps.mu.loopReadDone, name: "loopReadDone"},
-		{ch: eps.mu.loopWriteDone, name: "loopWriteDone"},
 		{ch: eps.mu.loopPollDone, name: "loopPollDone"},
 	}
 	eps.mu.Unlock()
@@ -490,6 +526,23 @@
 	// Give a generous timeout for the closed channel to be detected.
 	timeout := make(chan struct{})
 	time.AfterFunc(5*time.Second, func() { close(timeout) })
+	for _, ch := range errChannels {
+		if ch.ch != nil {
+			select {
+			case err := <-ch.ch:
+				switch err.(type) {
+				case nil:
+				case *tcpip.ErrClosedForReceive:
+				case *tcpip.ErrClosedForSend:
+				case *tcpip.ErrConnectionReset:
+				default:
+					t.Error(err)
+				}
+			case <-timeout:
+				t.Errorf("%s not cleaned up", ch.name)
+			}
+		}
+	}
 	for _, ch := range channels {
 		if ch.ch != nil {
 			select {
diff --git a/src/connectivity/network/netstack/socket_conv.go b/src/connectivity/network/netstack/socket_conv.go
index 1870299..ce97e9c 100644
--- a/src/connectivity/network/netstack/socket_conv.go
+++ b/src/connectivity/network/netstack/socket_conv.go
@@ -78,10 +78,10 @@
 	return 0
 }
 
-func GetSockOpt(ep tcpip.Endpoint, ns *Netstack, he *hardError, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, level, name int16) (interface{}, tcpip.Error) {
+func GetSockOpt(ep tcpip.Endpoint, ns *Netstack, terminal <-chan tcpip.Error, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, level, name int16) (interface{}, tcpip.Error) {
 	switch level {
 	case C.SOL_SOCKET:
-		return getSockOptSocket(ep, ns, he, netProto, transProto, name)
+		return getSockOptSocket(ep, ns, terminal, netProto, transProto, name)
 
 	case C.SOL_TCP:
 		return getSockOptTCP(ep, name)
@@ -105,7 +105,7 @@
 	return nil, &tcpip.ErrUnknownProtocol{}
 }
 
-func getSockOptSocket(ep tcpip.Endpoint, ns *Netstack, he *hardError, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, name int16) (interface{}, tcpip.Error) {
+func getSockOptSocket(ep tcpip.Endpoint, ns *Netstack, terminal <-chan tcpip.Error, netProto tcpip.NetworkProtocolNumber, transProto tcpip.TransportProtocolNumber, name int16) (interface{}, tcpip.Error) {
 	switch name {
 	case C.SO_TYPE:
 		switch transProto {
@@ -142,9 +142,14 @@
 		}
 
 	case C.SO_ERROR:
-		he.mu.Lock()
-		err := he.storeAndRetrieveLocked(ep.LastError())
-		he.mu.Unlock()
+		err := func() tcpip.Error {
+			select {
+			case err := <-terminal:
+				return err
+			default:
+				return ep.LastError()
+			}
+		}()
 		if err == nil {
 			return int32(0), nil
 		}
diff --git a/src/connectivity/network/tests/bsdsocket_test.cc b/src/connectivity/network/tests/bsdsocket_test.cc
index 321a739..d4f8a60 100644
--- a/src/connectivity/network/tests/bsdsocket_test.cc
+++ b/src/connectivity/network/tests/bsdsocket_test.cc
@@ -1529,7 +1529,7 @@
       << strerror(errno);
 #else
             -1);
-  ASSERT_EQ(errno, ECONNREFUSED) << strerror(errno);
+  ASSERT_EQ(errno, ECONNABORTED) << strerror(errno);
 #endif
 
   ASSERT_EQ(close(listener.release()), 0) << strerror(errno);
@@ -2408,6 +2408,13 @@
   socklen_t addrlen = AddrLen();
   ASSERT_EQ(connect(sock().get(), reinterpret_cast<const struct sockaddr*>(&any), addrlen), -1);
   ASSERT_EQ(errno, ECONNREFUSED) << strerror(errno);
+
+  // The error should have been consumed.
+  int err;
+  socklen_t optlen = sizeof(err);
+  ASSERT_EQ(getsockopt(sock().get(), SOL_SOCKET, SO_ERROR, &err, &optlen), 0) << strerror(errno);
+  ASSERT_EQ(optlen, sizeof(err));
+  ASSERT_EQ(err, 0) << strerror(err);
 }
 
 TEST_P(AnyAddrDatagramSocketTest, Connect) {
@@ -3221,9 +3228,10 @@
     EXPECT_GE(n, 0) << strerror(errno);
     EXPECT_EQ(n, 1);
     EXPECT_EQ(pfd.revents, POLLIN | POLLHUP | POLLERR);
+
     char c;
     EXPECT_EQ(ioMethod.executeIO(fd, &c, sizeof(c)), -1);
-    EXPECT_EQ(errno, expected_errno);
+    EXPECT_EQ(errno, expected_errno) << strerror(errno) << " vs " << strerror(expected_errno);
 
     bool isWrite = ioMethod.isWrite();
 #if !defined(__Fuchsia__)