2
2
mirror of https://github.com/octoleo/restic.git synced 2024-09-28 06:29:01 +00:00

Break Archiver.SaveFile() method down into smaller ones

This commit is contained in:
Florian Weingarten 2015-05-02 01:05:49 -04:00
parent 9308f1493d
commit e19c87fa7d

View File

@ -12,6 +12,7 @@ import (
"github.com/juju/arrar" "github.com/juju/arrar"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
"github.com/restic/restic/chunker"
"github.com/restic/restic/debug" "github.com/restic/restic/debug"
"github.com/restic/restic/pack" "github.com/restic/restic/pack"
"github.com/restic/restic/pipe" "github.com/restic/restic/pipe"
@ -88,92 +89,67 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) {
return arch.s.SaveJSON(pack.Tree, item) return arch.s.SaveJSON(pack.Tree, item)
} }
// SaveFile stores the content of the file on the backend as a Blob by calling func (arch *Archiver) reloadFileIfChanged(node *Node, file *os.File) (*Node, error) {
// Save for each chunk.
func (arch *Archiver) SaveFile(p *Progress, node *Node) error {
file, err := node.OpenForReading()
defer file.Close()
if err != nil {
return err
}
// check file again, since it could have disappeared by now
fi, err := file.Stat() fi, err := file.Stat()
if err != nil { if err != nil {
return err return nil, err
} }
if fi.ModTime() != node.ModTime { if fi.ModTime() == node.ModTime {
e2 := arch.Error(node.path, fi, errors.New("file was updated, using new version")) return node, nil
if e2 == nil {
n, err := NodeFromFileInfo(node.path, fi)
if err != nil {
debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err)
return err
}
*node = *n
}
} }
type result struct { err = arch.Error(node.path, fi, errors.New("file has changed"))
id backend.ID if err != nil {
bytes uint64 return nil, err
} }
// store all chunks node, err = NodeFromFileInfo(node.path, fi)
chnker := GetChunker("archiver.SaveFile") if err != nil {
chnker.Reset(file, arch.s.ChunkerPolynomial()) debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err)
chans := [](<-chan result){} return nil, err
defer FreeChunker("archiver.SaveFile", chnker)
chunks := 0
for {
chunk, err := chnker.Next()
if err == io.EOF {
break
}
if err != nil {
return arrar.Annotate(err, "SaveFile() chunker.Next()")
}
chunks++
// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan result, 1)
go func(ch chan<- result) {
err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
// TODO handle error
if err != nil {
panic(err)
}
p.Report(Stat{Bytes: uint64(chunk.Length)})
arch.blobToken <- token
ch <- result{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)}
}(resCh)
chans = append(chans, resCh)
} }
results := []result{} return node, nil
for _, ch := range chans { }
type saveResult struct {
id backend.ID
bytes uint64
}
func (arch *Archiver) saveChunk(chunk *chunker.Chunk, p *Progress, token struct{}, file *os.File, resultChannel chan<- saveResult) {
err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
// TODO handle error
if err != nil {
panic(err)
}
p.Report(Stat{Bytes: uint64(chunk.Length)})
arch.blobToken <- token
resultChannel <- saveResult{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)}
}
func waitForResults(resultChannels [](<-chan saveResult)) ([]saveResult, error) {
results := []saveResult{}
for _, ch := range resultChannels {
results = append(results, <-ch) results = append(results, <-ch)
} }
if len(results) != chunks { if len(results) != len(resultChannels) {
return fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(results)) return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", len(resultChannels), len(results))
} }
var bytes uint64 return results, nil
}
node.Content = make([]backend.ID, len(results)) func updateNodeContent(node *Node, results []saveResult) error {
debug.Log("Archiver.Save", "checking size for file %s", node.path) debug.Log("Archiver.Save", "checking size for file %s", node.path)
var bytes uint64
node.Content = make([]backend.ID, len(results))
for i, b := range results { for i, b := range results {
node.Content[i] = b.id node.Content[i] = b.id
bytes += b.bytes bytes += b.bytes
@ -190,6 +166,49 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) error {
return nil return nil
} }
// SaveFile stores the content of the file on the backend as a Blob by calling
// Save for each chunk.
func (arch *Archiver) SaveFile(p *Progress, node *Node) error {
file, err := node.OpenForReading()
defer file.Close()
if err != nil {
return err
}
node, err = arch.reloadFileIfChanged(node, file)
if err != nil {
return err
}
chnker := GetChunker("archiver.SaveFile")
chnker.Reset(file, arch.s.ChunkerPolynomial())
resultChannels := [](<-chan saveResult){}
defer FreeChunker("archiver.SaveFile", chnker)
for {
chunk, err := chnker.Next()
if err == io.EOF {
break
}
if err != nil {
return arrar.Annotate(err, "SaveFile() chunker.Next()")
}
resCh := make(chan saveResult, 1)
go arch.saveChunk(chunk, p, <-arch.blobToken, file, resCh)
resultChannels = append(resultChannels, resCh)
}
results, err := waitForResults(resultChannels)
if err != nil {
return err
}
err = updateNodeContent(node, results)
return err
}
func (arch *Archiver) saveTree(p *Progress, t *Tree) (backend.ID, error) { func (arch *Archiver) saveTree(p *Progress, t *Tree) (backend.ID, error) {
debug.Log("Archiver.saveTree", "saveTree(%v)\n", t) debug.Log("Archiver.saveTree", "saveTree(%v)\n", t)
var wg sync.WaitGroup var wg sync.WaitGroup