Merge pull request #1880 from calmh/itemfinished-err

Fix ItemFinished
This commit is contained in:
Audrius Butkevicius 2015-06-03 16:50:33 +01:00
commit 11cb040ad1
4 changed files with 114 additions and 67 deletions

View File

@ -102,8 +102,12 @@ func (s *verboseSvc) formatEvent(ev events.Event) string {
return fmt.Sprintf("Started syncing %q / %q (%v %v)", data["folder"], data["item"], data["action"], data["type"]) return fmt.Sprintf("Started syncing %q / %q (%v %v)", data["folder"], data["item"], data["action"], data["type"])
case events.ItemFinished: case events.ItemFinished:
data := ev.Data.(map[string]interface{}) data := ev.Data.(map[string]interface{})
if err := data["err"]; err != nil { if err := data["error"]; err != nil {
return fmt.Sprintf("Finished syncing %q / %q (%v %v): %v", data["folder"], data["item"], data["action"], data["type"], err) // If the err interface{} is not nil, it is a string pointer.
// Dereference it to get the actual error or Sprintf will print
// the pointer value....
errStr := *err.(*string)
return fmt.Sprintf("Finished syncing %q / %q (%v %v): %v", data["folder"], data["item"], data["action"], data["type"], errStr)
} }
return fmt.Sprintf("Finished syncing %q / %q (%v %v): Success", data["folder"], data["item"], data["action"], data["type"]) return fmt.Sprintf("Finished syncing %q / %q (%v %v): Success", data["folder"], data["item"], data["action"], data["type"])

View File

@ -258,3 +258,14 @@ func (s *BufferedSubscription) Since(id int, into []Event) []Event {
return into return into
} }
// Error returns a string pointer suitable for JSON marshalling errors. It
// retains the "null on sucess" semantics, but ensures the error result is a
// string regardless of the underlying concrete error type.
func Error(err error) *string {
if err == nil {
return nil
}
str := err.Error()
return &str
}

View File

@ -539,7 +539,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) {
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": file.Name, "item": file.Name,
"error": err, "error": events.Error(err),
"type": "dir", "type": "dir",
"action": "update", "action": "update",
}) })
@ -621,7 +621,7 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo) {
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": file.Name, "item": file.Name,
"error": err, "error": events.Error(err),
"type": "dir", "type": "dir",
"action": "delete", "action": "delete",
}) })
@ -667,7 +667,7 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) {
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": file.Name, "item": file.Name,
"error": err, "error": events.Error(err),
"type": "file", "type": "file",
"action": "delete", "action": "delete",
}) })
@ -722,14 +722,14 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": source.Name, "item": source.Name,
"error": err, "error": events.Error(err),
"type": "file", "type": "file",
"action": "delete", "action": "delete",
}) })
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": target.Name, "item": target.Name,
"error": err, "error": events.Error(err),
"type": "file", "type": "file",
"action": "update", "action": "update",
}) })
@ -778,6 +778,40 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
} }
} }
// This is the flow of data and events here, I think...
//
// +-----------------------+
// | | - - - - > ItemStarted
// | handleFile | - - - - > ItemFinished (on shortcuts)
// | |
// +-----------------------+
// |
// | copyChan (copyBlocksState; unless shortcut taken)
// |
// | +-----------------------+
// | | +-----------------------+
// +--->| | |
// | | copierRoutine |
// +-| |
// +-----------------------+
// |
// | pullChan (sharedPullerState)
// |
// | +-----------------------+
// | | +-----------------------+
// +-->| | |
// | | pullerRoutine |
// +-| |
// +-----------------------+
// |
// | finisherChan (sharedPullerState)
// |
// | +-----------------------+
// | | |
// +-->| finisherRoutine | - - - - > ItemFinished
// | |
// +-----------------------+
// handleFile queues the copies and pulls as necessary for a single new or // handleFile queues the copies and pulls as necessary for a single new or
// changed file. // changed file.
func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) { func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
@ -807,7 +841,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": file.Name, "item": file.Name,
"error": err, "error": events.Error(err),
"type": "file", "type": "file",
"action": "update", "action": "update",
}) })
@ -931,18 +965,17 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
buf := make([]byte, protocol.BlockSize) buf := make([]byte, protocol.BlockSize)
for state := range in { for state := range in {
if p.progressEmitter != nil {
p.progressEmitter.Register(state.sharedPullerState)
}
dstFd, err := state.tempFile() dstFd, err := state.tempFile()
if err != nil { if err != nil {
// Nothing more to do for this failed file (the error was logged // Nothing more to do for this failed file, since we couldn't create a temporary for it.
// when it happened)
out <- state.sharedPullerState out <- state.sharedPullerState
continue continue
} }
if p.progressEmitter != nil {
p.progressEmitter.Register(state.sharedPullerState)
}
folderRoots := make(map[string]string) folderRoots := make(map[string]string)
p.model.fmut.RLock() p.model.fmut.RLock()
for folder, cfg := range p.model.folderCfgs { for folder, cfg := range p.model.folderCfgs {
@ -1012,6 +1045,7 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) { func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
for state := range in { for state := range in {
if state.failed() != nil { if state.failed() != nil {
out <- state.sharedPullerState
continue continue
} }
@ -1020,6 +1054,7 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
// no point in issuing the request to the network. // no point in issuing the request to the network.
fd, err := state.tempFile() fd, err := state.tempFile()
if err != nil { if err != nil {
out <- state.sharedPullerState
continue continue
} }
@ -1070,39 +1105,26 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
} }
} }
func (p *rwFolder) performFinish(state *sharedPullerState) { func (p *rwFolder) performFinish(state *sharedPullerState) error {
var err error
defer func() {
events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder,
"item": state.file.Name,
"error": err,
"type": "file",
"action": "update",
})
}()
// Set the correct permission bits on the new file // Set the correct permission bits on the new file
if !p.ignorePermissions(state.file) { if !p.ignorePermissions(state.file) {
err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)) if err := os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)); err != nil {
if err != nil { return err
l.Warnln("Puller: final:", err)
return
} }
} }
// Set the correct timestamp on the new file // Set the correct timestamp on the new file
t := time.Unix(state.file.Modified, 0) t := time.Unix(state.file.Modified, 0)
err = os.Chtimes(state.tempName, t, t) if err := os.Chtimes(state.tempName, t, t); err != nil {
// Try using virtual mtimes instead
info, err := os.Stat(state.tempName)
if err != nil { if err != nil {
// First try using virtual mtimes return err
if info, err := os.Stat(state.tempName); err != nil { }
l.Infof("Puller (folder %q, file %q): final: unable to stat file: %v", p.folder, state.file.Name, err)
} else {
p.virtualMtimeRepo.UpdateMtime(state.file.Name, info.ModTime(), t) p.virtualMtimeRepo.UpdateMtime(state.file.Name, info.ModTime(), t)
} }
}
var err error
if p.inConflict(state.version, state.file.Version) { if p.inConflict(state.version, state.file.Version) {
// The new file has been changed in conflict with the existing one. We // The new file has been changed in conflict with the existing one. We
// should file it away as a conflict instead of just removing or // should file it away as a conflict instead of just removing or
@ -1119,8 +1141,7 @@ func (p *rwFolder) performFinish(state *sharedPullerState) {
err = nil err = nil
} }
if err != nil { if err != nil {
l.Warnln("Puller: final:", err) return err
return
} }
// If the target path is a symlink or a directory, we cannot copy // If the target path is a symlink or a directory, we cannot copy
@ -1130,18 +1151,15 @@ func (p *rwFolder) performFinish(state *sharedPullerState) {
osutil.InWritableDir(osutil.Remove, state.realName) osutil.InWritableDir(osutil.Remove, state.realName)
} }
// Replace the original content with the new one // Replace the original content with the new one
err = osutil.Rename(state.tempName, state.realName) if err = osutil.Rename(state.tempName, state.realName); err != nil {
if err != nil { return err
l.Warnln("Puller: final:", err)
return
} }
// If it's a symlink, the target of the symlink is inside the file. // If it's a symlink, the target of the symlink is inside the file.
if state.file.IsSymlink() { if state.file.IsSymlink() {
content, err := ioutil.ReadFile(state.realName) content, err := ioutil.ReadFile(state.realName)
if err != nil { if err != nil {
l.Warnln("Puller: final: reading symlink:", err) return err
return
} }
// Remove the file, and replace it with a symlink. // Remove the file, and replace it with a symlink.
@ -1150,13 +1168,13 @@ func (p *rwFolder) performFinish(state *sharedPullerState) {
return symlinks.Create(path, string(content), state.file.Flags) return symlinks.Create(path, string(content), state.file.Flags)
}, state.realName) }, state.realName)
if err != nil { if err != nil {
l.Warnln("Puller: final: creating symlink:", err) return err
return
} }
} }
// Record the updated file in the index // Record the updated file in the index
p.dbUpdates <- state.file p.dbUpdates <- state.file
return nil
} }
func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) { func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
@ -1165,23 +1183,24 @@ func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
if debug { if debug {
l.Debugln(p, "closing", state.file.Name) l.Debugln(p, "closing", state.file.Name)
} }
if err != nil {
l.Warnln("Puller: final:", err)
continue
}
p.queue.Done(state.file.Name) p.queue.Done(state.file.Name)
if state.failed() == nil {
p.performFinish(state) if err == nil {
} else { err = p.performFinish(state)
}
if err != nil {
l.Infoln("Puller: final:", err)
}
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": state.file.Name, "item": state.file.Name,
"error": state.failed(), "error": events.Error(err),
"type": "file", "type": "file",
"action": "update", "action": "update",
}) })
}
if p.progressEmitter != nil { if p.progressEmitter != nil {
p.progressEmitter.Deregister(state) p.progressEmitter.Deregister(state)
} }

View File

@ -36,6 +36,7 @@ type sharedPullerState struct {
copyOrigin int // Number of blocks copied from the original file copyOrigin int // Number of blocks copied from the original file
copyNeeded int // Number of copy actions still pending copyNeeded int // Number of copy actions still pending
pullNeeded int // Number of block pulls still pending pullNeeded int // Number of block pulls still pending
closed bool // True if the file has been finalClosed.
mut sync.Mutex // Protects the above mut sync.Mutex // Protects the above
} }
@ -218,18 +219,30 @@ func (s *sharedPullerState) finalClose() (bool, error) {
s.mut.Lock() s.mut.Lock()
defer s.mut.Unlock() defer s.mut.Unlock()
if s.pullNeeded+s.copyNeeded != 0 && s.err == nil { if s.closed {
// Not done yet. // Already closed
return false, nil return false, nil
} }
if fd := s.fd; fd != nil { if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
s.fd = nil // Not done yet, and not errored
return true, fd.Close()
}
return false, nil return false, nil
} }
if s.fd != nil {
if closeErr := s.fd.Close(); closeErr != nil && s.err == nil {
// This is our error if we weren't errored before. Otherwise we
// keep the earlier error.
s.err = closeErr
}
s.fd = nil
}
s.closed = true
return true, s.err
}
// Returns the momentarily progress for the puller // Returns the momentarily progress for the puller
func (s *sharedPullerState) Progress() *pullerProgress { func (s *sharedPullerState) Progress() *pullerProgress {
s.mut.Lock() s.mut.Lock()