| package raft |
| |
| import ( |
| "fmt" |
| "sync" |
| ) |
| |
| type waitItem struct { |
| // channel to wait up the waiter |
| ch chan interface{} |
| // callback which is called synchronously when the wait is triggered |
| cb func() |
| // callback which is called to cancel a waiter |
| cancel func() |
| } |
| |
| type wait struct { |
| l sync.Mutex |
| m map[uint64]waitItem |
| } |
| |
| func newWait() *wait { |
| return &wait{m: make(map[uint64]waitItem)} |
| } |
| |
| func (w *wait) register(id uint64, cb func(), cancel func()) <-chan interface{} { |
| w.l.Lock() |
| defer w.l.Unlock() |
| _, ok := w.m[id] |
| if !ok { |
| ch := make(chan interface{}, 1) |
| w.m[id] = waitItem{ch: ch, cb: cb, cancel: cancel} |
| return ch |
| } |
| panic(fmt.Sprintf("duplicate id %x", id)) |
| } |
| |
| func (w *wait) trigger(id uint64, x interface{}) bool { |
| w.l.Lock() |
| waitItem, ok := w.m[id] |
| delete(w.m, id) |
| w.l.Unlock() |
| if ok { |
| if waitItem.cb != nil { |
| waitItem.cb() |
| } |
| waitItem.ch <- x |
| return true |
| } |
| return false |
| } |
| |
| func (w *wait) cancel(id uint64) { |
| w.l.Lock() |
| waitItem, ok := w.m[id] |
| delete(w.m, id) |
| w.l.Unlock() |
| if ok { |
| if waitItem.cancel != nil { |
| waitItem.cancel() |
| } |
| close(waitItem.ch) |
| } |
| } |
| |
| func (w *wait) cancelAll() { |
| w.l.Lock() |
| defer w.l.Unlock() |
| |
| for id, waitItem := range w.m { |
| delete(w.m, id) |
| if waitItem.cancel != nil { |
| waitItem.cancel() |
| } |
| close(waitItem.ch) |
| } |
| } |