| package overlay |
| |
| import ( |
| "fmt" |
| "net" |
| "strings" |
| "time" |
| |
| "github.com/Sirupsen/logrus" |
| "github.com/hashicorp/serf/serf" |
| ) |
| |
| type ovNotify struct { |
| action string |
| ep *endpoint |
| nw *network |
| } |
| |
| type logWriter struct{} |
| |
| func (l *logWriter) Write(p []byte) (int, error) { |
| str := string(p) |
| |
| switch { |
| case strings.Contains(str, "[WARN]"): |
| logrus.Warn(str) |
| case strings.Contains(str, "[DEBUG]"): |
| logrus.Debug(str) |
| case strings.Contains(str, "[INFO]"): |
| logrus.Info(str) |
| case strings.Contains(str, "[ERR]"): |
| logrus.Error(str) |
| } |
| |
| return len(p), nil |
| } |
| |
| func (d *driver) serfInit() error { |
| var err error |
| |
| config := serf.DefaultConfig() |
| config.Init() |
| config.MemberlistConfig.BindAddr = d.advertiseAddress |
| |
| d.eventCh = make(chan serf.Event, 4) |
| config.EventCh = d.eventCh |
| config.UserCoalescePeriod = 1 * time.Second |
| config.UserQuiescentPeriod = 50 * time.Millisecond |
| |
| config.LogOutput = &logWriter{} |
| config.MemberlistConfig.LogOutput = config.LogOutput |
| |
| s, err := serf.Create(config) |
| if err != nil { |
| return fmt.Errorf("failed to create cluster node: %v", err) |
| } |
| defer func() { |
| if err != nil { |
| s.Shutdown() |
| } |
| }() |
| |
| d.serfInstance = s |
| |
| d.notifyCh = make(chan ovNotify) |
| d.exitCh = make(chan chan struct{}) |
| |
| go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh) |
| return nil |
| } |
| |
| func (d *driver) serfJoin(neighIP string) error { |
| if neighIP == "" { |
| return fmt.Errorf("no neighbor to join") |
| } |
| if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil { |
| return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", |
| neighIP, err) |
| } |
| return nil |
| } |
| |
| func (d *driver) notifyEvent(event ovNotify) { |
| ep := event.ep |
| |
| ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(), |
| net.IP(ep.addr.Mask).String(), ep.mac.String()) |
| eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(), |
| event.nw.id, ep.id) |
| |
| if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil { |
| logrus.Errorf("Sending user event failed: %v\n", err) |
| } |
| } |
| |
| func (d *driver) processEvent(u serf.UserEvent) { |
| logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name, |
| string(u.Payload), uint64(u.LTime)) |
| |
| var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string |
| if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil { |
| fmt.Printf("Failed to scan name string: %v\n", err) |
| } |
| |
| if _, err := fmt.Sscan(string(u.Payload), &action, |
| &ipStr, &maskStr, &macStr); err != nil { |
| fmt.Printf("Failed to scan value string: %v\n", err) |
| } |
| |
| logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr) |
| |
| mac, err := net.ParseMAC(macStr) |
| if err != nil { |
| logrus.Errorf("Failed to parse mac: %v\n", err) |
| } |
| |
| if d.serfInstance.LocalMember().Addr.String() == vtepStr { |
| return |
| } |
| |
| switch action { |
| case "join": |
| d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), |
| true, false, false, false) |
| case "leave": |
| d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), true) |
| } |
| } |
| |
| func (d *driver) processQuery(q *serf.Query) { |
| logrus.Debugf("Received query name:%s, payload:%s\n", q.Name, |
| string(q.Payload)) |
| |
| var nid, ipStr string |
| if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil { |
| fmt.Printf("Failed to scan query payload string: %v\n", err) |
| } |
| |
| peerMac, peerIPMask, vtep, err := d.peerDbSearch(nid, net.ParseIP(ipStr)) |
| if err != nil { |
| return |
| } |
| |
| logrus.Debugf("Sending peer query resp mac %s, mask %s, vtep %s", peerMac, net.IP(peerIPMask), vtep) |
| q.Respond([]byte(fmt.Sprintf("%s %s %s", peerMac.String(), net.IP(peerIPMask).String(), vtep.String()))) |
| } |
| |
| func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) { |
| if d.serfInstance == nil { |
| return nil, nil, nil, fmt.Errorf("could not resolve peer: serf instance not initialized") |
| } |
| |
| qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String()) |
| resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil) |
| if err != nil { |
| return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err) |
| } |
| |
| respCh := resp.ResponseCh() |
| select { |
| case r := <-respCh: |
| var macStr, maskStr, vtepStr string |
| if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil { |
| return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err) |
| } |
| |
| mac, err := net.ParseMAC(macStr) |
| if err != nil { |
| return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err) |
| } |
| |
| logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr) |
| return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil |
| |
| case <-time.After(time.Second): |
| return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster") |
| } |
| } |
| |
| func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, |
| exitCh chan chan struct{}) { |
| |
| for { |
| select { |
| case notify, ok := <-notifyCh: |
| if !ok { |
| break |
| } |
| |
| d.notifyEvent(notify) |
| case ch, ok := <-exitCh: |
| if !ok { |
| break |
| } |
| |
| if err := d.serfInstance.Leave(); err != nil { |
| logrus.Errorf("failed leaving the cluster: %v\n", err) |
| } |
| |
| d.serfInstance.Shutdown() |
| close(ch) |
| return |
| case e, ok := <-eventCh: |
| if !ok { |
| break |
| } |
| |
| if e.EventType() == serf.EventQuery { |
| d.processQuery(e.(*serf.Query)) |
| break |
| } |
| |
| u, ok := e.(serf.UserEvent) |
| if !ok { |
| break |
| } |
| d.processEvent(u) |
| } |
| } |
| } |
| |
| func (d *driver) isSerfAlive() bool { |
| d.Lock() |
| serfInstance := d.serfInstance |
| d.Unlock() |
| if serfInstance == nil || serfInstance.State() != serf.SerfAlive { |
| return false |
| } |
| return true |
| } |