diff --git a/archiver.go b/archiver.go index acc2b9a1f..028e94162 100644 --- a/archiver.go +++ b/archiver.go @@ -3,16 +3,25 @@ package khepri import ( "os" "path/filepath" + "sync" "github.com/fd0/khepri/backend" ) +const ( + maxConcurrentFiles = 32 +) + type Archiver struct { - be backend.Server - key *Key - ch *ContentHandler + be backend.Server + key *Key + ch *ContentHandler + + m sync.Mutex smap *StorageMap // blobs used for the current snapshot + fileToken chan struct{} + Stats Stats Error func(dir string, fi os.FileInfo, err error) error @@ -20,6 +29,8 @@ type Archiver struct { ScannerUpdate func(stats Stats) SaveUpdate func(stats Stats) + + sum sync.Mutex // for SaveUpdate } type Stats struct { @@ -31,7 +42,16 @@ type Stats struct { func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { var err error - arch := &Archiver{be: be, key: key} + arch := &Archiver{ + be: be, + key: key, + fileToken: make(chan struct{}, maxConcurrentFiles), + } + + // fill file token + for i := 0; i < maxConcurrentFiles; i++ { + arch.fileToken <- struct{}{} + } // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } @@ -39,7 +59,6 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { arch.Filter = func(string, os.FileInfo) bool { return true } // do nothing arch.ScannerUpdate = func(Stats) {} - arch.SaveUpdate = func(Stats) {} arch.smap = NewStorageMap() arch.ch, err = NewContentHandler(be, key) @@ -56,6 +75,14 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { return arch, nil } +func (arch *Archiver) saveUpdate(stats Stats) { + if arch.SaveUpdate != nil { + arch.sum.Lock() + defer arch.sum.Unlock() + arch.SaveUpdate(stats) + } +} + func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { blob, err := arch.ch.Save(t, data) if err != nil { @@ -63,6 +90,8 @@ func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { } // store blob in storage map for current snapshot + arch.m.Lock() + defer arch.m.Unlock() arch.smap.Insert(blob) return blob, nil @@ -75,6 +104,8 @@ func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) } // store blob in storage map for current snapshot + arch.m.Lock() + defer arch.m.Unlock() arch.smap.Insert(blob) return blob, nil @@ -89,7 +120,9 @@ func (arch *Archiver) SaveFile(node *Node) error { node.Content = make([]backend.ID, len(blobs)) for i, blob := range blobs { node.Content[i] = blob.ID + arch.m.Lock() arch.smap.Insert(blob) + arch.m.Unlock() } return err @@ -178,6 +211,8 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { } func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { + var wg sync.WaitGroup + for _, node := range *t { if node.Tree != nil && node.Subtree == nil { b, err := arch.saveTree(node.Tree) @@ -185,19 +220,34 @@ func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { return nil, err } node.Subtree = b.ID - arch.SaveUpdate(Stats{Directories: 1}) + arch.saveUpdate(Stats{Directories: 1}) } else if node.Type == "file" && len(node.Content) == 0 { - err := arch.SaveFile(node) - if err != nil { - return nil, err - } + // start goroutine + wg.Add(1) + go func(n *Node) { + defer wg.Done() - arch.SaveUpdate(Stats{Files: 1, Bytes: node.Size}) + // get token + token := <-arch.fileToken + defer func() { + arch.fileToken <- token + }() + + // debug("start: %s", n.path) + + // TODO: handle error + arch.SaveFile(n) + arch.saveUpdate(Stats{Files: 1, Bytes: n.Size}) + + // debug("done: %s", n.path) + }(node) } else { - arch.SaveUpdate(Stats{Other: 1}) + arch.saveUpdate(Stats{Other: 1}) } } + wg.Wait() + blob, err := arch.SaveJSON(backend.Tree, t) if err != nil { return nil, err diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index da2fef8ab..0e3972186 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/fd0/khepri" "github.com/fd0/khepri/backend" @@ -92,12 +93,15 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { } } + start := time.Now() sn, id, err := arch.Snapshot(target, t) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) } fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) + duration := time.Now().Sub(start) + fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20)) return nil } diff --git a/cmd/khepri/main.go b/cmd/khepri/main.go index 719a84ee8..cd1c0a777 100644 --- a/cmd/khepri/main.go +++ b/cmd/khepri/main.go @@ -5,6 +5,7 @@ import ( "log" "net/url" "os" + "runtime" "sort" "strings" @@ -128,6 +129,9 @@ func init() { commands["snapshots"] = commandSnapshots commands["cat"] = commandCat commands["ls"] = commandLs + + // set GOMAXPROCS to number of CPUs + runtime.GOMAXPROCS(runtime.NumCPU()) } func main() { diff --git a/contenthandler.go b/contenthandler.go index 271df9b76..514bc0fe3 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "os" + "sync" "github.com/fd0/khepri/backend" "github.com/fd0/khepri/chunker" @@ -15,6 +16,7 @@ type ContentHandler struct { be backend.Server key *Key + m sync.Mutex content *StorageMap } @@ -36,6 +38,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) { return nil, err } + ch.m.Lock() + defer ch.m.Unlock() ch.content.Merge(sn.StorageMap) return sn, nil } @@ -49,6 +53,9 @@ func (ch *ContentHandler) LoadAllSnapshots() error { if err != nil { return } + + ch.m.Lock() + defer ch.m.Unlock() ch.content.Merge(sn.StorageMap) }) if err != nil { @@ -65,6 +72,8 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { id := backend.Hash(data) // test if the hash is already in the backend + ch.m.Lock() + defer ch.m.Unlock() blob := ch.content.Find(id) if blob != nil { return blob, nil @@ -177,6 +186,8 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { } // lookup storage hash + ch.m.Lock() + defer ch.m.Unlock() blob := ch.content.Find(id) if blob == nil { return nil, errors.New("Storage ID not found")