Show pull errors, stop repo when not making progress (fixes #302)

This commit is contained in:
Jakob Borg 2014-08-04 22:02:44 +02:00
parent 5a7780ab5f
commit c9edd31993

View File

@ -72,6 +72,7 @@ type puller struct {
blocks chan bqBlock blocks chan bqBlock
requestResults chan requestResult requestResults chan requestResult
versioner versioner.Versioner versioner versioner.Versioner
errors int
} }
func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller { func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
@ -128,6 +129,7 @@ func (p *puller) run() {
prevVer, queued = p.queueNeededBlocks(prevVer) prevVer, queued = p.queueNeededBlocks(prevVer)
if queued > 0 { if queued > 0 {
p.errors = 0
pull: pull:
for { for {
@ -145,6 +147,11 @@ func (p *puller) run() {
if debug { if debug {
l.Debugf("%q: pulling loop needs more blocks", p.repoCfg.ID) l.Debugf("%q: pulling loop needs more blocks", p.repoCfg.ID)
} }
if p.errors > 0 && p.errors >= queued {
break pull
}
prevVer, _ = p.queueNeededBlocks(prevVer) prevVer, _ = p.queueNeededBlocks(prevVer)
b, ok = p.bq.get() b, ok = p.bq.get()
} }
@ -179,6 +186,12 @@ func (p *puller) run() {
} }
} }
} }
if p.errors > 0 && p.errors >= queued {
l.Warnf("All remaining files failed to sync. Stopping repo %q.", p.repoCfg.ID)
invalidateRepo(p.cfg, p.repoCfg.ID, errors.New("too many errors, check logs"))
return
}
} }
if changed { if changed {
@ -365,7 +378,8 @@ func (p *puller) handleBlock(b bqBlock) bool {
} }
err = os.MkdirAll(path, os.FileMode(f.Flags&0777)) err = os.MkdirAll(path, os.FileMode(f.Flags&0777))
if err != nil { if err != nil {
l.Warnf("Create folder: %q: %v", path, err) p.errors++
l.Infof("mkdir: error: %q: %v", path, err)
} }
} }
} else if debug { } else if debug {
@ -384,13 +398,13 @@ func (p *puller) handleBlock(b bqBlock) bool {
fp := filepath.Join(p.repoCfg.Directory, f.Name) fp := filepath.Join(p.repoCfg.Directory, f.Name)
t := time.Unix(f.Modified, 0) t := time.Unix(f.Modified, 0)
err := os.Chtimes(fp, t, t) err := os.Chtimes(fp, t, t)
if debug && err != nil { if err != nil {
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) l.Infof("chtimes: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
} }
if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) { if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
err = os.Chmod(fp, os.FileMode(f.Flags&0777)) err = os.Chmod(fp, os.FileMode(f.Flags&0777))
if debug && err != nil { if err != nil {
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) l.Infof("chmod: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
} }
} }
@ -426,14 +440,13 @@ func (p *puller) handleBlock(b bqBlock) bool {
err = os.MkdirAll(dirName, 0777) err = os.MkdirAll(dirName, 0777)
} }
if err != nil { if err != nil {
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) l.Infof("mkdir: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
} }
of.file, of.err = os.Create(of.temp) of.file, of.err = os.Create(of.temp)
if of.err != nil { if of.err != nil {
if debug { p.errors++
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err) l.Infof("create: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
}
if !b.last { if !b.last {
p.openFiles[f.Name] = of p.openFiles[f.Name] = of
} }
@ -482,9 +495,8 @@ func (p *puller) handleCopyBlock(b bqBlock) {
var exfd *os.File var exfd *os.File
exfd, of.err = os.Open(of.filepath) exfd, of.err = os.Open(of.filepath)
if of.err != nil { if of.err != nil {
if debug { p.errors++
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err) l.Infof("open: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
}
of.file.Close() of.file.Close()
of.file = nil of.file = nil
@ -500,9 +512,8 @@ func (p *puller) handleCopyBlock(b bqBlock) {
_, of.err = of.file.WriteAt(bs, b.Offset) _, of.err = of.file.WriteAt(bs, b.Offset)
} }
if of.err != nil { if of.err != nil {
if debug { p.errors++
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err) l.Infof("write: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
}
exfd.Close() exfd.Close()
of.file.Close() of.file.Close()
of.file = nil of.file = nil
@ -663,16 +674,17 @@ func (p *puller) closeFile(f protocol.FileInfo) {
} }
of := p.openFiles[f.Name] of := p.openFiles[f.Name]
of.file.Close() err := of.file.Close()
p.errors++
l.Infof("close: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
defer os.Remove(of.temp) defer os.Remove(of.temp)
delete(p.openFiles, f.Name) delete(p.openFiles, f.Name)
fd, err := os.Open(of.temp) fd, err := os.Open(of.temp)
if err != nil { if err != nil {
if debug { p.errors++
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) l.Infof("open: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
}
return return
} }
hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize) hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
@ -687,20 +699,22 @@ func (p *puller) closeFile(f protocol.FileInfo) {
for i := range hb { for i := range hb {
if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 { if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
l.Debugf("pull: %q / %q: block %d hash mismatch\n\thave: %x\n\twant: %x", p.repoCfg.ID, f.Name, i, hb[i].Hash, f.Blocks[i].Hash) if debug {
l.Debugf("pull: %q / %q: block %d hash mismatch\n have: %x\n want: %x", p.repoCfg.ID, f.Name, i, hb[i].Hash, f.Blocks[i].Hash)
}
return return
} }
} }
t := time.Unix(f.Modified, 0) t := time.Unix(f.Modified, 0)
err = os.Chtimes(of.temp, t, t) err = os.Chtimes(of.temp, t, t)
if debug && err != nil { if err != nil {
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) l.Infof("chtimes: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
} }
if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) { if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
err = os.Chmod(of.temp, os.FileMode(f.Flags&0777)) err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
if debug && err != nil { if err != nil {
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) l.Infof("chmod: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
} }
} }
@ -722,7 +736,8 @@ func (p *puller) closeFile(f protocol.FileInfo) {
if err := osutil.Rename(of.temp, of.filepath); err == nil { if err := osutil.Rename(of.temp, of.filepath); err == nil {
p.model.updateLocal(p.repoCfg.ID, f) p.model.updateLocal(p.repoCfg.ID, f)
} else { } else {
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) p.errors++
l.Infof("rename: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
} }
} }