diff --git a/archiver.go b/archiver.go index 226ea5ecc..4fa33819a 100644 --- a/archiver.go +++ b/archiver.go @@ -13,6 +13,7 @@ import ( const ( maxConcurrentFiles = 32 + maxConcurrentBlobs = 32 ) type Archiver struct { @@ -23,6 +24,7 @@ type Archiver struct { bl *BlobList // blobs used for the current snapshot fileToken chan struct{} + blobToken chan struct{} Stats Stats @@ -48,13 +50,18 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { be: be, key: key, fileToken: make(chan struct{}, maxConcurrentFiles), + blobToken: make(chan struct{}, maxConcurrentBlobs), } - // fill file token + // fill file and blob token for i := 0; i < maxConcurrentFiles; i++ { arch.fileToken <- struct{}{} } + for i := 0; i < maxConcurrentBlobs; i++ { + arch.blobToken <- struct{}{} + } + // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files @@ -138,6 +145,7 @@ func (arch *Archiver) SaveFile(node *Node) error { } else { // else store all chunks chnker := chunker.New(file) + chans := [](<-chan Blob){} for { chunk, err := chnker.Next() @@ -149,14 +157,28 @@ func (arch *Archiver) SaveFile(node *Node) error { return err } - blob, err := arch.ch.Save(backend.Data, chunk.Data) - if err != nil { - return err - } + // acquire token, start goroutine to save chunk + token := <-arch.blobToken + resCh := make(chan Blob, 1) - arch.saveUpdate(Stats{Bytes: blob.Size}) + go func(ch chan<- Blob) { + blob, err := arch.ch.Save(backend.Data, chunk.Data) + // TODO handle error + if err != nil { + panic(err) + } - blobs = append(blobs, blob) + arch.saveUpdate(Stats{Bytes: blob.Size}) + arch.blobToken <- token + ch <- blob + }(resCh) + + chans = append(chans, resCh) + } + + blobs = []Blob{} + for _, ch := range chans { + blobs = append(blobs, <-ch) } }