All roads lead to Finisher (fixes #1201)

This commit is contained in:
Audrius Butkevicius 2015-01-07 23:12:12 +00:00
parent 5d173168cc
commit dec479532e
5 changed files with 193 additions and 34 deletions

View File

@ -47,8 +47,6 @@ func expectTimeout(w *events.Subscription, t *testing.T) {
}
func TestProgressEmitter(t *testing.T) {
l.Debugln("test progress emitter")
w := events.Default.Subscribe(events.DownloadProgress)
c := config.Wrap("/tmp/test", config.Configuration{})

View File

@ -613,17 +613,17 @@ func (p *Puller) shortcutSymlink(curFile, file protocol.FileInfo) {
func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
buf := make([]byte, protocol.BlockSize)
nextFile:
for state := range in {
if p.progressEmitter != nil {
p.progressEmitter.Register(state.sharedPullerState)
}
dstFd, err := state.tempFile()
if err != nil {
// Nothing more to do for this failed file (the error was logged
// when it happened)
continue nextFile
}
if p.progressEmitter != nil {
p.progressEmitter.Register(state.sharedPullerState)
out <- state.sharedPullerState
continue
}
evictionChan := make(chan lfu.Eviction)
@ -687,7 +687,7 @@ nextFile:
_, err = dstFd.WriteAt(buf, block.Offset)
if err != nil {
state.earlyClose("dst write", err)
state.fail("dst write", err)
}
if file == state.file.Name {
state.copiedFromOrigin()
@ -739,9 +739,9 @@ func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPulle
selected := activity.leastBusy(potentialDevices)
if selected == (protocol.DeviceID{}) {
if lastError != nil {
state.earlyClose("pull", lastError)
state.fail("pull", lastError)
} else {
state.earlyClose("pull", errNoDevice)
state.fail("pull", errNoDevice)
}
break
}
@ -767,13 +767,13 @@ func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPulle
// Save the block data we got from the cluster
_, err = fd.WriteAt(buf, state.block.Offset)
if err != nil {
state.earlyClose("save", err)
state.fail("save", err)
} else {
state.pullDone()
out <- state.sharedPullerState
}
break
}
out <- state.sharedPullerState
}
}
@ -863,7 +863,9 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
}
p.queue.Done(state.file.Name)
p.performFinish(state)
if state.failed() == nil {
p.performFinish(state)
}
p.model.receivedFile(p.folder, state.file.Name)
if p.progressEmitter != nil {
p.progressEmitter.Deregister(state)

View File

@ -382,3 +382,172 @@ func TestLastResortPulling(t *testing.T) {
(<-finisherChan).fd.Close()
os.Remove(filepath.Join("testdata", defTempNamer.TempName("newfile")))
}
func TestDeregisterOnFailInCopy(t *testing.T) {
file := protocol.FileInfo{
Name: "filex",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{
blocks[0], blocks[2], blocks[0], blocks[0],
blocks[5], blocks[0], blocks[0], blocks[8],
},
}
defer os.Remove(defTempNamer.TempName("filex"))
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
cw := config.Wrap("/tmp/test", config.Configuration{})
m := NewModel(cw, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
emitter := NewProgressEmitter(cw)
go emitter.Serve()
p := Puller{
folder: "default",
dir: "testdata",
model: m,
queue: newJobQueue(),
progressEmitter: emitter,
}
// queue.Done should be called by the finisher routine
p.queue.Push("filex")
p.queue.Pop()
if len(p.queue.progress) != 1 {
t.Fatal("Expected file in progress")
}
copyChan := make(chan copyBlocksState)
pullChan := make(chan pullBlockState)
finisherBufferChan := make(chan *sharedPullerState)
finisherChan := make(chan *sharedPullerState)
go p.copierRoutine(copyChan, pullChan, finisherBufferChan)
go p.finisherRoutine(finisherChan)
p.handleFile(file, copyChan, finisherChan)
// Receive a block at puller, to indicate that atleast a single copier
// loop has been performed.
toPull := <-pullChan
// Wait until copier is trying to pass something down to the puller again
time.Sleep(100 * time.Millisecond)
// Close the file
toPull.sharedPullerState.fail("test", os.ErrNotExist)
// Unblock copier
<-pullChan
select {
case state := <-finisherBufferChan:
// At this point the file should still be registered with both the job
// queue, and the progress emitter. Verify this.
if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 {
t.Fatal("Could not find file")
}
// Pass the file down the real finisher, and give it time to consume
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if state.fd != nil {
t.Fatal("File not closed?")
}
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued))
}
// Doing it again should have no effect
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered")
}
case <-time.After(time.Second):
t.Fatal("Didn't get anything to the finisher")
}
}
func TestDeregisterOnFailInPull(t *testing.T) {
file := protocol.FileInfo{
Name: "filex",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{
blocks[0], blocks[2], blocks[0], blocks[0],
blocks[5], blocks[0], blocks[0], blocks[8],
},
}
defer os.Remove(defTempNamer.TempName("filex"))
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
cw := config.Wrap("/tmp/test", config.Configuration{})
m := NewModel(cw, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
emitter := NewProgressEmitter(cw)
go emitter.Serve()
p := Puller{
folder: "default",
dir: "testdata",
model: m,
queue: newJobQueue(),
progressEmitter: emitter,
}
// queue.Done should be called by the finisher routine
p.queue.Push("filex")
p.queue.Pop()
if len(p.queue.progress) != 1 {
t.Fatal("Expected file in progress")
}
copyChan := make(chan copyBlocksState)
pullChan := make(chan pullBlockState)
finisherBufferChan := make(chan *sharedPullerState)
finisherChan := make(chan *sharedPullerState)
go p.copierRoutine(copyChan, pullChan, finisherBufferChan)
go p.pullerRoutine(pullChan, finisherBufferChan)
go p.finisherRoutine(finisherChan)
p.handleFile(file, copyChan, finisherChan)
// Receove at finisher, we shoud error out as puller has nowhere to pull
// from.
select {
case state := <-finisherBufferChan:
// At this point the file should still be registered with both the job
// queue, and the progress emitter. Verify this.
if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 {
t.Fatal("Could not find file")
}
// Pass the file down the real finisher, and give it time to consume
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if state.fd != nil {
t.Fatal("File not closed?")
}
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued))
}
// Doing it again should have no effect
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered")
}
case <-time.After(time.Second):
t.Fatal("Didn't get anything to the finisher")
}
}

View File

@ -43,7 +43,6 @@ type sharedPullerState struct {
copyOrigin uint32 // Number of blocks copied from the original file
copyNeeded uint32 // Number of copy actions still pending
pullNeeded uint32 // Number of block pulls still pending
closed bool // Set when the file has been closed
mut sync.Mutex // Protects the above
}
@ -93,7 +92,7 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
// here.
dir := filepath.Dir(s.tempName)
if info, err := os.Stat(dir); err != nil {
s.earlyCloseLocked("dst stat dir", err)
s.failLocked("dst stat dir", err)
return nil, err
} else if info.Mode()&0200 == 0 {
err := os.Chmod(dir, 0755)
@ -119,13 +118,13 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
// make sure we have write permissions on the file before opening it.
err := os.Chmod(s.tempName, 0644)
if err != nil {
s.earlyCloseLocked("dst create chmod", err)
s.failLocked("dst create chmod", err)
return nil, err
}
}
fd, err := os.OpenFile(s.tempName, flags, 0644)
if err != nil {
s.earlyCloseLocked("dst create", err)
s.failLocked("dst create", err)
return nil, err
}
@ -148,7 +147,7 @@ func (s *sharedPullerState) sourceFile() (*os.File, error) {
// Attempt to open the existing file
fd, err := os.Open(s.realName)
if err != nil {
s.earlyCloseLocked("src open", err)
s.failLocked("src open", err)
return nil, err
}
@ -158,24 +157,20 @@ func (s *sharedPullerState) sourceFile() (*os.File, error) {
// earlyClose prints a warning message composed of the context and
// error, and marks the sharedPullerState as failed. Is a no-op when called on
// an already failed state.
func (s *sharedPullerState) earlyClose(context string, err error) {
func (s *sharedPullerState) fail(context string, err error) {
s.mut.Lock()
defer s.mut.Unlock()
s.earlyCloseLocked(context, err)
s.failLocked(context, err)
}
func (s *sharedPullerState) earlyCloseLocked(context string, err error) {
func (s *sharedPullerState) failLocked(context string, err error) {
if s.err != nil {
return
}
l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
s.err = err
if s.fd != nil {
s.fd.Close()
}
s.closed = true
}
func (s *sharedPullerState) failed() error {
@ -230,21 +225,16 @@ func (s *sharedPullerState) finalClose() (bool, error) {
s.mut.Lock()
defer s.mut.Unlock()
if s.pullNeeded+s.copyNeeded != 0 {
if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
// Not done yet.
return false, nil
}
if s.closed {
// Already handled.
return false, nil
}
s.closed = true
if fd := s.fd; fd != nil {
s.fd = nil
return true, fd.Close()
}
return true, nil
return false, nil
}
// Returns the momentarily progress for the puller

View File

@ -86,5 +86,5 @@ func TestReadOnlyDir(t *testing.T) {
t.Fatal("Unexpected nil fd")
}
s.earlyClose("Test done", nil)
s.fail("Test done", nil)
}