diff --git a/modules/api_rest.go b/modules/api_rest.go index 3033a488..28c504f7 100644 --- a/modules/api_rest.go +++ b/modules/api_rest.go @@ -33,6 +33,7 @@ func NewRestAPI(s *session.Session) *RestAPI { server: &http.Server{}, quit: make(chan bool), useWebsocket: false, + eventListener: s.Events.Listen(), upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, diff --git a/modules/api_rest_controller.go b/modules/api_rest_controller.go index d6eee401..53480ea8 100644 --- a/modules/api_rest_controller.go +++ b/modules/api_rest_controller.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "strconv" + "strings" "time" "github.com/bettercap/bettercap/log" @@ -81,6 +82,33 @@ func (api *RestAPI) runSessionCommand(w http.ResponseWriter, r *http.Request) { } } +func (api *RestAPI) streamEvent(ws *websocket.Conn, event session.Event) error { + msg, err := json.Marshal(event) + if err != nil { + log.Error("Error while creating websocket message: %s", err) + return err + } + + ws.SetWriteDeadline(time.Now().Add(writeWait)) + if err := ws.WriteMessage(websocket.TextMessage, msg); err != nil { + if !strings.Contains(err.Error(), "closed connection") { + log.Error("Error while writing websocket message: %s", err) + return err + } + } + + return nil +} + +func (api *RestAPI) sendPing(ws *websocket.Conn) error { + ws.SetWriteDeadline(time.Now().Add(writeWait)) + if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + log.Error("Error while writing websocket ping message: %s", err) + return err + } + return nil +} + func (api *RestAPI) streamWriter(ws *websocket.Conn, w http.ResponseWriter, r *http.Request) { defer ws.Close() @@ -88,17 +116,9 @@ func (api *RestAPI) streamWriter(ws *websocket.Conn, w http.ResponseWriter, r *h events := session.I.Events.Sorted() n := len(events) if n > 0 { - log.Info("Sending %d events.", n) + log.Debug("Sending %d events.", n) for _, event := range events { - msg, err := json.Marshal(event) - if err != nil { - log.Error("Error while creating websocket message: %s", err) - return - } - - ws.SetWriteDeadline(time.Now().Add(writeWait)) - if err := ws.WriteMessage(websocket.TextMessage, msg); err != nil { - log.Error("Error while writing websocket message: %s", err) + if err := api.streamEvent(ws, event); err != nil { return } } @@ -106,38 +126,22 @@ func (api *RestAPI) streamWriter(ws *websocket.Conn, w http.ResponseWriter, r *h session.I.Events.Clear() - log.Info("Listening for events and streaming to ws endpoint ...") - - api.eventListener = api.Session.Events.Listen() + log.Debug("Listening for events and streaming to ws endpoint ...") pingTicker := time.NewTicker(pingPeriod) for { select { case <-pingTicker.C: - ws.SetWriteDeadline(time.Now().Add(writeWait)) - log.Info("Ping") - if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - log.Error("Error while writing websocket ping message: %s", err) + if err := api.sendPing(ws); err != nil { return } case event := <-api.eventListener: - log.Info("Event") - msg, err := json.Marshal(event) - if err != nil { - log.Error("Error while creating websocket message: %s", err) - continue - } - - ws.SetWriteDeadline(time.Now().Add(writeWait)) - if err := ws.WriteMessage(websocket.TextMessage, msg); err != nil { - log.Error("Error while writing websocket message: %s", err) + if err := api.streamEvent(ws, event); err != nil { return } - log.Info("Sent") - case <-api.quit: - log.Info("Quit") + log.Info("Stopping websocket events streamer ...") return } } @@ -151,7 +155,7 @@ func (api *RestAPI) streamReader(ws *websocket.Conn) { for { _, _, err := ws.ReadMessage() if err != nil { - log.Info("Closing reader.") + log.Debug("Closing websocket reader.") break } } @@ -169,7 +173,7 @@ func (api *RestAPI) showEvents(w http.ResponseWriter, r *http.Request) { return } - log.Info("Websocket streaming started for %s", r.RemoteAddr) + log.Debug("Websocket streaming started for %s", r.RemoteAddr) go api.streamWriter(ws, w, r) api.streamReader(ws) diff --git a/network/ble_device.go b/network/ble_device.go index 50bf097a..40a72199 100644 --- a/network/ble_device.go +++ b/network/ble_device.go @@ -4,6 +4,7 @@ package network import ( + "encoding/json" "time" "github.com/bettercap/gatt" @@ -11,10 +12,18 @@ import ( type BLEDevice struct { LastSeen time.Time - Device gatt.Peripheral Vendor string - Advertisement *gatt.Advertisement RSSI int + Device gatt.Peripheral + Advertisement *gatt.Advertisement +} + +type bleDeviceJSON struct { + LastSeen time.Time `json:"last_seen"` + Name string `json:"name"` + MAC string `json:"mac"` + Vendor string `json:"vendor"` + RSSI int `json:"rssi"` } func NewBLEDevice(p gatt.Peripheral, a *gatt.Advertisement, rssi int) *BLEDevice { @@ -26,3 +35,15 @@ func NewBLEDevice(p gatt.Peripheral, a *gatt.Advertisement, rssi int) *BLEDevice RSSI: rssi, } } + +func (d *BLEDevice) MarshalJSON() ([]byte, error) { + doc := bleDeviceJSON{ + LastSeen: d.LastSeen, + Name: d.Device.Name(), + MAC: d.Device.ID(), + Vendor: d.Vendor, + RSSI: d.RSSI, + } + + return json.Marshal(doc) +} diff --git a/session/events.go b/session/events.go index cfd4f605..24fe1899 100644 --- a/session/events.go +++ b/session/events.go @@ -57,7 +57,7 @@ func NewEventPool(debug bool, silent bool) *EventPool { func (p *EventPool) Listen() <-chan Event { p.Lock() defer p.Unlock() - l := make(chan Event) + l := make(chan Event, 1) p.listeners = append(p.listeners, l) return l } @@ -77,12 +77,16 @@ func (p *EventPool) SetDebug(d bool) { func (p *EventPool) Add(tag string, data interface{}) { p.Lock() defer p.Unlock() + e := NewEvent(tag, data) p.events = append([]Event{e}, p.events...) // broadcast the event to every listener for _, l := range p.listeners { - l <- e + select { + case l <- e: + default: + } } }