Implement block fetcher (fixes #781, fixes #3)

This commit is contained in:
Audrius Butkevicius 2014-10-08 23:41:23 +01:00 committed by Jakob Borg
parent 0bc50f7284
commit 1e15b1e0be
3 changed files with 108 additions and 88 deletions

View File

@ -81,8 +81,9 @@ type service interface {
} }
type Model struct { type Model struct {
cfg *config.ConfigWrapper cfg *config.ConfigWrapper
db *leveldb.DB db *leveldb.DB
finder *files.BlockFinder
deviceName string deviceName string
clientName string clientName string
@ -137,6 +138,7 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s
protoConn: make(map[protocol.DeviceID]protocol.Connection), protoConn: make(map[protocol.DeviceID]protocol.Connection),
rawConn: make(map[protocol.DeviceID]io.Closer), rawConn: make(map[protocol.DeviceID]io.Closer),
deviceVer: make(map[protocol.DeviceID]string), deviceVer: make(map[protocol.DeviceID]string),
finder: files.NewBlockFinder(db, cfg),
} }
var timeout = 20 * 60 // seconds var timeout = 20 * 60 // seconds

View File

@ -16,6 +16,7 @@
package model package model
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"os" "os"
@ -23,6 +24,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/AudriusButkevicius/lfu-go"
"github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/osutil" "github.com/syncthing/syncthing/internal/osutil"
@ -50,7 +53,7 @@ type pullBlockState struct {
} }
// A copyBlocksState is passed to copy routine if the file has blocks to be // A copyBlocksState is passed to copy routine if the file has blocks to be
// copied from the original. // copied.
type copyBlocksState struct { type copyBlocksState struct {
*sharedPullerState *sharedPullerState
blocks []protocol.BlockInfo blocks []protocol.BlockInfo
@ -236,24 +239,25 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
copyChan := make(chan copyBlocksState) copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState) finisherChan := make(chan *sharedPullerState)
var wg sync.WaitGroup var copyWg sync.WaitGroup
var pullWg sync.WaitGroup
var doneWg sync.WaitGroup var doneWg sync.WaitGroup
for i := 0; i < ncopiers; i++ { for i := 0; i < ncopiers; i++ {
wg.Add(1) copyWg.Add(1)
go func() { go func() {
// copierRoutine finishes when copyChan is closed // copierRoutine finishes when copyChan is closed
p.copierRoutine(copyChan, finisherChan) p.copierRoutine(copyChan, pullChan, finisherChan)
wg.Done() copyWg.Done()
}() }()
} }
for i := 0; i < npullers; i++ { for i := 0; i < npullers; i++ {
wg.Add(1) pullWg.Add(1)
go func() { go func() {
// pullerRoutine finishes when pullChan is closed // pullerRoutine finishes when pullChan is closed
p.pullerRoutine(pullChan, finisherChan) p.pullerRoutine(pullChan, finisherChan)
wg.Done() pullWg.Done()
}() }()
} }
@ -310,7 +314,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
default: default:
// A new or changed file. This is the only case where we do stuff // A new or changed file. This is the only case where we do stuff
// in the background; the other three are done synchronously. // in the background; the other three are done synchronously.
p.handleFile(file, copyChan, pullChan, finisherChan) p.handleFile(file, copyChan, finisherChan)
} }
changed++ changed++
@ -318,13 +322,13 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
}) })
// Signal copy and puller routines that we are done with the in data for // Signal copy and puller routines that we are done with the in data for
// this iteration // this iteration. Wait for them to finish.
close(copyChan) close(copyChan)
copyWg.Wait()
close(pullChan) close(pullChan)
pullWg.Wait()
// Wait for them to finish, then signal the finisher chan that there will // Signal the finisher chan that there will be no more input.
// be no more input.
wg.Wait()
close(finisherChan) close(finisherChan)
// Wait for the finisherChan to finish. // Wait for the finisherChan to finish.
@ -419,11 +423,15 @@ func (p *Puller) deleteFile(file protocol.FileInfo) {
// handleFile queues the copies and pulls as necessary for a single new or // handleFile queues the copies and pulls as necessary for a single new or
// changed file. // changed file.
func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) { func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
curFile := p.model.CurrentFolderFile(p.folder, file.Name) curFile := p.model.CurrentFolderFile(p.folder, file.Name)
copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks)
if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 { if len(curFile.Blocks) == len(file.Blocks) {
for i := range file.Blocks {
if !bytes.Equal(curFile.Blocks[i].Hash, file.Blocks[i].Hash) {
goto FilesAreDifferent
}
}
// We are supposed to copy the entire file, and then fetch nothing. We // We are supposed to copy the entire file, and then fetch nothing. We
// are only updating metadata, so we don't actually *need* to make the // are only updating metadata, so we don't actually *need* to make the
// copy. // copy.
@ -434,11 +442,14 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
return return
} }
FilesAreDifferent:
// Figure out the absolute filenames we need once and for all // Figure out the absolute filenames we need once and for all
tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name)) tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
realName := filepath.Join(p.dir, file.Name) realName := filepath.Join(p.dir, file.Name)
var reuse bool var reuse bool
var blocks []protocol.BlockInfo
// Check for an old temporary file which might have some blocks we could // Check for an old temporary file which might have some blocks we could
// reuse. // reuse.
@ -453,38 +464,26 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
existingBlocks[block.String()] = true existingBlocks[block.String()] = true
} }
// Since the blocks are already there, we don't need to copy them // Since the blocks are already there, we don't need to get them.
// nor we need to pull them, hence discard blocks which are already for _, block := range file.Blocks {
// there, if they are exactly the same...
var newCopyBlocks []protocol.BlockInfo
for _, block := range copyBlocks {
_, ok := existingBlocks[block.String()] _, ok := existingBlocks[block.String()]
if !ok { if !ok {
newCopyBlocks = append(newCopyBlocks, block) blocks = append(blocks, block)
}
}
var newPullBlocks []protocol.BlockInfo
for _, block := range pullBlocks {
_, ok := existingBlocks[block.String()]
if !ok {
newPullBlocks = append(newPullBlocks, block)
} }
} }
// If any blocks could be reused, let the sharedpullerstate know // If any blocks could be reused, let the sharedpullerstate know
// which flags it is expected to set on the file. // which flags it is expected to set on the file.
// Also update the list of work for the routines. if len(blocks) != len(file.Blocks) {
if len(copyBlocks) != len(newCopyBlocks) || len(pullBlocks) != len(newPullBlocks) {
reuse = true reuse = true
copyBlocks = newCopyBlocks
pullBlocks = newPullBlocks
} else { } else {
// Otherwise, discard the file ourselves in order for the // Otherwise, discard the file ourselves in order for the
// sharedpuller not to panic when it fails to exlusively create a // sharedpuller not to panic when it fails to exlusively create a
// file which already exists // file which already exists
os.Remove(tempName) os.Remove(tempName)
} }
} else {
blocks = file.Blocks
} }
s := sharedPullerState{ s := sharedPullerState{
@ -492,43 +491,19 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
folder: p.folder, folder: p.folder,
tempName: tempName, tempName: tempName,
realName: realName, realName: realName,
pullNeeded: len(pullBlocks), copyNeeded: len(blocks),
reuse: reuse, reuse: reuse,
} }
if len(copyBlocks) > 0 {
s.copyNeeded = 1
}
if debug { if debug {
l.Debugf("%v need file %s; copy %d, pull %d, reuse %v", p, file.Name, len(copyBlocks), len(pullBlocks), reuse) l.Debugf("%v need file %s; copy %d, reuse %v", p, file.Name, len(blocks), reuse)
} }
if len(copyBlocks) > 0 { cs := copyBlocksState{
cs := copyBlocksState{ sharedPullerState: &s,
sharedPullerState: &s, blocks: blocks,
blocks: copyBlocks,
}
copyChan <- cs
}
if len(pullBlocks) > 0 {
for _, block := range pullBlocks {
ps := pullBlockState{
sharedPullerState: &s,
block: block,
}
pullChan <- ps
}
}
if len(pullBlocks) == 0 && len(copyBlocks) == 0 {
if !reuse {
panic("bug: nothing to do with file?")
}
// We have a temp file that we can reuse totally. Jump directly to the
// finisher stage.
finisherChan <- &s
} }
copyChan <- cs
} }
// shortcutFile sets file mode and modification time, when that's the only // shortcutFile sets file mode and modification time, when that's the only
@ -561,9 +536,9 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) {
p.model.updateLocal(p.folder, file) p.model.updateLocal(p.folder, file)
} }
// copierRoutine reads pullerStates until the in channel closes and performs // copierRoutine reads copierStates until the in channel closes and performs
// the relevant copy. // the relevant copies when possible, or passes it to the puller routine.
func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) { func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
buf := make([]byte, protocol.BlockSize) buf := make([]byte, protocol.BlockSize)
nextFile: nextFile:
@ -575,32 +550,66 @@ nextFile:
continue nextFile continue nextFile
} }
srcFd, err := state.sourceFile() evictionChan := make(chan lfu.Eviction)
if err != nil {
// As above fdCache := lfu.New()
continue nextFile fdCache.UpperBound = 50
} fdCache.LowerBound = 20
fdCache.EvictionChannel = evictionChan
go func() {
for item := range evictionChan {
item.Value.(*os.File).Close()
}
}()
for _, block := range state.blocks { for _, block := range state.blocks {
buf = buf[:int(block.Size)] buf = buf[:int(block.Size)]
_, err = srcFd.ReadAt(buf, block.Offset) success := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
if err != nil { path := filepath.Join(p.model.folderCfgs[folder].Path, file)
state.earlyClose("src read", err)
srcFd.Close() var fd *os.File
continue nextFile
fdi := fdCache.Get(path)
if fdi != nil {
fd = fdi.(*os.File)
} else {
fd, err = os.Open(path)
if err != nil {
return false
}
fdCache.Set(path, fd)
}
_, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
if err != nil {
return false
}
_, err = dstFd.WriteAt(buf, block.Offset)
if err != nil {
state.earlyClose("dst write", err)
}
return true
})
if state.failed() != nil {
break
} }
_, err = dstFd.WriteAt(buf, block.Offset) if !success {
if err != nil { state.pullStarted()
state.earlyClose("dst write", err) ps := pullBlockState{
srcFd.Close() sharedPullerState: state.sharedPullerState,
continue nextFile block: block,
}
pullChan <- ps
} }
state.copyDone()
} }
fdCache.Evict(fdCache.Len())
srcFd.Close() close(evictionChan)
state.copyDone()
out <- state.sharedPullerState out <- state.sharedPullerState
} }
} }

View File

@ -149,7 +149,16 @@ func (s *sharedPullerState) copyDone() {
s.mut.Lock() s.mut.Lock()
s.copyNeeded-- s.copyNeeded--
if debug { if debug {
l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.pullNeeded) l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
}
s.mut.Unlock()
}
func (s *sharedPullerState) pullStarted() {
s.mut.Lock()
s.pullNeeded++
if debug {
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
} }
s.mut.Unlock() s.mut.Unlock()
} }
@ -158,7 +167,7 @@ func (s *sharedPullerState) pullDone() {
s.mut.Lock() s.mut.Lock()
s.pullNeeded-- s.pullNeeded--
if debug { if debug {
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded ->", s.pullNeeded) l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
} }
s.mut.Unlock() s.mut.Unlock()
} }