mirror of
https://github.com/bettercap/bettercap
synced 2025-07-16 10:03:39 -07:00
misc: decoupled session record loading to external package
This commit is contained in:
parent
b743b26dde
commit
126cb7febf
78 changed files with 2063 additions and 490 deletions
|
@ -10,6 +10,8 @@ import (
|
|||
"github.com/bettercap/bettercap/session"
|
||||
"github.com/bettercap/bettercap/tls"
|
||||
|
||||
"github.com/bettercap/recording"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
|
@ -35,7 +37,7 @@ type RestAPI struct {
|
|||
replaying bool
|
||||
recordFileName string
|
||||
recordWait *sync.WaitGroup
|
||||
record *Record
|
||||
record *recording.Archive
|
||||
recStarted time.Time
|
||||
recStopped time.Time
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/bettercap/recording"
|
||||
|
||||
"github.com/evilsocket/islazy/fs"
|
||||
)
|
||||
|
||||
|
@ -45,7 +47,7 @@ func (mod *RestAPI) recorder() {
|
|||
mod.recTime = 0
|
||||
mod.recording = true
|
||||
mod.replaying = false
|
||||
mod.record = NewRecord(mod.recordFileName, nil)
|
||||
mod.record = recording.New(mod.recordFileName)
|
||||
|
||||
mod.Info("started recording to %s (clock %s) ...", mod.recordFileName, clock)
|
||||
|
||||
|
|
|
@ -5,6 +5,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/bettercap/recording"
|
||||
|
||||
"github.com/evilsocket/islazy/fs"
|
||||
)
|
||||
|
||||
|
@ -38,7 +40,7 @@ func (mod *RestAPI) startReplay(filename string) (err error) {
|
|||
mod.Info("loading %s ...", mod.recordFileName)
|
||||
|
||||
start := time.Now()
|
||||
mod.record, err = LoadRecord(mod.recordFileName, func(progress float64) {
|
||||
mod.record, err = recording.Load(mod.recordFileName, func(progress float64, done int, total int) {
|
||||
mod.State.Store("load_progress", progress)
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
@ -1,296 +0,0 @@
|
|||
package api_rest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/evilsocket/islazy/fs"
|
||||
"github.com/kr/binarydist"
|
||||
)
|
||||
|
||||
type patch []byte
|
||||
type frame []byte
|
||||
|
||||
type progressCallback func(done int)
|
||||
|
||||
type RecordEntry struct {
|
||||
sync.Mutex
|
||||
|
||||
Data []byte `json:"data"`
|
||||
Cur []byte `json:"-"`
|
||||
States []patch `json:"states"`
|
||||
NumStates int `json:"-"`
|
||||
CurState int `json:"-"`
|
||||
|
||||
frames []frame
|
||||
progress progressCallback
|
||||
}
|
||||
|
||||
func NewRecordEntry(progress progressCallback) *RecordEntry {
|
||||
return &RecordEntry{
|
||||
Data: nil,
|
||||
Cur: nil,
|
||||
States: make([]patch, 0),
|
||||
NumStates: 0,
|
||||
CurState: 0,
|
||||
frames: nil,
|
||||
progress: progress,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *RecordEntry) AddState(state []byte) error {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
// set reference state
|
||||
if e.Data == nil {
|
||||
e.Data = state
|
||||
} else {
|
||||
// create a patch
|
||||
oldReader := bytes.NewReader(e.Cur)
|
||||
newReader := bytes.NewReader(state)
|
||||
writer := new(bytes.Buffer)
|
||||
|
||||
if err := binarydist.Diff(oldReader, newReader, writer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.States = append(e.States, patch(writer.Bytes()))
|
||||
e.NumStates++
|
||||
e.CurState = 0
|
||||
}
|
||||
e.Cur = state
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *RecordEntry) Reset() {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
e.Cur = e.Data
|
||||
e.NumStates = len(e.States)
|
||||
e.CurState = 0
|
||||
}
|
||||
|
||||
func (e *RecordEntry) Compile() error {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
// reset the state
|
||||
e.Cur = e.Data
|
||||
e.NumStates = len(e.States)
|
||||
e.CurState = 0
|
||||
e.frames = make([]frame, e.NumStates+1)
|
||||
|
||||
// first is the master frame
|
||||
e.frames[0] = frame(e.Data)
|
||||
// precompute frames so they can be accessed by index
|
||||
for i := 0; i < e.NumStates; i++ {
|
||||
patch := e.States[i]
|
||||
oldReader := bytes.NewReader(e.Cur)
|
||||
patchReader := bytes.NewReader(patch)
|
||||
newWriter := new(bytes.Buffer)
|
||||
|
||||
if err := binarydist.Patch(oldReader, newWriter, patchReader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.Cur = newWriter.Bytes()
|
||||
e.frames[i+1] = e.Cur
|
||||
|
||||
e.progress(1)
|
||||
}
|
||||
|
||||
e.progress(1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *RecordEntry) Frames() int {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
// master + sub states
|
||||
return e.NumStates + 1
|
||||
}
|
||||
|
||||
func (e *RecordEntry) CurFrame() int {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
return e.CurState + 1
|
||||
}
|
||||
|
||||
func (e *RecordEntry) SetFrom(from int) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
e.CurState = from
|
||||
}
|
||||
|
||||
func (e *RecordEntry) Over() bool {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
return e.CurState > e.NumStates
|
||||
}
|
||||
|
||||
func (e *RecordEntry) Next() []byte {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
cur := e.CurState
|
||||
e.CurState++
|
||||
return e.frames[cur]
|
||||
}
|
||||
|
||||
func (e *RecordEntry) TimeOf(idx int) time.Time {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
buf := e.frames[idx]
|
||||
frame := make(map[string]interface{})
|
||||
|
||||
if err := json.Unmarshal(buf, &frame); err != nil {
|
||||
fmt.Printf("%v\n", err)
|
||||
return time.Time{}
|
||||
} else if tm, err := time.Parse(time.RFC3339, frame["polled_at"].(string)); err != nil {
|
||||
fmt.Printf("%v\n", err)
|
||||
return time.Time{}
|
||||
} else {
|
||||
return tm
|
||||
}
|
||||
}
|
||||
|
||||
func (e *RecordEntry) StartedAt() time.Time {
|
||||
return e.TimeOf(0)
|
||||
}
|
||||
|
||||
func (e *RecordEntry) StoppedAt() time.Time {
|
||||
return e.TimeOf(e.NumStates)
|
||||
}
|
||||
|
||||
func (e *RecordEntry) Duration() time.Duration {
|
||||
return e.StoppedAt().Sub(e.StartedAt())
|
||||
}
|
||||
|
||||
type RecordLoadProgress func(p float64)
|
||||
|
||||
// the Record object represents a recorded session
|
||||
type Record struct {
|
||||
sync.Mutex
|
||||
|
||||
fileName string `json:"-"`
|
||||
done int `json:"-"`
|
||||
total int `json:"-"`
|
||||
progress float64 `json:"-"`
|
||||
onProgress RecordLoadProgress `json:"-"`
|
||||
Session *RecordEntry `json:"session"`
|
||||
Events *RecordEntry `json:"events"`
|
||||
}
|
||||
|
||||
func NewRecord(fileName string, cb RecordLoadProgress) *Record {
|
||||
r := &Record{
|
||||
fileName: fileName,
|
||||
onProgress: cb,
|
||||
}
|
||||
|
||||
r.Session = NewRecordEntry(r.onPartialProgress)
|
||||
r.Events = NewRecordEntry(r.onPartialProgress)
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Record) onPartialProgress(done int) {
|
||||
r.done += done
|
||||
r.progress = float64(r.done) / float64(r.total) * 100.0
|
||||
r.onProgress(r.progress)
|
||||
}
|
||||
|
||||
func LoadRecord(fileName string, cb RecordLoadProgress) (*Record, error) {
|
||||
if !fs.Exists(fileName) {
|
||||
return nil, fmt.Errorf("%s does not exist", fileName)
|
||||
}
|
||||
|
||||
compressed, err := ioutil.ReadFile(fileName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while reading %s: %s", fileName, err)
|
||||
}
|
||||
|
||||
decompress, err := gzip.NewReader(bytes.NewReader(compressed))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while reading gzip file %s: %s", fileName, err)
|
||||
}
|
||||
defer decompress.Close()
|
||||
|
||||
raw, err := ioutil.ReadAll(decompress)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while decompressing %s: %s", fileName, err)
|
||||
}
|
||||
|
||||
rec := &Record{}
|
||||
|
||||
decoder := json.NewDecoder(bytes.NewReader(raw))
|
||||
if err = decoder.Decode(rec); err != nil {
|
||||
return nil, fmt.Errorf("error while parsing %s: %s", fileName, err)
|
||||
}
|
||||
|
||||
rec.Session.NumStates = len(rec.Session.States)
|
||||
rec.Session.progress = rec.onPartialProgress
|
||||
rec.Events.NumStates = len(rec.Events.States)
|
||||
rec.Events.progress = rec.onPartialProgress
|
||||
rec.fileName = fileName
|
||||
rec.total = rec.Session.NumStates + rec.Events.NumStates + 2
|
||||
rec.progress = 0.0
|
||||
rec.done = 0
|
||||
rec.onProgress = cb
|
||||
|
||||
// reset state and precompute frames
|
||||
if err = rec.Session.Compile(); err != nil {
|
||||
return nil, err
|
||||
} else if err = rec.Events.Compile(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rec, nil
|
||||
}
|
||||
|
||||
func (r *Record) NewState(session []byte, events []byte) error {
|
||||
if err := r.Session.AddState(session); err != nil {
|
||||
return err
|
||||
} else if err := r.Events.AddState(events); err != nil {
|
||||
return err
|
||||
}
|
||||
return r.Flush()
|
||||
}
|
||||
|
||||
func (r *Record) save() error {
|
||||
buf := new(bytes.Buffer)
|
||||
encoder := json.NewEncoder(buf)
|
||||
|
||||
if err := encoder.Encode(r); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data := buf.Bytes()
|
||||
|
||||
compressed := new(bytes.Buffer)
|
||||
compress := gzip.NewWriter(compressed)
|
||||
|
||||
if _, err := compress.Write(data); err != nil {
|
||||
return err
|
||||
} else if err = compress.Flush(); err != nil {
|
||||
return err
|
||||
} else if err = compress.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(r.fileName, compressed.Bytes(), os.ModePerm)
|
||||
}
|
||||
|
||||
func (r *Record) Flush() error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.save()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue