From f645306a18772e5479f49202d6347edf4f94a6dd Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Fri, 10 Aug 2018 23:34:37 -0600 Subject: [PATCH] Initial implementation of JSON progress reporter --- cmd/restic/cmd_backup.go | 72 ++++- cmd/restic/global.go | 2 +- internal/ui/backup.go | 10 +- internal/ui/jsonstatus/status.go | 443 +++++++++++++++++++++++++++++++ 4 files changed, 511 insertions(+), 16 deletions(-) create mode 100644 internal/ui/jsonstatus/status.go diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index dff1d31db..7677c1915 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -23,6 +24,7 @@ import ( "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/textfile" "github.com/restic/restic/internal/ui" + "github.com/restic/restic/internal/ui/jsonstatus" "github.com/restic/restic/internal/ui/termstatus" ) @@ -395,15 +397,43 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina var t tomb.Tomb - if gopts.verbosity >= 2 { + if gopts.verbosity >= 2 && !gopts.JSON { term.Print("open repository\n") } + repo, err := OpenRepository(gopts) if err != nil { return err } - p := ui.NewBackup(term, gopts.verbosity) + type ArchiveProgressReporter interface { + CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) + StartFile(filename string) + CompleteBlob(filename string, bytes uint64) + ScannerError(item string, fi os.FileInfo, err error) error + ReportTotal(item string, s archiver.ScanStats) + SetMinUpdatePause(d time.Duration) + Run(ctx context.Context) error + Error(item string, fi os.FileInfo, err error) error + Finish() + + // ui.StdioWrapper + Stdout() io.WriteCloser + Stderr() io.WriteCloser + + // ui.Message + E(msg string, args ...interface{}) + P(msg string, args ...interface{}) + V(msg string, args ...interface{}) + VV(msg string, args ...interface{}) + } + + var p ArchiveProgressReporter + if gopts.JSON { + p = jsonstatus.NewBackup(term, gopts.verbosity) + } else { + p = ui.NewBackup(term, gopts.verbosity) + } // use the terminal for stdout/stderr prevStdout, prevStderr := gopts.stdout, gopts.stderr @@ -418,13 +448,15 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina if fps > 60 { fps = 60 } - p.MinUpdatePause = time.Second / time.Duration(fps) + p.SetMinUpdatePause(time.Second / time.Duration(fps)) } } t.Go(func() error { return p.Run(t.Context(gopts.ctx)) }) - p.V("lock repository") + if !gopts.JSON { + p.V("lock repository") + } lock, err := lockRepo(repo) defer unlockRepo(lock) if err != nil { @@ -443,7 +475,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina return err } - p.V("load index files") + if !gopts.JSON { + p.V("load index files") + } err = repo.LoadIndex(gopts.ctx) if err != nil { return err @@ -454,7 +488,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina return err } - if parentSnapshotID != nil { + if !gopts.JSON && parentSnapshotID != nil { p.V("using parent snapshot %v\n", parentSnapshotID.Str()) } @@ -478,7 +512,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina var targetFS fs.FS = fs.Local{} if opts.Stdin { - p.V("read data from stdin") + if !gopts.JSON { + p.V("read data from stdin") + } targetFS = &fs.Reader{ ModTime: timeStamp, Name: opts.StdinFilename, @@ -494,7 +530,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina sc.Error = p.ScannerError sc.Result = p.ReportTotal - p.V("start scan on %v", targets) + if !gopts.JSON { + p.V("start scan on %v", targets) + } t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) }) arch := archiver.New(repo, targetFS, archiver.Options{}) @@ -502,7 +540,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina arch.Select = selectFilter arch.WithAtime = opts.WithAtime arch.Error = p.Error - arch.CompleteItem = p.CompleteItemFn + arch.CompleteItem = p.CompleteItem arch.StartFile = p.StartFile arch.CompleteBlob = p.CompleteBlob @@ -521,10 +559,14 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina uploader := archiver.IndexUploader{ Repository: repo, Start: func() { - p.VV("uploading intermediate index") + if !gopts.JSON { + p.VV("uploading intermediate index") + } }, Complete: func(id restic.ID) { - p.V("uploaded intermediate index %v", id.Str()) + if !gopts.JSON { + p.V("uploaded intermediate index %v", id.Str()) + } }, } @@ -532,14 +574,18 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina return uploader.Upload(gopts.ctx, t.Context(gopts.ctx), 30*time.Second) }) - p.V("start backup on %v", targets) + if !gopts.JSON { + p.V("start backup on %v", targets) + } _, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts) if err != nil { return errors.Fatalf("unable to save snapshot: %v", err) } p.Finish() - p.P("snapshot %s saved\n", id.Str()) + if !gopts.JSON { + p.P("snapshot %s saved\n", id.Str()) + } // cleanly shutdown all running goroutines t.Kill(nil) diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 531447365..57d4e8d8c 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -377,7 +377,7 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) { return nil, err } - if stdoutIsTerminal() { + if stdoutIsTerminal() && !opts.JSON { id := s.Config().ID if len(id) > 8 { id = id[:8] diff --git a/internal/ui/backup.go b/internal/ui/backup.go index 3a950d9ad..14d0c0c00 100644 --- a/internal/ui/backup.go +++ b/internal/ui/backup.go @@ -254,9 +254,9 @@ func formatBytes(c uint64) string { } } -// CompleteItemFn is the status callback function for the archiver when a +// CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (b *Backup) CompleteItemFn(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { +func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { b.summary.Lock() b.summary.ItemStats.Add(s) b.summary.Unlock() @@ -365,3 +365,9 @@ func (b *Backup) Finish() { formatDuration(time.Since(b.start)), ) } + +// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the +// ArchiveProgressReporter interface. +func (b *Backup) SetMinUpdatePause(d time.Duration) { + b.MinUpdatePause = d +} diff --git a/internal/ui/jsonstatus/status.go b/internal/ui/jsonstatus/status.go new file mode 100644 index 000000000..208dfe896 --- /dev/null +++ b/internal/ui/jsonstatus/status.go @@ -0,0 +1,443 @@ +package jsonstatus + +import ( + "context" + "encoding/json" + "os" + "sort" + "sync" + "time" + + "github.com/restic/restic/internal/archiver" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui" + "github.com/restic/restic/internal/ui/termstatus" +) + +type counter struct { + Files, Dirs, Bytes uint64 +} + +type fileWorkerMessage struct { + filename string + done bool +} + +// Backup reports progress for the `backup` command in JSON. +type Backup struct { + *ui.Message + *ui.StdioWrapper + + MinUpdatePause time.Duration + + term *termstatus.Terminal + v uint + start time.Time + + totalBytes uint64 + + totalCh chan counter + processedCh chan counter + errCh chan struct{} + workerCh chan fileWorkerMessage + finished chan struct{} + + summary struct { + sync.Mutex + Files, Dirs struct { + New uint + Changed uint + Unchanged uint + } + archiver.ItemStats + } +} + +// NewBackup returns a new backup progress reporter. +func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup { + return &Backup{ + Message: ui.NewMessage(term, verbosity), + StdioWrapper: ui.NewStdioWrapper(term), + term: term, + v: verbosity, + start: time.Now(), + + // limit to 60fps by default + MinUpdatePause: time.Second / 60, + + totalCh: make(chan counter), + processedCh: make(chan counter), + errCh: make(chan struct{}), + workerCh: make(chan fileWorkerMessage), + finished: make(chan struct{}), + } +} + +// Run regularly updates the status lines. It should be called in a separate +// goroutine. +func (b *Backup) Run(ctx context.Context) error { + var ( + lastUpdate time.Time + total, processed counter + errors uint + started bool + currentFiles = make(map[string]struct{}) + secondsRemaining uint64 + ) + + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-b.finished: + started = false + case t, ok := <-b.totalCh: + if ok { + total = t + started = true + } else { + // scan has finished + b.totalCh = nil + b.totalBytes = total.Bytes + } + case s := <-b.processedCh: + processed.Files += s.Files + processed.Dirs += s.Dirs + processed.Bytes += s.Bytes + started = true + case <-b.errCh: + errors++ + started = true + case m := <-b.workerCh: + if m.done { + delete(currentFiles, m.filename) + } else { + currentFiles[m.filename] = struct{}{} + } + case <-t.C: + if !started { + continue + } + + if b.totalCh == nil { + secs := float64(time.Since(b.start) / time.Second) + todo := float64(total.Bytes - processed.Bytes) + secondsRemaining = uint64(secs / float64(processed.Bytes) * todo) + } + } + + // limit update frequency + if time.Since(lastUpdate) < b.MinUpdatePause { + continue + } + lastUpdate = time.Now() + + b.update(total, processed, errors, currentFiles, secondsRemaining) + } +} + +// update updates the status lines. +func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) { + status := statusUpdate{ + MessageType: "status", + SecondsElapsed: uint64(time.Since(b.start) / time.Second), + SecondsRemaining: secs, + TotalFiles: total.Files, + FilesDone: processed.Files, + TotalBytes: total.Bytes, + BytesDone: processed.Bytes, + ErrorCount: errors, + } + + if total.Bytes > 0 { + status.PercentDone = float64(processed.Bytes) / float64(total.Bytes) + } + + for filename := range currentFiles { + status.CurrentFiles = append(status.CurrentFiles, filename) + } + sort.Sort(sort.StringSlice(status.CurrentFiles)) + + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(status) +} + +// ScannerError is the error callback function for the scanner, it prints the +// error in verbose mode and returns nil. +func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error { + // b.V("scan: %v\n", err) + json.NewEncoder(b.StdioWrapper.Stderr()).Encode(errorUpdate{ + MessageType: "error", + Error: err, + During: "scan", + Item: item, + }) + return nil +} + +// Error is the error callback function for the archiver, it prints the error and returns nil. +func (b *Backup) Error(item string, fi os.FileInfo, err error) error { + // b.E("error: %v\n", err) + json.NewEncoder(b.StdioWrapper.Stderr()).Encode(errorUpdate{ + MessageType: "error", + Error: err, + During: "archival", + Item: item, + }) + b.errCh <- struct{}{} + return nil +} + +// StartFile is called when a file is being processed by a worker. +func (b *Backup) StartFile(filename string) { + b.workerCh <- fileWorkerMessage{ + filename: filename, + } +} + +// CompleteBlob is called for all saved blobs for files. +func (b *Backup) CompleteBlob(filename string, bytes uint64) { + b.processedCh <- counter{Bytes: bytes} +} + +// CompleteItem is the status callback function for the archiver when a +// file/dir has been saved successfully. +func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { + b.summary.Lock() + b.summary.ItemStats.Add(s) + b.summary.Unlock() + + if current == nil { + // error occurred, tell the status display to remove the line + b.workerCh <- fileWorkerMessage{ + filename: item, + done: true, + } + return + } + + switch current.Type { + case "file": + b.processedCh <- counter{Files: 1} + b.workerCh <- fileWorkerMessage{ + filename: item, + done: true, + } + case "dir": + b.processedCh <- counter{Dirs: 1} + } + + if current.Type == "dir" { + if previous == nil { + // b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) + if b.v >= 3 { + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{ + MessageType: "verbose_status", + Action: "new", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + MetadataSize: s.TreeSize, + }) + } + b.summary.Lock() + b.summary.Dirs.New++ + b.summary.Unlock() + return + } + + if previous.Equals(*current) { + // b.VV("unchanged %v", item) + if b.v >= 3 { + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{ + MessageType: "verbose_status", + Action: "unchanged", + Item: item, + }) + } + b.summary.Lock() + b.summary.Dirs.Unchanged++ + b.summary.Unlock() + } else { + // b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) + if b.v >= 3 { + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{ + MessageType: "verbose_status", + Action: "modified", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + MetadataSize: s.TreeSize, + }) + } + b.summary.Lock() + b.summary.Dirs.Changed++ + b.summary.Unlock() + } + + } else if current.Type == "file" { + + b.workerCh <- fileWorkerMessage{ + done: true, + filename: item, + } + + if previous == nil { + // b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) + if b.v >= 3 { + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{ + MessageType: "verbose_status", + Action: "new", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + }) + } + b.summary.Lock() + b.summary.Files.New++ + b.summary.Unlock() + return + } + + if previous.Equals(*current) { + // b.VV("unchanged %v", item) + if b.v >= 3 { + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{ + MessageType: "verbose_status", + Action: "unchanged", + Item: item, + }) + } + b.summary.Lock() + b.summary.Files.Unchanged++ + b.summary.Unlock() + } else { + // b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) + if b.v >= 3 { + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{ + MessageType: "verbose_status", + Action: "modified", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + }) + } + b.summary.Lock() + b.summary.Files.Changed++ + b.summary.Unlock() + } + } +} + +// ReportTotal sets the total stats up to now +func (b *Backup) ReportTotal(item string, s archiver.ScanStats) { + select { + case b.totalCh <- counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}: + case <-b.finished: + } + + if item == "" { + // b.V("scan finished in %.3fs: %v files, %s", + // time.Since(b.start).Seconds(), + // s.Files, formatBytes(s.Bytes), + // ) + if b.v >= 2 { + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{ + MessageType: "status", + Action: "scan_finished", + Duration: time.Since(b.start).Seconds(), + DataSize: s.Bytes, + TotalFiles: s.Files, + }) + } + close(b.totalCh) + return + } +} + +// Finish prints the finishing messages. +func (b *Backup) Finish() { + close(b.finished) + + // b.P("\n") + // b.P("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged) + // b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", b.summary.Dirs.New, b.summary.Dirs.Changed, b.summary.Dirs.Unchanged) + // b.V("Data Blobs: %5d new\n", b.summary.ItemStats.DataBlobs) + // b.V("Tree Blobs: %5d new\n", b.summary.ItemStats.TreeBlobs) + // b.P("Added to the repo: %-5s\n", formatBytes(b.summary.ItemStats.DataSize+b.summary.ItemStats.TreeSize)) + // b.P("\n") + // b.P("processed %v files, %v in %s", + // b.summary.Files.New+b.summary.Files.Changed+b.summary.Files.Unchanged, + // formatBytes(b.totalBytes), + // formatDuration(time.Since(b.start)), + // ) + json.NewEncoder(b.StdioWrapper.Stdout()).Encode(summaryOutput{ + MessageType: "summary", + FilesNew: b.summary.Files.New, + FilesChanged: b.summary.Files.Changed, + FilesUnmodified: b.summary.Files.Unchanged, + DirsNew: b.summary.Dirs.New, + DirsChanged: b.summary.Dirs.Changed, + DirsUnmodified: b.summary.Dirs.Unchanged, + DataBlobs: b.summary.ItemStats.DataBlobs, + TreeBlobs: b.summary.ItemStats.TreeBlobs, + DataAdded: b.summary.ItemStats.DataSize + b.summary.ItemStats.TreeSize, + TotalFilesProcessed: b.summary.Files.New + b.summary.Files.Changed + b.summary.Files.Unchanged, + TotalBytesProcessed: b.totalBytes, + TotalDuration: time.Since(b.start).Seconds(), + }) +} + +// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the +// ArchiveProgressReporter interface. +func (b *Backup) SetMinUpdatePause(d time.Duration) { + b.MinUpdatePause = d +} + +type statusUpdate struct { + MessageType string `json:"message_type"` // "status" + SecondsElapsed uint64 `json:"seconds_elapsed,omitempty"` + SecondsRemaining uint64 `json:"seconds_remaining,omitempty"` + PercentDone float64 `json:"percent_done"` + TotalFiles uint64 `json:"total_files,omitempty"` + FilesDone uint64 `json:"files_done,omitempty"` + TotalBytes uint64 `json:"total_bytes,omitempty"` + BytesDone uint64 `json:"bytes_done,omitempty"` + ErrorCount uint `json:"error_count,omitempty"` + CurrentFiles []string `json:"current_files,omitempty"` +} + +type errorUpdate struct { + MessageType string `json:"message_type"` // "error" + Error error `json:"error"` + During string `json:"during"` + Item string `json:"item"` +} + +type verboseUpdate struct { + MessageType string `json:"message_type"` // "verbose_status" + Action string `json:"action"` + Item string `json:"item"` + Duration float64 `json:"duration"` // in seconds + DataSize uint64 `json:"data_size"` + MetadataSize uint64 `json:"metadata_size"` + TotalFiles uint `json:"total_files"` +} + +type summaryOutput struct { + MessageType string `json:"message_type"` // "summary" + FilesNew uint `json:"files_new"` + FilesChanged uint `json:"files_changed"` + FilesUnmodified uint `json:"files_unmodified"` + DirsNew uint `json:"dirs_new"` + DirsChanged uint `json:"dirs_changed"` + DirsUnmodified uint `json:"dirs_unmodified"` + DataBlobs int `json:"data_blobs"` + TreeBlobs int `json:"tree_blobs"` + DataAdded uint64 `json:"data_added"` + TotalFilesProcessed uint `json:"total_files_processed"` + TotalBytesProcessed uint64 `json:"total_bytes_processed"` + TotalDuration float64 `json:"total_duration"` // in seconds +}