diff --git a/integration/test.sh b/integration/test.sh index 707a613c6..5045fb315 100755 --- a/integration/test.sh +++ b/integration/test.sh @@ -30,7 +30,7 @@ EOT mkdir files-$i pushd files-$i >/dev/null - ../genfiles -maxexp 21 -files 4000 + ../genfiles -maxexp 21 -files 400 ../md5r > ../md5-$i popd >/dev/null done diff --git a/main.go b/main.go index 68f326844..3d8b29232 100644 --- a/main.go +++ b/main.go @@ -58,6 +58,7 @@ type AdvancedOptions struct { LimitRate int `long:"send-rate" description:"Rate limit for outgoing data" default:"0" value-name:"KBPS"` ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"` ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"` + MaxChangeBW int `long:"max-change-bw" description:"Max change bandwidth per file" default:"1e6" value-name:"MB/s"` } var opts Options @@ -166,7 +167,7 @@ func main() { } ensureDir(dir, -1) - m := model.NewModel(dir) + m := model.NewModel(dir, opts.Advanced.MaxChangeBW) for _, t := range opts.Debug.TraceModel { m.Trace(t) } diff --git a/model/model.go b/model/model.go index 7fafd53f5..0b710eff7 100644 --- a/model/model.go +++ b/model/model.go @@ -46,9 +46,7 @@ type Model struct { trace map[string]bool - fileLastChanged map[string]time.Time - fileWasSuppressed map[string]int - fmut sync.Mutex // protects fileLastChanged and fileWasSuppressed + sup suppressor parallellRequests int limitRequestRate chan struct{} @@ -79,20 +77,19 @@ var ( // NewModel creates and starts a new model. The model starts in read-only mode, // where it sends index information to connected peers and responds to requests // for file data without altering the local repository in any way. -func NewModel(dir string) *Model { +func NewModel(dir string, maxChangeBw int) *Model { m := &Model{ - dir: dir, - global: make(map[string]File), - local: make(map[string]File), - remote: make(map[string]map[string]File), - protoConn: make(map[string]Connection), - rawConn: make(map[string]io.Closer), - lastIdxBcast: time.Now(), - trace: make(map[string]bool), - fileLastChanged: make(map[string]time.Time), - fileWasSuppressed: make(map[string]int), - fq: NewFileQueue(), - dq: make(chan File), + dir: dir, + global: make(map[string]File), + local: make(map[string]File), + remote: make(map[string]map[string]File), + protoConn: make(map[string]Connection), + rawConn: make(map[string]io.Closer), + lastIdxBcast: time.Now(), + trace: make(map[string]bool), + sup: suppressor{threshold: int64(maxChangeBw)}, + fq: NewFileQueue(), + dq: make(chan File), } go m.broadcastIndexLoop() @@ -391,7 +388,6 @@ func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []b } // ReplaceLocal replaces the local repository index with the given list of files. -// Change suppression is applied to files changing too often. func (m *Model) ReplaceLocal(fs []File) { var updated bool var newLocal = make(map[string]File) @@ -512,30 +508,6 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { } } -func (m *Model) shouldSuppressChange(name string) bool { - m.fmut.Lock() - sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name]) - if sup { - m.fileWasSuppressed[name]++ - } else { - m.fileWasSuppressed[name] = 0 - m.fileLastChanged[name] = time.Now() - } - m.fmut.Unlock() - return sup -} - -func shouldSuppressChange(lastChange time.Time, numChanges int) bool { - sinceLast := time.Since(lastChange) - if sinceLast > maxFileHoldTimeS*time.Second { - return false - } - if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second { - return true - } - return false -} - // ProtocolIndex returns the current local index in protocol data types. // Must be called with the read lock held. func (m *Model) ProtocolIndex() []protocol.FileInfo { diff --git a/model/model_test.go b/model/model_test.go index 8262891a3..8e9c0496f 100644 --- a/model/model_test.go +++ b/model/model_test.go @@ -356,31 +356,6 @@ func TestRequest(t *testing.T) { } } -func TestSuppression(t *testing.T) { - var testdata = []struct { - lastChange time.Time - hold int - result bool - }{ - {time.Unix(0, 0), 0, false}, // First change - {time.Now().Add(-1 * time.Second), 0, true}, // Changed once one second ago, suppress - {time.Now().Add(-119 * time.Second), 0, true}, // Changed once 119 seconds ago, suppress - {time.Now().Add(-121 * time.Second), 0, false}, // Changed once 121 seconds ago, permit - - {time.Now().Add(-179 * time.Second), 1, true}, // Suppressed once 179 seconds ago, suppress again - {time.Now().Add(-181 * time.Second), 1, false}, // Suppressed once 181 seconds ago, permit - - {time.Now().Add(-599 * time.Second), 99, true}, // Suppressed lots of times, last allowed 599 seconds ago, suppress again - {time.Now().Add(-601 * time.Second), 99, false}, // Suppressed lots of times, last allowed 601 seconds ago, permit - } - - for i, tc := range testdata { - if shouldSuppressChange(tc.lastChange, tc.hold) != tc.result { - t.Errorf("Incorrect result for test #%d: %v", i, tc) - } - } -} - func TestIgnoreWithUnknownFlags(t *testing.T) { m := NewModel("testdata") fs, _ := m.Walk(false) diff --git a/model/suppressor.go b/model/suppressor.go new file mode 100644 index 000000000..0eb43963c --- /dev/null +++ b/model/suppressor.go @@ -0,0 +1,72 @@ +package model + +import ( + "sync" + "time" +) + +const ( + MAX_CHANGE_HISTORY = 4 +) + +type change struct { + size int64 + when time.Time +} + +type changeHistory struct { + changes []change + next int64 + prevSup bool +} + +type suppressor struct { + sync.Mutex + changes map[string]changeHistory + threshold int64 // bytes/s +} + +func (h changeHistory) bandwidth(t time.Time) int64 { + if len(h.changes) == 0 { + return 0 + } + + var t0 = h.changes[0].when + if t == t0 { + return 0 + } + + var bw float64 + for _, c := range h.changes { + bw += float64(c.size) + } + return int64(bw / t.Sub(t0).Seconds()) +} + +func (h *changeHistory) append(size int64, t time.Time) { + c := change{size, t} + if len(h.changes) == MAX_CHANGE_HISTORY { + h.changes = h.changes[1:MAX_CHANGE_HISTORY] + } + h.changes = append(h.changes, c) +} + +func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) { + s.Lock() + + if s.changes == nil { + s.changes = make(map[string]changeHistory) + } + h := s.changes[name] + sup := h.bandwidth(t) > s.threshold + prevSup := h.prevSup + h.prevSup = sup + if !sup { + h.append(size, t) + } + s.changes[name] = h + + s.Unlock() + + return sup, prevSup +} diff --git a/model/suppressor_test.go b/model/suppressor_test.go new file mode 100644 index 000000000..58fc370f0 --- /dev/null +++ b/model/suppressor_test.go @@ -0,0 +1,113 @@ +package model + +import ( + "testing" + "time" +) + +func TestSuppressor(t *testing.T) { + s := suppressor{threshold: 10000} + t0 := time.Now() + + t1 := t0 + sup, prev := s.suppress("foo", 10000, t1) + if sup { + t.Fatal("Never suppress first change") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is 10000 / 10 = 1000 + t1 = t0.Add(10 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 1000 { + t.Error("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 10000, t1) + if sup { + t.Fatal("Should still be fine") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is (10000 + 10000) / 11 = 1818 + t1 = t0.Add(11 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 1818 { + t.Error("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 100500, t1) + if sup { + t.Fatal("Should still be fine") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is (10000 + 10000 + 100500) / 12 = 10041 + t1 = t0.Add(12 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 10041 { + t.Error("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored + if !sup { + t.Fatal("Should be over threshold") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is (10000 + 10000 + 100500) / 15 = 8033 + t1 = t0.Add(15 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 8033 { + t.Error("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 10000000, t1) + if sup { + t.Fatal("Should be Ok") + } + if !prev { + t.Fatal("Incorrect prev status") + } +} + +func TestHistory(t *testing.T) { + h := changeHistory{} + + t0 := time.Now() + h.append(40, t0) + + if l := len(h.changes); l != 1 { + t.Errorf("Incorrect history length %d", l) + } + if s := h.changes[0].size; s != 40 { + t.Errorf("Incorrect first record size %d", s) + } + + for i := 1; i < MAX_CHANGE_HISTORY; i++ { + h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second)) + } + + if l := len(h.changes); l != MAX_CHANGE_HISTORY { + t.Errorf("Incorrect history length %d", l) + } + if s := h.changes[0].size; s != 40 { + t.Errorf("Incorrect first record size %d", s) + } + if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 40+MAX_CHANGE_HISTORY-1 { + t.Errorf("Incorrect last record size %d", s) + } + + h.append(999, t0.Add(time.Duration(999)*time.Second)) + + if l := len(h.changes); l != MAX_CHANGE_HISTORY { + t.Errorf("Incorrect history length %d", l) + } + if s := h.changes[0].size; s != 41 { + t.Errorf("Incorrect first record size %d", s) + } + if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 999 { + t.Errorf("Incorrect last record size %d", s) + } + +} diff --git a/model/walk.go b/model/walk.go index 1898c8752..1806e2884 100644 --- a/model/walk.go +++ b/model/walk.go @@ -126,9 +126,12 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. } *res = append(*res, lf) } else { - if m.shouldSuppressChange(rn) { + if cur, prev := m.sup.suppress(rn, info.Size(), time.Now()); cur { if m.trace["file"] { - log.Println("FILE: SUPPRESS:", rn, m.fileWasSuppressed[rn], time.Since(m.fileLastChanged[rn])) + log.Printf("FILE: SUPPRESS: %q change bw over threshold", rn) + } + if !prev { + log.Printf("INFO: Changes to %q are being temporarily suppressed because it changes too frequently.", rn) } if ok { @@ -137,6 +140,8 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. *res = append(*res, lf) } return nil + } else if prev && !cur { + log.Printf("INFO: Changes to %q are no longer suppressed.", rn) } if m.trace["file"] {