From 624fed69bb06db015fdac83da458e059bc2aff3e Mon Sep 17 00:00:00 2001 From: evilsocket Date: Sun, 11 Feb 2018 04:06:10 +0100 Subject: [PATCH] misc: small fix or general refactoring i did not bother commenting --- packets/queue.go | 127 ++++++++++++++++++++++++----------------------- 1 file changed, 66 insertions(+), 61 deletions(-) diff --git a/packets/queue.go b/packets/queue.go index 052e32cd..5611eaee 100644 --- a/packets/queue.go +++ b/packets/queue.go @@ -71,78 +71,82 @@ func NewQueue(iface *bnet.Endpoint) (*Queue, error) { return q, nil } +func (q *Queue) trackProtocols(pkt gopacket.Packet) { + // gather protocols stats + pktLayers := pkt.Layers() + for _, layer := range pktLayers { + proto := layer.LayerType().String() + if proto == "DecodeFailure" || proto == "Payload" || proto == "Ethernet" { + continue + } + + q.Lock() + if _, found := q.Protos[proto]; found == false { + q.Protos[proto] = 1 + } else { + q.Protos[proto] += 1 + } + q.Unlock() + } +} + +func (q *Queue) trackActivity(eth *layers.Ethernet, ip4 *layers.IPv4, address net.IP, pktSize uint64) { + q.Lock() + defer q.Unlock() + + // detrmine direction + isSent := bytes.Compare(address, ip4.SrcIP) == 0 + // push to activity channel + q.Activities <- Activity{ + IP: address, + MAC: eth.SrcMAC, + Source: isSent, + } + + // initialize or update stats + addr := address.String() + if _, found := q.Traffic[addr]; found == false { + if isSent { + q.Traffic[addr] = &Traffic{Sent: pktSize} + } else { + q.Traffic[addr] = &Traffic{Received: pktSize} + } + } else { + if isSent { + q.Traffic[addr].Sent += pktSize + } else { + q.Traffic[addr].Received += pktSize + } + } +} + func (q *Queue) worker() { for pkt := range q.source.Packets() { if q.active == false { return } + q.trackProtocols(pkt) + pktSize := uint64(len(pkt.Data())) atomic.AddUint64(&q.PktReceived, 1) atomic.AddUint64(&q.Received, pktSize) - // gather protocols stats - pktLayers := pkt.Layers() - for _, layer := range pktLayers { - proto := layer.LayerType().String() - if proto == "DecodeFailure" || proto == "Payload" || proto == "Ethernet" { - continue - } - - q.Lock() - if _, found := q.Protos[proto]; found == false { - q.Protos[proto] = 1 - } else { - q.Protos[proto] += 1 - } - q.Unlock() - } - - // check for new ipv4 endpoints + // decode eth and ipv4 layers leth := pkt.Layer(layers.LayerTypeEthernet) lip4 := pkt.Layer(layers.LayerTypeIPv4) - if leth != nil && lip4 != nil { eth := leth.(*layers.Ethernet) ip4 := lip4.(*layers.IPv4) + // coming from our network if bytes.Compare(q.iface.IP, ip4.SrcIP) != 0 && q.iface.Net.Contains(ip4.SrcIP) { - q.Lock() - q.Activities <- Activity{ - IP: ip4.SrcIP, - MAC: eth.SrcMAC, - Source: true, - } - - addr := ip4.SrcIP.String() - if _, found := q.Traffic[addr]; found == false { - q.Traffic[addr] = &Traffic{ - Sent: pktSize, - } - } else { - q.Traffic[addr].Sent += pktSize - } - q.Unlock() + q.trackActivity(eth, ip4, ip4.SrcIP, pktSize) } - + // coming to our network if bytes.Compare(q.iface.IP, ip4.DstIP) != 0 && q.iface.Net.Contains(ip4.DstIP) { - q.Lock() - q.Activities <- Activity{ - IP: ip4.DstIP, - MAC: eth.SrcMAC, - Source: false, - } - - addr := ip4.DstIP.String() - if _, found := q.Traffic[addr]; found == false { - q.Traffic[addr] = &Traffic{ - Received: pktSize, - } - } else { - q.Traffic[addr].Received += pktSize - } - q.Unlock() + q.trackActivity(eth, ip4, ip4.DstIP, pktSize) } } } @@ -152,17 +156,18 @@ func (q *Queue) Send(raw []byte) error { q.Lock() defer q.Unlock() - if q.active { - err := q.handle.WritePacketData(raw) - if err == nil { - q.Sent += uint64(len(raw)) - } else { - q.Errors += 1 - } - return err - } else { + if q.active == false { return fmt.Errorf("Packet queue is not active.") } + + if err := q.handle.WritePacketData(raw); err != nil { + q.Errors += 1 + return err + } else { + q.Sent += uint64(len(raw)) + } + + return nil } func (q *Queue) Stop() {