2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-07 08:54:29 +00:00
restic/internal/pipe/pipe_test.go

601 lines
12 KiB
Go
Raw Normal View History

2015-02-15 11:57:09 +00:00
package pipe_test
import (
2017-06-05 21:56:59 +00:00
"context"
2015-11-06 18:41:57 +00:00
"io/ioutil"
2015-02-15 11:57:09 +00:00
"os"
"path/filepath"
"runtime"
2015-02-15 11:57:09 +00:00
"sync"
"testing"
"time"
2017-07-23 12:21:03 +00:00
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/pipe"
2017-10-02 13:06:39 +00:00
rtest "github.com/restic/restic/internal/test"
2015-02-15 11:57:09 +00:00
)
type stats struct {
dirs, files int
}
func acceptAll(string, os.FileInfo) bool {
return true
}
2015-02-15 11:57:09 +00:00
func statPath(path string) (stats, error) {
var s stats
// count files and directories with filepath.Walk()
2017-10-02 13:06:39 +00:00
err := filepath.Walk(rtest.TestWalkerPath, func(p string, fi os.FileInfo, err error) error {
2015-02-15 11:57:09 +00:00
if fi == nil {
return err
}
if fi.IsDir() {
s.dirs++
2015-02-15 13:44:54 +00:00
} else {
2015-02-15 11:57:09 +00:00
s.files++
}
return err
})
return s, err
}
2015-06-28 11:15:35 +00:00
const maxWorkers = 100
2015-03-02 13:48:47 +00:00
func TestPipelineWalkerWithSplit(t *testing.T) {
2017-10-02 13:06:39 +00:00
if rtest.TestWalkerPath == "" {
2015-03-02 13:48:47 +00:00
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
2015-02-15 11:57:09 +00:00
}
var err error
2017-10-02 13:06:39 +00:00
if !filepath.IsAbs(rtest.TestWalkerPath) {
rtest.TestWalkerPath, err = filepath.Abs(rtest.TestWalkerPath)
rtest.OK(t, err)
}
2017-10-02 13:06:39 +00:00
before, err := statPath(rtest.TestWalkerPath)
rtest.OK(t, err)
2015-02-15 11:57:09 +00:00
2017-10-02 13:06:39 +00:00
t.Logf("walking path %s with %d dirs, %d files", rtest.TestWalkerPath,
2015-02-15 11:57:09 +00:00
before.dirs, before.files)
// account for top level dir
before.dirs++
2015-02-15 11:57:09 +00:00
after := stats{}
m := sync.Mutex{}
2015-02-15 13:44:54 +00:00
worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
2015-02-15 11:57:09 +00:00
defer wg.Done()
for {
select {
case e, ok := <-entCh:
if !ok {
// channel is closed
return
}
m.Lock()
after.files++
m.Unlock()
2015-03-07 10:53:32 +00:00
e.Result() <- true
2015-02-15 11:57:09 +00:00
case dir, ok := <-dirCh:
if !ok {
// channel is closed
return
}
// wait for all content
for _, ch := range dir.Entries {
<-ch
}
m.Lock()
after.dirs++
m.Unlock()
2015-03-07 10:53:32 +00:00
dir.Result() <- true
2015-02-15 11:57:09 +00:00
case <-done:
// pipeline was cancelled
return
}
}
}
2015-02-15 13:44:54 +00:00
var wg sync.WaitGroup
2015-02-15 11:57:09 +00:00
done := make(chan struct{})
entCh := make(chan pipe.Entry)
dirCh := make(chan pipe.Dir)
2015-06-28 11:15:35 +00:00
for i := 0; i < maxWorkers; i++ {
2015-02-15 11:57:09 +00:00
wg.Add(1)
2015-02-15 13:44:54 +00:00
go worker(&wg, done, entCh, dirCh)
2015-02-15 11:57:09 +00:00
}
2015-03-07 10:53:32 +00:00
jobs := make(chan pipe.Job, 200)
2015-03-02 13:48:47 +00:00
wg.Add(1)
go func() {
pipe.Split(jobs, dirCh, entCh)
close(entCh)
close(dirCh)
wg.Done()
}()
2015-03-07 10:53:32 +00:00
resCh := make(chan pipe.Result, 1)
2017-10-02 13:06:39 +00:00
pipe.Walk(context.TODO(), []string{rtest.TestWalkerPath}, acceptAll, jobs, resCh)
2015-03-02 13:48:47 +00:00
// wait for all workers to terminate
wg.Wait()
// wait for top-level blob
<-resCh
2017-10-02 13:06:39 +00:00
t.Logf("walked path %s with %d dirs, %d files", rtest.TestWalkerPath,
2015-03-02 13:48:47 +00:00
after.dirs, after.files)
2017-10-02 13:06:39 +00:00
rtest.Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
2015-03-02 13:48:47 +00:00
}
func TestPipelineWalker(t *testing.T) {
2017-10-02 13:06:39 +00:00
if rtest.TestWalkerPath == "" {
2015-03-02 13:48:47 +00:00
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
}
2017-06-05 21:56:59 +00:00
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var err error
2017-10-02 13:06:39 +00:00
if !filepath.IsAbs(rtest.TestWalkerPath) {
rtest.TestWalkerPath, err = filepath.Abs(rtest.TestWalkerPath)
rtest.OK(t, err)
}
2017-10-02 13:06:39 +00:00
before, err := statPath(rtest.TestWalkerPath)
rtest.OK(t, err)
2015-03-02 13:48:47 +00:00
2017-10-02 13:06:39 +00:00
t.Logf("walking path %s with %d dirs, %d files", rtest.TestWalkerPath,
2015-03-02 13:48:47 +00:00
before.dirs, before.files)
// account for top level dir
before.dirs++
2015-03-02 13:48:47 +00:00
after := stats{}
m := sync.Mutex{}
2017-06-05 21:56:59 +00:00
worker := func(ctx context.Context, wg *sync.WaitGroup, jobs <-chan pipe.Job) {
2015-03-02 13:48:47 +00:00
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
// channel is closed
return
}
2017-10-02 13:06:39 +00:00
rtest.Assert(t, job != nil, "job is nil")
2015-03-02 13:48:47 +00:00
switch j := job.(type) {
case pipe.Dir:
// wait for all content
for _, ch := range j.Entries {
<-ch
}
m.Lock()
after.dirs++
m.Unlock()
2015-03-07 10:53:32 +00:00
j.Result() <- true
2015-03-02 13:48:47 +00:00
case pipe.Entry:
m.Lock()
after.files++
m.Unlock()
2015-03-07 10:53:32 +00:00
j.Result() <- true
2015-03-02 13:48:47 +00:00
}
2017-06-05 21:56:59 +00:00
case <-ctx.Done():
2015-03-02 13:48:47 +00:00
// pipeline was cancelled
return
}
}
}
var wg sync.WaitGroup
2015-03-07 10:53:32 +00:00
jobs := make(chan pipe.Job)
2015-03-02 13:48:47 +00:00
2015-06-28 11:15:35 +00:00
for i := 0; i < maxWorkers; i++ {
2015-03-02 13:48:47 +00:00
wg.Add(1)
2017-06-05 21:56:59 +00:00
go worker(ctx, &wg, jobs)
2015-03-02 13:48:47 +00:00
}
2015-03-07 10:53:32 +00:00
resCh := make(chan pipe.Result, 1)
2017-10-02 13:06:39 +00:00
pipe.Walk(ctx, []string{rtest.TestWalkerPath}, acceptAll, jobs, resCh)
2015-02-15 11:57:09 +00:00
// wait for all workers to terminate
wg.Wait()
// wait for top-level blob
<-resCh
2017-10-02 13:06:39 +00:00
t.Logf("walked path %s with %d dirs, %d files", rtest.TestWalkerPath,
2015-02-15 11:57:09 +00:00
after.dirs, after.files)
2017-10-02 13:06:39 +00:00
rtest.Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
2015-02-15 11:57:09 +00:00
}
2015-11-06 18:41:57 +00:00
func createFile(filename, data string) error {
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write([]byte(data))
if err != nil {
return err
}
return nil
}
func TestPipeWalkerError(t *testing.T) {
dir, err := ioutil.TempDir("", "restic-test-")
2017-10-02 13:06:39 +00:00
rtest.OK(t, err)
2015-11-06 18:41:57 +00:00
base := filepath.Base(dir)
var testjobs = []struct {
path []string
err bool
}{
{[]string{base, "a", "file_a"}, false},
{[]string{base, "a"}, false},
{[]string{base, "b"}, true},
{[]string{base, "c", "file_c"}, false},
{[]string{base, "c"}, false},
{[]string{base}, false},
{[]string{}, false},
}
2017-10-02 13:06:39 +00:00
rtest.OK(t, os.Mkdir(filepath.Join(dir, "a"), 0755))
rtest.OK(t, os.Mkdir(filepath.Join(dir, "b"), 0755))
rtest.OK(t, os.Mkdir(filepath.Join(dir, "c"), 0755))
2015-11-06 18:41:57 +00:00
2017-10-02 13:06:39 +00:00
rtest.OK(t, createFile(filepath.Join(dir, "a", "file_a"), "file a"))
rtest.OK(t, createFile(filepath.Join(dir, "b", "file_b"), "file b"))
rtest.OK(t, createFile(filepath.Join(dir, "c", "file_c"), "file c"))
2015-11-06 18:41:57 +00:00
2015-11-06 21:38:34 +00:00
ranHook := false
testdir := filepath.Join(dir, "b")
// install hook that removes the dir right before readdirnames()
debug.Hook("pipe.readdirnames", func(context interface{}) {
path := context.(string)
if path != testdir {
return
}
t.Logf("in hook, removing test file %v", testdir)
ranHook = true
2017-10-02 13:06:39 +00:00
rtest.OK(t, os.RemoveAll(testdir))
2015-11-06 21:38:34 +00:00
})
2015-11-06 18:41:57 +00:00
2017-06-05 21:56:59 +00:00
ctx, cancel := context.WithCancel(context.TODO())
2015-11-06 18:41:57 +00:00
ch := make(chan pipe.Job)
resCh := make(chan pipe.Result, 1)
2017-06-05 21:56:59 +00:00
go pipe.Walk(ctx, []string{dir}, acceptAll, ch, resCh)
2015-11-06 18:41:57 +00:00
i := 0
for job := range ch {
if i == len(testjobs) {
t.Errorf("too many jobs received")
break
}
p := filepath.Join(testjobs[i].path...)
if p != job.Path() {
t.Errorf("job %d has wrong path: expected %q, got %q", i, p, job.Path())
}
if testjobs[i].err {
if job.Error() == nil {
t.Errorf("job %d expected error but got nil", i)
}
} else {
if job.Error() != nil {
t.Errorf("job %d expected no error but got %v", i, job.Error())
}
}
i++
}
if i != len(testjobs) {
t.Errorf("expected %d jobs, got %d", len(testjobs), i)
}
2017-06-05 21:56:59 +00:00
cancel()
2015-11-06 18:41:57 +00:00
2017-10-02 13:06:39 +00:00
rtest.Assert(t, ranHook, "hook did not run")
rtest.OK(t, os.RemoveAll(dir))
2015-11-06 18:41:57 +00:00
}
2015-02-15 11:57:09 +00:00
func BenchmarkPipelineWalker(b *testing.B) {
2017-10-02 13:06:39 +00:00
if rtest.TestWalkerPath == "" {
2015-03-02 13:48:47 +00:00
b.Skipf("walkerpath not set, skipping BenchPipelineWalker")
2015-02-15 11:57:09 +00:00
}
var max time.Duration
m := sync.Mutex{}
2017-06-05 21:56:59 +00:00
fileWorker := func(ctx context.Context, wg *sync.WaitGroup, ch <-chan pipe.Entry) {
2015-02-15 11:57:09 +00:00
defer wg.Done()
for {
select {
2015-02-15 13:44:54 +00:00
case e, ok := <-ch:
2015-02-15 11:57:09 +00:00
if !ok {
// channel is closed
return
}
// simulate backup
2015-02-15 13:44:54 +00:00
//time.Sleep(10 * time.Millisecond)
2015-02-15 11:57:09 +00:00
2015-03-07 10:53:32 +00:00
e.Result() <- true
2017-06-05 21:56:59 +00:00
case <-ctx.Done():
2015-02-15 13:44:54 +00:00
// pipeline was cancelled
return
}
}
}
2015-02-15 11:57:09 +00:00
2017-06-05 21:56:59 +00:00
dirWorker := func(ctx context.Context, wg *sync.WaitGroup, ch <-chan pipe.Dir) {
2015-02-15 13:44:54 +00:00
defer wg.Done()
for {
select {
case dir, ok := <-ch:
2015-02-15 11:57:09 +00:00
if !ok {
// channel is closed
return
}
start := time.Now()
// wait for all content
for _, ch := range dir.Entries {
<-ch
}
d := time.Since(start)
m.Lock()
if d > max {
max = d
}
m.Unlock()
2015-03-07 10:53:32 +00:00
dir.Result() <- true
2017-06-05 21:56:59 +00:00
case <-ctx.Done():
2015-02-15 11:57:09 +00:00
// pipeline was cancelled
return
}
}
}
2017-06-05 21:56:59 +00:00
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
2015-02-15 11:57:09 +00:00
for i := 0; i < b.N; i++ {
2015-02-15 13:44:54 +00:00
max = 0
entCh := make(chan pipe.Entry, 200)
dirCh := make(chan pipe.Dir, 200)
2015-02-15 11:57:09 +00:00
var wg sync.WaitGroup
2015-06-28 11:15:35 +00:00
b.Logf("starting %d workers", maxWorkers)
for i := 0; i < maxWorkers; i++ {
2015-02-15 13:44:54 +00:00
wg.Add(2)
2017-06-05 21:56:59 +00:00
go dirWorker(ctx, &wg, dirCh)
go fileWorker(ctx, &wg, entCh)
2015-02-15 11:57:09 +00:00
}
2015-03-07 10:53:32 +00:00
jobs := make(chan pipe.Job, 200)
2015-03-02 13:48:47 +00:00
wg.Add(1)
go func() {
pipe.Split(jobs, dirCh, entCh)
close(entCh)
close(dirCh)
wg.Done()
}()
2015-03-07 10:53:32 +00:00
resCh := make(chan pipe.Result, 1)
2017-10-02 13:06:39 +00:00
pipe.Walk(ctx, []string{rtest.TestWalkerPath}, acceptAll, jobs, resCh)
2015-02-15 11:57:09 +00:00
// wait for all workers to terminate
wg.Wait()
// wait for final result
<-resCh
2015-02-15 13:44:54 +00:00
b.Logf("max duration for a dir: %v", max)
}
2015-02-15 11:57:09 +00:00
}
func TestPipelineWalkerMultiple(t *testing.T) {
2017-10-02 13:06:39 +00:00
if rtest.TestWalkerPath == "" {
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
}
2017-06-05 21:56:59 +00:00
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
2017-10-02 13:06:39 +00:00
paths, err := filepath.Glob(filepath.Join(rtest.TestWalkerPath, "*"))
rtest.OK(t, err)
2017-10-02 13:06:39 +00:00
before, err := statPath(rtest.TestWalkerPath)
rtest.OK(t, err)
t.Logf("walking paths %v with %d dirs, %d files", paths,
before.dirs, before.files)
after := stats{}
m := sync.Mutex{}
2017-06-05 21:56:59 +00:00
worker := func(ctx context.Context, wg *sync.WaitGroup, jobs <-chan pipe.Job) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
// channel is closed
return
}
2017-10-02 13:06:39 +00:00
rtest.Assert(t, job != nil, "job is nil")
switch j := job.(type) {
case pipe.Dir:
// wait for all content
for _, ch := range j.Entries {
<-ch
}
m.Lock()
after.dirs++
m.Unlock()
j.Result() <- true
case pipe.Entry:
m.Lock()
after.files++
m.Unlock()
j.Result() <- true
}
2017-06-05 21:56:59 +00:00
case <-ctx.Done():
// pipeline was cancelled
return
}
}
}
var wg sync.WaitGroup
jobs := make(chan pipe.Job)
2015-06-28 11:15:35 +00:00
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
2017-06-05 21:56:59 +00:00
go worker(ctx, &wg, jobs)
}
resCh := make(chan pipe.Result, 1)
2017-06-05 21:56:59 +00:00
pipe.Walk(ctx, paths, acceptAll, jobs, resCh)
// wait for all workers to terminate
wg.Wait()
// wait for top-level blob
<-resCh
t.Logf("walked %d paths with %d dirs, %d files", len(paths), after.dirs, after.files)
2017-10-02 13:06:39 +00:00
rtest.Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
}
func dirsInPath(path string) int {
if path == "/" || path == "." || path == "" {
return 0
}
n := 0
for dir := path; dir != "/" && dir != "."; dir = filepath.Dir(dir) {
n++
}
return n
}
func TestPipeWalkerRoot(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skipf("not running TestPipeWalkerRoot on %s", runtime.GOOS)
return
}
cwd, err := os.Getwd()
2017-10-02 13:06:39 +00:00
rtest.OK(t, err)
testPaths := []string{
string(filepath.Separator),
".",
cwd,
}
for _, path := range testPaths {
testPipeWalkerRootWithPath(path, t)
}
}
func testPipeWalkerRootWithPath(path string, t *testing.T) {
pattern := filepath.Join(path, "*")
rootPaths, err := filepath.Glob(pattern)
2017-10-02 13:06:39 +00:00
rtest.OK(t, err)
for i, p := range rootPaths {
rootPaths[i], err = filepath.Rel(path, p)
2017-10-02 13:06:39 +00:00
rtest.OK(t, err)
}
t.Logf("paths in %v (pattern %q) expanded to %v items", path, pattern, len(rootPaths))
jobCh := make(chan pipe.Job)
var jobs []pipe.Job
worker := func(wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobCh {
jobs = append(jobs, job)
}
}
var wg sync.WaitGroup
wg.Add(1)
go worker(&wg)
filter := func(p string, fi os.FileInfo) bool {
p, err := filepath.Rel(path, p)
2017-10-02 13:06:39 +00:00
rtest.OK(t, err)
return dirsInPath(p) <= 1
}
resCh := make(chan pipe.Result, 1)
2017-06-05 21:56:59 +00:00
pipe.Walk(context.TODO(), []string{path}, filter, jobCh, resCh)
wg.Wait()
t.Logf("received %d jobs", len(jobs))
for i, job := range jobs[:len(jobs)-1] {
path := job.Path()
if path == "." || path == ".." || path == string(filepath.Separator) {
t.Errorf("job %v has invalid path %q", i, path)
}
}
lastPath := jobs[len(jobs)-1].Path()
if lastPath != "" {
t.Errorf("last job has non-empty path %q", lastPath)
}
if len(jobs) < len(rootPaths) {
t.Errorf("want at least %v jobs, got %v for path %v\n", len(rootPaths), len(jobs), path)
}
}