Use fine grained locks while sending NUD probes

Previously when sending probe messages, we would hold a shared lock
which lead to deadlocks (due to synchronous packet loooping (e.g. pipe
and loopback link endpoints)) and lock contention.

Writing packets may be an expensive operation which could prevent other
goroutines from doing meaningful work if a shared lock is held while
writing packets.

This change upates the NUD timers to not hold shared locks while
sending packets.

PiperOrigin-RevId: 356048697

This is a cherry-pick of 1d0fbea. This changed required manual updates
as it does not include 56d1850 or 4eaf674.

Bug: 72285
Change-Id: I2d4be5d90de768cb36c0a360d5d5d50d29726bf5
diff --git a/pkg/tcpip/stack/neighbor_entry.go b/pkg/tcpip/stack/neighbor_entry.go
index 75afb30..15e7da1 100644
--- a/pkg/tcpip/stack/neighbor_entry.go
+++ b/pkg/tcpip/stack/neighbor_entry.go
@@ -70,6 +70,13 @@
 	Failed
 )
 
+type timer struct {
+	// done indicates to the timer that the timer was stopped.
+	done *bool
+
+	timer tcpip.Timer
+}
+
 // neighborEntry implements a neighbor entry's individual node behavior, as per
 // RFC 4861 section 7.3.3. Neighbor Unreachability Detection operates in
 // parallel with the sending of packets to a neighbor, necessitating the
@@ -99,7 +106,8 @@
 	onResolve []func(tcpip.LinkAddress, bool)
 
 	isRouter bool
-	job      *tcpip.Job
+
+	timer timer
 }
 
 // newNeighborEntry creates a neighbor cache entry starting at the default
@@ -138,6 +146,20 @@
 	}
 }
 
+type remainingCounter struct {
+	mu struct {
+		sync.Mutex
+
+		remaining uint32
+	}
+}
+
+func (r *remainingCounter) init(max uint32) {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	r.mu.remaining = max
+}
+
 // notifyCompletionLocked notifies those waiting for address resolution, with
 // the link address if resolution completed successfully.
 //
@@ -183,13 +205,16 @@
 	}
 }
 
-// cancelJobLocked cancels the currently scheduled action, if there is one.
+// cancelTimerLocked cancels the currently scheduled action, if there is one.
 // Entries in Unknown, Stale, or Static state do not have a scheduled action.
 //
 // Precondition: e.mu MUST be locked.
-func (e *neighborEntry) cancelJobLocked() {
-	if job := e.job; job != nil {
-		job.Cancel()
+func (e *neighborEntry) cancelTimerLocked() {
+	if e.timer.timer != nil {
+		e.timer.timer.Stop()
+		*e.timer.done = true
+
+		e.timer = timer{}
 	}
 }
 
@@ -199,7 +224,7 @@
 func (e *neighborEntry) removeLocked() {
 	e.neigh.UpdatedAtNanos = e.nic.stack.clock.NowNanoseconds()
 	e.dispatchRemoveEventLocked()
-	e.cancelJobLocked()
+	e.cancelTimerLocked()
 	e.notifyCompletionLocked(false /* succeeded */)
 }
 
@@ -209,7 +234,7 @@
 //
 // Precondition: e.mu MUST be locked.
 func (e *neighborEntry) setStateLocked(next NeighborState) {
-	e.cancelJobLocked()
+	e.cancelTimerLocked()
 
 	prev := e.neigh.State
 	e.neigh.State = next
@@ -221,47 +246,90 @@
 		panic(fmt.Sprintf("should never transition to Incomplete with setStateLocked; neigh = %#v, prev state = %s", e.neigh, prev))
 
 	case Reachable:
-		e.job = e.nic.stack.newJob(&e.mu, func() {
-			e.setStateLocked(Stale)
-			e.dispatchChangeEventLocked()
-		})
-		e.job.Schedule(e.nudState.ReachableTime())
+		// Protected by e.mu.
+		done := false
 
-	case Delay:
-		e.job = e.nic.stack.newJob(&e.mu, func() {
-			e.setStateLocked(Probe)
-			e.dispatchChangeEventLocked()
-		})
-		e.job.Schedule(config.DelayFirstProbeTime)
+		e.timer = timer{
+			done: &done,
+			timer: e.nic.stack.Clock().AfterFunc(e.nudState.ReachableTime(), func() {
+				e.mu.Lock()
+				defer e.mu.Unlock()
 
-	case Probe:
-		var retryCounter uint32
-		var sendUnicastProbe func()
+				if done {
+					// The timer was stopped because the entry changed state.
+					return
+				}
 
-		sendUnicastProbe = func() {
-			if retryCounter == config.MaxUnicastProbes {
-				e.dispatchRemoveEventLocked()
-				e.setStateLocked(Failed)
-				return
-			}
-
-			if err := e.linkRes.LinkAddressRequest(e.neigh.Addr, "" /* localAddr */, e.neigh.LinkAddr, e.nic); err != nil {
-				e.dispatchRemoveEventLocked()
-				e.setStateLocked(Failed)
-				return
-			}
-
-			retryCounter++
-			e.job = e.nic.stack.newJob(&e.mu, sendUnicastProbe)
-			e.job.Schedule(config.RetransmitTimer)
+				e.setStateLocked(Stale)
+				e.dispatchChangeEventLocked()
+			}),
 		}
 
+	case Delay:
+		// Protected by e.mu.
+		done := false
+
+		e.timer = timer{
+			done: &done,
+			timer: e.nic.stack.Clock().AfterFunc(config.DelayFirstProbeTime, func() {
+				e.mu.Lock()
+				defer e.mu.Unlock()
+
+				if done {
+					// The timer was stopped because the entry changed state.
+					return
+				}
+
+				e.setStateLocked(Probe)
+				e.dispatchChangeEventLocked()
+			}),
+		}
+
+	case Probe:
+		// Protected by e.mu.
+		done := false
+
+		var remaining remainingCounter
+		remaining.init(config.MaxUnicastProbes)
+		addr := e.neigh.Addr
+		linkAddr := e.neigh.LinkAddr
+
 		// Send a probe in another gorountine to free this thread of execution
-		// for finishing the state transition. This is necessary to avoid
-		// deadlock where sending and processing probes are done synchronously,
-		// such as loopback and integration tests.
-		e.job = e.nic.stack.newJob(&e.mu, sendUnicastProbe)
-		e.job.Schedule(immediateDuration)
+		// for finishing the state transition. This is necessary to escape the
+		// currently held lock so we can send the probe message without holding
+		// a shared lock.
+		e.timer = timer{
+			done: &done,
+			timer: e.nic.stack.Clock().AfterFunc(0, func() {
+				// Okay to hold this lock while writing packets since we use a different
+				// lock per probe timer so there will not be any lock contention.
+				remaining.mu.Lock()
+				defer remaining.mu.Unlock()
+
+				var err *tcpip.Error
+				timedoutResolution := remaining.mu.remaining == 0
+				if !timedoutResolution {
+					err = e.linkRes.LinkAddressRequest(addr, "" /* localAddr */, linkAddr, e.nic)
+				}
+
+				e.mu.Lock()
+				defer e.mu.Unlock()
+
+				if done {
+					// The timer was stopped because the entry changed state.
+					return
+				}
+
+				if timedoutResolution || err != nil {
+					e.dispatchRemoveEventLocked()
+					e.setStateLocked(Failed)
+					return
+				}
+
+				remaining.mu.remaining--
+				e.timer.timer.Reset(config.RetransmitTimer)
+			}),
+		}
 
 	case Failed:
 		e.notifyCompletionLocked(false /* succeeded */)
@@ -291,66 +359,58 @@
 		e.neigh.UpdatedAtNanos = e.nic.stack.clock.NowNanoseconds()
 
 		e.dispatchAddEventLocked()
-
 		config := e.nudState.Config()
 
-		var retryCounter uint32
-		var sendMulticastProbe func()
+		// Protected by e.mu.
+		done := false
 
-		sendMulticastProbe = func() {
-			if retryCounter == config.MaxMulticastProbes {
-				// "If no Neighbor Advertisement is received after
-				// MAX_MULTICAST_SOLICIT solicitations, address resolution has failed.
-				// The sender MUST return ICMP destination unreachable indications with
-				// code 3 (Address Unreachable) for each packet queued awaiting address
-				// resolution." - RFC 4861 section 7.2.2
-				//
-				// There is no need to send an ICMP destination unreachable indication
-				// since the failure to resolve the address is expected to only occur
-				// on this node. Thus, redirecting traffic is currently not supported.
-				//
-				// "If the error occurs on a node other than the node originating the
-				// packet, an ICMP error message is generated. If the error occurs on
-				// the originating node, an implementation is not required to actually
-				// create and send an ICMP error packet to the source, as long as the
-				// upper-layer sender is notified through an appropriate mechanism
-				// (e.g. return value from a procedure call). Note, however, that an
-				// implementation may find it convenient in some cases to return errors
-				// to the sender by taking the offending packet, generating an ICMP
-				// error message, and then delivering it (locally) through the generic
-				// error-handling routines." - RFC 4861 section 2.1
-				e.dispatchRemoveEventLocked()
-				e.setStateLocked(Failed)
-				return
-			}
-
-			// As per RFC 4861 section 7.2.2:
-			//
-			//  If the source address of the packet prompting the solicitation is the
-			//  same as one of the addresses assigned to the outgoing interface, that
-			//  address SHOULD be placed in the IP Source Address of the outgoing
-			//  solicitation.
-			//
-			if err := e.linkRes.LinkAddressRequest(e.neigh.Addr, localAddr, "", e.nic); err != nil {
-				// There is no need to log the error here; the NUD implementation may
-				// assume a working link. A valid link should be the responsibility of
-				// the NIC/stack.LinkEndpoint.
-				e.dispatchRemoveEventLocked()
-				e.setStateLocked(Failed)
-				return
-			}
-
-			retryCounter++
-			e.job = e.nic.stack.newJob(&e.mu, sendMulticastProbe)
-			e.job.Schedule(config.RetransmitTimer)
-		}
+		var remaining remainingCounter
+		remaining.init(config.MaxMulticastProbes)
+		addr := e.neigh.Addr
 
 		// Send a probe in another gorountine to free this thread of execution
-		// for finishing the state transition. This is necessary to avoid
-		// deadlock where sending and processing probes are done synchronously,
-		// such as loopback and integration tests.
-		e.job = e.nic.stack.newJob(&e.mu, sendMulticastProbe)
-		e.job.Schedule(immediateDuration)
+		// for finishing the state transition. This is necessary to escape the
+		// currently held lock so we can send the probe message without holding
+		// a shared lock.
+		e.timer = timer{
+			done: &done,
+			timer: e.nic.stack.Clock().AfterFunc(0, func() {
+				// Okay to hold this lock while writing packets since we use a different
+				// lock per probe timer so there will not be any lock contention.
+				remaining.mu.Lock()
+				defer remaining.mu.Unlock()
+
+				var err *tcpip.Error
+				timedoutResolution := remaining.mu.remaining == 0
+				if !timedoutResolution {
+					// As per RFC 4861 section 7.2.2:
+					//
+					//  If the source address of the packet prompting the solicitation is
+					//  the same as one of the addresses assigned to the outgoing interface,
+					//  that address SHOULD be placed in the IP Source Address of the
+					//  outgoing solicitation.
+					//
+					err = e.linkRes.LinkAddressRequest(addr, localAddr, "" /* linkAddr */, e.nic)
+				}
+
+				e.mu.Lock()
+				defer e.mu.Unlock()
+
+				if done {
+					// The timer was stopped because the entry changed state.
+					return
+				}
+
+				if timedoutResolution || err != nil {
+					e.dispatchRemoveEventLocked()
+					e.setStateLocked(Failed)
+					return
+				}
+
+				remaining.mu.remaining--
+				e.timer.timer.Reset(config.RetransmitTimer)
+			}),
+		}
 
 	case Stale:
 		e.setStateLocked(Delay)