From 21df0e50e5320b7c6c3d4bcc1f8f4f4aad77687c Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 23 Nov 2014 09:22:18 +0100 Subject: [PATCH 1/2] Refactor stats * channel instead of callback func * cumulate Stats for slow receivers --- archiver.go | 71 ++++++++++++++++++++++++++++++---------- cmd/khepri/cmd_backup.go | 46 ++++++++++++++++++-------- 2 files changed, 85 insertions(+), 32 deletions(-) diff --git a/archiver.go b/archiver.go index 1ce7180bc..0bd35b8e2 100644 --- a/archiver.go +++ b/archiver.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/fd0/khepri/backend" "github.com/fd0/khepri/chunker" @@ -15,6 +16,8 @@ import ( const ( maxConcurrentFiles = 32 maxConcurrentBlobs = 32 + + statTimeout = 20 * time.Millisecond ) type Archiver struct { @@ -32,10 +35,11 @@ type Archiver struct { Error func(dir string, fi os.FileInfo, err error) error Filter func(item string, fi os.FileInfo) bool - ScannerUpdate func(stats Stats) - SaveUpdate func(stats Stats) + ScannerStats chan Stats + SaveStats chan Stats - sum sync.Mutex // for SaveUpdate + statsMutex sync.Mutex + updateStats Stats } type Stats struct { @@ -45,6 +49,13 @@ type Stats struct { Bytes uint64 } +func (s *Stats) Add(other Stats) { + s.Bytes += other.Bytes + s.Directories += other.Directories + s.Files += other.Files + s.Other += other.Other +} + func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { var err error arch := &Archiver{ @@ -67,8 +78,6 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files arch.Filter = func(string, os.FileInfo) bool { return true } - // do nothing - arch.ScannerUpdate = func(Stats) {} arch.bl = NewBlobList() arch.ch, err = NewContentHandler(be, key) @@ -85,11 +94,31 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { return arch, nil } -func (arch *Archiver) saveUpdate(stats Stats) { - if arch.SaveUpdate != nil { - arch.sum.Lock() - defer arch.sum.Unlock() - arch.SaveUpdate(stats) +func (arch *Archiver) update(ch chan Stats, stats Stats) { + if ch == nil { + return + } + + // load old stats from global state + arch.statsMutex.Lock() + stats.Add(arch.updateStats) + arch.updateStats = Stats{} + arch.statsMutex.Unlock() + + // try to send stats through the channel, with a timeout + timeout := time.After(statTimeout) + + select { + case ch <- stats: + break + case _ = <-timeout: + + // save cumulated stats to global state + arch.statsMutex.Lock() + arch.updateStats.Add(stats) + arch.statsMutex.Unlock() + + break } } @@ -140,7 +169,7 @@ func (arch *Archiver) SaveFile(node *Node) error { return err } - arch.saveUpdate(Stats{Bytes: blob.Size}) + arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) blobs = Blobs{blob} } else { @@ -169,7 +198,7 @@ func (arch *Archiver) SaveFile(node *Node) error { panic(err) } - arch.saveUpdate(Stats{Bytes: blob.Size}) + arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.blobToken <- token ch <- blob }(resCh) @@ -240,12 +269,15 @@ func (arch *Archiver) loadTree(dir string) (*Tree, error) { } } - arch.ScannerUpdate(arch.Stats) + arch.update(arch.ScannerStats, arch.Stats) return &tree, nil } func (arch *Archiver) LoadTree(path string) (*Tree, error) { + // reset global stats + arch.updateStats = Stats{} + fi, err := os.Lstat(path) if err != nil { return nil, arrar.Annotatef(err, "Lstat(%q)", path) @@ -259,7 +291,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { if node.Type != "dir" { arch.Stats.Files = 1 arch.Stats.Bytes = node.Size - arch.ScannerUpdate(arch.Stats) + arch.update(arch.ScannerStats, arch.Stats) return &Tree{node}, nil } @@ -269,7 +301,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { return nil, arrar.Annotate(err, "loadTree()") } - arch.ScannerUpdate(arch.Stats) + arch.update(arch.ScannerStats, arch.Stats) return &Tree{node}, nil } @@ -284,7 +316,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { return Blob{}, err } node.Subtree = b.ID - arch.saveUpdate(Stats{Directories: 1}) + arch.update(arch.SaveStats, Stats{Directories: 1}) } else if node.Type == "file" && len(node.Content) == 0 { // start goroutine wg.Add(1) @@ -299,10 +331,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { // TODO: handle error arch.SaveFile(n) - arch.saveUpdate(Stats{Files: 1}) + arch.update(arch.SaveStats, Stats{Files: 1}) }(node) } else { - arch.saveUpdate(Stats{Other: 1}) + arch.update(arch.SaveStats, Stats{Other: 1}) } } @@ -317,6 +349,9 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { } func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) { + // reset global stats + arch.updateStats = Stats{} + sn := NewSnapshot(dir) blob, err := arch.saveTree(t) diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index 8ede5da32..82cab39aa 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -60,11 +60,18 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { fmt.Printf("scanning %s\n", target) if terminal.IsTerminal(int(os.Stdout.Fd())) { - arch.ScannerUpdate = func(stats khepri.Stats) { - fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) - } + ch := make(chan khepri.Stats, 5) + arch.ScannerStats = ch + + go func(ch <-chan khepri.Stats) { + for stats := range ch { + fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) + } + }(ch) } + fmt.Printf("done\n") + // TODO: add filter // arch.Filter = func(dir string, fi os.FileInfo) bool { // return true @@ -80,18 +87,23 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { stats := khepri.Stats{} if terminal.IsTerminal(int(os.Stdout.Fd())) { - arch.SaveUpdate = func(s khepri.Stats) { - stats.Files += s.Files - stats.Directories += s.Directories - stats.Other += s.Other - stats.Bytes += s.Bytes + ch := make(chan khepri.Stats, 5) + arch.SaveStats = ch - fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", - float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, - stats.Directories, arch.Stats.Directories, - stats.Files, arch.Stats.Files, - format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) - } + go func(ch <-chan khepri.Stats) { + for s := range ch { + stats.Files += s.Files + stats.Directories += s.Directories + stats.Other += s.Other + stats.Bytes += s.Bytes + + fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", + float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, + stats.Directories, arch.Stats.Directories, + stats.Files, arch.Stats.Files, + format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) + } + }(ch) } start := time.Now() @@ -100,6 +112,12 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { fmt.Fprintf(os.Stderr, "error: %v\n", err) } + if terminal.IsTerminal(int(os.Stdout.Fd())) { + // close channels so that the goroutines terminate + close(arch.SaveStats) + close(arch.ScannerStats) + } + fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) duration := time.Now().Sub(start) fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20)) From 7e26567b8b1add1451e2dfee130953a933b7ccdb Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 23 Nov 2014 12:05:43 +0100 Subject: [PATCH 2/2] Pretty status for backup --- cmd/khepri/cmd_backup.go | 73 ++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 18 deletions(-) diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index 82cab39aa..97b50243f 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -17,18 +17,30 @@ func format_bytes(c uint64) string { switch { case c > 1<<40: - return fmt.Sprintf("%.3f TiB", b/(1<<40)) + return fmt.Sprintf("%.3fTiB", b/(1<<40)) case c > 1<<30: - return fmt.Sprintf("%.3f GiB", b/(1<<30)) + return fmt.Sprintf("%.3fGiB", b/(1<<30)) case c > 1<<20: - return fmt.Sprintf("%.3f MiB", b/(1<<20)) + return fmt.Sprintf("%.3fMiB", b/(1<<20)) case c > 1<<10: - return fmt.Sprintf("%.3f KiB", b/(1<<10)) + return fmt.Sprintf("%.3fKiB", b/(1<<10)) default: - return fmt.Sprintf("%d B", c) + return fmt.Sprintf("%dB", c) } } +func format_duration(sec uint64) string { + hours := sec / 3600 + sec -= hours * 3600 + min := sec / 60 + sec -= min * 60 + if hours > 0 { + return fmt.Sprintf("%d:%02d:%02d", hours, min, sec) + } + + return fmt.Sprintf("%d:%02d", min, sec) +} + func print_tree2(indent int, t *khepri.Tree) { for _, node := range *t { if node.Tree != nil { @@ -60,7 +72,7 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { fmt.Printf("scanning %s\n", target) if terminal.IsTerminal(int(os.Stdout.Fd())) { - ch := make(chan khepri.Stats, 5) + ch := make(chan khepri.Stats, 20) arch.ScannerStats = ch go func(ch <-chan khepri.Stats) { @@ -86,27 +98,52 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) stats := khepri.Stats{} + start := time.Now() if terminal.IsTerminal(int(os.Stdout.Fd())) { - ch := make(chan khepri.Stats, 5) + ch := make(chan khepri.Stats, 20) arch.SaveStats = ch - go func(ch <-chan khepri.Stats) { - for s := range ch { - stats.Files += s.Files - stats.Directories += s.Directories - stats.Other += s.Other - stats.Bytes += s.Bytes + ticker := time.NewTicker(time.Second) + var eta, bps uint64 - fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", + go func(ch <-chan khepri.Stats) { + + status := func(d time.Duration) { + fmt.Printf("\r[%s] %3.2f%% %s/s %s / %s ETA %s", + format_duration(uint64(d/time.Second)), float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, - stats.Directories, arch.Stats.Directories, - stats.Files, arch.Stats.Files, - format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) + format_bytes(bps), + format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes), + format_duration(eta)) + } + + defer ticker.Stop() + for { + select { + case s, ok := <-ch: + if !ok { + return + } + stats.Files += s.Files + stats.Directories += s.Directories + stats.Other += s.Other + stats.Bytes += s.Bytes + + status(time.Since(start)) + case <-ticker.C: + d := time.Since(start) + bps = stats.Bytes * uint64(time.Second) / uint64(d) + + if bps > 0 { + eta = (arch.Stats.Bytes - stats.Bytes) / bps + } + + status(d) + } } }(ch) } - start := time.Now() sn, id, err := arch.Snapshot(target, t) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err)