| package networkdb |
| |
| import ( |
| "fmt" |
| "log" |
| "net" |
| "os" |
| "strconv" |
| "sync/atomic" |
| "testing" |
| "time" |
| |
| "github.com/docker/docker/pkg/stringid" |
| "github.com/docker/go-events" |
| "github.com/hashicorp/memberlist" |
| "github.com/sirupsen/logrus" |
| "gotest.tools/v3/assert" |
| is "gotest.tools/v3/assert/cmp" |
| "gotest.tools/v3/poll" |
| ) |
| |
| var dbPort int32 = 10000 |
| |
| func TestMain(m *testing.M) { |
| os.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644) |
| logrus.SetLevel(logrus.ErrorLevel) |
| os.Exit(m.Run()) |
| } |
| |
| func launchNode(t *testing.T, conf Config) *NetworkDB { |
| t.Helper() |
| db, err := New(&conf) |
| assert.NilError(t, err) |
| return db |
| } |
| |
| func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB { |
| t.Helper() |
| var dbs []*NetworkDB |
| for i := 0; i < num; i++ { |
| localConfig := *conf |
| localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1) |
| localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID()) |
| localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1)) |
| db := launchNode(t, localConfig) |
| if i != 0 { |
| assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})) |
| } |
| |
| dbs = append(dbs, db) |
| } |
| |
| // Wait till the cluster creation is successful |
| check := func(t poll.LogT) poll.Result { |
| // Check that the cluster is properly created |
| for i := 0; i < num; i++ { |
| if num != len(dbs[i].ClusterPeers()) { |
| return poll.Continue("%s:Waiting for cluster peers to be established", dbs[i].config.Hostname) |
| } |
| } |
| return poll.Success() |
| } |
| poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second)) |
| |
| return dbs |
| } |
| |
| func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) { |
| t.Helper() |
| log.Print("Closing DB instances...") |
| for _, db := range dbs { |
| db.Close() |
| } |
| } |
| |
| func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) { |
| t.Helper() |
| for i := 0; i < 80; i++ { |
| db.RLock() |
| _, ok := db.nodes[node] |
| db.RUnlock() |
| if present && ok { |
| return |
| } |
| |
| if !present && !ok { |
| return |
| } |
| |
| time.Sleep(50 * time.Millisecond) |
| } |
| |
| t.Errorf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node) |
| } |
| |
| func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) { |
| t.Helper() |
| |
| const sleepInterval = 50 * time.Millisecond |
| var maxRetries int64 |
| if dl, ok := t.Deadline(); ok { |
| maxRetries = int64(time.Until(dl) / sleepInterval) |
| } else { |
| maxRetries = 80 |
| } |
| for i := int64(0); i < maxRetries; i++ { |
| db.RLock() |
| nn, nnok := db.networks[node] |
| if nnok { |
| n, ok := nn[id] |
| var leaving bool |
| if ok { |
| leaving = n.leaving |
| } |
| db.RUnlock() |
| if present && ok { |
| return |
| } |
| |
| if !present && |
| ((ok && leaving) || |
| !ok) { |
| return |
| } |
| } else { |
| db.RUnlock() |
| } |
| |
| time.Sleep(sleepInterval) |
| } |
| |
| t.Error("Network existence verification failed") |
| } |
| |
| func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) { |
| t.Helper() |
| n := 80 |
| for i := 0; i < n; i++ { |
| v, err := db.GetEntry(tname, nid, key) |
| if present && err == nil && string(v) == value { |
| return |
| } |
| if err != nil && !present { |
| return |
| } |
| |
| time.Sleep(50 * time.Millisecond) |
| } |
| |
| t.Errorf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID) |
| } |
| |
| func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) { |
| t.Helper() |
| select { |
| case rcvdEv := <-ch: |
| assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev))) |
| switch typ := rcvdEv.(type) { |
| case CreateEvent: |
| assert.Check(t, is.Equal(tname, typ.Table)) |
| assert.Check(t, is.Equal(nid, typ.NetworkID)) |
| assert.Check(t, is.Equal(key, typ.Key)) |
| assert.Check(t, is.Equal(value, string(typ.Value))) |
| case UpdateEvent: |
| assert.Check(t, is.Equal(tname, typ.Table)) |
| assert.Check(t, is.Equal(nid, typ.NetworkID)) |
| assert.Check(t, is.Equal(key, typ.Key)) |
| assert.Check(t, is.Equal(value, string(typ.Value))) |
| case DeleteEvent: |
| assert.Check(t, is.Equal(tname, typ.Table)) |
| assert.Check(t, is.Equal(nid, typ.NetworkID)) |
| assert.Check(t, is.Equal(key, typ.Key)) |
| } |
| case <-time.After(time.Second): |
| t.Fail() |
| return |
| } |
| } |
| |
| func TestNetworkDBSimple(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBJoinLeaveNetwork(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) |
| |
| err = dbs[0].LeaveNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false) |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBJoinLeaveNetworks(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| |
| n := 10 |
| for i := 1; i <= n; i++ { |
| err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i)) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i)) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true) |
| } |
| |
| for i := 1; i <= n; i++ { |
| err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i)) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i)) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false) |
| } |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBCRUDTableEntry(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig()) |
| |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) |
| |
| err = dbs[1].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) |
| assert.NilError(t, err) |
| |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true) |
| dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false) |
| |
| err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value")) |
| assert.NilError(t, err) |
| |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true) |
| |
| err = dbs[0].DeleteEntry("test_table", "network1", "test_key") |
| assert.NilError(t, err) |
| |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false) |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBCRUDTableEntries(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) |
| |
| err = dbs[1].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true) |
| |
| n := 10 |
| for i := 1; i <= n; i++ { |
| err = dbs[0].CreateEntry("test_table", "network1", |
| fmt.Sprintf("test_key0%d", i), |
| []byte(fmt.Sprintf("test_value0%d", i))) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| err = dbs[1].CreateEntry("test_table", "network1", |
| fmt.Sprintf("test_key1%d", i), |
| []byte(fmt.Sprintf("test_value1%d", i))) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[0].verifyEntryExistence(t, "test_table", "network1", |
| fmt.Sprintf("test_key1%d", i), |
| fmt.Sprintf("test_value1%d", i), true) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", |
| fmt.Sprintf("test_key0%d", i), |
| fmt.Sprintf("test_value0%d", i), true) |
| assert.NilError(t, err) |
| } |
| |
| // Verify deletes |
| for i := 1; i <= n; i++ { |
| err = dbs[0].DeleteEntry("test_table", "network1", |
| fmt.Sprintf("test_key0%d", i)) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| err = dbs[1].DeleteEntry("test_table", "network1", |
| fmt.Sprintf("test_key1%d", i)) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[0].verifyEntryExistence(t, "test_table", "network1", |
| fmt.Sprintf("test_key1%d", i), "", false) |
| assert.NilError(t, err) |
| } |
| |
| for i := 1; i <= n; i++ { |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", |
| fmt.Sprintf("test_key0%d", i), "", false) |
| assert.NilError(t, err) |
| } |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBNodeLeave(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| err = dbs[1].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) |
| assert.NilError(t, err) |
| |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true) |
| |
| dbs[0].Close() |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false) |
| dbs[1].Close() |
| } |
| |
| func TestNetworkDBWatch(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| err = dbs[1].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| ch, cancel := dbs[1].Watch("", "", "") |
| |
| err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) |
| assert.NilError(t, err) |
| |
| testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value") |
| |
| err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value")) |
| assert.NilError(t, err) |
| |
| testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value") |
| |
| err = dbs[0].DeleteEntry("test_table", "network1", "test_key") |
| assert.NilError(t, err) |
| |
| testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "") |
| |
| cancel() |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBBulkSync(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) |
| |
| n := 1000 |
| for i := 1; i <= n; i++ { |
| err = dbs[0].CreateEntry("test_table", "network1", |
| fmt.Sprintf("test_key0%d", i), |
| []byte(fmt.Sprintf("test_value0%d", i))) |
| assert.NilError(t, err) |
| } |
| |
| err = dbs[1].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true) |
| |
| for i := 1; i <= n; i++ { |
| dbs[1].verifyEntryExistence(t, "test_table", "network1", |
| fmt.Sprintf("test_key0%d", i), |
| fmt.Sprintf("test_value0%d", i), true) |
| assert.NilError(t, err) |
| } |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBCRUDMediumCluster(t *testing.T) { |
| n := 5 |
| |
| dbs := createNetworkDBInstances(t, n, "node", DefaultConfig()) |
| |
| for i := 0; i < n; i++ { |
| for j := 0; j < n; j++ { |
| if i == j { |
| continue |
| } |
| |
| dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true) |
| } |
| } |
| |
| for i := 0; i < n; i++ { |
| err := dbs[i].JoinNetwork("network1") |
| assert.NilError(t, err) |
| } |
| |
| for i := 0; i < n; i++ { |
| for j := 0; j < n; j++ { |
| dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true) |
| } |
| } |
| |
| err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) |
| assert.NilError(t, err) |
| |
| for i := 1; i < n; i++ { |
| dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true) |
| } |
| |
| err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value")) |
| assert.NilError(t, err) |
| |
| for i := 1; i < n; i++ { |
| dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true) |
| } |
| |
| err = dbs[0].DeleteEntry("test_table", "network1", "test_key") |
| assert.NilError(t, err) |
| |
| for i := 1; i < n; i++ { |
| dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false) |
| } |
| |
| for i := 1; i < n; i++ { |
| _, err = dbs[i].GetEntry("test_table", "network1", "test_key") |
| assert.Check(t, is.ErrorContains(err, "")) |
| assert.Check(t, is.Contains(err.Error(), "deleted and pending garbage collection"), err) |
| } |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) |
| |
| dbChangeWitness := func(db *NetworkDB) func(network string, expectNodeCount int) { |
| staleNetworkTime := db.networkClock.Time() |
| return func(network string, expectNodeCount int) { |
| check := func(t poll.LogT) poll.Result { |
| networkTime := db.networkClock.Time() |
| if networkTime <= staleNetworkTime { |
| return poll.Continue("network time is stale, no change registered yet.") |
| } |
| count := -1 |
| db.Lock() |
| if nodes, ok := db.networkNodes[network]; ok { |
| count = len(nodes) |
| } |
| db.Unlock() |
| if count != expectNodeCount { |
| return poll.Continue("current number of nodes is %d, expect %d.", count, expectNodeCount) |
| } |
| return poll.Success() |
| } |
| t.Helper() |
| poll.WaitOn(t, check, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond)) |
| } |
| } |
| |
| // Single node Join/Leave |
| witness0 := dbChangeWitness(dbs[0]) |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| witness0("network1", 1) |
| |
| witness0 = dbChangeWitness(dbs[0]) |
| err = dbs[0].LeaveNetwork("network1") |
| assert.NilError(t, err) |
| witness0("network1", 0) |
| |
| // Multiple nodes Join/Leave |
| witness0, witness1 := dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1]) |
| err = dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| err = dbs[1].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| // Wait for the propagation on db[0] |
| dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true) |
| witness0("network1", 2) |
| if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving { |
| t.Fatalf("The network should not be marked as leaving:%t", n.leaving) |
| } |
| |
| // Wait for the propagation on db[1] |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) |
| witness1("network1", 2) |
| if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving { |
| t.Fatalf("The network should not be marked as leaving:%t", n.leaving) |
| } |
| |
| // Try a quick leave/join |
| witness0, witness1 = dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1]) |
| err = dbs[0].LeaveNetwork("network1") |
| assert.NilError(t, err) |
| err = dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true) |
| witness0("network1", 2) |
| |
| dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) |
| witness1("network1", 2) |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBGarbageCollection(t *testing.T) { |
| keysWriteDelete := 5 |
| config := DefaultConfig() |
| config.reapEntryInterval = 30 * time.Second |
| config.StatsPrintPeriod = 15 * time.Second |
| |
| dbs := createNetworkDBInstances(t, 3, "node", config) |
| |
| // 2 Nodes join network |
| err := dbs[0].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| err = dbs[1].JoinNetwork("network1") |
| assert.NilError(t, err) |
| |
| for i := 0; i < keysWriteDelete; i++ { |
| err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+strconv.Itoa(i), []byte("value")) |
| assert.NilError(t, err) |
| } |
| time.Sleep(time.Second) |
| for i := 0; i < keysWriteDelete; i++ { |
| err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+strconv.Itoa(i)) |
| assert.NilError(t, err) |
| } |
| for i := 0; i < 2; i++ { |
| dbs[i].Lock() |
| assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match") |
| dbs[i].Unlock() |
| } |
| |
| // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node |
| time.Sleep(5 * time.Second) |
| |
| err = dbs[2].JoinNetwork("network1") |
| assert.NilError(t, err) |
| for i := 0; i < 3; i++ { |
| dbs[i].Lock() |
| assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match") |
| dbs[i].Unlock() |
| } |
| // at this point the entries should had been all deleted |
| time.Sleep(30 * time.Second) |
| for i := 0; i < 3; i++ { |
| dbs[i].Lock() |
| assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected") |
| dbs[i].Unlock() |
| } |
| |
| // make sure that entries are not coming back |
| time.Sleep(15 * time.Second) |
| for i := 0; i < 3; i++ { |
| dbs[i].Lock() |
| assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected") |
| dbs[i].Unlock() |
| } |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestFindNode(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) |
| |
| dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}} |
| dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}} |
| dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}} |
| |
| // active nodes is 2 because the testing node is in the list |
| assert.Check(t, is.Len(dbs[0].nodes, 2)) |
| assert.Check(t, is.Len(dbs[0].failedNodes, 1)) |
| assert.Check(t, is.Len(dbs[0].leftNodes, 1)) |
| |
| n, currState, m := dbs[0].findNode("active") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal("active", n.Name)) |
| assert.Check(t, is.Equal(nodeActiveState, currState)) |
| assert.Check(t, m != nil) |
| // delete the entry manually |
| delete(m, "active") |
| |
| // test if can be still find |
| n, currState, m = dbs[0].findNode("active") |
| assert.Check(t, is.Nil(n)) |
| assert.Check(t, is.Equal(nodeNotFound, currState)) |
| assert.Check(t, is.Nil(m)) |
| |
| n, currState, m = dbs[0].findNode("failed") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal("failed", n.Name)) |
| assert.Check(t, is.Equal(nodeFailedState, currState)) |
| assert.Check(t, m != nil) |
| |
| // find and remove |
| n, currState, m = dbs[0].findNode("left") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal("left", n.Name)) |
| assert.Check(t, is.Equal(nodeLeftState, currState)) |
| assert.Check(t, m != nil) |
| delete(m, "left") |
| |
| n, currState, m = dbs[0].findNode("left") |
| assert.Check(t, is.Nil(n)) |
| assert.Check(t, is.Equal(nodeNotFound, currState)) |
| assert.Check(t, is.Nil(m)) |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestChangeNodeState(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) |
| |
| dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}} |
| dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}} |
| dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}} |
| |
| // active nodes is 4 because the testing node is in the list |
| assert.Check(t, is.Len(dbs[0].nodes, 4)) |
| |
| n, currState, m := dbs[0].findNode("node1") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal(nodeActiveState, currState)) |
| assert.Check(t, is.Equal("node1", n.Name)) |
| assert.Check(t, m != nil) |
| |
| // node1 to failed |
| dbs[0].changeNodeState("node1", nodeFailedState) |
| |
| n, currState, m = dbs[0].findNode("node1") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal(nodeFailedState, currState)) |
| assert.Check(t, is.Equal("node1", n.Name)) |
| assert.Check(t, m != nil) |
| assert.Check(t, time.Duration(0) != n.reapTime) |
| |
| // node1 back to active |
| dbs[0].changeNodeState("node1", nodeActiveState) |
| |
| n, currState, m = dbs[0].findNode("node1") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal(nodeActiveState, currState)) |
| assert.Check(t, is.Equal("node1", n.Name)) |
| assert.Check(t, m != nil) |
| assert.Check(t, is.Equal(time.Duration(0), n.reapTime)) |
| |
| // node1 to left |
| dbs[0].changeNodeState("node1", nodeLeftState) |
| dbs[0].changeNodeState("node2", nodeLeftState) |
| dbs[0].changeNodeState("node3", nodeLeftState) |
| |
| n, currState, m = dbs[0].findNode("node1") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal(nodeLeftState, currState)) |
| assert.Check(t, is.Equal("node1", n.Name)) |
| assert.Check(t, m != nil) |
| assert.Check(t, time.Duration(0) != n.reapTime) |
| |
| n, currState, m = dbs[0].findNode("node2") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal(nodeLeftState, currState)) |
| assert.Check(t, is.Equal("node2", n.Name)) |
| assert.Check(t, m != nil) |
| assert.Check(t, time.Duration(0) != n.reapTime) |
| |
| n, currState, m = dbs[0].findNode("node3") |
| assert.Check(t, n != nil) |
| assert.Check(t, is.Equal(nodeLeftState, currState)) |
| assert.Check(t, is.Equal("node3", n.Name)) |
| assert.Check(t, m != nil) |
| assert.Check(t, time.Duration(0) != n.reapTime) |
| |
| // active nodes is 1 because the testing node is in the list |
| assert.Check(t, is.Len(dbs[0].nodes, 1)) |
| assert.Check(t, is.Len(dbs[0].failedNodes, 0)) |
| assert.Check(t, is.Len(dbs[0].leftNodes, 3)) |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNodeReincarnation(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) |
| |
| dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}} |
| dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}} |
| dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}} |
| |
| // active nodes is 2 because the testing node is in the list |
| assert.Check(t, is.Len(dbs[0].nodes, 2)) |
| assert.Check(t, is.Len(dbs[0].failedNodes, 1)) |
| assert.Check(t, is.Len(dbs[0].leftNodes, 1)) |
| |
| dbs[0].Lock() |
| b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}) |
| assert.Check(t, b) |
| dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}} |
| |
| b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")}) |
| assert.Check(t, b) |
| dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}} |
| |
| b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")}) |
| assert.Check(t, b) |
| dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}} |
| |
| b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")}) |
| assert.Check(t, !b) |
| |
| // active nodes is 1 because the testing node is in the list |
| assert.Check(t, is.Len(dbs[0].nodes, 4)) |
| assert.Check(t, is.Len(dbs[0].failedNodes, 0)) |
| assert.Check(t, is.Len(dbs[0].leftNodes, 3)) |
| |
| dbs[0].Unlock() |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestParallelCreate(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) |
| |
| startCh := make(chan int) |
| doneCh := make(chan error) |
| var success int32 |
| for i := 0; i < 20; i++ { |
| go func() { |
| <-startCh |
| err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value")) |
| if err == nil { |
| atomic.AddInt32(&success, 1) |
| } |
| doneCh <- err |
| }() |
| } |
| |
| close(startCh) |
| |
| for i := 0; i < 20; i++ { |
| <-doneCh |
| } |
| close(doneCh) |
| // Only 1 write should have succeeded |
| assert.Check(t, is.Equal(int32(1), success)) |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestParallelDelete(t *testing.T) { |
| dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) |
| |
| err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value")) |
| assert.NilError(t, err) |
| |
| startCh := make(chan int) |
| doneCh := make(chan error) |
| var success int32 |
| for i := 0; i < 20; i++ { |
| go func() { |
| <-startCh |
| err := dbs[0].DeleteEntry("testTable", "testNetwork", "key") |
| if err == nil { |
| atomic.AddInt32(&success, 1) |
| } |
| doneCh <- err |
| }() |
| } |
| |
| close(startCh) |
| |
| for i := 0; i < 20; i++ { |
| <-doneCh |
| } |
| close(doneCh) |
| // Only 1 write should have succeeded |
| assert.Check(t, is.Equal(int32(1), success)) |
| |
| closeNetworkDBInstances(t, dbs) |
| } |
| |
| func TestNetworkDBIslands(t *testing.T) { |
| pollTimeout := func() time.Duration { |
| const defaultTimeout = 120 * time.Second |
| dl, ok := t.Deadline() |
| if !ok { |
| return defaultTimeout |
| } |
| if d := time.Until(dl); d <= defaultTimeout { |
| return d |
| } |
| return defaultTimeout |
| } |
| |
| logrus.SetLevel(logrus.DebugLevel) |
| conf := DefaultConfig() |
| // Shorten durations to speed up test execution. |
| conf.rejoinClusterDuration = conf.rejoinClusterDuration / 10 |
| conf.rejoinClusterInterval = conf.rejoinClusterInterval / 10 |
| dbs := createNetworkDBInstances(t, 5, "node", conf) |
| |
| // Get the node IP used currently |
| node := dbs[0].nodes[dbs[0].config.NodeID] |
| baseIPStr := node.Addr.String() |
| // Node 0,1,2 are going to be the 3 bootstrap nodes |
| members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort), |
| fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort), |
| fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)} |
| // Rejoining will update the list of the bootstrap members |
| for i := 3; i < 5; i++ { |
| t.Logf("Re-joining: %d", i) |
| assert.Check(t, dbs[i].Join(members)) |
| } |
| |
| // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes |
| for i := 0; i < 3; i++ { |
| logrus.Infof("node %d leaving", i) |
| dbs[i].Close() |
| } |
| |
| checkDBs := make(map[string]*NetworkDB) |
| for i := 3; i < 5; i++ { |
| db := dbs[i] |
| checkDBs[db.config.Hostname] = db |
| } |
| |
| // Give some time to let the system propagate the messages and free up the ports |
| check := func(t poll.LogT) poll.Result { |
| // Verify that the nodes are actually all gone and marked appropiately |
| for name, db := range checkDBs { |
| db.RLock() |
| if (len(db.leftNodes) != 3) || (len(db.failedNodes) != 0) { |
| for name := range db.leftNodes { |
| t.Logf("%s: Node %s left", db.config.Hostname, name) |
| } |
| for name := range db.failedNodes { |
| t.Logf("%s: Node %s failed", db.config.Hostname, name) |
| } |
| db.RUnlock() |
| return poll.Continue("%s:Waiting for all nodes to cleanly leave, left: %d, failed nodes: %d", name, len(db.leftNodes), len(db.failedNodes)) |
| } |
| db.RUnlock() |
| t.Logf("%s: OK", name) |
| delete(checkDBs, name) |
| } |
| return poll.Success() |
| } |
| poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout())) |
| |
| // Spawn again the first 3 nodes with different names but same IP:port |
| for i := 0; i < 3; i++ { |
| logrus.Infof("node %d coming back", i) |
| conf := *dbs[i].config |
| conf.NodeID = stringid.TruncateID(stringid.GenerateRandomID()) |
| dbs[i] = launchNode(t, conf) |
| } |
| |
| // Give some time for the reconnect routine to run, it runs every 6s. |
| check = func(t poll.LogT) poll.Result { |
| // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join |
| for i := 0; i < 5; i++ { |
| db := dbs[i] |
| db.RLock() |
| if len(db.nodes) != 5 { |
| db.RUnlock() |
| return poll.Continue("%s:Waiting to connect to all nodes", dbs[i].config.Hostname) |
| } |
| if len(db.failedNodes) != 0 { |
| db.RUnlock() |
| return poll.Continue("%s:Waiting for 0 failedNodes", dbs[i].config.Hostname) |
| } |
| if i < 3 { |
| // nodes from 0 to 3 has no left nodes |
| if len(db.leftNodes) != 0 { |
| db.RUnlock() |
| return poll.Continue("%s:Waiting to have no leftNodes", dbs[i].config.Hostname) |
| } |
| } else { |
| // nodes from 4 to 5 has the 3 previous left nodes |
| if len(db.leftNodes) != 3 { |
| db.RUnlock() |
| return poll.Continue("%s:Waiting to have 3 leftNodes", dbs[i].config.Hostname) |
| } |
| } |
| db.RUnlock() |
| } |
| return poll.Success() |
| } |
| poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout())) |
| closeNetworkDBInstances(t, dbs) |
| } |