| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package load |
| |
| import ( |
| "fmt" |
| "sort" |
| "sync" |
| "testing" |
| |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| ) |
| |
| var ( |
| dropCategories = []string{"drop_for_real", "drop_for_fun"} |
| localities = []string{"locality-A", "locality-B"} |
| errTest = fmt.Errorf("test error") |
| ) |
| |
| // rpcData wraps the rpc counts and load data to be pushed to the store. |
| type rpcData struct { |
| start, success, failure int |
| serverData map[string]float64 // Will be reported with successful RPCs. |
| } |
| |
| // TestDrops spawns a bunch of goroutines which report drop data. After the |
| // goroutines have exited, the test dumps the stats from the Store and makes |
| // sure they are as expected. |
| func TestDrops(t *testing.T) { |
| var ( |
| drops = map[string]int{ |
| dropCategories[0]: 30, |
| dropCategories[1]: 40, |
| } |
| wantStoreData = &Data{ |
| TotalDrops: 70, |
| Drops: map[string]uint64{ |
| dropCategories[0]: 30, |
| dropCategories[1]: 40, |
| }, |
| } |
| ) |
| |
| ls := perClusterStore{} |
| var wg sync.WaitGroup |
| for category, count := range drops { |
| for i := 0; i < count; i++ { |
| wg.Add(1) |
| go func(c string) { |
| ls.CallDropped(c) |
| wg.Done() |
| }(category) |
| } |
| } |
| wg.Wait() |
| |
| gotStoreData := ls.stats() |
| if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| } |
| |
| // TestLocalityStats spawns a bunch of goroutines which report rpc and load |
| // data. After the goroutines have exited, the test dumps the stats from the |
| // Store and makes sure they are as expected. |
| func TestLocalityStats(t *testing.T) { |
| var ( |
| localityData = map[string]rpcData{ |
| localities[0]: { |
| start: 40, |
| success: 20, |
| failure: 10, |
| serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, |
| }, |
| localities[1]: { |
| start: 80, |
| success: 40, |
| failure: 20, |
| serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, |
| }, |
| } |
| wantStoreData = &Data{ |
| LocalityStats: map[string]LocalityData{ |
| localities[0]: { |
| RequestStats: RequestData{Succeeded: 20, Errored: 10, InProgress: 10}, |
| LoadStats: map[string]ServerLoadData{ |
| "net": {Count: 20, Sum: 20}, |
| "disk": {Count: 20, Sum: 40}, |
| "cpu": {Count: 20, Sum: 60}, |
| "mem": {Count: 20, Sum: 80}, |
| }, |
| }, |
| localities[1]: { |
| RequestStats: RequestData{Succeeded: 40, Errored: 20, InProgress: 20}, |
| LoadStats: map[string]ServerLoadData{ |
| "net": {Count: 40, Sum: 40}, |
| "disk": {Count: 40, Sum: 80}, |
| "cpu": {Count: 40, Sum: 120}, |
| "mem": {Count: 40, Sum: 160}, |
| }, |
| }, |
| }, |
| } |
| ) |
| |
| ls := perClusterStore{} |
| var wg sync.WaitGroup |
| for locality, data := range localityData { |
| wg.Add(data.start) |
| for i := 0; i < data.start; i++ { |
| go func(l string) { |
| ls.CallStarted(l) |
| wg.Done() |
| }(locality) |
| } |
| // The calls to callStarted() need to happen before the other calls are |
| // made. Hence the wait here. |
| wg.Wait() |
| |
| wg.Add(data.success) |
| for i := 0; i < data.success; i++ { |
| go func(l string, serverData map[string]float64) { |
| ls.CallFinished(l, nil) |
| for n, d := range serverData { |
| ls.CallServerLoad(l, n, d) |
| } |
| wg.Done() |
| }(locality, data.serverData) |
| } |
| wg.Add(data.failure) |
| for i := 0; i < data.failure; i++ { |
| go func(l string) { |
| ls.CallFinished(l, errTest) |
| wg.Done() |
| }(locality) |
| } |
| wg.Wait() |
| } |
| |
| gotStoreData := ls.stats() |
| if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| } |
| |
| func TestResetAfterStats(t *testing.T) { |
| // Push a bunch of drops, call stats and load stats, and leave inProgress to be non-zero. |
| // Dump the stats. Verify expexted |
| // Push the same set of loads as before |
| // Now dump and verify the newly expected ones. |
| var ( |
| drops = map[string]int{ |
| dropCategories[0]: 30, |
| dropCategories[1]: 40, |
| } |
| localityData = map[string]rpcData{ |
| localities[0]: { |
| start: 40, |
| success: 20, |
| failure: 10, |
| serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, |
| }, |
| localities[1]: { |
| start: 80, |
| success: 40, |
| failure: 20, |
| serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, |
| }, |
| } |
| wantStoreData = &Data{ |
| TotalDrops: 70, |
| Drops: map[string]uint64{ |
| dropCategories[0]: 30, |
| dropCategories[1]: 40, |
| }, |
| LocalityStats: map[string]LocalityData{ |
| localities[0]: { |
| RequestStats: RequestData{Succeeded: 20, Errored: 10, InProgress: 10}, |
| LoadStats: map[string]ServerLoadData{ |
| "net": {Count: 20, Sum: 20}, |
| "disk": {Count: 20, Sum: 40}, |
| "cpu": {Count: 20, Sum: 60}, |
| "mem": {Count: 20, Sum: 80}, |
| }, |
| }, |
| localities[1]: { |
| RequestStats: RequestData{Succeeded: 40, Errored: 20, InProgress: 20}, |
| LoadStats: map[string]ServerLoadData{ |
| "net": {Count: 40, Sum: 40}, |
| "disk": {Count: 40, Sum: 80}, |
| "cpu": {Count: 40, Sum: 120}, |
| "mem": {Count: 40, Sum: 160}, |
| }, |
| }, |
| }, |
| } |
| ) |
| |
| reportLoad := func(ls *perClusterStore) { |
| for category, count := range drops { |
| for i := 0; i < count; i++ { |
| ls.CallDropped(category) |
| } |
| } |
| for locality, data := range localityData { |
| for i := 0; i < data.start; i++ { |
| ls.CallStarted(locality) |
| } |
| for i := 0; i < data.success; i++ { |
| ls.CallFinished(locality, nil) |
| for n, d := range data.serverData { |
| ls.CallServerLoad(locality, n, d) |
| } |
| } |
| for i := 0; i < data.failure; i++ { |
| ls.CallFinished(locality, errTest) |
| } |
| } |
| } |
| |
| ls := perClusterStore{} |
| reportLoad(&ls) |
| gotStoreData := ls.stats() |
| if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| |
| // The above call to stats() should have reset all load reports except the |
| // inProgress rpc count. We are now going to push the same load data into |
| // the store. So, we should expect to see twice the count for inProgress. |
| for _, l := range localities { |
| ls := wantStoreData.LocalityStats[l] |
| ls.RequestStats.InProgress *= 2 |
| wantStoreData.LocalityStats[l] = ls |
| } |
| reportLoad(&ls) |
| gotStoreData = ls.stats() |
| if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| } |
| |
| var sortDataSlice = cmp.Transformer("SortDataSlice", func(in []*Data) []*Data { |
| out := append([]*Data(nil), in...) // Copy input to avoid mutating it |
| sort.Slice(out, |
| func(i, j int) bool { |
| if out[i].Cluster < out[j].Cluster { |
| return true |
| } |
| if out[i].Cluster == out[j].Cluster { |
| return out[i].Service < out[j].Service |
| } |
| return false |
| }, |
| ) |
| return out |
| }) |
| |
| // Test all load are returned for the given clusters, and all clusters are |
| // reported if no cluster is specified. |
| func TestStoreStats(t *testing.T) { |
| var ( |
| testClusters = []string{"c0", "c1", "c2"} |
| testServices = []string{"s0", "s1"} |
| testLocality = "test-locality" |
| ) |
| |
| store := NewStore() |
| for _, c := range testClusters { |
| for _, s := range testServices { |
| store.PerCluster(c, s).CallStarted(testLocality) |
| store.PerCluster(c, s).CallServerLoad(testLocality, "abc", 123) |
| store.PerCluster(c, s).CallDropped("dropped") |
| store.PerCluster(c, s).CallFinished(testLocality, nil) |
| } |
| } |
| |
| wantC0 := []*Data{ |
| { |
| Cluster: "c0", Service: "s0", |
| TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": { |
| RequestStats: RequestData{Succeeded: 1}, |
| LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, |
| }, |
| }, |
| }, |
| { |
| Cluster: "c0", Service: "s1", |
| TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": { |
| RequestStats: RequestData{Succeeded: 1}, |
| LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, |
| }, |
| }, |
| }, |
| } |
| // Call Stats with just "c0", this should return data for "c0", and not |
| // touch data for other clusters. |
| gotC0 := store.Stats([]string{"c0"}) |
| if diff := cmp.Diff(wantC0, gotC0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| |
| wantOther := []*Data{ |
| { |
| Cluster: "c1", Service: "s0", |
| TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": { |
| RequestStats: RequestData{Succeeded: 1}, |
| LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, |
| }, |
| }, |
| }, |
| { |
| Cluster: "c1", Service: "s1", |
| TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": { |
| RequestStats: RequestData{Succeeded: 1}, |
| LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, |
| }, |
| }, |
| }, |
| { |
| Cluster: "c2", Service: "s0", |
| TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": { |
| RequestStats: RequestData{Succeeded: 1}, |
| LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, |
| }, |
| }, |
| }, |
| { |
| Cluster: "c2", Service: "s1", |
| TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": { |
| RequestStats: RequestData{Succeeded: 1}, |
| LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, |
| }, |
| }, |
| }, |
| } |
| // Call Stats with empty slice, this should return data for all the |
| // remaining clusters, and not include c0 (because c0 data was cleared). |
| gotOther := store.Stats(nil) |
| if diff := cmp.Diff(wantOther, gotOther, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| } |
| |
| // Test the cases that if a cluster doesn't have load to report, its data is not |
| // appended to the slice returned by Stats(). |
| func TestStoreStatsEmptyDataNotReported(t *testing.T) { |
| var ( |
| testServices = []string{"s0", "s1"} |
| testLocality = "test-locality" |
| ) |
| |
| store := NewStore() |
| // "c0"'s RPCs all finish with success. |
| for _, s := range testServices { |
| store.PerCluster("c0", s).CallStarted(testLocality) |
| store.PerCluster("c0", s).CallFinished(testLocality, nil) |
| } |
| // "c1"'s RPCs never finish (always inprocess). |
| for _, s := range testServices { |
| store.PerCluster("c1", s).CallStarted(testLocality) |
| } |
| |
| want0 := []*Data{ |
| { |
| Cluster: "c0", Service: "s0", |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": {RequestStats: RequestData{Succeeded: 1}}, |
| }, |
| }, |
| { |
| Cluster: "c0", Service: "s1", |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": {RequestStats: RequestData{Succeeded: 1}}, |
| }, |
| }, |
| { |
| Cluster: "c1", Service: "s0", |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": {RequestStats: RequestData{InProgress: 1}}, |
| }, |
| }, |
| { |
| Cluster: "c1", Service: "s1", |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": {RequestStats: RequestData{InProgress: 1}}, |
| }, |
| }, |
| } |
| // Call Stats with empty slice, this should return data for all the |
| // clusters. |
| got0 := store.Stats(nil) |
| if diff := cmp.Diff(want0, got0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| |
| want1 := []*Data{ |
| { |
| Cluster: "c1", Service: "s0", |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": {RequestStats: RequestData{InProgress: 1}}, |
| }, |
| }, |
| { |
| Cluster: "c1", Service: "s1", |
| LocalityStats: map[string]LocalityData{ |
| "test-locality": {RequestStats: RequestData{InProgress: 1}}, |
| }, |
| }, |
| } |
| // Call Stats with empty slice again, this should return data only for "c1", |
| // because "c0" data was cleared, but "c1" has in-progress RPCs. |
| got1 := store.Stats(nil) |
| if diff := cmp.Diff(want1, got1, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { |
| t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) |
| } |
| } |