2
2
mirror of https://github.com/octoleo/restic.git synced 2024-12-25 20:11:06 +00:00
restic/pipe/pipe.go

266 lines
5.7 KiB
Go
Raw Normal View History

2015-02-15 11:57:09 +00:00
package pipe
import (
2015-03-02 18:44:16 +00:00
"errors"
2015-02-15 11:57:09 +00:00
"fmt"
"os"
2015-03-15 13:24:58 +00:00
"path"
2015-02-15 11:57:09 +00:00
"path/filepath"
"sort"
2015-03-02 13:48:47 +00:00
"github.com/restic/restic/debug"
2015-02-15 11:57:09 +00:00
)
2015-03-07 10:53:32 +00:00
type Result interface{}
type Job interface {
Path() string
Fullpath() string
Error() error
Info() os.FileInfo
Result() chan<- Result
}
2015-02-15 11:57:09 +00:00
type Entry struct {
2015-03-07 10:53:32 +00:00
basedir string
path string
info os.FileInfo
error error
result chan<- Result
// points to the old node if available, interface{} is used to prevent
// circular import
Node interface{}
2015-02-15 11:57:09 +00:00
}
2015-03-07 10:53:32 +00:00
func (e Entry) Path() string { return e.path }
func (e Entry) Fullpath() string { return filepath.Join(e.basedir, e.path) }
func (e Entry) Error() error { return e.error }
func (e Entry) Info() os.FileInfo { return e.info }
func (e Entry) Result() chan<- Result { return e.result }
2015-02-15 11:57:09 +00:00
type Dir struct {
2015-03-07 10:53:32 +00:00
basedir string
path string
error error
info os.FileInfo
2015-02-15 11:57:09 +00:00
2015-03-07 10:53:32 +00:00
Entries [](<-chan Result)
result chan<- Result
2015-02-15 11:57:09 +00:00
}
2015-03-07 10:53:32 +00:00
func (e Dir) Path() string { return e.path }
func (e Dir) Fullpath() string { return filepath.Join(e.basedir, e.path) }
func (e Dir) Error() error { return e.error }
func (e Dir) Info() os.FileInfo { return e.info }
func (e Dir) Result() chan<- Result { return e.result }
2015-02-15 11:57:09 +00:00
// readDirNames reads the directory named by dirname and returns
// a sorted list of directory entries.
// taken from filepath/path.go
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
names, err := f.Readdirnames(-1)
f.Close()
if err != nil {
return nil, err
}
sort.Strings(names)
return names, nil
}
func isDir(fi os.FileInfo) bool {
return fi.IsDir()
}
func isFile(fi os.FileInfo) bool {
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
}
2015-03-02 18:44:16 +00:00
var errCancelled = errors.New("walk cancelled")
2015-03-15 13:24:58 +00:00
func walk(basedir, dir string, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
info, err := os.Lstat(dir)
2015-02-15 11:57:09 +00:00
if err != nil {
2015-03-28 15:35:46 +00:00
debug.Log("pipe.walk", "error for %v: %v", dir, err)
2015-02-15 11:57:09 +00:00
return err
}
2015-03-15 13:24:58 +00:00
relpath, _ := filepath.Rel(basedir, dir)
2015-03-07 10:53:32 +00:00
2015-02-15 11:57:09 +00:00
if !info.IsDir() {
2015-03-02 18:44:16 +00:00
select {
2015-03-07 10:53:32 +00:00
case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}:
2015-03-02 18:44:16 +00:00
case <-done:
return errCancelled
}
2015-03-02 13:48:47 +00:00
return nil
2015-02-15 11:57:09 +00:00
}
2015-03-15 13:24:58 +00:00
names, err := readDirNames(dir)
2015-02-15 11:57:09 +00:00
if err != nil {
return err
}
2015-03-15 13:24:58 +00:00
// Insert breakpoint to allow testing behaviour with vanishing files
// between Readdir() and lstat()
debug.BreakIf("pipe.walk1", func() bool {
match, err := path.Match(os.Getenv("DEBUG_BREAK_PIPE"), relpath)
if err != nil {
panic(err)
}
if match {
debug.Log("break", "break pattern matches for %v\n", relpath)
}
return match
})
2015-03-07 10:53:32 +00:00
entries := make([]<-chan Result, 0, len(names))
2015-02-15 11:57:09 +00:00
for _, name := range names {
2015-03-15 13:24:58 +00:00
subpath := filepath.Join(dir, name)
2015-02-15 13:44:54 +00:00
2015-03-07 10:53:32 +00:00
ch := make(chan Result, 1)
2015-02-15 13:44:54 +00:00
entries = append(entries, ch)
2015-02-15 11:57:09 +00:00
fi, err := os.Lstat(subpath)
if err != nil {
2015-03-02 18:44:16 +00:00
select {
case jobs <- Entry{info: fi, error: err, basedir: basedir, path: filepath.Join(relpath, name), result: ch}:
2015-03-02 18:44:16 +00:00
case <-done:
return errCancelled
}
continue
2015-02-15 11:57:09 +00:00
}
2015-03-15 13:24:58 +00:00
// Insert breakpoint to allow testing behaviour with vanishing files
// between walk and open
debug.BreakIf("pipe.walk2", func() bool {
p := filepath.Join(relpath, name)
match, err := path.Match(os.Getenv("DEBUG_BREAK_PIPE"), p)
if err != nil {
panic(err)
}
if match {
debug.Log("break", "break pattern matches for %v\n", p)
}
return match
})
2015-02-15 13:44:54 +00:00
if isDir(fi) {
2015-03-07 10:53:32 +00:00
err = walk(basedir, subpath, done, jobs, ch)
2015-02-15 13:44:54 +00:00
if err != nil {
return err
}
} else {
2015-03-02 18:44:16 +00:00
select {
2015-03-07 10:53:32 +00:00
case jobs <- Entry{info: fi, basedir: basedir, path: filepath.Join(relpath, name), result: ch}:
2015-03-02 18:44:16 +00:00
case <-done:
return errCancelled
}
2015-02-15 11:57:09 +00:00
}
}
2015-03-02 18:44:16 +00:00
select {
2015-03-07 10:53:32 +00:00
case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}:
2015-03-02 18:44:16 +00:00
case <-done:
return errCancelled
}
2015-02-15 11:57:09 +00:00
return nil
}
2015-03-02 13:48:47 +00:00
// Walk sends a Job for each file and directory it finds below the paths. When
// the channel done is closed, processing stops.
2015-03-07 10:53:32 +00:00
func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
2015-03-02 13:48:47 +00:00
defer func() {
debug.Log("pipe.Walk", "output channel closed")
2015-03-07 10:53:32 +00:00
close(jobs)
2015-03-02 13:48:47 +00:00
}()
2015-03-07 10:53:32 +00:00
entries := make([]<-chan Result, 0, len(paths))
2015-03-02 13:48:47 +00:00
for _, path := range paths {
debug.Log("pipe.Walk", "start walker for %v", path)
2015-03-07 10:53:32 +00:00
ch := make(chan Result, 1)
err := walk(filepath.Dir(path), path, done, jobs, ch)
2015-03-02 13:48:47 +00:00
if err != nil {
2015-03-28 15:35:46 +00:00
debug.Log("pipe.Walk", "error for %v: %v", path, err)
continue
2015-03-02 13:48:47 +00:00
}
2015-03-28 15:35:46 +00:00
entries = append(entries, ch)
2015-03-02 13:48:47 +00:00
debug.Log("pipe.Walk", "walker for %v done", path)
}
debug.Log("pipe.Walk", "sending root node")
select {
case <-done:
return errCancelled
case jobs <- Dir{Entries: entries, result: res}:
}
debug.Log("pipe.Walk", "walker done")
2015-03-07 10:53:32 +00:00
return nil
2015-03-02 13:48:47 +00:00
}
// Split feeds all elements read from inChan to dirChan and entChan.
2015-03-07 10:53:32 +00:00
func Split(inChan <-chan Job, dirChan chan<- Dir, entChan chan<- Entry) {
2015-03-02 13:48:47 +00:00
debug.Log("pipe.Split", "start")
defer debug.Log("pipe.Split", "done")
inCh := inChan
dirCh := dirChan
entCh := entChan
var (
dir Dir
ent Entry
)
// deactivate sending until we received at least one job
dirCh = nil
entCh = nil
for {
select {
case job, ok := <-inCh:
if !ok {
// channel is closed
return
}
if job == nil {
panic("nil job received")
}
// disable receiving until the current job has been sent
inCh = nil
switch j := job.(type) {
case Dir:
dir = j
dirCh = dirChan
case Entry:
ent = j
entCh = entChan
default:
panic(fmt.Sprintf("unknown job type %v", j))
}
case dirCh <- dir:
// disable sending, re-enable receiving
dirCh = nil
inCh = inChan
case entCh <- ent:
// disable sending, re-enable receiving
entCh = nil
inCh = inChan
}
}
2015-02-15 11:57:09 +00:00
}