Fixes a concurrency error in groupcache. Previously, multiple Get

requests could simultaneously result in a load().  Only requests that
enter singleflight Do concurrently would be deduped, so it was possible for
populateCache to be called multiple times for the same key.  That
would result in overcounting nbytes, and eventually leading to a
livelock where nbytes > cacheBytes, but there were no entries in the
cache.

Change-Id: I5b304ce82041c1310c31b662486744e86509cc53
diff --git a/groupcache.go b/groupcache.go
index 637b063..40410a0 100644
--- a/groupcache.go
+++ b/groupcache.go
@@ -100,6 +100,7 @@
 		getter:     getter,
 		peers:      peers,
 		cacheBytes: cacheBytes,
+		loadGroup:  &singleflight.Group{},
 	}
 	if fn := newGroupHook; fn != nil {
 		fn(g)
@@ -163,12 +164,20 @@
 	// loadGroup ensures that each key is only fetched once
 	// (either locally or remotely), regardless of the number of
 	// concurrent callers.
-	loadGroup singleflight.Group
+	loadGroup flightGroup
 
 	// Stats are statistics on the group.
 	Stats Stats
 }
 
+// flightGroup is defined as an interface which flightgroup.Group
+// satisfies.  We define this so that we may test with an alternate
+// implementation.
+type flightGroup interface {
+	// Done is called when Do is done.
+	Do(key string, fn func() (interface{}, error)) (interface{}, error)
+}
+
 // Stats are per-group statistics.
 type Stats struct {
 	Gets           AtomicInt // any Get request, including from peers
@@ -225,6 +234,31 @@
 func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
 	g.Stats.Loads.Add(1)
 	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
+		// Check the cache again because singleflight can only dedup calls
+		// that overlap concurrently.  It's possible for 2 concurrent
+		// requests to miss the cache, resulting in 2 load() calls.  An
+		// unfortunate goroutine scheduling would result in this callback
+		// being run twice, serially.  If we don't check the cache again,
+		// cache.nbytes would be incremented below even though there will
+		// be only one entry for this key.
+		//
+		// Consider the following serialized event ordering for two
+		// goroutines in which this callback gets called twice for hte
+		// same key:
+		// 1: Get("key")
+		// 2: Get("key")
+		// 1: lookupCache("key")
+		// 2: lookupCache("key")
+		// 1: load("key")
+		// 2: load("key")
+		// 1: loadGroup.Do("key", fn)
+		// 1: fn()
+		// 2: loadGroup.Do("key", fn)
+		// 2: fn()
+		if value, cacheHit := g.lookupCache(key); cacheHit {
+			g.Stats.CacheHits.Add(1)
+			return value, nil
+		}
 		g.Stats.LoadsDeduped.Add(1)
 		var value ByteView
 		var err error
diff --git a/groupcache_test.go b/groupcache_test.go
index 0ecd6bd..3a4ecc2 100644
--- a/groupcache_test.go
+++ b/groupcache_test.go
@@ -363,5 +363,85 @@
 	}
 }
 
+// orderedFlightGroup allows the caller to force the schedule of when
+// orig.Do will be called.  This is useful to serialize calls such
+// that singleflight cannot dedup them.
+type orderedFlightGroup struct {
+	mu     sync.Mutex
+	stage1 chan bool
+	stage2 chan bool
+	orig   flightGroup
+}
+
+func (g *orderedFlightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
+	<-g.stage1
+	<-g.stage2
+	g.mu.Lock()
+	defer g.mu.Unlock()
+	return g.orig.Do(key, fn)
+}
+
+// TestNoDedup tests invariants on the cache size when singleflight is
+// unable to dedup calls.
+func TestNoDedup(t *testing.T) {
+	const testkey = "testkey"
+	const testval = "testval"
+	g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error {
+		return dest.SetString(testval)
+	}), nil)
+
+	orderedGroup := &orderedFlightGroup{
+		stage1: make(chan bool),
+		stage2: make(chan bool),
+		orig:   g.loadGroup,
+	}
+	// Replace loadGroup with our wrapper so we can control when
+	// loadGroup.Do is entered for each concurrent request.
+	g.loadGroup = orderedGroup
+
+	// Issue two idential requests concurrently.  Since the cache is
+	// empty, it will miss.  Both will enter load(), but we will only
+	// allow one at a time to enter singleflight.Do, so the callback
+	// function will be called twice.
+	resc := make(chan string, 2)
+	for i := 0; i < 2; i++ {
+		go func() {
+			var s string
+			if err := g.Get(dummyCtx, testkey, StringSink(&s)); err != nil {
+				resc <- "ERROR:" + err.Error()
+				return
+			}
+			resc <- s
+		}()
+	}
+
+	// Ensure both goroutines have entered the Do routine.  This implies
+	// both concurrent requests have checked the cache, found it empty,
+	// and called load().
+	orderedGroup.stage1 <- true
+	orderedGroup.stage1 <- true
+	orderedGroup.stage2 <- true
+	orderedGroup.stage2 <- true
+
+	for i := 0; i < 2; i++ {
+		if s := <-resc; s != testval {
+			t.Errorf("result is %s want %s", s, testval)
+		}
+	}
+
+	const wantItems = 1
+	if g.mainCache.items() != wantItems {
+		t.Errorf("mainCache has %d items, want %d", g.mainCache.items(), wantItems)
+	}
+
+	// If the singleflight callback doesn't double-check the cache again
+	// upon entry, we would increment nbytes twice but the entry would
+	// only be in the cache once.
+	const wantBytes = int64(len(testkey) + len(testval))
+	if g.mainCache.nbytes != wantBytes {
+		t.Errorf("cache has %d bytes, want %d", g.mainCache.nbytes, wantBytes)
+	}
+}
+
 // TODO(bradfitz): port the Google-internal full integration test into here,
 // using HTTP requests instead of our RPC system.