Save multiple files in parallel

This commit is contained in:
Alexander Neumann 2014-11-16 22:50:20 +01:00
parent 1ac4f92299
commit 94d1482888
4 changed files with 81 additions and 12 deletions

View File

@ -3,16 +3,25 @@ package khepri
import (
"os"
"path/filepath"
"sync"
"github.com/fd0/khepri/backend"
)
const (
maxConcurrentFiles = 32
)
type Archiver struct {
be backend.Server
key *Key
ch *ContentHandler
be backend.Server
key *Key
ch *ContentHandler
m sync.Mutex
smap *StorageMap // blobs used for the current snapshot
fileToken chan struct{}
Stats Stats
Error func(dir string, fi os.FileInfo, err error) error
@ -20,6 +29,8 @@ type Archiver struct {
ScannerUpdate func(stats Stats)
SaveUpdate func(stats Stats)
sum sync.Mutex // for SaveUpdate
}
type Stats struct {
@ -31,7 +42,16 @@ type Stats struct {
func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
var err error
arch := &Archiver{be: be, key: key}
arch := &Archiver{
be: be,
key: key,
fileToken: make(chan struct{}, maxConcurrentFiles),
}
// fill file token
for i := 0; i < maxConcurrentFiles; i++ {
arch.fileToken <- struct{}{}
}
// abort on all errors
arch.Error = func(string, os.FileInfo, error) error { return err }
@ -39,7 +59,6 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
arch.Filter = func(string, os.FileInfo) bool { return true }
// do nothing
arch.ScannerUpdate = func(Stats) {}
arch.SaveUpdate = func(Stats) {}
arch.smap = NewStorageMap()
arch.ch, err = NewContentHandler(be, key)
@ -56,6 +75,14 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
return arch, nil
}
func (arch *Archiver) saveUpdate(stats Stats) {
if arch.SaveUpdate != nil {
arch.sum.Lock()
defer arch.sum.Unlock()
arch.SaveUpdate(stats)
}
}
func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) {
blob, err := arch.ch.Save(t, data)
if err != nil {
@ -63,6 +90,8 @@ func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) {
}
// store blob in storage map for current snapshot
arch.m.Lock()
defer arch.m.Unlock()
arch.smap.Insert(blob)
return blob, nil
@ -75,6 +104,8 @@ func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error)
}
// store blob in storage map for current snapshot
arch.m.Lock()
defer arch.m.Unlock()
arch.smap.Insert(blob)
return blob, nil
@ -89,7 +120,9 @@ func (arch *Archiver) SaveFile(node *Node) error {
node.Content = make([]backend.ID, len(blobs))
for i, blob := range blobs {
node.Content[i] = blob.ID
arch.m.Lock()
arch.smap.Insert(blob)
arch.m.Unlock()
}
return err
@ -178,6 +211,8 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) {
}
func (arch *Archiver) saveTree(t *Tree) (*Blob, error) {
var wg sync.WaitGroup
for _, node := range *t {
if node.Tree != nil && node.Subtree == nil {
b, err := arch.saveTree(node.Tree)
@ -185,19 +220,34 @@ func (arch *Archiver) saveTree(t *Tree) (*Blob, error) {
return nil, err
}
node.Subtree = b.ID
arch.SaveUpdate(Stats{Directories: 1})
arch.saveUpdate(Stats{Directories: 1})
} else if node.Type == "file" && len(node.Content) == 0 {
err := arch.SaveFile(node)
if err != nil {
return nil, err
}
// start goroutine
wg.Add(1)
go func(n *Node) {
defer wg.Done()
arch.SaveUpdate(Stats{Files: 1, Bytes: node.Size})
// get token
token := <-arch.fileToken
defer func() {
arch.fileToken <- token
}()
// debug("start: %s", n.path)
// TODO: handle error
arch.SaveFile(n)
arch.saveUpdate(Stats{Files: 1, Bytes: n.Size})
// debug("done: %s", n.path)
}(node)
} else {
arch.SaveUpdate(Stats{Other: 1})
arch.saveUpdate(Stats{Other: 1})
}
}
wg.Wait()
blob, err := arch.SaveJSON(backend.Tree, t)
if err != nil {
return nil, err

View File

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strings"
"time"
"github.com/fd0/khepri"
"github.com/fd0/khepri/backend"
@ -92,12 +93,15 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
}
}
start := time.Now()
sn, id, err := arch.Snapshot(target, t)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
fmt.Printf("\nsnapshot %s saved: %v\n", id, sn)
duration := time.Now().Sub(start)
fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20))
return nil
}

View File

@ -5,6 +5,7 @@ import (
"log"
"net/url"
"os"
"runtime"
"sort"
"strings"
@ -128,6 +129,9 @@ func init() {
commands["snapshots"] = commandSnapshots
commands["cat"] = commandCat
commands["ls"] = commandLs
// set GOMAXPROCS to number of CPUs
runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {

View File

@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"os"
"sync"
"github.com/fd0/khepri/backend"
"github.com/fd0/khepri/chunker"
@ -15,6 +16,7 @@ type ContentHandler struct {
be backend.Server
key *Key
m sync.Mutex
content *StorageMap
}
@ -36,6 +38,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) {
return nil, err
}
ch.m.Lock()
defer ch.m.Unlock()
ch.content.Merge(sn.StorageMap)
return sn, nil
}
@ -49,6 +53,9 @@ func (ch *ContentHandler) LoadAllSnapshots() error {
if err != nil {
return
}
ch.m.Lock()
defer ch.m.Unlock()
ch.content.Merge(sn.StorageMap)
})
if err != nil {
@ -65,6 +72,8 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) {
id := backend.Hash(data)
// test if the hash is already in the backend
ch.m.Lock()
defer ch.m.Unlock()
blob := ch.content.Find(id)
if blob != nil {
return blob, nil
@ -177,6 +186,8 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) {
}
// lookup storage hash
ch.m.Lock()
defer ch.m.Unlock()
blob := ch.content.Find(id)
if blob == nil {
return nil, errors.New("Storage ID not found")