From 5ac01a3af4a1b6b33013cd452f86cfbc0254f481 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Sun, 28 Dec 2014 23:11:32 +0000 Subject: [PATCH] Hash blocks after receipt, try multiple peers (fixes #1166) --- internal/model/model.go | 2 +- internal/model/puller.go | 128 +++++++++++++++++----------------- internal/model/puller_test.go | 11 ++- internal/scanner/blocks.go | 18 +++++ test/sync_test.go | 2 +- 5 files changed, 90 insertions(+), 71 deletions(-) diff --git a/internal/model/model.go b/internal/model/model.go index 403113404..132a8abbe 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -1369,7 +1369,7 @@ func (m *Model) RemoteLocalVersion(folder string) uint64 { func (m *Model) availability(folder, file string) []protocol.DeviceID { // Acquire this lock first, as the value returned from foldersFiles can - // gen heavily modified on Close() + // get heavily modified on Close() m.pmut.RLock() defer m.pmut.RUnlock() diff --git a/internal/model/puller.go b/internal/model/puller.go index cac6815da..7f965a484 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -16,8 +16,6 @@ package model import ( - "bytes" - "crypto/sha256" "errors" "fmt" "io/ioutil" @@ -158,16 +156,10 @@ loop: } p.model.setState(p.folder, FolderSyncing) tries := 0 - checksum := false for { tries++ - // Last resort mode, to get around corrupt/invalid block maps. - if tries == 10 { - l.Infoln("Desperation mode ON") - checksum = true - } - changed := p.pullerIteration(checksum, curIgnores) + changed := p.pullerIteration(curIgnores) if debug { l.Debugln(p, "changed", changed) } @@ -255,7 +247,7 @@ func (p *Puller) String() string { // returns the number items that should have been synced (even those that // might have failed). One puller iteration handles all files currently // flagged as needed in the folder. -func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int { +func (p *Puller) pullerIteration(ignores *ignore.Matcher) int { pullChan := make(chan pullBlockState) copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) @@ -272,7 +264,7 @@ func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int { copyWg.Add(1) go func() { // copierRoutine finishes when copyChan is closed - p.copierRoutine(copyChan, pullChan, finisherChan, checksum) + p.copierRoutine(copyChan, pullChan, finisherChan) copyWg.Done() }() } @@ -613,7 +605,7 @@ func (p *Puller) shortcutSymlink(curFile, file protocol.FileInfo) { // copierRoutine reads copierStates until the in channel closes and performs // the relevant copies when possible, or passes it to the puller routine. -func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState, checksum bool) { +func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { buf := make([]byte, protocol.BlockSize) nextFile: @@ -649,7 +641,6 @@ nextFile: } p.model.fmut.RUnlock() - hasher := sha256.New() for _, block := range state.blocks { buf = buf[:int(block.Size)] found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool { @@ -673,12 +664,9 @@ nextFile: return false } - // Only done on second to last puller attempt - if checksum { - hasher.Write(buf) - hash := hasher.Sum(nil) - hasher.Reset() - if !bytes.Equal(hash, block.Hash) { + hash, err := scanner.VerifyBuffer(buf, block) + if err != nil { + if hash != nil { if debug { l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash) } @@ -686,8 +674,10 @@ nextFile: if err != nil { l.Warnln("finder fix:", err) } - return false + } else if debug { + l.Debugln("Finder failed to verify buffer", err) } + return false } _, err = dstFd.WriteAt(buf, block.Offset) @@ -722,20 +712,9 @@ nextFile: } func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) { -nextBlock: for state := range in { if state.failed() != nil { - continue nextBlock - } - - // Select the least busy device to pull the block from. If we found no - // feasible device at all, fail the block (and in the long run, the - // file). - potentialDevices := p.model.availability(p.folder, state.file.Name) - selected := activity.leastBusy(potentialDevices) - if selected == (protocol.DeviceID{}) { - state.earlyClose("pull", errNoDevice) - continue nextBlock + continue } // Get an fd to the temporary file. Tehcnically we don't need it until @@ -743,45 +722,58 @@ nextBlock: // no point in issuing the request to the network. fd, err := state.tempFile() if err != nil { - continue nextBlock + continue } - // Fetch the block, while marking the selected device as in use so that - // leastBusy can select another device when someone else asks. - activity.using(selected) - buf, err := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash) - activity.done(selected) - if err != nil { - state.earlyClose("pull", err) - continue nextBlock - } + var lastError error + potentialDevices := p.model.availability(p.folder, state.file.Name) + for { + // Select the least busy device to pull the block from. If we found no + // feasible device at all, fail the block (and in the long run, the + // file). + selected := activity.leastBusy(potentialDevices) + if selected == (protocol.DeviceID{}) { + if lastError != nil { + state.earlyClose("pull", lastError) + } else { + state.earlyClose("pull", errNoDevice) + } + break + } - // Save the block data we got from the cluster - _, err = fd.WriteAt(buf, state.block.Offset) - if err != nil { - state.earlyClose("save", err) - continue nextBlock - } + potentialDevices = removeDevice(potentialDevices, selected) - state.pullDone() - out <- state.sharedPullerState + // Fetch the block, while marking the selected device as in use so that + // leastBusy can select another device when someone else asks. + activity.using(selected) + buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash) + activity.done(selected) + if lastError != nil { + continue + } + + // Verify that the received block matches the desired hash, if not + // try pulling it from another device. + _, lastError = scanner.VerifyBuffer(buf, state.block) + if lastError != nil { + continue + } + + // Save the block data we got from the cluster + _, err = fd.WriteAt(buf, state.block.Offset) + if err != nil { + state.earlyClose("save", err) + } else { + state.pullDone() + out <- state.sharedPullerState + } + break + } } } func (p *Puller) performFinish(state *sharedPullerState) { - // Verify the file against expected hashes - fd, err := os.Open(state.tempName) - if err != nil { - l.Warnln("puller: final:", err) - return - } - err = scanner.Verify(fd, protocol.BlockSize, state.file.Blocks) - fd.Close() - if err != nil { - l.Infoln("puller:", state.file.Name, err, "(file changed during pull?)") - return - } - + var err error // Set the correct permission bits on the new file if !p.ignorePerms { err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)) @@ -893,3 +885,13 @@ func invalidateFolder(cfg *config.Configuration, folderID string, err error) { } } } + +func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID { + for i := range devices { + if devices[i] == device { + devices[i] = devices[len(devices)-1] + return devices[:len(devices)-1] + } + } + return devices +} diff --git a/internal/model/puller_test.go b/internal/model/puller_test.go index 50408f1b6..d7576b46f 100644 --- a/internal/model/puller_test.go +++ b/internal/model/puller_test.go @@ -221,7 +221,7 @@ func TestCopierFinder(t *testing.T) { finisherChan := make(chan *sharedPullerState, 1) // Run a single fetcher routine - go p.copierRoutine(copyChan, pullChan, finisherChan, false) + go p.copierRoutine(copyChan, pullChan, finisherChan) p.handleFile(requiredFile, copyChan, finisherChan) @@ -317,9 +317,8 @@ func TestCopierCleanup(t *testing.T) { } } -// On the 10th iteration, we start hashing the content which we receive by -// following blockfinder's instructions. Make sure that the copier routine -// hashes the content when asked, and pulls if it fails to find the block. +// Make sure that the copier routine hashes the content when asked, and pulls +// if it fails to find the block. func TestLastResortPulling(t *testing.T) { fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"} cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}} @@ -361,8 +360,8 @@ func TestLastResortPulling(t *testing.T) { pullChan := make(chan pullBlockState, 1) finisherChan := make(chan *sharedPullerState, 1) - // Run a single copier routine with checksumming enabled - go p.copierRoutine(copyChan, pullChan, finisherChan, true) + // Run a single copier routine + go p.copierRoutine(copyChan, pullChan, finisherChan) p.handleFile(file, copyChan, finisherChan) diff --git a/internal/scanner/blocks.go b/internal/scanner/blocks.go index 2f28b1fe4..8ec2c26b4 100644 --- a/internal/scanner/blocks.go +++ b/internal/scanner/blocks.go @@ -130,6 +130,24 @@ func Verify(r io.Reader, blocksize int, blocks []protocol.BlockInfo) error { return nil } +func VerifyBuffer(buf []byte, block protocol.BlockInfo) ([]byte, error) { + if len(buf) != int(block.Size) { + return nil, fmt.Errorf("length mismatch %d != %d", len(buf), block.Size) + } + hf := sha256.New() + _, err := hf.Write(buf) + if err != nil { + return nil, err + } + hash := hf.Sum(nil) + + if !bytes.Equal(hash, block.Hash) { + return hash, fmt.Errorf("hash mismatch %x != %x", hash, block.Hash) + } + + return hash, nil +} + // BlockEqual returns whether two slices of blocks are exactly the same hash // and index pair wise. func BlocksEqual(src, tgt []protocol.BlockInfo) bool { diff --git a/test/sync_test.go b/test/sync_test.go index 759941a10..be8c84796 100644 --- a/test/sync_test.go +++ b/test/sync_test.go @@ -28,7 +28,7 @@ import ( "github.com/syncthing/syncthing/internal/protocol" ) -func TestSyncCluster(t *testing.T) { +func TestSyncClusterWithoutVersioning(t *testing.T) { // Use no versioning id, _ := protocol.DeviceIDFromString(id2) cfg, _ := config.Load("h2/config.xml", id)