From 1e15b1e0be66808e1f0691de574be0b6cf65de88 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Wed, 8 Oct 2014 23:41:23 +0100 Subject: [PATCH] Implement block fetcher (fixes #781, fixes #3) --- internal/model/model.go | 6 +- internal/model/puller.go | 177 +++++++++++++++------------- internal/model/sharedpullerstate.go | 13 +- 3 files changed, 108 insertions(+), 88 deletions(-) diff --git a/internal/model/model.go b/internal/model/model.go index 527f0a55b..a9cd6b151 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -81,8 +81,9 @@ type service interface { } type Model struct { - cfg *config.ConfigWrapper - db *leveldb.DB + cfg *config.ConfigWrapper + db *leveldb.DB + finder *files.BlockFinder deviceName string clientName string @@ -137,6 +138,7 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s protoConn: make(map[protocol.DeviceID]protocol.Connection), rawConn: make(map[protocol.DeviceID]io.Closer), deviceVer: make(map[protocol.DeviceID]string), + finder: files.NewBlockFinder(db, cfg), } var timeout = 20 * 60 // seconds diff --git a/internal/model/puller.go b/internal/model/puller.go index dc216093e..0ebe58829 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -16,6 +16,7 @@ package model import ( + "bytes" "errors" "fmt" "os" @@ -23,6 +24,8 @@ import ( "sync" "time" + "github.com/AudriusButkevicius/lfu-go" + "github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/osutil" @@ -50,7 +53,7 @@ type pullBlockState struct { } // A copyBlocksState is passed to copy routine if the file has blocks to be -// copied from the original. +// copied. type copyBlocksState struct { *sharedPullerState blocks []protocol.BlockInfo @@ -236,24 +239,25 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) - var wg sync.WaitGroup + var copyWg sync.WaitGroup + var pullWg sync.WaitGroup var doneWg sync.WaitGroup for i := 0; i < ncopiers; i++ { - wg.Add(1) + copyWg.Add(1) go func() { // copierRoutine finishes when copyChan is closed - p.copierRoutine(copyChan, finisherChan) - wg.Done() + p.copierRoutine(copyChan, pullChan, finisherChan) + copyWg.Done() }() } for i := 0; i < npullers; i++ { - wg.Add(1) + pullWg.Add(1) go func() { // pullerRoutine finishes when pullChan is closed p.pullerRoutine(pullChan, finisherChan) - wg.Done() + pullWg.Done() }() } @@ -310,7 +314,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { default: // A new or changed file. This is the only case where we do stuff // in the background; the other three are done synchronously. - p.handleFile(file, copyChan, pullChan, finisherChan) + p.handleFile(file, copyChan, finisherChan) } changed++ @@ -318,13 +322,13 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { }) // Signal copy and puller routines that we are done with the in data for - // this iteration + // this iteration. Wait for them to finish. close(copyChan) + copyWg.Wait() close(pullChan) + pullWg.Wait() - // Wait for them to finish, then signal the finisher chan that there will - // be no more input. - wg.Wait() + // Signal the finisher chan that there will be no more input. close(finisherChan) // Wait for the finisherChan to finish. @@ -419,11 +423,15 @@ func (p *Puller) deleteFile(file protocol.FileInfo) { // handleFile queues the copies and pulls as necessary for a single new or // changed file. -func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) { +func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) { curFile := p.model.CurrentFolderFile(p.folder, file.Name) - copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks) - if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 { + if len(curFile.Blocks) == len(file.Blocks) { + for i := range file.Blocks { + if !bytes.Equal(curFile.Blocks[i].Hash, file.Blocks[i].Hash) { + goto FilesAreDifferent + } + } // We are supposed to copy the entire file, and then fetch nothing. We // are only updating metadata, so we don't actually *need* to make the // copy. @@ -434,11 +442,14 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt return } +FilesAreDifferent: + // Figure out the absolute filenames we need once and for all tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name)) realName := filepath.Join(p.dir, file.Name) var reuse bool + var blocks []protocol.BlockInfo // Check for an old temporary file which might have some blocks we could // reuse. @@ -453,38 +464,26 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt existingBlocks[block.String()] = true } - // Since the blocks are already there, we don't need to copy them - // nor we need to pull them, hence discard blocks which are already - // there, if they are exactly the same... - var newCopyBlocks []protocol.BlockInfo - for _, block := range copyBlocks { + // Since the blocks are already there, we don't need to get them. + for _, block := range file.Blocks { _, ok := existingBlocks[block.String()] if !ok { - newCopyBlocks = append(newCopyBlocks, block) - } - } - - var newPullBlocks []protocol.BlockInfo - for _, block := range pullBlocks { - _, ok := existingBlocks[block.String()] - if !ok { - newPullBlocks = append(newPullBlocks, block) + blocks = append(blocks, block) } } // If any blocks could be reused, let the sharedpullerstate know // which flags it is expected to set on the file. - // Also update the list of work for the routines. - if len(copyBlocks) != len(newCopyBlocks) || len(pullBlocks) != len(newPullBlocks) { + if len(blocks) != len(file.Blocks) { reuse = true - copyBlocks = newCopyBlocks - pullBlocks = newPullBlocks } else { // Otherwise, discard the file ourselves in order for the // sharedpuller not to panic when it fails to exlusively create a // file which already exists os.Remove(tempName) } + } else { + blocks = file.Blocks } s := sharedPullerState{ @@ -492,43 +491,19 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt folder: p.folder, tempName: tempName, realName: realName, - pullNeeded: len(pullBlocks), + copyNeeded: len(blocks), reuse: reuse, } - if len(copyBlocks) > 0 { - s.copyNeeded = 1 - } if debug { - l.Debugf("%v need file %s; copy %d, pull %d, reuse %v", p, file.Name, len(copyBlocks), len(pullBlocks), reuse) + l.Debugf("%v need file %s; copy %d, reuse %v", p, file.Name, len(blocks), reuse) } - if len(copyBlocks) > 0 { - cs := copyBlocksState{ - sharedPullerState: &s, - blocks: copyBlocks, - } - copyChan <- cs - } - - if len(pullBlocks) > 0 { - for _, block := range pullBlocks { - ps := pullBlockState{ - sharedPullerState: &s, - block: block, - } - pullChan <- ps - } - } - - if len(pullBlocks) == 0 && len(copyBlocks) == 0 { - if !reuse { - panic("bug: nothing to do with file?") - } - // We have a temp file that we can reuse totally. Jump directly to the - // finisher stage. - finisherChan <- &s + cs := copyBlocksState{ + sharedPullerState: &s, + blocks: blocks, } + copyChan <- cs } // shortcutFile sets file mode and modification time, when that's the only @@ -561,9 +536,9 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) { p.model.updateLocal(p.folder, file) } -// copierRoutine reads pullerStates until the in channel closes and performs -// the relevant copy. -func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) { +// copierRoutine reads copierStates until the in channel closes and performs +// the relevant copies when possible, or passes it to the puller routine. +func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { buf := make([]byte, protocol.BlockSize) nextFile: @@ -575,32 +550,66 @@ nextFile: continue nextFile } - srcFd, err := state.sourceFile() - if err != nil { - // As above - continue nextFile - } + evictionChan := make(chan lfu.Eviction) + + fdCache := lfu.New() + fdCache.UpperBound = 50 + fdCache.LowerBound = 20 + fdCache.EvictionChannel = evictionChan + + go func() { + for item := range evictionChan { + item.Value.(*os.File).Close() + } + }() for _, block := range state.blocks { buf = buf[:int(block.Size)] - _, err = srcFd.ReadAt(buf, block.Offset) - if err != nil { - state.earlyClose("src read", err) - srcFd.Close() - continue nextFile + success := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool { + path := filepath.Join(p.model.folderCfgs[folder].Path, file) + + var fd *os.File + + fdi := fdCache.Get(path) + if fdi != nil { + fd = fdi.(*os.File) + } else { + fd, err = os.Open(path) + if err != nil { + return false + } + fdCache.Set(path, fd) + } + + _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index)) + if err != nil { + return false + } + + _, err = dstFd.WriteAt(buf, block.Offset) + if err != nil { + state.earlyClose("dst write", err) + } + return true + }) + + if state.failed() != nil { + break } - _, err = dstFd.WriteAt(buf, block.Offset) - if err != nil { - state.earlyClose("dst write", err) - srcFd.Close() - continue nextFile + if !success { + state.pullStarted() + ps := pullBlockState{ + sharedPullerState: state.sharedPullerState, + block: block, + } + pullChan <- ps } + state.copyDone() } - - srcFd.Close() - state.copyDone() + fdCache.Evict(fdCache.Len()) + close(evictionChan) out <- state.sharedPullerState } } diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go index 1347e6274..bba67b142 100644 --- a/internal/model/sharedpullerstate.go +++ b/internal/model/sharedpullerstate.go @@ -149,7 +149,16 @@ func (s *sharedPullerState) copyDone() { s.mut.Lock() s.copyNeeded-- if debug { - l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.pullNeeded) + l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded) + } + s.mut.Unlock() +} + +func (s *sharedPullerState) pullStarted() { + s.mut.Lock() + s.pullNeeded++ + if debug { + l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded) } s.mut.Unlock() } @@ -158,7 +167,7 @@ func (s *sharedPullerState) pullDone() { s.mut.Lock() s.pullNeeded-- if debug { - l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded ->", s.pullNeeded) + l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded) } s.mut.Unlock() }