| package utils |
| |
| import ( |
| "sync" |
| "time" |
| ) |
| |
| func NewJSONMessagePublisher() *JSONMessagePublisher { |
| return &JSONMessagePublisher{} |
| } |
| |
| type JSONMessageListener chan<- JSONMessage |
| |
| type JSONMessagePublisher struct { |
| m sync.RWMutex |
| subscribers []JSONMessageListener |
| } |
| |
| func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) { |
| p.m.Lock() |
| p.subscribers = append(p.subscribers, l) |
| p.m.Unlock() |
| } |
| |
| func (p *JSONMessagePublisher) SubscribersCount() int { |
| p.m.RLock() |
| count := len(p.subscribers) |
| p.m.RUnlock() |
| return count |
| } |
| |
| // Unsubscribe closes and removes the specified listener from the list of |
| // previously registed ones. |
| // It returns a boolean value indicating if the listener was successfully |
| // found, closed and unregistered. |
| func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool { |
| p.m.Lock() |
| defer p.m.Unlock() |
| |
| for i, subscriber := range p.subscribers { |
| if subscriber == l { |
| close(l) |
| p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) |
| return true |
| } |
| } |
| return false |
| } |
| |
| func (p *JSONMessagePublisher) Publish(m JSONMessage) { |
| p.m.RLock() |
| for _, subscriber := range p.subscribers { |
| // We give each subscriber a 100ms time window to receive the event, |
| // after which we move to the next. |
| select { |
| case subscriber <- m: |
| case <-time.After(100 * time.Millisecond): |
| } |
| } |
| p.m.RUnlock() |
| } |