syncthing/lib/model/folder_sendrecv.go

2068 lines
62 KiB
Go
Raw Normal View History

// Copyright (C) 2014 The Syncthing Authors.
2014-09-29 19:43:32 +00:00
//
2015-03-07 20:36:35 +00:00
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
2014-06-01 20:50:14 +00:00
2014-05-15 03:26:55 +00:00
package model
import (
"bytes"
2014-08-25 15:45:13 +00:00
"fmt"
"io"
"path/filepath"
"runtime"
"sort"
"strings"
"time"
"github.com/pkg/errors"
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/osutil"
2015-09-22 17:38:46 +00:00
"github.com/syncthing/syncthing/lib/protocol"
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/sha256"
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/weakhash"
)
var (
blockStats = make(map[string]int)
blockStatsMut = sync.NewMutex()
)
func init() {
folderFactories[config.FolderTypeSendReceive] = newSendReceiveFolder
}
// A pullBlockState is passed to the puller routine for each block that needs
// to be fetched.
type pullBlockState struct {
*sharedPullerState
block protocol.BlockInfo
}
// A copyBlocksState is passed to copy routine if the file has blocks to be
// copied.
type copyBlocksState struct {
*sharedPullerState
blocks []protocol.BlockInfo
have int
}
// Which filemode bits to preserve
const retainBits = fs.ModeSetgid | fs.ModeSetuid | fs.ModeSticky
var (
activity = newDeviceActivity()
errNoDevice = errors.New("peers who had this file went away, or the file has changed while syncing. will retry later")
errDirPrefix = "directory has been deleted on a remote device but "
errDirHasToBeScanned = errors.New(errDirPrefix + "contains changed files, scheduling scan")
errDirHasIgnored = errors.New(errDirPrefix + "contains ignored files (see ignore documentation for (?d) prefix)")
errDirHasReceiveOnlyChanged = errors.New(errDirPrefix + "contains locally changed files")
errDirNotEmpty = errors.New(errDirPrefix + "is not empty; the contents are probably ignored on that remote device, but not locally")
errNotAvailable = errors.New("no connected device has the required version of this file")
errModified = errors.New("file modified but not rescanned; will try again later")
errUnexpectedDirOnFileDel = errors.New("encountered directory when trying to remove file/symlink")
errIncompatibleSymlink = errors.New("incompatible symlink entry; rescan with newer Syncthing on source")
contextRemovingOldItem = "removing item to be replaced"
)
type dbUpdateType int
func (d dbUpdateType) String() string {
switch d {
case dbUpdateHandleDir:
return "dbUpdateHandleDir"
case dbUpdateDeleteDir:
return "dbUpdateDeleteDir"
case dbUpdateHandleFile:
return "dbUpdateHandleFile"
case dbUpdateDeleteFile:
return "dbUpdateDeleteFile"
case dbUpdateShortcutFile:
return "dbUpdateShourtcutFile"
case dbUpdateHandleSymlink:
return "dbUpdateHandleSymlink"
case dbUpdateInvalidate:
return "dbUpdateHandleInvalidate"
}
panic(fmt.Sprintf("unknown dbUpdateType %d", d))
}
const (
dbUpdateHandleDir dbUpdateType = iota
dbUpdateDeleteDir
dbUpdateHandleFile
dbUpdateDeleteFile
dbUpdateShortcutFile
dbUpdateHandleSymlink
dbUpdateInvalidate
)
const (
defaultCopiers = 2
defaultPullerPause = 60 * time.Second
defaultPullerPendingKiB = 2 * protocol.MaxBlockSize / 1024
maxPullerIterations = 3
)
type dbUpdateJob struct {
file protocol.FileInfo
jobType dbUpdateType
}
type sendReceiveFolder struct {
folder
fs fs.Filesystem
queue *jobQueue
blockPullReorderer blockPullReorderer
writeLimiter *byteSemaphore
pullErrors map[string]string // errors for most recent/current iteration
oldPullErrors map[string]string // errors from previous iterations for log filtering only
pullErrorsMut sync.Mutex
}
func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
f := &sendReceiveFolder{
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter, ver),
fs: fs,
queue: newJobQueue(),
blockPullReorderer: newBlockPullReorderer(cfg.BlockPullOrder, model.id, cfg.DeviceIDs()),
writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites),
pullErrorsMut: sync.NewMutex(),
}
f.folder.puller = f
f.folder.Service = util.AsService(f.serve, f.String())
if f.Copiers == 0 {
f.Copiers = defaultCopiers
}
// If the configured max amount of pending data is zero, we use the
// default. If it's configured to something non-zero but less than the
// protocol block size we adjust it upwards accordingly.
if f.PullerMaxPendingKiB == 0 {
f.PullerMaxPendingKiB = defaultPullerPendingKiB
}
if blockSizeKiB := protocol.MaxBlockSize / 1024; f.PullerMaxPendingKiB < blockSizeKiB {
f.PullerMaxPendingKiB = blockSizeKiB
}
return f
}
// pull returns true if it manages to get all needed items from peers, i.e. get
// the device in sync with the global state.
func (f *sendReceiveFolder) pull() bool {
// Check if the ignore patterns changed.
oldHash := f.ignores.Hash()
defer func() {
if f.ignores.Hash() != oldHash {
f.ignoresUpdated()
}
}()
err := f.getHealthErrorAndLoadIgnores()
f.setError(err)
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return false
}
l.Debugf("%v pulling", f)
scanChan := make(chan string)
go f.pullScannerRoutine(scanChan)
defer func() {
close(scanChan)
f.setState(FolderIdle)
}()
changed := 0
for tries := 0; tries < maxPullerIterations; tries++ {
select {
case <-f.ctx.Done():
return false
default:
}
// Needs to be set on every loop, as the puller might have set
// it to FolderSyncing during the last iteration.
f.setState(FolderSyncPreparing)
changed = f.pullerIteration(scanChan)
l.Debugln(f, "changed", changed, "on try", tries+1)
if changed == 0 {
// No files were changed by the puller, so we are in
// sync (except for unrecoverable stuff like invalid
// filenames on windows).
break
}
}
f.pullErrorsMut.Lock()
pullErrNum := len(f.pullErrors)
f.pullErrorsMut.Unlock()
if pullErrNum > 0 {
l.Infof("%v: Failed to sync %v items", f.Description(), pullErrNum)
f.evLogger.Log(events.FolderErrors, map[string]interface{}{
"folder": f.folderID,
"errors": f.Errors(),
})
}
return changed == 0
}
// pullerIteration runs a single puller iteration for the given folder and
// returns the number items that should have been synced (even those that
// might have failed). One puller iteration handles all files currently
// flagged as needed in the folder.
func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
f.pullErrorsMut.Lock()
f.oldPullErrors = f.pullErrors
f.pullErrors = make(map[string]string)
f.pullErrorsMut.Unlock()
snap := f.fset.Snapshot()
defer snap.Release()
pullChan := make(chan pullBlockState)
copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState)
dbUpdateChan := make(chan dbUpdateJob)
2015-04-22 22:54:31 +00:00
pullWg := sync.NewWaitGroup()
copyWg := sync.NewWaitGroup()
2015-04-22 22:54:31 +00:00
doneWg := sync.NewWaitGroup()
updateWg := sync.NewWaitGroup()
l.Debugln(f, "copiers:", f.Copiers, "pullerPendingKiB:", f.PullerMaxPendingKiB)
2014-11-23 18:43:49 +00:00
updateWg.Add(1)
go func() {
// dbUpdaterRoutine finishes when dbUpdateChan is closed
f.dbUpdaterRoutine(dbUpdateChan)
updateWg.Done()
}()
for i := 0; i < f.Copiers; i++ {
copyWg.Add(1)
go func() {
// copierRoutine finishes when copyChan is closed
f.copierRoutine(copyChan, pullChan, finisherChan)
copyWg.Done()
}()
}
pullWg.Add(1)
go func() {
// pullerRoutine finishes when pullChan is closed
f.pullerRoutine(pullChan, finisherChan)
pullWg.Done()
}()
doneWg.Add(1)
// finisherRoutine finishes when finisherChan is closed
go func() {
f.finisherRoutine(snap, finisherChan, dbUpdateChan, scanChan)
doneWg.Done()
}()
changed, fileDeletions, dirDeletions, err := f.processNeeded(snap, dbUpdateChan, copyChan, scanChan)
// Signal copy and puller routines that we are done with the in data for
// this iteration. Wait for them to finish.
close(copyChan)
copyWg.Wait()
close(pullChan)
pullWg.Wait()
// Signal the finisher chan that there will be no more input and wait
// for it to finish.
close(finisherChan)
doneWg.Wait()
if err == nil {
f.processDeletions(fileDeletions, dirDeletions, snap, dbUpdateChan, scanChan)
}
// Wait for db updates and scan scheduling to complete
close(dbUpdateChan)
updateWg.Wait()
f.pullErrorsMut.Lock()
f.oldPullErrors = nil
f.pullErrorsMut.Unlock()
f.queue.Reset()
return changed
}
func (f *sendReceiveFolder) processNeeded(snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
changed := 0
var dirDeletions []protocol.FileInfo
fileDeletions := map[string]protocol.FileInfo{}
buckets := map[string][]protocol.FileInfo{}
// Iterate the list of items that we need and sort them into piles.
// Regular files to pull goes into the file queue, everything else
// (directories, symlinks and deletes) goes into the "process directly"
// pile.
snap.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
select {
case <-f.ctx.Done():
return false
default:
}
if f.IgnoreDelete && intf.IsDeleted() {
l.Debugln(f, "ignore file deletion (config)", intf.FileName())
return true
}
changed++
file := intf.(protocol.FileInfo)
switch {
case f.ignores.ShouldIgnore(file.Name):
file.SetIgnored(f.shortID)
l.Debugln(f, "Handling ignored file", file)
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
case runtime.GOOS == "windows" && fs.WindowsInvalidFilename(file.Name):
if file.IsDeleted() {
// Just pretend we deleted it, no reason to create an error
// about a deleted file that we can't have anyway.
// Reason we need it in the first place is, that it was
// ignored at some point.
dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
} else {
// We can't pull an invalid file.
f.newPullError(file.Name, fs.ErrInvalidFilename)
// No reason to retry for this
changed--
}
case file.IsDeleted():
if file.IsDirectory() {
// Perform directory deletions at the end, as we may have
// files to delete inside them before we get to that point.
dirDeletions = append(dirDeletions, file)
} else if file.IsSymlink() {
f.deleteFile(file, snap, dbUpdateChan, scanChan)
} else {
df, ok := snap.Get(protocol.LocalDeviceID, file.Name)
// Local file can be already deleted, but with a lower version
// number, hence the deletion coming in again as part of
// WithNeed, furthermore, the file can simply be of the wrong
// type if we haven't yet managed to pull it.
if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() && !df.IsInvalid() {
fileDeletions[file.Name] = file
// Put files into buckets per first hash
key := string(df.BlocksHash)
buckets[key] = append(buckets[key], df)
} else {
f.deleteFileWithCurrent(file, df, ok, dbUpdateChan, scanChan)
}
}
case file.Type == protocol.FileInfoTypeFile:
curFile, hasCurFile := snap.Get(protocol.LocalDeviceID, file.Name)
if hasCurFile && file.BlocksEqual(curFile) {
// We are supposed to copy the entire file, and then fetch nothing. We
// are only updating metadata, so we don't actually *need* to make the
// copy.
f.shortcutFile(file, curFile, dbUpdateChan)
} else {
// Queue files for processing after directories and symlinks.
f.queue.Push(file.Name, file.Size, file.ModTime())
}
case runtime.GOOS == "windows" && file.IsSymlink():
file.SetUnsupported(f.shortID)
l.Debugln(f, "Invalidating symlink (unsupported)", file.Name)
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
case file.IsDirectory() && !file.IsSymlink():
l.Debugln(f, "Handling directory", file.Name)
if f.checkParent(file.Name, scanChan) {
f.handleDir(file, snap, dbUpdateChan, scanChan)
}
case file.IsSymlink():
l.Debugln(f, "Handling symlink", file.Name)
if f.checkParent(file.Name, scanChan) {
f.handleSymlink(file, snap, dbUpdateChan, scanChan)
}
default:
l.Warnln(file)
panic("unhandleable item type, can't happen")
}
return true
})
select {
case <-f.ctx.Done():
return changed, nil, nil, f.ctx.Err()
default:
}
// Now do the file queue. Reorder it according to configuration.
switch f.Order {
case config.OrderRandom:
f.queue.Shuffle()
case config.OrderAlphabetic:
// The queue is already in alphabetic order.
case config.OrderSmallestFirst:
f.queue.SortSmallestFirst()
case config.OrderLargestFirst:
f.queue.SortLargestFirst()
case config.OrderOldestFirst:
f.queue.SortOldestFirst()
case config.OrderNewestFirst:
f.queue.SortNewestFirst()
}
// Process the file queue.
2014-12-19 23:12:12 +00:00
nextFile:
for {
select {
case <-f.ctx.Done():
return changed, fileDeletions, dirDeletions, f.ctx.Err()
default:
}
fileName, ok := f.queue.Pop()
if !ok {
break
}
2014-12-19 23:12:12 +00:00
fi, ok := snap.GetGlobal(fileName)
2014-12-19 23:12:12 +00:00
if !ok {
// File is no longer in the index. Mark it as done and drop it.
f.queue.Done(fileName)
2014-12-19 23:12:12 +00:00
continue
}
if fi.IsDeleted() || fi.IsInvalid() || fi.Type != protocol.FileInfoTypeFile {
// The item has changed type or status in the index while we
// were processing directories above.
f.queue.Done(fileName)
continue
}
if !f.checkParent(fi.Name, scanChan) {
f.queue.Done(fileName)
continue
}
// Check our list of files to be removed for a match, in which case
// we can just do a rename instead.
key := string(fi.BlocksHash)
for candidate, ok := popCandidate(buckets, key); ok; candidate, ok = popCandidate(buckets, key) {
// candidate is our current state of the file, where as the
// desired state with the delete bit set is in the deletion
// map.
desired := fileDeletions[candidate.Name]
if err := f.renameFile(candidate, desired, fi, snap, dbUpdateChan, scanChan); err != nil {
l.Debugf("rename shortcut for %s failed: %s", fi.Name, err.Error())
// Failed to rename, try next one.
continue
}
// Remove the pending deletion (as we performed it by renaming)
delete(fileDeletions, candidate.Name)
f.queue.Done(fileName)
continue nextFile
}
2014-12-19 23:12:12 +00:00
devices := snap.Availability(fileName)
for _, dev := range devices {
if _, ok := f.model.Connection(dev); ok {
// Handle the file normally, by coping and pulling, etc.
f.handleFile(fi, snap, copyChan)
continue nextFile
}
}
f.newPullError(fileName, errNotAvailable)
f.queue.Done(fileName)
}
return changed, fileDeletions, dirDeletions, nil
}
func popCandidate(buckets map[string][]protocol.FileInfo, key string) (protocol.FileInfo, bool) {
cands := buckets[key]
if len(cands) == 0 {
return protocol.FileInfo{}, false
}
buckets[key] = cands[1:]
return cands[0], true
}
func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
2014-12-19 23:12:12 +00:00
for _, file := range fileDeletions {
select {
case <-f.ctx.Done():
return
default:
}
f.deleteFile(file, snap, dbUpdateChan, scanChan)
2014-12-19 23:12:12 +00:00
}
// Process in reverse order to delete depth first
for i := range dirDeletions {
select {
case <-f.ctx.Done():
return
default:
}
dir := dirDeletions[len(dirDeletions)-i-1]
l.Debugln(f, "Deleting dir", dir.Name)
f.deleteDir(dir, snap, dbUpdateChan, scanChan)
}
}
// handleDir creates or updates the given directory
func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
"item": file.Name,
"type": "dir",
"action": "update",
2015-02-01 17:31:19 +00:00
})
2015-02-01 17:31:19 +00:00
defer func() {
f.evLogger.Log(events.ItemFinished, map[string]interface{}{
"folder": f.folderID,
2015-02-01 17:31:19 +00:00
"item": file.Name,
"error": events.Error(err),
"type": "dir",
"action": "update",
2015-02-01 17:31:19 +00:00
})
}()
mode := fs.FileMode(file.Permissions & 0777)
if f.IgnorePerms || file.NoPermissions {
mode = 0777
2014-10-09 22:34:32 +00:00
}
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
if shouldDebug() {
curFile, _ := snap.Get(protocol.LocalDeviceID, file.Name)
l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
}
info, err := f.fs.Lstat(file.Name)
switch {
// There is already something under that name, we need to handle that.
// Unless it already is a directory, as we only track permissions,
// that don't result in a conflict.
case err == nil && !info.IsDir():
// Check that it is what we have in the database.
curFile, hasCurFile := f.model.CurrentFolderFile(f.folderID, file.Name)
if err := f.scanIfItemChanged(file.Name, info, curFile, hasCurFile, scanChan); err != nil {
err = errors.Wrap(err, "handling dir")
f.newPullError(file.Name, err)
return
}
// Remove it to replace with the dir.
if !curFile.IsSymlink() && f.inConflict(curFile.Version, file.Version) {
// The new file has been changed in conflict with the existing one. We
// should file it away as a conflict instead of just removing or
// archiving. Also merge with the version vector we had, to indicate
// we have resolved the conflict.
// Symlinks aren't checked for conflicts.
file.Version = file.Version.Merge(curFile.Version)
err = f.inWritableDir(func(name string) error {
return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
}, curFile.Name)
} else {
err = f.deleteItemOnDisk(curFile, snap, scanChan)
}
if err != nil {
f.newPullError(file.Name, err)
return
}
fallthrough
// The directory doesn't exist, so we create it with the right
// mode bits from the start.
case err != nil && fs.IsNotExist(err):
// We declare a function that acts on only the path name, so
// we can pass it to InWritableDir. We use a regular Mkdir and
// not MkdirAll because the parent should already exist.
mkdir := func(path string) error {
err = f.fs.Mkdir(path, mode)
if err != nil || f.IgnorePerms || file.NoPermissions {
return err
}
// Copy the parent owner and group, if we are supposed to do that.
if err := f.maybeCopyOwner(path); err != nil {
return err
}
// Stat the directory so we can check its permissions.
info, err := f.fs.Lstat(path)
if err != nil {
return err
}
// Mask for the bits we want to preserve and add them in to the
// directories permissions.
return f.fs.Chmod(path, mode|(info.Mode()&retainBits))
}
if err = f.inWritableDir(mkdir, file.Name); err == nil {
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
} else {
f.newPullError(file.Name, errors.Wrap(err, "creating directory"))
}
return
// Weird error when stat()'ing the dir. Probably won't work to do
// anything else with it if we can't even stat() it.
case err != nil:
f.newPullError(file.Name, errors.Wrap(err, "checking file to be replaced"))
return
}
// The directory already exists, so we just correct the mode bits. (We
// don't handle modification times on directories, because that sucks...)
// It's OK to change mode bits on stuff within non-writable directories.
if !f.IgnorePerms && !file.NoPermissions {
if err := f.fs.Chmod(file.Name, mode|(fs.FileMode(info.Mode())&retainBits)); err != nil {
f.newPullError(file.Name, err)
return
}
}
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
}
// checkParent verifies that the thing we are handling lives inside a directory,
// and not a symlink or regular file. It also resurrects missing parent dirs.
func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) bool {
parent := filepath.Dir(file)
if err := osutil.TraversesSymlink(f.fs, parent); err != nil {
f.newPullError(file, errors.Wrap(err, "checking parent dirs"))
return false
}
// issues #114 and #4475: This works around a race condition
// between two devices, when one device removes a directory and the
// other creates a file in it. However that happens, we end up with
// a directory for "foo" with the delete bit, but a file "foo/bar"
// that we want to sync. We never create the directory, and hence
// fail to create the file and end up looping forever on it. This
// breaks that by creating the directory and scheduling a scan,
// where it will be found and the delete bit on it removed. The
// user can then clean up as they like...
// This can also occur if an entire tree structure was deleted, but only
// a leave has been scanned.
if _, err := f.fs.Lstat(parent); !fs.IsNotExist(err) {
l.Debugf("%v parent not missing %v", f, file)
return true
}
l.Debugf("%v resurrecting parent directory of %v", f, file)
if err := f.fs.MkdirAll(parent, 0755); err != nil {
f.newPullError(file, errors.Wrap(err, "resurrecting parent dir"))
return false
}
scanChan <- parent
return true
}
// handleSymlink creates or updates the given symlink
func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
var err error
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
"item": file.Name,
"type": "symlink",
"action": "update",
})
defer func() {
f.evLogger.Log(events.ItemFinished, map[string]interface{}{
"folder": f.folderID,
"item": file.Name,
"error": events.Error(err),
"type": "symlink",
"action": "update",
})
}()
if shouldDebug() {
curFile, _ := snap.Get(protocol.LocalDeviceID, file.Name)
l.Debugf("need symlink\n\t%v\n\t%v", file, curFile)
}
if len(file.SymlinkTarget) == 0 {
// Index entry from a Syncthing predating the support for including
// the link target in the index entry. We log this as an error.
f.newPullError(file.Name, errIncompatibleSymlink)
return
}
// There is already something under that name, we need to handle that.
if info, err := f.fs.Lstat(file.Name); err == nil {
// Check that it is what we have in the database.
curFile, hasCurFile := f.model.CurrentFolderFile(f.folderID, file.Name)
if err := f.scanIfItemChanged(file.Name, info, curFile, hasCurFile, scanChan); err != nil {
err = errors.Wrap(err, "handling symlink")
f.newPullError(file.Name, err)
return
}
// Remove it to replace with the symlink. This also handles the
// "change symlink type" path.
if !curFile.IsDirectory() && !curFile.IsSymlink() && f.inConflict(curFile.Version, file.Version) {
// The new file has been changed in conflict with the existing one. We
// should file it away as a conflict instead of just removing or
// archiving. Also merge with the version vector we had, to indicate
// we have resolved the conflict.
// Directories and symlinks aren't checked for conflicts.
file.Version = file.Version.Merge(curFile.Version)
err = f.inWritableDir(func(name string) error {
return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
}, curFile.Name)
} else {
err = f.deleteItemOnDisk(curFile, snap, scanChan)
}
if err != nil {
f.newPullError(file.Name, errors.Wrap(err, "symlink remove"))
return
}
}
// We declare a function that acts on only the path name, so
// we can pass it to InWritableDir.
createLink := func(path string) error {
if err := f.fs.CreateSymlink(file.SymlinkTarget, path); err != nil {
return err
}
return f.maybeCopyOwner(path)
}
if err = f.inWritableDir(createLink, file.Name); err == nil {
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleSymlink}
} else {
f.newPullError(file.Name, errors.Wrap(err, "symlink create"))
}
}
// deleteDir attempts to remove a directory that was deleted on a remote
func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
"item": file.Name,
"type": "dir",
"action": "delete",
2015-02-01 17:31:19 +00:00
})
2015-02-01 17:31:19 +00:00
defer func() {
if err != nil {
f.newPullError(file.Name, errors.Wrap(err, "delete dir"))
}
f.evLogger.Log(events.ItemFinished, map[string]interface{}{
"folder": f.folderID,
2015-02-01 17:31:19 +00:00
"item": file.Name,
"error": events.Error(err),
"type": "dir",
"action": "delete",
2015-02-01 17:31:19 +00:00
})
}()
cur, hasCur := snap.Get(protocol.LocalDeviceID, file.Name)
if err = f.checkToBeDeleted(file, cur, hasCur, dbUpdateDeleteDir, dbUpdateChan, scanChan); err != nil {
if fs.IsNotExist(err) {
err = nil
}
return
}
if err = f.deleteDirOnDisk(file.Name, snap, scanChan); err != nil {
return
}
dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteDir}
}
// deleteFile attempts to delete the given file
func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
cur, hasCur := snap.Get(protocol.LocalDeviceID, file.Name)
f.deleteFileWithCurrent(file, cur, hasCur, dbUpdateChan, scanChan)
}
func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, hasCur bool, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
l.Debugln(f, "Deleting file", file.Name)
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
"item": file.Name,
"type": "file",
"action": "delete",
2015-02-01 17:31:19 +00:00
})
2015-02-01 17:31:19 +00:00
defer func() {
if err != nil {
f.newPullError(file.Name, errors.Wrap(err, "delete file"))
}
f.evLogger.Log(events.ItemFinished, map[string]interface{}{
"folder": f.folderID,
2015-02-01 17:31:19 +00:00
"item": file.Name,
"error": events.Error(err),
"type": "file",
"action": "delete",
2015-02-01 17:31:19 +00:00
})
}()
if err = f.checkToBeDeleted(file, cur, hasCur, dbUpdateDeleteFile, dbUpdateChan, scanChan); err != nil {
if fs.IsNotExist(err) {
err = nil
}
return
}
// We are asked to delete a file, but what we have on disk and in db
// is a directory. Something is wrong here, should probably not happen.
if cur.IsDirectory() {
err = errUnexpectedDirOnFileDel
return
}
if f.inConflict(cur.Version, file.Version) {
// There is a conflict here, which shouldn't happen as deletions
// always lose. Merge the version vector of the file we have
// locally and commit it to db to resolve the conflict.
cur.Version = cur.Version.Merge(file.Version)
dbUpdateChan <- dbUpdateJob{cur, dbUpdateHandleFile}
return
}
if f.versioner != nil && !cur.IsSymlink() {
err = f.inWritableDir(f.versioner.Archive, file.Name)
} else {
err = f.inWritableDir(f.fs.Remove, file.Name)
}
2014-07-13 19:07:24 +00:00
if err == nil || fs.IsNotExist(err) {
// It was removed or it doesn't exist to start with
dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
return
}
if _, serr := f.fs.Lstat(file.Name); serr != nil && !fs.IsPermission(serr) {
// We get an error just looking at the file, and it's not a permission
// problem. Lets assume the error is in fact some variant of "file
// does not exist" (possibly expressed as some parent being a file and
// not a directory etc) and that the delete is handled.
err = nil
dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
}
}
2014-12-19 23:12:12 +00:00
// renameFile attempts to rename an existing file to a destination
// and set the right attributes on it.
func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
"item": source.Name,
"type": "file",
"action": "delete",
2015-02-01 17:31:19 +00:00
})
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
"item": target.Name,
"type": "file",
"action": "update",
2015-02-01 17:31:19 +00:00
})
2015-02-01 17:31:19 +00:00
defer func() {
f.evLogger.Log(events.ItemFinished, map[string]interface{}{
"folder": f.folderID,
2015-02-01 17:31:19 +00:00
"item": source.Name,
"error": events.Error(err),
"type": "file",
"action": "delete",
2015-02-01 17:31:19 +00:00
})
f.evLogger.Log(events.ItemFinished, map[string]interface{}{
"folder": f.folderID,
2015-02-01 17:31:19 +00:00
"item": target.Name,
"error": events.Error(err),
"type": "file",
"action": "update",
2015-02-01 17:31:19 +00:00
})
}()
l.Debugln(f, "taking rename shortcut", source.Name, "->", target.Name)
2014-12-19 23:12:12 +00:00
// Check that source is compatible with what we have in the DB
if err = f.checkToBeDeleted(source, cur, true, dbUpdateDeleteFile, dbUpdateChan, scanChan); err != nil {
return err
}
// Check that the target corresponds to what we have in the DB
curTarget, ok := snap.Get(protocol.LocalDeviceID, target.Name)
switch stat, serr := f.fs.Lstat(target.Name); {
case serr != nil && fs.IsNotExist(serr):
if !ok || curTarget.IsDeleted() {
break
}
scanChan <- target.Name
err = errModified
case serr != nil:
// We can't check whether the file changed as compared to the db,
// do not delete.
err = serr
case !ok:
// Target appeared from nowhere
scanChan <- target.Name
err = errModified
default:
var fi protocol.FileInfo
if fi, err = scanner.CreateFileInfo(stat, target.Name, f.fs); err == nil {
if !fi.IsEquivalentOptional(curTarget, f.ModTimeWindow(), f.IgnorePerms, true, protocol.LocalAllFlags) {
// Target changed
scanChan <- target.Name
err = errModified
}
}
}
if err != nil {
return err
}
tempName := fs.TempName(target.Name)
if f.versioner != nil {
err = f.CheckAvailableSpace(source.Size)
2014-12-19 23:12:12 +00:00
if err == nil {
err = osutil.Copy(f.CopyRangeMethod, f.fs, f.fs, source.Name, tempName)
if err == nil {
err = f.inWritableDir(f.versioner.Archive, source.Name)
}
2014-12-19 23:12:12 +00:00
}
} else {
err = osutil.RenameOrCopy(f.CopyRangeMethod, f.fs, f.fs, source.Name, tempName)
2014-12-19 23:12:12 +00:00
}
if err != nil {
return err
}
2014-12-19 23:12:12 +00:00
blockStatsMut.Lock()
minBlocksPerBlock := target.BlockSize() / protocol.MinBlockSize
blockStats["total"] += len(target.Blocks) * minBlocksPerBlock
blockStats["renamed"] += len(target.Blocks) * minBlocksPerBlock
blockStatsMut.Unlock()
2014-12-19 23:12:12 +00:00
// The file was renamed, so we have handled both the necessary delete
// of the source and the creation of the target temp file. Fix-up the metadata,
// update the local index of the target file and rename from temp to real name.
if err = f.performFinish(target, curTarget, true, tempName, snap, dbUpdateChan, scanChan); err != nil {
return err
}
2014-12-19 23:12:12 +00:00
dbUpdateChan <- dbUpdateJob{source, dbUpdateDeleteFile}
return nil
2014-12-19 23:12:12 +00:00
}
// This is the flow of data and events here, I think...
//
// +-----------------------+
// | | - - - - > ItemStarted
// | handleFile | - - - - > ItemFinished (on shortcuts)
// | |
// +-----------------------+
// |
// | copyChan (copyBlocksState; unless shortcut taken)
// |
// | +-----------------------+
// | | +-----------------------+
// +--->| | |
// | | copierRoutine |
// +-| |
// +-----------------------+
// |
// | pullChan (sharedPullerState)
// |
// | +-----------------------+
// | | +-----------------------+
// +-->| | |
// | | pullerRoutine |
// +-| |
// +-----------------------+
// |
// | finisherChan (sharedPullerState)
// |
// | +-----------------------+
// | | |
// +-->| finisherRoutine | - - - - > ItemFinished
// | |
// +-----------------------+
// handleFile queues the copies and pulls as necessary for a single new or
// changed file.
func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot, copyChan chan<- copyBlocksState) {
curFile, hasCurFile := snap.Get(protocol.LocalDeviceID, file.Name)
have, _ := blockDiff(curFile.Blocks, file.Blocks)
tempName := fs.TempName(file.Name)
populateOffsets(file.Blocks)
blocks := make([]protocol.BlockInfo, 0, len(file.Blocks))
reused := make([]int32, 0, len(file.Blocks))
2014-10-03 22:15:54 +00:00
// Check for an old temporary file which might have some blocks we could
// reuse.
tempBlocks, err := scanner.HashFile(f.ctx, f.fs, tempName, file.BlockSize(), nil, false)
2014-10-03 22:15:54 +00:00
if err == nil {
// Check for any reusable blocks in the temp file
tempCopyBlocks, _ := blockDiff(tempBlocks, file.Blocks)
2014-10-03 22:15:54 +00:00
// block.String() returns a string unique to the block
2015-01-14 23:00:00 +00:00
existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
2014-10-03 22:15:54 +00:00
for _, block := range tempCopyBlocks {
2015-01-14 23:00:00 +00:00
existingBlocks[block.String()] = struct{}{}
2014-10-03 22:15:54 +00:00
}
// Since the blocks are already there, we don't need to get them.
for i, block := range file.Blocks {
2014-10-03 22:15:54 +00:00
_, ok := existingBlocks[block.String()]
if !ok {
blocks = append(blocks, block)
} else {
reused = append(reused, int32(i))
2014-10-03 22:15:54 +00:00
}
}
// The sharedpullerstate will know which flags to use when opening the
// temp file depending if we are reusing any blocks or not.
if len(reused) == 0 {
2014-10-03 22:15:54 +00:00
// Otherwise, discard the file ourselves in order for the
2015-04-28 15:34:55 +00:00
// sharedpuller not to panic when it fails to exclusively create a
2014-10-03 22:15:54 +00:00
// file which already exists
f.inWritableDir(f.fs.Remove, tempName)
2014-10-03 22:15:54 +00:00
}
} else {
// Copy the blocks, as we don't want to shuffle them on the FileInfo
blocks = append(blocks, file.Blocks...)
}
// Reorder blocks
blocks = f.blockPullReorderer.Reorder(blocks)
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
"item": file.Name,
"type": "file",
"action": "update",
})
s := newSharedPullerState(file, f.fs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles, !f.DisableFsync)
l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), len(reused))
cs := copyBlocksState{
sharedPullerState: s,
blocks: blocks,
have: len(have),
}
copyChan <- cs
}
// blockDiff returns lists of common and missing (to transform src into tgt)
// blocks. Both block lists must have been created with the same block size.
func blockDiff(src, tgt []protocol.BlockInfo) ([]protocol.BlockInfo, []protocol.BlockInfo) {
if len(tgt) == 0 {
return nil, nil
}
if len(src) == 0 {
// Copy the entire file
return nil, tgt
}
have := make([]protocol.BlockInfo, 0, len(src))
need := make([]protocol.BlockInfo, 0, len(tgt))
for i := range tgt {
if i >= len(src) {
return have, append(need, tgt[i:]...)
}
if !bytes.Equal(tgt[i].Hash, src[i].Hash) {
// Copy differing block
need = append(need, tgt[i])
} else {
have = append(have, tgt[i])
}
}
return have, need
}
// populateOffsets sets the Offset field on each block
func populateOffsets(blocks []protocol.BlockInfo) {
var offset int64
for i := range blocks {
blocks[i].Offset = offset
offset += int64(blocks[i].Size)
}
}
// shortcutFile sets file mode and modification time, when that's the only
// thing that has changed.
2018-09-16 09:29:06 +00:00
func (f *sendReceiveFolder) shortcutFile(file, curFile protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
l.Debugln(f, "taking shortcut on", file.Name)
f.evLogger.Log(events.ItemStarted, map[string]string{
2018-09-16 09:29:06 +00:00
"folder": f.folderID,
"item": file.Name,
"type": "file",
"action": "metadata",
})
var err error
defer f.evLogger.Log(events.ItemFinished, map[string]interface{}{
2018-09-16 09:29:06 +00:00
"folder": f.folderID,
"item": file.Name,
"error": events.Error(err),
"type": "file",
"action": "metadata",
})
f.queue.Done(file.Name)
if !f.IgnorePerms && !file.NoPermissions {
2018-09-16 09:29:06 +00:00
if err = f.fs.Chmod(file.Name, fs.FileMode(file.Permissions&0777)); err != nil {
f.newPullError(file.Name, err)
2018-09-16 09:29:06 +00:00
return
2014-10-09 22:34:32 +00:00
}
2014-04-27 10:14:53 +00:00
}
2019-02-02 11:16:27 +00:00
f.fs.Chtimes(file.Name, file.ModTime(), file.ModTime()) // never fails
// This may have been a conflict. We should merge the version vectors so
// that our clock doesn't move backwards.
2018-09-16 09:29:06 +00:00
file.Version = file.Version.Merge(curFile.Version)
dbUpdateChan <- dbUpdateJob{file, dbUpdateShortcutFile}
}
// copierRoutine reads copierStates until the in channel closes and performs
// the relevant copies when possible, or passes it to the puller routine.
func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
buf := protocol.BufferPool.Get(protocol.MinBlockSize)
defer func() {
protocol.BufferPool.Put(buf)
}()
folderFilesystems := make(map[string]fs.Filesystem)
// Hope that it's usually in the same folder, so start with that one.
folders := []string{f.folderID}
for folder, cfg := range f.model.cfg.Folders() {
folderFilesystems[folder] = cfg.Filesystem()
if folder != f.folderID {
folders = append(folders, folder)
}
}
for state := range in {
if err := f.CheckAvailableSpace(state.file.Size); err != nil {
state.fail(err)
// Nothing more to do for this failed file, since it would use to much disk space
out <- state.sharedPullerState
continue
}
dstFd, err := state.tempFile()
if err != nil {
// Nothing more to do for this failed file, since we couldn't create a temporary for it.
out <- state.sharedPullerState
continue
}
f.model.progressEmitter.Register(state.sharedPullerState)
var file fs.File
var weakHashFinder *weakhash.Finder
blocksPercentChanged := 0
if tot := len(state.file.Blocks); tot > 0 {
blocksPercentChanged = (tot - state.have) * 100 / tot
}
if blocksPercentChanged >= f.WeakHashThresholdPct {
hashesToFind := make([]uint32, 0, len(state.blocks))
for _, block := range state.blocks {
if block.WeakHash != 0 {
hashesToFind = append(hashesToFind, block.WeakHash)
}
}
if len(hashesToFind) > 0 {
file, err = f.fs.Open(state.file.Name)
if err == nil {
weakHashFinder, err = weakhash.NewFinder(f.ctx, file, state.file.BlockSize(), hashesToFind)
if err != nil {
l.Debugln("weak hasher", err)
}
}
} else {
l.Debugf("not weak hashing %s. file did not contain any weak hashes", state.file.Name)
}
} else {
l.Debugf("not weak hashing %s. not enough changed %.02f < %d", state.file.Name, blocksPercentChanged, f.WeakHashThresholdPct)
}
blocks:
for _, block := range state.blocks {
select {
case <-f.ctx.Done():
state.fail(errors.Wrap(f.ctx.Err(), "folder stopped"))
break blocks
default:
}
if !f.DisableSparseFiles && state.reused == 0 && block.IsEmpty() {
2015-11-21 15:30:53 +00:00
// The block is a block of all zeroes, and we are not reusing
// a temp file, so there is no need to do anything with it.
// If we were reusing a temp file and had this block to copy,
// it would be because the block in the temp file was *not* a
// block of all zeroes, so then we should not skip it.
// Pretend we copied it.
state.copiedFromOrigin()
state.copyDone(block)
2015-11-21 15:30:53 +00:00
continue
}
buf = protocol.BufferPool.Upgrade(buf, int(block.Size))
found, err := weakHashFinder.Iterate(block.WeakHash, buf, func(offset int64) bool {
if verifyBuffer(buf, block) != nil {
return true
2014-10-24 22:20:08 +00:00
}
err = f.limitedWriteAt(dstFd, buf, block.Offset)
if err != nil {
state.fail(errors.Wrap(err, "dst write"))
}
if offset == block.Offset {
state.copiedFromOrigin()
} else {
state.copiedFromOriginShifted()
}
return false
})
if err != nil {
l.Debugln("weak hasher iter", err)
}
if !found {
found = f.model.finder.Iterate(folders, block.Hash, func(folder, path string, index int32) bool {
ffs := folderFilesystems[folder]
fd, err := ffs.Open(path)
if err != nil {
return false
}
srcOffset := int64(state.file.BlockSize()) * int64(index)
_, err = fd.ReadAt(buf, srcOffset)
fd.Close()
if err != nil {
return false
}
if err := verifyBuffer(buf, block); err != nil {
l.Debugln("Finder failed to verify buffer", err)
return false
}
if f.CopyRangeMethod != fs.CopyRangeMethodStandard {
err = f.withLimiter(func() error {
dstFd.mut.Lock()
defer dstFd.mut.Unlock()
return fs.CopyRange(f.CopyRangeMethod, fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
})
} else {
err = f.limitedWriteAt(dstFd, buf, block.Offset)
}
if err != nil {
state.fail(errors.Wrap(err, "dst write"))
}
if path == state.file.Name {
state.copiedFromOrigin()
}
return true
})
}
if state.failed() != nil {
break
}
2014-10-24 22:20:08 +00:00
if !found {
state.pullStarted()
ps := pullBlockState{
sharedPullerState: state.sharedPullerState,
block: block,
}
pullChan <- ps
} else {
state.copyDone(block)
2014-05-25 18:49:08 +00:00
}
}
if file != nil {
// os.File used to return invalid argument if nil.
// fs.File panics as it's an interface.
file.Close()
}
out <- state.sharedPullerState
}
}
func verifyBuffer(buf []byte, block protocol.BlockInfo) error {
if len(buf) != int(block.Size) {
return fmt.Errorf("length mismatch %d != %d", len(buf), block.Size)
}
hash := sha256.Sum256(buf)
if !bytes.Equal(hash[:], block.Hash) {
return fmt.Errorf("hash mismatch %x != %x", hash, block.Hash)
}
return nil
}
func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
requestLimiter := newByteSemaphore(f.PullerMaxPendingKiB * 1024)
wg := sync.NewWaitGroup()
for state := range in {
if state.failed() != nil {
out <- state.sharedPullerState
continue
}
2014-07-24 07:38:16 +00:00
f.setState(FolderSyncing) // Does nothing if already FolderSyncing
// The requestLimiter limits how many pending block requests we have
// ongoing at any given time, based on the size of the blocks
// themselves.
2014-08-05 07:46:11 +00:00
state := state
bytes := int(state.block.Size)
2015-11-21 15:30:53 +00:00
2020-02-24 20:57:15 +00:00
if err := requestLimiter.takeWithContext(f.ctx, bytes); err != nil {
state.fail(err)
out <- state.sharedPullerState
continue
2020-02-24 20:57:15 +00:00
}
wg.Add(1)
2014-08-05 07:46:11 +00:00
go func() {
defer wg.Done()
defer requestLimiter.give(bytes)
2014-07-24 07:38:16 +00:00
f.pullBlock(state, out)
}()
}
wg.Wait()
}
func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPullerState) {
// Get an fd to the temporary file. Technically we don't need it until
// after fetching the block, but if we run into an error here there is
// no point in issuing the request to the network.
fd, err := state.tempFile()
if err != nil {
out <- state.sharedPullerState
return
}
if !f.DisableSparseFiles && state.reused == 0 && state.block.IsEmpty() {
// There is no need to request a block of all zeroes. Pretend we
// requested it and handled it correctly.
state.pullDone(state.block)
out <- state.sharedPullerState
return
}
var lastError error
candidates := f.model.Availability(f.folderID, state.file, state.block)
for {
select {
case <-f.ctx.Done():
state.fail(errors.Wrap(f.ctx.Err(), "folder stopped"))
break
default:
}
// Select the least busy device to pull the block from. If we found no
// feasible device at all, fail the block (and in the long run, the
// file).
selected, found := activity.leastBusy(candidates)
if !found {
if lastError != nil {
state.fail(errors.Wrap(lastError, "pull"))
} else {
state.fail(errors.Wrap(errNoDevice, "pull"))
}
break
}
candidates = removeAvailability(candidates, selected)
// Fetch the block, while marking the selected device as in use so that
// leastBusy can select another device when someone else asks.
activity.using(selected)
var buf []byte
buf, lastError = f.model.requestGlobal(f.ctx, selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, state.block.WeakHash, selected.FromTemporary)
activity.done(selected)
if lastError != nil {
l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
continue
}
// Verify that the received block matches the desired hash, if not
// try pulling it from another device.
lastError = verifyBuffer(buf, state.block)
if lastError != nil {
l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
continue
}
// Save the block data we got from the cluster
err = f.limitedWriteAt(fd, buf, state.block.Offset)
if err != nil {
state.fail(errors.Wrap(err, "save"))
} else {
state.pullDone(state.block)
}
break
2014-07-24 07:38:16 +00:00
}
out <- state.sharedPullerState
}
2014-04-27 10:14:53 +00:00
func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
// Set the correct permission bits on the new file
if !f.IgnorePerms && !file.NoPermissions {
if err := f.fs.Chmod(tempName, fs.FileMode(file.Permissions&0777)); err != nil {
return err
}
}
// Copy the parent owner and group, if we are supposed to do that.
if err := f.maybeCopyOwner(tempName); err != nil {
return err
}
if stat, err := f.fs.Lstat(file.Name); err == nil {
// There is an old file or directory already in place. We need to
// handle that.
if err := f.scanIfItemChanged(file.Name, stat, curFile, hasCurFile, scanChan); err != nil {
return err
}
if !curFile.IsDirectory() && !curFile.IsSymlink() && f.inConflict(curFile.Version, file.Version) {
// The new file has been changed in conflict with the existing one. We
// should file it away as a conflict instead of just removing or
// archiving. Also merge with the version vector we had, to indicate
// we have resolved the conflict.
// Directories and symlinks aren't checked for conflicts.
file.Version = file.Version.Merge(curFile.Version)
err = f.inWritableDir(func(name string) error {
return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
}, curFile.Name)
} else {
err = f.deleteItemOnDisk(curFile, snap, scanChan)
}
if err != nil {
return err
}
} else if !fs.IsNotExist(err) {
return err
}
// Replace the original content with the new one. If it didn't work,
// leave the temp file in place for reuse.
if err := osutil.RenameOrCopy(f.CopyRangeMethod, f.fs, f.fs, tempName, file.Name); err != nil {
return err
}
2014-04-27 10:14:53 +00:00
// Set the correct timestamp on the new file
2019-02-02 11:16:27 +00:00
f.fs.Chtimes(file.Name, file.ModTime(), file.ModTime()) // never fails
// Record the updated file in the index
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleFile}
return nil
}
2014-11-09 04:26:52 +00:00
func (f *sendReceiveFolder) finisherRoutine(snap *db.Snapshot, in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
for state := range in {
if closed, err := state.finalClose(); closed {
l.Debugln(f, "closing", state.file.Name)
f.queue.Done(state.file.Name)
if err == nil {
err = f.performFinish(state.file, state.curFile, state.hasCurFile, state.tempName, snap, dbUpdateChan, scanChan)
}
if err != nil {
f.newPullError(state.file.Name, err)
} else {
minBlocksPerBlock := state.file.BlockSize() / protocol.MinBlockSize
blockStatsMut.Lock()
blockStats["total"] += (state.reused + state.copyTotal + state.pullTotal) * minBlocksPerBlock
blockStats["reused"] += state.reused * minBlocksPerBlock
blockStats["pulled"] += state.pullTotal * minBlocksPerBlock
// copyOriginShifted is counted towards copyOrigin due to progress bar reasons
// for reporting reasons we want to separate these.
blockStats["copyOrigin"] += (state.copyOrigin - state.copyOriginShifted) * minBlocksPerBlock
blockStats["copyOriginShifted"] += state.copyOriginShifted * minBlocksPerBlock
blockStats["copyElsewhere"] += (state.copyTotal - state.copyOrigin) * minBlocksPerBlock
blockStatsMut.Unlock()
}
f.model.progressEmitter.Deregister(state)
f.evLogger.Log(events.ItemFinished, map[string]interface{}{
"folder": f.folderID,
"item": state.file.Name,
"error": events.Error(err),
"type": "file",
"action": "update",
})
2014-04-27 10:14:53 +00:00
}
}
}
2014-04-27 10:14:53 +00:00
// Moves the given filename to the front of the job queue
func (f *sendReceiveFolder) BringToFront(filename string) {
f.queue.BringToFront(filename)
}
func (f *sendReceiveFolder) Jobs(page, perpage int) ([]string, []string, int) {
return f.queue.Jobs(page, perpage)
}
// dbUpdaterRoutine aggregates db updates and commits them in batches no
// larger than 1000 items, and no more delayed than 2 seconds.
func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) {
const maxBatchTime = 2 * time.Second
batch := newFileInfoBatch(nil)
tick := time.NewTicker(maxBatchTime)
defer tick.Stop()
changedDirs := make(map[string]struct{})
found := false
var lastFile protocol.FileInfo
batch.flushFn = func(files []protocol.FileInfo) error {
// sync directories
for dir := range changedDirs {
delete(changedDirs, dir)
if !f.FolderConfiguration.DisableFsync {
fd, err := f.fs.Open(dir)
if err != nil {
l.Debugf("fsync %q failed: %v", dir, err)
continue
}
if err := fd.Sync(); err != nil {
l.Debugf("fsync %q failed: %v", dir, err)
}
fd.Close()
}
}
// All updates to file/folder objects that originated remotely
// (across the network) use this call to updateLocals
f.updateLocalsFromPulling(files)
if found {
f.ReceivedFile(lastFile.Name, lastFile.IsDeleted())
found = false
}
return nil
}
loop:
for {
select {
case job, ok := <-dbUpdateChan:
if !ok {
break loop
}
switch job.jobType {
case dbUpdateHandleFile, dbUpdateShortcutFile:
changedDirs[filepath.Dir(job.file.Name)] = struct{}{}
case dbUpdateHandleDir:
changedDirs[job.file.Name] = struct{}{}
case dbUpdateHandleSymlink, dbUpdateInvalidate:
// fsyncing symlinks is only supported by MacOS
// and invalidated files are db only changes -> no sync
}
// For some reason we seem to care about file deletions and
// content modification, but not about metadata and dirs/symlinks.
if !job.file.IsInvalid() && job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) != 0 {
found = true
lastFile = job.file
}
job.file.Sequence = 0
batch.append(job.file)
batch.flushIfFull()
case <-tick.C:
batch.flush()
}
}
batch.flush()
}
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
// scheduled once when scanChan is closed (scanning can not happen during pulling).
func (f *sendReceiveFolder) pullScannerRoutine(scanChan <-chan string) {
toBeScanned := make(map[string]struct{})
for path := range scanChan {
toBeScanned[path] = struct{}{}
}
if len(toBeScanned) != 0 {
scanList := make([]string, 0, len(toBeScanned))
for path := range toBeScanned {
l.Debugln(f, "scheduling scan after pulling for", path)
scanList = append(scanList, path)
}
2019-02-02 11:16:27 +00:00
f.Scan(scanList)
}
}
func (f *sendReceiveFolder) inConflict(current, replacement protocol.Vector) bool {
if current.Concurrent(replacement) {
// Obvious case
return true
}
if replacement.Counter(f.shortID) > current.Counter(f.shortID) {
// The replacement file contains a higher version for ourselves than
// what we have. This isn't supposed to be possible, since it's only
// we who can increment that counter. We take it as a sign that
// something is wrong (our index may have been corrupted or removed)
// and flag it as a conflict.
return true
}
return false
}
func removeAvailability(availabilities []Availability, availability Availability) []Availability {
for i := range availabilities {
if availabilities[i] == availability {
availabilities[i] = availabilities[len(availabilities)-1]
return availabilities[:len(availabilities)-1]
}
}
return availabilities
}
func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan chan<- string) error {
if isConflict(name) {
l.Infoln("Conflict for", name, "which is already a conflict copy; not copying again.")
if err := f.fs.Remove(name); err != nil && !fs.IsNotExist(err) {
return errors.Wrap(err, contextRemovingOldItem)
}
return nil
}
if f.MaxConflicts == 0 {
if err := f.fs.Remove(name); err != nil && !fs.IsNotExist(err) {
return errors.Wrap(err, contextRemovingOldItem)
}
return nil
}
newName := conflictName(name, lastModBy)
err := f.fs.Rename(name, newName)
if fs.IsNotExist(err) {
// We were supposed to move a file away but it does not exist. Either
// the user has already moved it away, or the conflict was between a
// remote modification and a local delete. In either way it does not
// matter, go ahead as if the move succeeded.
err = nil
}
if f.MaxConflicts > -1 {
matches := existingConflicts(name, f.fs)
if len(matches) > f.MaxConflicts {
sort.Sort(sort.Reverse(sort.StringSlice(matches)))
for _, match := range matches[f.MaxConflicts:] {
if gerr := f.fs.Remove(match); gerr != nil {
l.Debugln(f, "removing extra conflict", gerr)
}
}
}
}
if err == nil {
scanChan <- newName
}
return err
}
func (f *sendReceiveFolder) newPullError(path string, err error) {
if errors.Cause(err) == f.ctx.Err() {
// Error because the folder stopped - no point logging/tracking
return
}
f.pullErrorsMut.Lock()
defer f.pullErrorsMut.Unlock()
// We might get more than one error report for a file (i.e. error on
// Write() followed by Close()); we keep the first error as that is
// probably closer to the root cause.
if _, ok := f.pullErrors[path]; ok {
return
}
// Establish context to differentiate from errors while scanning.
// Use "syncing" as opposed to "pulling" as the latter might be used
// for errors occurring specificly in the puller routine.
errStr := fmt.Sprintln("syncing:", err)
f.pullErrors[path] = errStr
if oldErr, ok := f.oldPullErrors[path]; ok && oldErr == errStr {
l.Debugf("Repeat error on puller (folder %s, item %q): %v", f.Description(), path, err)
delete(f.oldPullErrors, path) // Potential repeats are now caught by f.pullErrors itself
return
}
l.Infof("Puller (folder %s, item %q): %v", f.Description(), path, err)
}
func (f *sendReceiveFolder) Errors() []FileError {
scanErrors := f.folder.Errors()
f.pullErrorsMut.Lock()
errors := make([]FileError, 0, len(f.pullErrors)+len(f.scanErrors))
for path, err := range f.pullErrors {
errors = append(errors, FileError{path, err})
}
f.pullErrorsMut.Unlock()
errors = append(errors, scanErrors...)
sort.Sort(fileErrorList(errors))
return errors
}
// deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, snap *db.Snapshot, scanChan chan<- string) (err error) {
defer func() {
err = errors.Wrap(err, contextRemovingOldItem)
}()
switch {
case item.IsDirectory():
// Directories aren't archived and need special treatment due
// to potential children.
return f.deleteDirOnDisk(item.Name, snap, scanChan)
case !item.IsSymlink() && f.versioner != nil:
// If we should use versioning, let the versioner archive the
// file before we replace it. Archiving a non-existent file is not
// an error.
// Symlinks aren't archived.
return f.inWritableDir(f.versioner.Archive, item.Name)
}
return f.inWritableDir(f.fs.Remove, item.Name)
}
// deleteDirOnDisk attempts to delete a directory. It checks for files/dirs inside
// the directory and removes them if possible or returns an error if it fails
func (f *sendReceiveFolder) deleteDirOnDisk(dir string, snap *db.Snapshot, scanChan chan<- string) error {
if err := osutil.TraversesSymlink(f.fs, filepath.Dir(dir)); err != nil {
return err
}
files, _ := f.fs.DirNames(dir)
toBeDeleted := make([]string, 0, len(files))
var hasIgnored, hasKnown, hasToBeScanned, hasReceiveOnlyChanged bool
for _, dirFile := range files {
fullDirFile := filepath.Join(dir, dirFile)
switch {
case fs.IsTemporary(dirFile) || f.ignores.Match(fullDirFile).IsDeletable():
toBeDeleted = append(toBeDeleted, fullDirFile)
continue
case f.ignores != nil && f.ignores.Match(fullDirFile).IsIgnored():
hasIgnored = true
continue
}
cf, ok := snap.Get(protocol.LocalDeviceID, fullDirFile)
switch {
case !ok || cf.IsDeleted():
// Something appeared in the dir that we either are not
// aware of at all or that we think should be deleted
// -> schedule scan.
scanChan <- fullDirFile
hasToBeScanned = true
continue
case ok && f.Type == config.FolderTypeReceiveOnly && cf.IsReceiveOnlyChanged():
hasReceiveOnlyChanged = true
continue
}
info, err := f.fs.Lstat(fullDirFile)
var diskFile protocol.FileInfo
if err == nil {
diskFile, err = scanner.CreateFileInfo(info, fullDirFile, f.fs)
}
if err != nil {
// Lets just assume the file has changed.
scanChan <- fullDirFile
hasToBeScanned = true
continue
}
if !cf.IsEquivalentOptional(diskFile, f.ModTimeWindow(), f.IgnorePerms, true, protocol.LocalAllFlags) {
// File on disk changed compared to what we have in db
// -> schedule scan.
scanChan <- fullDirFile
hasToBeScanned = true
continue
}
// Dir contains file that is valid according to db and
// not ignored -> something weird is going on
hasKnown = true
}
if hasToBeScanned {
return errDirHasToBeScanned
}
if hasIgnored {
return errDirHasIgnored
}
if hasReceiveOnlyChanged {
return errDirHasReceiveOnlyChanged
}
if hasKnown {
return errDirNotEmpty
}
for _, del := range toBeDeleted {
2019-02-02 11:16:27 +00:00
f.fs.RemoveAll(del)
}
err := f.inWritableDir(f.fs.Remove, dir)
if err == nil || fs.IsNotExist(err) {
// It was removed or it doesn't exist to start with
return nil
}
if _, serr := f.fs.Lstat(dir); serr != nil && !fs.IsPermission(serr) {
// We get an error just looking at the directory, and it's not a
// permission problem. Lets assume the error is in fact some variant
// of "file does not exist" (possibly expressed as some parent being a
// file and not a directory etc) and that the delete is handled.
return nil
}
return err
}
// scanIfItemChanged schedules the given file for scanning and returns errModified
// if it differs from the information in the database. Returns nil if the file has
// not changed.
func (f *sendReceiveFolder) scanIfItemChanged(name string, stat fs.FileInfo, item protocol.FileInfo, hasItem bool, scanChan chan<- string) (err error) {
defer func() {
if err == errModified {
scanChan <- name
}
}()
if !hasItem || item.Deleted {
// The item appeared from nowhere
return errModified
}
// Check that the item on disk is what we expect it to be according
// to the database. If there's a mismatch here, there might be local
// changes that we don't know about yet and we should scan before
// touching the item.
statItem, err := scanner.CreateFileInfo(stat, item.Name, f.fs)
if err != nil {
return errors.Wrap(err, "comparing item on disk to db")
}
if !statItem.IsEquivalentOptional(item, f.ModTimeWindow(), f.IgnorePerms, true, protocol.LocalAllFlags) {
return errModified
}
return nil
}
// checkToBeDeleted makes sure the file on disk is compatible with what there is
// in the DB before the caller proceeds with actually deleting it.
// I.e. non-nil error status means "Do not delete!".
func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur bool, updateType dbUpdateType, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
if err := osutil.TraversesSymlink(f.fs, filepath.Dir(file.Name)); err != nil {
l.Debugln(f, "not deleting item behind symlink on disk, but update db", file.Name)
dbUpdateChan <- dbUpdateJob{file, updateType}
return fs.ErrNotExist
}
stat, err := f.fs.Lstat(file.Name)
if !fs.IsNotExist(err) && err != nil {
return err
}
if fs.IsNotExist(err) {
if hasCur && !cur.Deleted {
scanChan <- file.Name
return errModified
}
l.Debugln(f, "not deleting item we don't have, but update db", file.Name)
dbUpdateChan <- dbUpdateJob{file, updateType}
return fs.ErrNotExist
}
return f.scanIfItemChanged(file.Name, stat, cur, hasCur, scanChan)
}
func (f *sendReceiveFolder) maybeCopyOwner(path string) error {
if !f.CopyOwnershipFromParent {
// Not supposed to do anything.
return nil
}
if runtime.GOOS == "windows" {
// Can't do anything.
return nil
}
info, err := f.fs.Lstat(filepath.Dir(path))
if err != nil {
return errors.Wrap(err, "copy owner from parent")
}
if err := f.fs.Lchown(path, info.Owner(), info.Group()); err != nil {
return errors.Wrap(err, "copy owner from parent")
}
return nil
}
func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) error {
return inWritableDir(fn, f.fs, path, f.IgnorePerms)
}
func (f *sendReceiveFolder) limitedWriteAt(fd io.WriterAt, data []byte, offset int64) error {
return f.withLimiter(func() error {
_, err := fd.WriteAt(data, offset)
return err
})
}
func (f *sendReceiveFolder) withLimiter(fn func() error) error {
if err := f.writeLimiter.takeWithContext(f.ctx, 1); err != nil {
return err
}
defer f.writeLimiter.give(1)
return fn()
}
// A []FileError is sent as part of an event and will be JSON serialized.
type FileError struct {
Path string `json:"path"`
Err string `json:"error"`
}
type fileErrorList []FileError
func (l fileErrorList) Len() int {
return len(l)
}
func (l fileErrorList) Less(a, b int) bool {
return l[a].Path < l[b].Path
}
func (l fileErrorList) Swap(a, b int) {
l[a], l[b] = l[b], l[a]
}
func conflictName(name, lastModBy string) string {
ext := filepath.Ext(name)
return name[:len(name)-len(ext)] + time.Now().Format(".sync-conflict-20060102-150405-") + lastModBy + ext
}
func isConflict(name string) bool {
return strings.Contains(filepath.Base(name), ".sync-conflict-")
}
func existingConflicts(name string, fs fs.Filesystem) []string {
ext := filepath.Ext(name)
matches, err := fs.Glob(name[:len(name)-len(ext)] + ".sync-conflict-????????-??????*" + ext)
if err != nil {
l.Debugln("globbing for conflicts", err)
}
return matches
}