fix: fixed events buffering, now no event is lost with multiple listeners (fixes #266)

This commit is contained in:
evilsocket 2018-05-12 12:16:15 +02:00
parent e9fa015962
commit 9f0b4a9502
No known key found for this signature in database
GPG key ID: 1564D7F30393A456
3 changed files with 25 additions and 21 deletions

View file

@ -24,7 +24,6 @@ type RestAPI struct {
keyFile string keyFile string
useWebsocket bool useWebsocket bool
upgrader websocket.Upgrader upgrader websocket.Upgrader
eventListener <-chan session.Event
quit chan bool quit chan bool
} }
@ -34,7 +33,6 @@ func NewRestAPI(s *session.Session) *RestAPI {
server: &http.Server{}, server: &http.Server{},
quit: make(chan bool), quit: make(chan bool),
useWebsocket: false, useWebsocket: false,
eventListener: s.Events.Listen(),
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 1024,
WriteBufferSize: 1024, WriteBufferSize: 1024,

View file

@ -68,6 +68,8 @@ func (api *RestAPI) streamWriter(ws *websocket.Conn, w http.ResponseWriter, r *h
log.Debug("Listening for events and streaming to ws endpoint ...") log.Debug("Listening for events and streaming to ws endpoint ...")
pingTicker := time.NewTicker(pingPeriod) pingTicker := time.NewTicker(pingPeriod)
listener := session.I.Events.Listen()
defer session.I.Events.Unlisten(listener)
for { for {
select { select {
@ -75,7 +77,7 @@ func (api *RestAPI) streamWriter(ws *websocket.Conn, w http.ResponseWriter, r *h
if err := api.sendPing(ws); err != nil { if err := api.sendPing(ws); err != nil {
return return
} }
case event := <-api.eventListener: case event := <-listener:
if err := api.streamEvent(ws, event); err != nil { if err := api.streamEvent(ws, event); err != nil {
return return
} }

View file

@ -62,6 +62,19 @@ func (p *EventPool) Listen() <-chan Event {
return l return l
} }
func (p *EventPool) Unlisten(listener <-chan Event) {
p.Lock()
defer p.Unlock()
for i, l := range p.listeners {
if l == listener {
close(l)
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
return
}
}
}
func (p *EventPool) SetSilent(s bool) { func (p *EventPool) SetSilent(s bool) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
@ -83,16 +96,7 @@ func (p *EventPool) Add(tag string, data interface{}) {
// broadcast the event to every listener // broadcast the event to every listener
for _, l := range p.listeners { for _, l := range p.listeners {
select { l <- e
case l <- e:
// NOTE: Without this 'default', errors in sending the event
// to the listener would not empty the channel, therefore
// all operations would be stuck at some point (after the first
// event if not buffered or after the first N events if buffered)
//
// See https://github.com/bettercap/bettercap/issues/198
default:
}
} }
} }