diff --git a/scanner/blockqueue.go b/scanner/blockqueue.go new file mode 100644 index 000000000..b37cf6491 --- /dev/null +++ b/scanner/blockqueue.go @@ -0,0 +1,65 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package scanner + +import ( + "os" + "path/filepath" + "sync" + + "github.com/calmh/syncthing/protocol" +) + +// The parallell hasher reads FileInfo structures from the inbox, hashes the +// file to populate the Blocks element and sends it to the outbox. A number of +// workers are used in parallel. The outbox will become closed when the inbox +// is closed and all items handled. + +func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo) { + var wg sync.WaitGroup + wg.Add(workers) + + for i := 0; i < workers; i++ { + go func() { + hashFile(dir, blockSize, outbox, inbox) + wg.Done() + }() + } + + go func() { + wg.Wait() + close(outbox) + }() +} + +func hashFile(dir string, blockSize int, outbox, inbox chan protocol.FileInfo) { + for f := range inbox { + if protocol.IsDirectory(f.Flags) || protocol.IsDeleted(f.Flags) { + outbox <- f + continue + } + + fd, err := os.Open(filepath.Join(dir, f.Name)) + if err != nil { + if debug { + l.Debugln("open:", err) + } + continue + } + + blocks, err := Blocks(fd, blockSize) + fd.Close() + + if err != nil { + if debug { + l.Debugln("hash error:", f.Name, err) + } + continue + } + + f.Blocks = blocks + outbox <- f + } +} diff --git a/scanner/walk.go b/scanner/walk.go index 011025d43..a188f0c1b 100644 --- a/scanner/walk.go +++ b/scanner/walk.go @@ -13,7 +13,6 @@ import ( "path/filepath" "runtime" "strings" - "time" "code.google.com/p/go.text/unicode/norm" "github.com/calmh/syncthing/lamport" @@ -60,18 +59,20 @@ type CurrentFiler interface { // Walk returns the list of files found in the local repository by scanning the // file system. Files are blockwise hashed. -func (w *Walker) Walk() (files chan protocol.FileInfo, ignore map[string][]string, err error) { +func (w *Walker) Walk() (chan protocol.FileInfo, map[string][]string, error) { if debug { l.Debugln("Walk", w.Dir, w.BlockSize, w.IgnoreFile) } - err = checkDir(w.Dir) + err := checkDir(w.Dir) if err != nil { - return + return nil, nil, err } - ignore = make(map[string][]string) - files = make(chan protocol.FileInfo) + ignore := make(map[string][]string) + files := make(chan protocol.FileInfo) + hashedFiles := make(chan protocol.FileInfo) + newParallelHasher(w.Dir, w.BlockSize, runtime.NumCPU(), hashedFiles, files) hashFiles := w.walkAndHashFiles(files, ignore) go func() { @@ -80,7 +81,7 @@ func (w *Walker) Walk() (files chan protocol.FileInfo, ignore map[string][]strin close(files) }() - return + return hashedFiles, ignore, nil } // CleanTempFiles removes all files that match the temporary filename pattern. @@ -219,40 +220,17 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo, ign map[string][ } } - fd, err := os.Open(p) - if err != nil { - if debug { - l.Debugln("open:", p, err) - } - return nil - } - defer fd.Close() - - t0 := time.Now() - blocks, err := Blocks(fd, w.BlockSize) - if err != nil { - if debug { - l.Debugln("hash error:", rn, err) - } - return nil - } - if debug { - t1 := time.Now() - l.Debugln("hashed:", rn, ";", len(blocks), "blocks;", info.Size(), "bytes;", int(float64(info.Size())/1024/t1.Sub(t0).Seconds()), "KB/s") - } - var flags = uint32(info.Mode() & os.ModePerm) if w.IgnorePerms { flags = protocol.FlagNoPermBits | 0666 } - f := protocol.FileInfo{ + + fchan <- protocol.FileInfo{ Name: rn, Version: lamport.Default.Tick(0), Flags: flags, Modified: info.ModTime().Unix(), - Blocks: blocks, } - fchan <- f } return nil diff --git a/scanner/walk_test.go b/scanner/walk_test.go index 62e28c4fd..6b10e9b76 100644 --- a/scanner/walk_test.go +++ b/scanner/walk_test.go @@ -7,6 +7,7 @@ package scanner import ( "fmt" "reflect" + "sort" "testing" "time" @@ -39,6 +40,7 @@ func TestWalk(t *testing.T) { for f := range fchan { files = append(files, f) } + sort.Sort(fileList(files)) if err != nil { t.Fatal(err) @@ -133,3 +135,17 @@ func TestIgnore(t *testing.T) { } } } + +type fileList []protocol.FileInfo + +func (f fileList) Len() int { + return len(f) +} + +func (f fileList) Less(a, b int) bool { + return f[a].Name < f[b].Name +} + +func (f fileList) Swap(a, b int) { + f[a], f[b] = f[b], f[a] +}