| package zk |
| |
| import ( |
| "bufio" |
| "bytes" |
| "fmt" |
| "io/ioutil" |
| "math/big" |
| "net" |
| "regexp" |
| "strconv" |
| "time" |
| ) |
| |
| // FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output |
| // from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned |
| // as well as a boolean value to indicate whether this function processed successfully. |
| // |
| // If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil, |
| // then the error happened before we started to obtain 'srvr' values. Otherwise, one of the |
| // servers had an issue and the "Error" value in the struct should be inspected to determine |
| // which server had the issue. |
| func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) { |
| // different parts of the regular expression that are required to parse the srvr output |
| var ( |
| zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)` |
| zrLat = `^Latency min/avg/max: (\d+)/(\d+)/(\d+)` |
| zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)` |
| zrState = `^Zxid: (0x[A-Za-z0-9]+).*\n^Mode: (\w+).*\n^Node count: (\d+)` |
| ) |
| |
| // build the regex from the pieces above |
| re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState)) |
| |
| if err != nil { |
| return nil, false |
| } |
| |
| imOk := true |
| servers = FormatServers(servers) |
| ss := make([]*ServerStats, len(servers)) |
| |
| for i := range ss { |
| response, err := fourLetterWord(servers[i], "srvr", timeout) |
| |
| if err != nil { |
| ss[i] = &ServerStats{Error: err} |
| imOk = false |
| continue |
| } |
| |
| match := re.FindAllStringSubmatch(string(response), -1)[0][1:] |
| |
| if match == nil { |
| err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)") |
| ss[i] = &ServerStats{Error: err} |
| imOk = false |
| continue |
| } |
| |
| // determine current server |
| var srvrMode Mode |
| switch match[10] { |
| case "leader": |
| srvrMode = ModeLeader |
| case "follower": |
| srvrMode = ModeFollower |
| case "standalone": |
| srvrMode = ModeStandalone |
| default: |
| srvrMode = ModeUnknown |
| } |
| |
| buildTime, err := time.Parse("01/02/2006 15:04 MST", match[1]) |
| |
| if err != nil { |
| ss[i] = &ServerStats{Error: err} |
| imOk = false |
| continue |
| } |
| |
| parsedInt, err := strconv.ParseInt(match[9], 0, 64) |
| |
| if err != nil { |
| ss[i] = &ServerStats{Error: err} |
| imOk = false |
| continue |
| } |
| |
| // the ZxID value is an int64 with two int32s packed inside |
| // the high int32 is the epoch (i.e., number of leader elections) |
| // the low int32 is the counter |
| epoch := int32(parsedInt >> 32) |
| counter := int32(parsedInt & 0xFFFFFFFF) |
| |
| // within the regex above, these values must be numerical |
| // so we can avoid useless checking of the error return value |
| minLatency, _ := strconv.ParseInt(match[2], 0, 64) |
| avgLatency, _ := strconv.ParseInt(match[3], 0, 64) |
| maxLatency, _ := strconv.ParseInt(match[4], 0, 64) |
| recv, _ := strconv.ParseInt(match[5], 0, 64) |
| sent, _ := strconv.ParseInt(match[6], 0, 64) |
| cons, _ := strconv.ParseInt(match[7], 0, 64) |
| outs, _ := strconv.ParseInt(match[8], 0, 64) |
| ncnt, _ := strconv.ParseInt(match[11], 0, 64) |
| |
| ss[i] = &ServerStats{ |
| Sent: sent, |
| Received: recv, |
| NodeCount: ncnt, |
| MinLatency: minLatency, |
| AvgLatency: avgLatency, |
| MaxLatency: maxLatency, |
| Connections: cons, |
| Outstanding: outs, |
| Epoch: epoch, |
| Counter: counter, |
| BuildTime: buildTime, |
| Mode: srvrMode, |
| Version: match[0], |
| } |
| } |
| |
| return ss, imOk |
| } |
| |
| // FLWRuok is a FourLetterWord helper function. In particular, this function |
| // pulls the ruok output from each server. |
| func FLWRuok(servers []string, timeout time.Duration) []bool { |
| servers = FormatServers(servers) |
| oks := make([]bool, len(servers)) |
| |
| for i := range oks { |
| response, err := fourLetterWord(servers[i], "ruok", timeout) |
| |
| if err != nil { |
| continue |
| } |
| |
| if bytes.Equal(response[:4], []byte("imok")) { |
| oks[i] = true |
| } |
| } |
| return oks |
| } |
| |
| // FLWCons is a FourLetterWord helper function. In particular, this function |
| // pulls the ruok output from each server. |
| // |
| // As with FLWSrvr, the boolean value indicates whether one of the requests had |
| // an issue. The Clients struct has an Error value that can be checked. |
| func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) { |
| var ( |
| zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]` |
| zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),` |
| zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)` |
| ) |
| |
| re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh)) |
| |
| if err != nil { |
| return nil, false |
| } |
| |
| servers = FormatServers(servers) |
| sc := make([]*ServerClients, len(servers)) |
| imOk := true |
| |
| for i := range sc { |
| response, err := fourLetterWord(servers[i], "cons", timeout) |
| |
| if err != nil { |
| sc[i] = &ServerClients{Error: err} |
| imOk = false |
| continue |
| } |
| |
| scan := bufio.NewScanner(bytes.NewReader(response)) |
| |
| var clients []*ServerClient |
| |
| for scan.Scan() { |
| line := scan.Bytes() |
| |
| if len(line) == 0 { |
| continue |
| } |
| |
| m := re.FindAllStringSubmatch(string(line), -1) |
| |
| if m == nil { |
| err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)") |
| sc[i] = &ServerClients{Error: err} |
| imOk = false |
| continue |
| } |
| |
| match := m[0][1:] |
| |
| queued, _ := strconv.ParseInt(match[1], 0, 64) |
| recvd, _ := strconv.ParseInt(match[2], 0, 64) |
| sent, _ := strconv.ParseInt(match[3], 0, 64) |
| sid, _ := strconv.ParseInt(match[4], 0, 64) |
| est, _ := strconv.ParseInt(match[6], 0, 64) |
| timeout, _ := strconv.ParseInt(match[7], 0, 32) |
| lresp, _ := strconv.ParseInt(match[10], 0, 64) |
| llat, _ := strconv.ParseInt(match[11], 0, 32) |
| minlat, _ := strconv.ParseInt(match[12], 0, 32) |
| avglat, _ := strconv.ParseInt(match[13], 0, 32) |
| maxlat, _ := strconv.ParseInt(match[14], 0, 32) |
| |
| // zookeeper returns a value, '0xffffffffffffffff', as the |
| // Lzxid for PING requests in the 'cons' output. |
| // unfortunately, in Go that is an invalid int64 and is not represented |
| // as -1. |
| // However, converting the string value to a big.Int and then back to |
| // and int64 properly sets the value to -1 |
| lzxid, ok := new(big.Int).SetString(match[9], 0) |
| |
| var errVal error |
| |
| if !ok { |
| errVal = fmt.Errorf("failed to convert lzxid value to big.Int") |
| imOk = false |
| } |
| |
| lcxid, ok := new(big.Int).SetString(match[8], 0) |
| |
| if !ok && errVal == nil { |
| errVal = fmt.Errorf("failed to convert lcxid value to big.Int") |
| imOk = false |
| } |
| |
| clients = append(clients, &ServerClient{ |
| Queued: queued, |
| Received: recvd, |
| Sent: sent, |
| SessionID: sid, |
| Lcxid: lcxid.Int64(), |
| Lzxid: lzxid.Int64(), |
| Timeout: int32(timeout), |
| LastLatency: int32(llat), |
| MinLatency: int32(minlat), |
| AvgLatency: int32(avglat), |
| MaxLatency: int32(maxlat), |
| Established: time.Unix(est, 0), |
| LastResponse: time.Unix(lresp, 0), |
| Addr: match[0], |
| LastOperation: match[5], |
| Error: errVal, |
| }) |
| } |
| |
| sc[i] = &ServerClients{Clients: clients} |
| } |
| |
| return sc, imOk |
| } |
| |
| func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) { |
| conn, err := net.DialTimeout("tcp", server, timeout) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| // the zookeeper server should automatically close this socket |
| // once the command has been processed, but better safe than sorry |
| defer conn.Close() |
| |
| conn.SetWriteDeadline(time.Now().Add(timeout)) |
| |
| _, err = conn.Write([]byte(command)) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| conn.SetReadDeadline(time.Now().Add(timeout)) |
| |
| resp, err := ioutil.ReadAll(conn) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| return resp, nil |
| } |