2015-09-27 09:50:54 +01:00
// Copyright (C) 2014 The Syncthing Authors.
2014-09-29 21:43:32 +02:00
//
2015-03-07 21:36:35 +01:00
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
2017-02-09 07:52:18 +01:00
// You can obtain one at https://mozilla.org/MPL/2.0/.
2014-06-01 22:50:14 +02:00
2014-05-15 00:26:55 -03:00
package model
2014-03-28 14:36:57 +01:00
import (
2018-01-14 14:30:11 +00:00
"bytes"
2014-03-28 14:36:57 +01:00
"errors"
2014-08-25 17:45:13 +02:00
"fmt"
2015-01-02 16:15:53 +01:00
"math/rand"
2014-03-28 14:36:57 +01:00
"path/filepath"
2016-08-05 07:13:52 +00:00
"runtime"
2015-06-26 13:31:30 +02:00
"sort"
2016-01-03 21:15:02 +01:00
"strings"
2018-02-25 10:14:02 +01:00
stdsync "sync"
2014-03-28 14:36:57 +01:00
"time"
2014-06-20 00:27:54 +02:00
2015-08-06 11:29:25 +02:00
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
2016-08-05 17:45:45 +00:00
"github.com/syncthing/syncthing/lib/fs"
2015-08-06 11:29:25 +02:00
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/osutil"
2015-09-22 19:38:46 +02:00
"github.com/syncthing/syncthing/lib/protocol"
2015-08-06 11:29:25 +02:00
"github.com/syncthing/syncthing/lib/scanner"
2018-01-14 14:30:11 +00:00
"github.com/syncthing/syncthing/lib/sha256"
2015-08-06 11:29:25 +02:00
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/versioner"
2016-12-14 23:30:29 +00:00
"github.com/syncthing/syncthing/lib/weakhash"
2014-03-28 14:36:57 +01:00
)
2017-11-09 21:16:29 +00:00
var (
blockStats = make ( map [ string ] int )
blockStatsMut = sync . NewMutex ( )
)
2016-05-04 10:47:33 +00:00
func init ( ) {
2016-12-16 22:23:35 +00:00
folderFactories [ config . FolderTypeSendReceive ] = newSendReceiveFolder
2016-05-04 10:47:33 +00:00
}
2014-09-27 14:44:15 +02:00
// A pullBlockState is passed to the puller routine for each block that needs
// to be fetched.
type pullBlockState struct {
* sharedPullerState
block protocol . BlockInfo
2014-03-28 14:36:57 +01:00
}
2014-09-27 14:44:15 +02:00
// A copyBlocksState is passed to copy routine if the file has blocks to be
2014-10-08 23:41:23 +01:00
// copied.
2014-09-27 14:44:15 +02:00
type copyBlocksState struct {
* sharedPullerState
blocks [ ] protocol . BlockInfo
2017-01-04 21:04:13 +00:00
have int
2014-03-28 14:36:57 +01:00
}
2015-07-03 10:25:35 +01:00
// Which filemode bits to preserve
2017-08-19 14:36:56 +00:00
const retainBits = fs . ModeSetgid | fs . ModeSetuid | fs . ModeSticky
2015-07-03 10:25:35 +01:00
2014-09-27 14:44:15 +02:00
var (
2017-03-18 00:25:47 +00:00
activity = newDeviceActivity ( )
errNoDevice = errors . New ( "peers who had this file went away, or the file has changed while syncing. will retry later" )
errSymlinksUnsupported = errors . New ( "symlinks not supported" )
2017-12-07 08:42:03 +00:00
errDirHasToBeScanned = errors . New ( "directory contains unexpected files, scheduling scan" )
errDirHasIgnored = errors . New ( "directory contains ignored files (see ignore documentation for (?d) prefix)" )
errDirNotEmpty = errors . New ( "directory is not empty" )
2014-09-27 14:44:15 +02:00
)
2015-06-16 12:12:34 +01:00
const (
dbUpdateHandleDir = iota
dbUpdateDeleteDir
dbUpdateHandleFile
dbUpdateDeleteFile
dbUpdateShortcutFile
2016-12-09 18:02:18 +00:00
dbUpdateHandleSymlink
2017-11-11 19:18:17 +00:00
dbUpdateInvalidate
2015-06-16 12:12:34 +01:00
)
2015-08-14 09:37:04 +02:00
const (
2018-02-25 10:14:02 +01:00
defaultCopiers = 2
defaultPullerPause = 60 * time . Second
defaultPullerPendingKiB = 8192 // must be larger than block size
2017-11-07 06:59:35 +00:00
maxPullerIterations = 3
2015-08-14 09:37:04 +02:00
)
2015-06-16 12:12:34 +01:00
type dbUpdateJob struct {
file protocol . FileInfo
jobType int
}
2016-12-16 22:23:35 +00:00
type sendReceiveFolder struct {
2016-04-26 14:01:46 +00:00
folder
2015-03-16 21:14:19 +01:00
2017-08-19 14:36:56 +00:00
fs fs . Filesystem
2016-12-21 11:23:20 +00:00
versioner versioner . Versioner
pause time . Duration
2015-05-13 14:57:29 +00:00
2018-02-25 09:39:00 +01:00
queue * jobQueue
2015-06-26 13:31:30 +02:00
errors map [ string ] string // path -> error string
errorsMut sync . Mutex
2015-03-16 21:14:19 +01:00
}
2017-08-19 14:36:56 +00:00
func newSendReceiveFolder ( model * Model , cfg config . FolderConfiguration , ver versioner . Versioner , fs fs . Filesystem ) service {
2016-12-16 22:23:35 +00:00
f := & sendReceiveFolder {
2017-08-25 19:47:01 +00:00
folder : newFolder ( model , cfg ) ,
2015-03-16 21:14:19 +01:00
2017-08-19 14:36:56 +00:00
fs : fs ,
2016-12-21 11:23:20 +00:00
versioner : ver ,
2016-04-26 14:01:46 +00:00
2018-02-25 09:39:00 +01:00
queue : newJobQueue ( ) ,
2015-06-26 13:31:30 +02:00
errorsMut : sync . NewMutex ( ) ,
2015-03-16 21:14:19 +01:00
}
2015-08-14 09:37:04 +02:00
2016-12-21 11:23:20 +00:00
if f . Copiers == 0 {
f . Copiers = defaultCopiers
2015-08-14 09:37:04 +02:00
}
2018-02-25 10:14:02 +01:00
// If the configured max amount of pending data is zero, we use the
// default. If it's configured to something non-zero but less than the
// protocol block size we adjust it upwards accordingly.
if f . PullerMaxPendingKiB == 0 {
f . PullerMaxPendingKiB = defaultPullerPendingKiB
}
if blockSizeKiB := protocol . BlockSize / 1024 ; f . PullerMaxPendingKiB < blockSizeKiB {
f . PullerMaxPendingKiB = blockSizeKiB
2015-08-14 09:37:04 +02:00
}
2017-11-07 06:59:35 +00:00
f . pause = f . basePause ( )
2018-02-25 10:14:02 +01:00
return f
2014-03-28 14:36:57 +01:00
}
2014-09-27 14:44:15 +02:00
// Serve will run scans and pulls. It will return when Stop()ed or on a
// critical error.
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) Serve ( ) {
2016-04-26 14:01:46 +00:00
l . Debugln ( f , "starting" )
defer l . Debugln ( f , "exiting" )
2014-03-28 14:36:57 +01:00
2014-09-27 14:44:15 +02:00
defer func ( ) {
2016-04-26 14:01:46 +00:00
f . scan . timer . Stop ( )
2014-09-28 12:00:38 +01:00
// TODO: Should there be an actual FolderStopped state?
2016-04-26 14:01:46 +00:00
f . setState ( FolderIdle )
2014-09-27 14:44:15 +02:00
} ( )
2014-03-28 14:36:57 +01:00
2014-12-23 10:06:51 +01:00
var prevIgnoreHash string
2017-11-07 06:59:35 +00:00
var success bool
pullFailTimer := time . NewTimer ( time . Duration ( 0 ) )
<- pullFailTimer . C
2014-07-24 09:38:16 +02:00
2017-10-26 11:49:06 +00:00
if f . FSWatcherEnabled && f . CheckHealth ( ) == nil {
2017-10-24 07:58:55 +00:00
f . startWatch ( )
2017-10-20 14:52:55 +00:00
}
2017-11-17 12:11:45 +00:00
initialCompleted := f . initialScanFinished
2014-03-28 14:36:57 +01:00
for {
2014-09-27 14:44:15 +02:00
select {
2017-04-26 00:15:23 +00:00
case <- f . ctx . Done ( ) :
2014-09-27 14:44:15 +02:00
return
2014-07-24 09:38:16 +02:00
2017-11-07 06:59:35 +00:00
case <- f . pullScheduled :
pullFailTimer . Stop ( )
2016-05-09 12:56:21 +00:00
select {
2017-11-07 06:59:35 +00:00
case <- pullFailTimer . C :
2016-05-09 12:56:21 +00:00
default :
2015-09-12 23:00:43 +02:00
}
2017-11-17 12:11:45 +00:00
if prevIgnoreHash , success = f . pull ( prevIgnoreHash ) ; ! success {
2017-11-07 06:59:35 +00:00
// Pulling failed, try again later.
pullFailTimer . Reset ( f . pause )
2014-03-28 14:36:57 +01:00
}
2014-08-04 22:02:44 +02:00
2017-11-07 06:59:35 +00:00
case <- pullFailTimer . C :
2017-11-17 12:11:45 +00:00
if prevIgnoreHash , success = f . pull ( prevIgnoreHash ) ; ! success {
2017-11-07 06:59:35 +00:00
// Pulling failed, try again later.
pullFailTimer . Reset ( f . pause )
// Back off from retrying to pull with an upper limit.
if f . pause < 60 * f . basePause ( ) {
f . pause *= 2
2014-09-27 14:44:15 +02:00
}
}
2014-04-14 09:58:17 +02:00
2017-11-17 12:11:45 +00:00
case <- initialCompleted :
// Initial scan has completed, we should do a pull
initialCompleted = nil // never hit this case again
if prevIgnoreHash , success = f . pull ( prevIgnoreHash ) ; ! success {
// Pulling failed, try again later.
pullFailTimer . Reset ( f . pause )
}
2014-09-27 14:44:15 +02:00
// The reason for running the scanner from within the puller is that
// this is the easiest way to make sure we are not doing both at the
// same time.
2016-04-26 14:01:46 +00:00
case <- f . scan . timer . C :
2017-04-20 00:20:34 +00:00
l . Debugln ( f , "Scanning subdirectories" )
2017-09-07 06:17:47 +00:00
f . scanTimerFired ( )
2015-05-03 14:18:32 +02:00
2016-04-26 14:01:46 +00:00
case req := <- f . scan . now :
2017-04-20 00:20:34 +00:00
req . err <- f . scanSubdirs ( req . subdirs )
2015-06-20 19:26:25 +02:00
2016-04-26 14:01:46 +00:00
case next := <- f . scan . delay :
f . scan . timer . Reset ( next )
2017-10-20 14:52:55 +00:00
case fsEvents := <- f . watchChan :
l . Debugln ( f , "filesystem notification rescan" )
f . scanSubdirs ( fsEvents )
2017-10-24 07:58:55 +00:00
case <- f . restartWatchChan :
f . restartWatch ( )
2014-03-28 14:36:57 +01:00
}
}
}
2017-11-07 06:59:35 +00:00
func ( f * sendReceiveFolder ) SchedulePull ( ) {
2015-05-07 22:45:07 +02:00
select {
2017-11-07 06:59:35 +00:00
case f . pullScheduled <- struct { } { } :
2015-05-07 22:45:07 +02:00
default :
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull, but beyond that we must
// make sure to not block index receiving.
}
}
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) String ( ) string {
return fmt . Sprintf ( "sendReceiveFolder/%s@%p" , f . folderID , f )
2014-09-27 14:44:15 +02:00
}
2014-09-07 21:29:06 +02:00
2017-11-17 12:11:45 +00:00
func ( f * sendReceiveFolder ) pull ( prevIgnoreHash string ) ( curIgnoreHash string , success bool ) {
2017-11-07 06:59:35 +00:00
select {
case <- f . initialScanFinished :
default :
// Once the initial scan finished, a pull will be scheduled
2017-11-17 12:11:45 +00:00
return prevIgnoreHash , true
2017-11-07 06:59:35 +00:00
}
2017-11-17 12:42:41 +00:00
if err := f . CheckHealth ( ) ; err != nil {
l . Debugln ( "Skipping pull of" , f . Description ( ) , "due to folder error:" , err )
return prevIgnoreHash , true
}
2017-11-07 06:59:35 +00:00
f . model . fmut . RLock ( )
curIgnores := f . model . folderIgnores [ f . folderID ]
f . model . fmut . RUnlock ( )
2017-11-11 19:18:17 +00:00
curIgnoreHash = curIgnores . Hash ( )
ignoresChanged := curIgnoreHash != prevIgnoreHash
2017-11-07 06:59:35 +00:00
2017-11-22 08:05:27 +00:00
l . Debugf ( "%v pulling (ignoresChanged=%v)" , f , ignoresChanged )
2017-11-07 06:59:35 +00:00
f . setState ( FolderSyncing )
f . clearErrors ( )
2017-12-07 08:42:03 +00:00
scanChan := make ( chan string )
go f . pullScannerRoutine ( scanChan )
2017-11-07 06:59:35 +00:00
var changed int
tries := 0
for {
tries ++
2017-12-07 08:42:03 +00:00
changed = f . pullerIteration ( curIgnores , ignoresChanged , scanChan )
2017-11-07 06:59:35 +00:00
l . Debugln ( f , "changed" , changed )
if changed == 0 {
// No files were changed by the puller, so we are in
// sync. Update the local version number.
f . pause = f . basePause ( )
break
}
if tries == maxPullerIterations {
// We've tried a bunch of times to get in sync, but
// we're not making it. Probably there are write
// errors preventing us. Flag this with a warning and
// wait a bit longer before retrying.
2018-01-14 17:01:06 +00:00
if folderErrors := f . PullErrors ( ) ; len ( folderErrors ) > 0 {
2017-11-07 06:59:35 +00:00
events . Default . Log ( events . FolderErrors , map [ string ] interface { } {
"folder" : f . folderID ,
"errors" : folderErrors ,
} )
}
l . Infof ( "Folder %v isn't making progress. Pausing puller for %v." , f . Description ( ) , f . pause )
l . Debugln ( f , "next pull in" , f . pause )
break
}
}
f . setState ( FolderIdle )
2017-12-07 08:42:03 +00:00
close ( scanChan )
2017-11-17 12:42:41 +00:00
if changed == 0 {
return curIgnoreHash , true
}
return prevIgnoreHash , false
2017-11-07 06:59:35 +00:00
}
2014-09-28 12:00:38 +01:00
// pullerIteration runs a single puller iteration for the given folder and
2014-09-27 14:44:15 +02:00
// returns the number items that should have been synced (even those that
// might have failed). One puller iteration handles all files currently
2014-11-23 00:02:09 +00:00
// flagged as needed in the folder.
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) pullerIteration ( ignores * ignore . Matcher , ignoresChanged bool , scanChan chan <- string ) int {
2014-09-27 14:44:15 +02:00
pullChan := make ( chan pullBlockState )
copyChan := make ( chan copyBlocksState )
finisherChan := make ( chan * sharedPullerState )
2017-12-07 08:42:03 +00:00
dbUpdateChan := make ( chan dbUpdateJob )
2014-09-27 14:44:15 +02:00
2015-04-22 23:54:31 +01:00
pullWg := sync . NewWaitGroup ( )
2017-12-07 08:42:03 +00:00
copyWg := sync . NewWaitGroup ( )
2015-04-22 23:54:31 +01:00
doneWg := sync . NewWaitGroup ( )
2017-12-07 08:42:03 +00:00
updateWg := sync . NewWaitGroup ( )
2014-09-27 14:44:15 +02:00
2018-02-25 10:14:02 +01:00
l . Debugln ( f , "copiers:" , f . Copiers , "pullerPendingKiB:" , f . PullerMaxPendingKiB )
2014-11-23 18:43:49 +00:00
2015-04-05 15:34:29 +02:00
updateWg . Add ( 1 )
go func ( ) {
2017-12-07 08:42:03 +00:00
// dbUpdaterRoutine finishes when dbUpdateChan is closed
f . dbUpdaterRoutine ( dbUpdateChan )
2015-04-05 15:34:29 +02:00
updateWg . Done ( )
} ( )
2016-12-21 11:23:20 +00:00
for i := 0 ; i < f . Copiers ; i ++ {
2014-10-08 23:41:23 +01:00
copyWg . Add ( 1 )
2014-09-27 14:44:15 +02:00
go func ( ) {
// copierRoutine finishes when copyChan is closed
2016-04-26 14:01:46 +00:00
f . copierRoutine ( copyChan , pullChan , finisherChan )
2014-10-08 23:41:23 +01:00
copyWg . Done ( )
2014-09-27 14:44:15 +02:00
} ( )
}
2018-02-25 10:14:02 +01:00
pullWg . Add ( 1 )
go func ( ) {
// pullerRoutine finishes when pullChan is closed
f . pullerRoutine ( pullChan , finisherChan )
pullWg . Done ( )
} ( )
2014-09-27 14:44:15 +02:00
2014-12-24 23:12:12 +00:00
doneWg . Add ( 1 )
// finisherRoutine finishes when finisherChan is closed
go func ( ) {
2017-12-07 08:42:03 +00:00
f . finisherRoutine ( ignores , finisherChan , dbUpdateChan , scanChan )
2014-12-24 23:12:12 +00:00
doneWg . Done ( )
} ( )
2014-09-27 14:44:15 +02:00
2016-04-26 14:01:46 +00:00
f . model . fmut . RLock ( )
folderFiles := f . model . folderFiles [ f . folderID ]
f . model . fmut . RUnlock ( )
2014-09-27 14:44:15 +02:00
changed := 0
2016-12-13 10:24:10 +00:00
var processDirectly [ ] protocol . FileInfo
// Iterate the list of items that we need and sort them into piles.
// Regular files to pull goes into the file queue, everything else
// (directories, symlinks and deletes) goes into the "process directly"
// pile.
2018-02-25 09:39:00 +01:00
folderFiles . WithNeed ( protocol . LocalDeviceID , func ( intf db . FileIntf ) bool {
2017-11-11 19:18:17 +00:00
if f . IgnoreDelete && intf . IsDeleted ( ) {
2017-11-22 08:05:27 +00:00
l . Debugln ( f , "ignore file deletion (config)" , intf . FileName ( ) )
2016-12-13 10:24:10 +00:00
return true
}
file := intf . ( protocol . FileInfo )
switch {
2017-11-11 19:18:17 +00:00
case ignores . ShouldIgnore ( file . Name ) :
2018-02-25 09:39:00 +01:00
file . Invalidate ( f . shortID )
2017-11-11 19:18:17 +00:00
l . Debugln ( f , "Handling ignored file" , file )
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateInvalidate }
2017-11-11 19:18:17 +00:00
2017-12-25 17:54:34 +00:00
case runtime . GOOS == "windows" && fs . WindowsInvalidFilename ( file . Name ) :
f . newError ( "need" , file . Name , fs . ErrInvalidFilename )
changed ++
return true
2016-12-13 10:24:10 +00:00
case file . IsDeleted ( ) :
processDirectly = append ( processDirectly , file )
changed ++
case file . Type == protocol . FileInfoTypeFile :
// Queue files for processing after directories and symlinks, if
// it has availability.
devices := folderFiles . Availability ( file . Name )
for _ , dev := range devices {
2017-11-21 07:25:38 +00:00
if _ , ok := f . model . Connection ( dev ) ; ok {
2016-12-13 10:24:10 +00:00
f . queue . Push ( file . Name , file . Size , file . ModTime ( ) )
changed ++
2017-11-11 19:18:17 +00:00
return true
2016-12-13 10:24:10 +00:00
}
}
2017-11-11 19:18:17 +00:00
l . Debugln ( f , "Needed file is unavailable" , file )
case runtime . GOOS == "windows" && file . IsSymlink ( ) :
2018-02-25 09:39:00 +01:00
file . Invalidate ( f . shortID )
2017-11-11 19:18:17 +00:00
l . Debugln ( f , "Invalidating symlink (unsupported)" , file . Name )
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateInvalidate }
2016-12-13 10:24:10 +00:00
default :
// Directories, symlinks
processDirectly = append ( processDirectly , file )
changed ++
}
return true
} )
// Sort the "process directly" pile by number of path components. This
// ensures that we handle parents before children.
sort . Sort ( byComponentCount ( processDirectly ) )
// Process the list.
2014-10-12 22:01:57 +01:00
2014-12-19 23:12:12 +00:00
fileDeletions := map [ string ] protocol . FileInfo { }
dirDeletions := [ ] protocol . FileInfo { }
buckets := map [ string ] [ ] protocol . FileInfo { }
2014-10-12 22:01:57 +01:00
2016-12-13 10:24:10 +00:00
for _ , fi := range processDirectly {
// Verify that the thing we are handling lives inside a directory,
// and not a symlink or empty space.
2017-08-19 14:36:56 +00:00
if err := osutil . TraversesSymlink ( f . fs , filepath . Dir ( fi . Name ) ) ; err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "traverses d" , fi . Name , err )
2016-12-13 10:24:10 +00:00
continue
}
2014-09-27 14:44:15 +02:00
switch {
2016-04-26 14:01:46 +00:00
case fi . IsDeleted ( ) :
2014-11-09 04:26:52 +00:00
// A deleted file, directory or symlink
2016-04-26 14:01:46 +00:00
if fi . IsDirectory ( ) {
2016-12-13 10:24:10 +00:00
// Perform directory deletions at the end, as we may have
// files to delete inside them before we get to that point.
2016-04-26 14:01:46 +00:00
dirDeletions = append ( dirDeletions , fi )
2014-12-19 23:12:12 +00:00
} else {
2016-04-26 14:01:46 +00:00
fileDeletions [ fi . Name ] = fi
df , ok := f . model . CurrentFolderFile ( f . folderID , fi . Name )
2014-12-19 23:12:12 +00:00
// Local file can be already deleted, but with a lower version
// number, hence the deletion coming in again as part of
2015-01-30 14:32:59 +00:00
// WithNeed, furthermore, the file can simply be of the wrong
// type if we haven't yet managed to pull it.
2017-11-11 19:18:17 +00:00
if ok && ! df . IsDeleted ( ) && ! df . IsSymlink ( ) && ! df . IsDirectory ( ) && ! df . IsInvalid ( ) {
2014-12-19 23:12:12 +00:00
// Put files into buckets per first hash
key := string ( df . Blocks [ 0 ] . Hash )
buckets [ key ] = append ( buckets [ key ] , df )
}
}
2016-12-09 18:02:18 +00:00
2016-04-26 14:01:46 +00:00
case fi . IsDirectory ( ) && ! fi . IsSymlink ( ) :
2017-11-11 19:18:17 +00:00
l . Debugln ( f , "Handling directory" , fi . Name )
2017-12-07 08:42:03 +00:00
f . handleDir ( fi , dbUpdateChan )
2016-12-09 18:02:18 +00:00
case fi . IsSymlink ( ) :
2017-12-07 08:42:03 +00:00
l . Debugln ( "Handling symlink" , fi . Name )
2017-11-11 19:18:17 +00:00
l . Debugln ( f , "Handling symlink" , fi . Name )
2017-12-07 08:42:03 +00:00
f . handleSymlink ( fi , dbUpdateChan )
2016-12-09 18:02:18 +00:00
2014-09-27 14:44:15 +02:00
default :
2016-12-13 10:24:10 +00:00
l . Warnln ( fi )
panic ( "unhandleable item type, can't happen" )
2016-01-16 17:18:37 +00:00
}
}
2016-12-13 10:24:10 +00:00
// Now do the file queue. Reorder it according to configuration.
2015-04-25 14:13:53 +09:00
2016-12-21 11:23:20 +00:00
switch f . Order {
2015-04-25 14:13:53 +09:00
case config . OrderRandom :
2016-04-26 14:01:46 +00:00
f . queue . Shuffle ( )
2015-04-25 14:13:53 +09:00
case config . OrderAlphabetic :
2016-04-26 14:01:46 +00:00
// The queue is already in alphabetic order.
2015-04-25 14:13:53 +09:00
case config . OrderSmallestFirst :
2016-04-26 14:01:46 +00:00
f . queue . SortSmallestFirst ( )
2015-04-25 14:13:53 +09:00
case config . OrderLargestFirst :
2016-04-26 14:01:46 +00:00
f . queue . SortLargestFirst ( )
2015-04-25 14:13:53 +09:00
case config . OrderOldestFirst :
2016-04-26 14:01:46 +00:00
f . queue . SortOldestFirst ( )
2015-04-25 14:13:53 +09:00
case config . OrderNewestFirst :
2016-04-26 14:01:46 +00:00
f . queue . SortNewestFirst ( )
2015-04-25 14:13:53 +09:00
}
2016-12-13 10:24:10 +00:00
// Process the file queue.
2015-04-25 14:13:53 +09:00
2014-12-19 23:12:12 +00:00
nextFile :
Add job queue (fixes #629)
Request to terminate currently ongoing downloads and jump to the bumped file
incoming in 3, 2, 1.
Also, has a slightly strange effect where we pop a job off the queue, but
the copyChannel is still busy and blocks, though it gets moved to the
progress slice in the jobqueue, and looks like it's in progress which it isn't
as it's waiting to be picked up from the copyChan.
As a result, the progress emitter doesn't register on the task, and hence the file
doesn't have a progress bar, but cannot be replaced by a bump.
I guess I can fix progress bar issue by moving the progressEmiter.Register just
before passing the file to the copyChan, but then we are back to the initial
problem of a file with a progress bar, but no progress happening as it's stuck
on write to copyChan
I checked if there is a way to check for channel writeability (before popping)
but got struck by lightning just for bringing the idea up in #go-nuts.
My ideal scenario would be to check if copyChan is writeable, pop job from the
queue and shove it down handleFile. This way jobs would stay in the queue while
they cannot be handled, meaning that the `Bump` could bring your file up higher.
2014-12-01 19:23:06 +00:00
for {
2016-01-16 21:42:32 +01:00
select {
2017-04-26 00:15:23 +00:00
case <- f . ctx . Done ( ) :
2016-01-16 21:42:32 +01:00
// Stop processing files if the puller has been told to stop.
2016-12-17 15:27:44 +01:00
break nextFile
2016-01-16 21:42:32 +01:00
default :
}
2016-04-26 14:01:46 +00:00
fileName , ok := f . queue . Pop ( )
2014-12-30 09:31:34 +01:00
if ! ok {
Add job queue (fixes #629)
Request to terminate currently ongoing downloads and jump to the bumped file
incoming in 3, 2, 1.
Also, has a slightly strange effect where we pop a job off the queue, but
the copyChannel is still busy and blocks, though it gets moved to the
progress slice in the jobqueue, and looks like it's in progress which it isn't
as it's waiting to be picked up from the copyChan.
As a result, the progress emitter doesn't register on the task, and hence the file
doesn't have a progress bar, but cannot be replaced by a bump.
I guess I can fix progress bar issue by moving the progressEmiter.Register just
before passing the file to the copyChan, but then we are back to the initial
problem of a file with a progress bar, but no progress happening as it's stuck
on write to copyChan
I checked if there is a way to check for channel writeability (before popping)
but got struck by lightning just for bringing the idea up in #go-nuts.
My ideal scenario would be to check if copyChan is writeable, pop job from the
queue and shove it down handleFile. This way jobs would stay in the queue while
they cannot be handled, meaning that the `Bump` could bring your file up higher.
2014-12-01 19:23:06 +00:00
break
}
2014-12-19 23:12:12 +00:00
2016-04-26 14:01:46 +00:00
fi , ok := f . model . CurrentGlobalFile ( f . folderID , fileName )
2014-12-19 23:12:12 +00:00
if ! ok {
2015-01-06 22:12:45 +01:00
// File is no longer in the index. Mark it as done and drop it.
2016-04-26 14:01:46 +00:00
f . queue . Done ( fileName )
2014-12-19 23:12:12 +00:00
continue
}
2016-12-13 10:24:10 +00:00
if fi . IsDeleted ( ) || fi . Type != protocol . FileInfoTypeFile {
// The item has changed type or status in the index while we
// were processing directories above.
f . queue . Done ( fileName )
continue
}
2017-12-07 08:42:03 +00:00
dirName := filepath . Dir ( fi . Name )
2016-12-13 10:24:10 +00:00
// Verify that the thing we are handling lives inside a directory,
// and not a symlink or empty space.
2017-12-07 08:42:03 +00:00
if err := osutil . TraversesSymlink ( f . fs , dirName ) ; err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "traverses q" , fi . Name , err )
2016-01-16 17:18:37 +00:00
continue
}
2017-12-07 08:42:03 +00:00
// issues #114 and #4475: This works around a race condition
// between two devices, when one device removes a directory and the
// other creates a file in it. However that happens, we end up with
// a directory for "foo" with the delete bit, but a file "foo/bar"
// that we want to sync. We never create the directory, and hence
// fail to create the file and end up looping forever on it. This
// breaks that by creating the directory and scheduling a scan,
// where it will be found and the delete bit on it removed. The
// user can then clean up as they like...
if _ , err := f . fs . Lstat ( dirName ) ; fs . IsNotExist ( err ) {
l . Debugln ( "%v resurrecting parent directory of %v" , f , fi . Name )
if err := f . fs . MkdirAll ( dirName , 0755 ) ; err != nil {
f . newError ( "resurrecting parent dir" , fi . Name , err )
continue
}
scanChan <- dirName
}
2016-12-09 18:02:18 +00:00
// Check our list of files to be removed for a match, in which case
// we can just do a rename instead.
key := string ( fi . Blocks [ 0 ] . Hash )
for i , candidate := range buckets [ key ] {
2018-02-25 09:39:00 +01:00
if protocol . BlocksEqual ( candidate . Blocks , fi . Blocks ) {
2016-12-09 18:02:18 +00:00
// Remove the candidate from the bucket
lidx := len ( buckets [ key ] ) - 1
buckets [ key ] [ i ] = buckets [ key ] [ lidx ]
buckets [ key ] = buckets [ key ] [ : lidx ]
// candidate is our current state of the file, where as the
// desired state with the delete bit set is in the deletion
// map.
desired := fileDeletions [ candidate . Name ]
// Remove the pending deletion (as we perform it by renaming)
delete ( fileDeletions , candidate . Name )
2017-12-07 08:42:03 +00:00
f . renameFile ( desired , fi , dbUpdateChan )
2016-12-09 18:02:18 +00:00
f . queue . Done ( fileName )
continue nextFile
2014-12-19 23:12:12 +00:00
}
2015-01-06 22:12:45 +01:00
}
2014-12-19 23:12:12 +00:00
2016-12-09 18:02:18 +00:00
// Handle the file normally, by coping and pulling, etc.
2017-12-07 08:42:03 +00:00
f . handleFile ( fi , copyChan , finisherChan , dbUpdateChan )
Add job queue (fixes #629)
Request to terminate currently ongoing downloads and jump to the bumped file
incoming in 3, 2, 1.
Also, has a slightly strange effect where we pop a job off the queue, but
the copyChannel is still busy and blocks, though it gets moved to the
progress slice in the jobqueue, and looks like it's in progress which it isn't
as it's waiting to be picked up from the copyChan.
As a result, the progress emitter doesn't register on the task, and hence the file
doesn't have a progress bar, but cannot be replaced by a bump.
I guess I can fix progress bar issue by moving the progressEmiter.Register just
before passing the file to the copyChan, but then we are back to the initial
problem of a file with a progress bar, but no progress happening as it's stuck
on write to copyChan
I checked if there is a way to check for channel writeability (before popping)
but got struck by lightning just for bringing the idea up in #go-nuts.
My ideal scenario would be to check if copyChan is writeable, pop job from the
queue and shove it down handleFile. This way jobs would stay in the queue while
they cannot be handled, meaning that the `Bump` could bring your file up higher.
2014-12-01 19:23:06 +00:00
}
2014-09-27 14:44:15 +02:00
// Signal copy and puller routines that we are done with the in data for
2014-10-08 23:41:23 +01:00
// this iteration. Wait for them to finish.
2014-09-27 14:44:15 +02:00
close ( copyChan )
2014-10-08 23:41:23 +01:00
copyWg . Wait ( )
2014-09-27 14:44:15 +02:00
close ( pullChan )
2014-10-08 23:41:23 +01:00
pullWg . Wait ( )
2014-04-01 23:18:32 +02:00
2014-10-08 23:41:23 +01:00
// Signal the finisher chan that there will be no more input.
2014-09-27 14:44:15 +02:00
close ( finisherChan )
2014-04-01 23:18:32 +02:00
2014-09-27 14:44:15 +02:00
// Wait for the finisherChan to finish.
doneWg . Wait ( )
2014-05-19 22:31:28 +02:00
2014-12-19 23:12:12 +00:00
for _ , file := range fileDeletions {
2017-11-11 19:18:17 +00:00
l . Debugln ( f , "Deleting file" , file . Name )
2017-12-07 08:42:03 +00:00
f . deleteFile ( file , dbUpdateChan )
2014-12-19 23:12:12 +00:00
}
for i := range dirDeletions {
2015-01-30 14:32:59 +00:00
dir := dirDeletions [ len ( dirDeletions ) - i - 1 ]
2017-11-11 19:18:17 +00:00
l . Debugln ( f , "Deleting dir" , dir . Name )
2017-12-07 08:42:03 +00:00
f . handleDeleteDir ( dir , ignores , dbUpdateChan , scanChan )
2014-10-12 22:01:57 +01:00
}
2017-12-07 08:42:03 +00:00
// Wait for db updates and scan scheduling to complete
close ( dbUpdateChan )
2015-04-05 15:34:29 +02:00
updateWg . Wait ( )
2014-09-27 14:44:15 +02:00
return changed
}
2014-04-01 23:18:32 +02:00
2014-09-27 14:44:15 +02:00
// handleDir creates or updates the given directory
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) handleDir ( file protocol . FileInfo , dbUpdateChan chan <- dbUpdateJob ) {
2016-12-09 18:02:18 +00:00
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
2016-12-09 18:02:18 +00:00
2015-06-14 22:59:21 +02:00
events . Default . Log ( events . ItemStarted , map [ string ] string {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-04-14 20:59:06 +09:00
"item" : file . Name ,
"type" : "dir" ,
"action" : "update" ,
2015-02-01 17:31:19 +00:00
} )
2015-04-14 20:59:06 +09:00
2015-02-01 17:31:19 +00:00
defer func ( ) {
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-02-01 17:31:19 +00:00
"item" : file . Name ,
2015-05-27 11:14:39 +02:00
"error" : events . Error ( err ) ,
2015-04-14 20:59:06 +09:00
"type" : "dir" ,
"action" : "update" ,
2015-02-01 17:31:19 +00:00
} )
} ( )
2017-08-19 14:36:56 +00:00
mode := fs . FileMode ( file . Permissions & 0777 )
2018-02-25 09:39:00 +01:00
if f . IgnorePerms || file . NoPermissions {
2015-05-25 00:12:51 +02:00
mode = 0777
2014-10-10 00:34:32 +02:00
}
2014-05-19 22:31:28 +02:00
2015-10-03 17:25:21 +02:00
if shouldDebug ( ) {
2016-04-26 14:01:46 +00:00
curFile , _ := f . model . CurrentFolderFile ( f . folderID , file . Name )
2014-09-27 14:44:15 +02:00
l . Debugf ( "need dir\n\t%v\n\t%v" , file , curFile )
2014-04-01 23:18:32 +02:00
}
2017-08-19 14:36:56 +00:00
info , err := f . fs . Lstat ( file . Name )
2014-11-13 22:59:40 +00:00
switch {
2014-11-17 11:25:32 +00:00
// There is already something under that name, but it's a file/link.
// Most likely a file/link is getting replaced with a directory.
// Remove the file/link and fall through to directory creation.
2017-04-01 09:04:11 +00:00
case err == nil && ( ! info . IsDir ( ) || info . IsSymlink ( ) ) :
2017-08-19 14:36:56 +00:00
err = osutil . InWritableDir ( f . fs . Remove , f . fs , file . Name )
2014-11-13 22:59:40 +00:00
if err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "dir replace" , file . Name , err )
2014-09-28 01:54:25 +02:00
return
}
2014-11-13 22:59:40 +00:00
fallthrough
// The directory doesn't exist, so we create it with the right
// mode bits from the start.
2017-08-19 14:36:56 +00:00
case err != nil && fs . IsNotExist ( err ) :
2014-11-13 22:59:40 +00:00
// We declare a function that acts on only the path name, so
// we can pass it to InWritableDir. We use a regular Mkdir and
// not MkdirAll because the parent should already exist.
mkdir := func ( path string ) error {
2017-08-19 14:36:56 +00:00
err = f . fs . Mkdir ( path , mode )
2018-02-25 09:39:00 +01:00
if err != nil || f . IgnorePerms || file . NoPermissions {
2015-03-23 21:31:53 +09:00
return err
}
2015-07-03 10:25:35 +01:00
// Stat the directory so we can check its permissions.
2017-08-19 14:36:56 +00:00
info , err := f . fs . Lstat ( path )
2015-07-03 10:25:35 +01:00
if err != nil {
return err
}
// Mask for the bits we want to preserve and add them in to the
// directories permissions.
2017-08-19 14:36:56 +00:00
return f . fs . Chmod ( path , mode | ( info . Mode ( ) & retainBits ) )
2014-11-13 22:59:40 +00:00
}
2014-09-28 01:54:25 +02:00
2017-08-19 14:36:56 +00:00
if err = osutil . InWritableDir ( mkdir , f . fs , file . Name ) ; err == nil {
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateHandleDir }
2014-11-13 22:59:40 +00:00
} else {
2017-09-23 15:22:26 +01:00
f . newError ( "dir mkdir" , file . Name , err )
2014-11-13 22:59:40 +00:00
}
2014-09-28 01:54:25 +02:00
return
2014-11-13 22:59:40 +00:00
// Weird error when stat()'ing the dir. Probably won't work to do
// anything else with it if we can't even stat() it.
case err != nil :
2017-09-23 15:22:26 +01:00
f . newError ( "dir stat" , file . Name , err )
2014-03-28 14:36:57 +01:00
return
}
2014-09-28 01:54:25 +02:00
// The directory already exists, so we just correct the mode bits. (We
// don't handle modification times on directories, because that sucks...)
// It's OK to change mode bits on stuff within non-writable directories.
2018-02-25 09:39:00 +01:00
if f . IgnorePerms || file . NoPermissions {
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateHandleDir }
2017-08-19 14:36:56 +00:00
} else if err := f . fs . Chmod ( file . Name , mode | ( fs . FileMode ( info . Mode ( ) ) & retainBits ) ) ; err == nil {
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateHandleDir }
2014-09-28 01:54:25 +02:00
} else {
2017-09-23 15:22:26 +01:00
f . newError ( "dir chmod" , file . Name , err )
2014-07-15 13:04:37 +02:00
}
2014-09-27 14:44:15 +02:00
}
2014-03-28 14:36:57 +01:00
2016-12-09 18:02:18 +00:00
// handleSymlink creates or updates the given symlink
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) handleSymlink ( file protocol . FileInfo , dbUpdateChan chan <- dbUpdateJob ) {
2016-12-09 18:02:18 +00:00
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
var err error
events . Default . Log ( events . ItemStarted , map [ string ] string {
"folder" : f . folderID ,
"item" : file . Name ,
"type" : "symlink" ,
"action" : "update" ,
} )
defer func ( ) {
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
"folder" : f . folderID ,
"item" : file . Name ,
"error" : events . Error ( err ) ,
"type" : "symlink" ,
"action" : "update" ,
} )
} ( )
if shouldDebug ( ) {
curFile , _ := f . model . CurrentFolderFile ( f . folderID , file . Name )
l . Debugf ( "need symlink\n\t%v\n\t%v" , file , curFile )
}
if len ( file . SymlinkTarget ) == 0 {
// Index entry from a Syncthing predating the support for including
// the link target in the index entry. We log this as an error.
err = errors . New ( "incompatible symlink entry; rescan with newer Syncthing on source" )
2017-09-23 15:22:26 +01:00
f . newError ( "symlink" , file . Name , err )
2016-12-09 18:02:18 +00:00
return
}
2017-08-19 14:36:56 +00:00
if _ , err = f . fs . Lstat ( file . Name ) ; err == nil {
2016-12-09 18:02:18 +00:00
// There is already something under that name. Remove it to replace
// with the symlink. This also handles the "change symlink type"
// path.
2017-08-19 14:36:56 +00:00
err = osutil . InWritableDir ( f . fs . Remove , f . fs , file . Name )
2016-12-09 18:02:18 +00:00
if err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "symlink remove" , file . Name , err )
2016-12-09 18:02:18 +00:00
return
}
}
// We declare a function that acts on only the path name, so
// we can pass it to InWritableDir.
createLink := func ( path string ) error {
2017-08-19 14:36:56 +00:00
return f . fs . CreateSymlink ( file . SymlinkTarget , path )
2016-12-09 18:02:18 +00:00
}
2017-08-19 14:36:56 +00:00
if err = osutil . InWritableDir ( createLink , f . fs , file . Name ) ; err == nil {
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateHandleSymlink }
2016-12-09 18:02:18 +00:00
} else {
2017-09-23 15:22:26 +01:00
f . newError ( "symlink create" , file . Name , err )
2016-12-09 18:02:18 +00:00
}
}
2017-12-07 08:42:03 +00:00
// handleDeleteDir attempts to remove a directory that was deleted on a remote
func ( f * sendReceiveFolder ) handleDeleteDir ( file protocol . FileInfo , ignores * ignore . Matcher , dbUpdateChan chan <- dbUpdateJob , scanChan chan <- string ) {
2016-12-09 18:02:18 +00:00
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
2016-12-09 18:02:18 +00:00
2015-06-14 22:59:21 +02:00
events . Default . Log ( events . ItemStarted , map [ string ] string {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-04-14 20:59:06 +09:00
"item" : file . Name ,
"type" : "dir" ,
"action" : "delete" ,
2015-02-01 17:31:19 +00:00
} )
2016-12-09 18:02:18 +00:00
2015-02-01 17:31:19 +00:00
defer func ( ) {
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-02-01 17:31:19 +00:00
"item" : file . Name ,
2015-05-27 11:14:39 +02:00
"error" : events . Error ( err ) ,
2015-04-14 20:59:06 +09:00
"type" : "dir" ,
"action" : "delete" ,
2015-02-01 17:31:19 +00:00
} )
} ( )
2017-12-07 08:42:03 +00:00
err = f . deleteDir ( file . Name , ignores , scanChan )
2015-05-23 23:55:50 +02:00
2017-12-07 08:42:03 +00:00
if err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "delete dir" , file . Name , err )
2017-12-07 08:42:03 +00:00
return
2014-03-28 14:36:57 +01:00
}
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateDeleteDir }
2014-03-28 14:36:57 +01:00
}
2014-09-27 14:44:15 +02:00
// deleteFile attempts to delete the given file
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) deleteFile ( file protocol . FileInfo , dbUpdateChan chan <- dbUpdateJob ) {
2016-12-09 18:02:18 +00:00
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
2016-12-09 18:02:18 +00:00
2015-06-14 22:59:21 +02:00
events . Default . Log ( events . ItemStarted , map [ string ] string {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-04-14 20:59:06 +09:00
"item" : file . Name ,
"type" : "file" ,
"action" : "delete" ,
2015-02-01 17:31:19 +00:00
} )
2016-12-09 18:02:18 +00:00
2015-02-01 17:31:19 +00:00
defer func ( ) {
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-02-01 17:31:19 +00:00
"item" : file . Name ,
2015-05-27 11:14:39 +02:00
"error" : events . Error ( err ) ,
2015-04-14 20:59:06 +09:00
"type" : "file" ,
"action" : "delete" ,
2015-02-01 17:31:19 +00:00
} )
} ( )
2016-04-26 14:01:46 +00:00
cur , ok := f . model . CurrentFolderFile ( f . folderID , file . Name )
if ok && f . inConflict ( cur . Version , file . Version ) {
2015-04-09 12:53:41 +02:00
// There is a conflict here. Move the file to a conflict copy instead
// of deleting. Also merge with the version vector we had, to indicate
// we have resolved the conflict.
file . Version = file . Version . Merge ( cur . Version )
2017-05-25 10:26:41 +00:00
err = osutil . InWritableDir ( func ( name string ) error {
return f . moveForConflict ( name , file . ModifiedBy . String ( ) )
2017-08-19 14:36:56 +00:00
} , f . fs , file . Name )
2017-07-25 11:36:09 +02:00
} else if f . versioner != nil && ! cur . IsSymlink ( ) {
2017-08-19 14:36:56 +00:00
err = osutil . InWritableDir ( f . versioner . Archive , f . fs , file . Name )
2014-09-27 14:44:15 +02:00
} else {
2017-08-19 14:36:56 +00:00
err = osutil . InWritableDir ( f . fs . Remove , f . fs , file . Name )
2014-09-27 14:44:15 +02:00
}
2014-07-13 21:07:24 +02:00
2017-08-19 14:36:56 +00:00
if err == nil || fs . IsNotExist ( err ) {
2015-05-23 23:55:50 +02:00
// It was removed or it doesn't exist to start with
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateDeleteFile }
2017-08-19 14:36:56 +00:00
} else if _ , serr := f . fs . Lstat ( file . Name ) ; serr != nil && ! fs . IsPermission ( serr ) {
2015-05-23 23:55:50 +02:00
// We get an error just looking at the file, and it's not a permission
// problem. Lets assume the error is in fact some variant of "file
// does not exist" (possibly expressed as some parent being a file and
// not a directory etc) and that the delete is handled.
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateDeleteFile }
2015-05-23 23:55:50 +02:00
} else {
2017-09-23 15:22:26 +01:00
f . newError ( "delete file" , file . Name , err )
2014-05-28 11:45:45 +02:00
}
2014-09-27 14:44:15 +02:00
}
2014-05-28 11:45:45 +02:00
2014-12-19 23:12:12 +00:00
// renameFile attempts to rename an existing file to a destination
// and set the right attributes on it.
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) renameFile ( source , target protocol . FileInfo , dbUpdateChan chan <- dbUpdateJob ) {
2016-12-09 18:02:18 +00:00
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
2015-02-01 17:31:19 +00:00
var err error
2016-12-09 18:02:18 +00:00
2015-06-14 22:59:21 +02:00
events . Default . Log ( events . ItemStarted , map [ string ] string {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-04-14 20:59:06 +09:00
"item" : source . Name ,
"type" : "file" ,
"action" : "delete" ,
2015-02-01 17:31:19 +00:00
} )
2015-06-14 22:59:21 +02:00
events . Default . Log ( events . ItemStarted , map [ string ] string {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-04-14 20:59:06 +09:00
"item" : target . Name ,
"type" : "file" ,
"action" : "update" ,
2015-02-01 17:31:19 +00:00
} )
2016-12-09 18:02:18 +00:00
2015-02-01 17:31:19 +00:00
defer func ( ) {
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-02-01 17:31:19 +00:00
"item" : source . Name ,
2015-05-27 11:14:39 +02:00
"error" : events . Error ( err ) ,
2015-04-14 20:59:06 +09:00
"type" : "file" ,
"action" : "delete" ,
2015-02-01 17:31:19 +00:00
} )
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-02-01 17:31:19 +00:00
"item" : target . Name ,
2015-05-27 11:14:39 +02:00
"error" : events . Error ( err ) ,
2015-04-14 20:59:06 +09:00
"type" : "file" ,
"action" : "update" ,
2015-02-01 17:31:19 +00:00
} )
} ( )
2016-04-26 14:01:46 +00:00
l . Debugln ( f , "taking rename shortcut" , source . Name , "->" , target . Name )
2014-12-19 23:12:12 +00:00
2016-04-26 14:01:46 +00:00
if f . versioner != nil {
2017-08-19 14:36:56 +00:00
err = osutil . Copy ( f . fs , source . Name , target . Name )
2014-12-19 23:12:12 +00:00
if err == nil {
2017-08-19 14:36:56 +00:00
err = osutil . InWritableDir ( f . versioner . Archive , f . fs , source . Name )
2014-12-19 23:12:12 +00:00
}
} else {
2017-08-19 14:36:56 +00:00
err = osutil . TryRename ( f . fs , source . Name , target . Name )
2014-12-19 23:12:12 +00:00
}
2015-03-01 10:46:28 +01:00
if err == nil {
2017-11-09 21:16:29 +00:00
blockStatsMut . Lock ( )
blockStats [ "total" ] += len ( target . Blocks )
blockStats [ "renamed" ] += len ( target . Blocks )
blockStatsMut . Unlock ( )
2017-10-12 06:16:46 +00:00
2015-03-01 10:46:28 +01:00
// The file was renamed, so we have handled both the necessary delete
// of the source and the creation of the target. Fix-up the metadata,
// and update the local index of the target file.
2014-12-19 23:12:12 +00:00
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { source , dbUpdateDeleteFile }
2015-03-01 10:46:28 +01:00
2016-04-26 14:01:46 +00:00
err = f . shortcutFile ( target )
2015-03-01 10:46:28 +01:00
if err != nil {
2017-09-23 15:22:26 +01:00
err = fmt . Errorf ( "from %s: %s" , source . Name , err . Error ( ) )
f . newError ( "rename shortcut" , target . Name , err )
2015-03-01 10:46:28 +01:00
return
}
2015-06-16 12:12:34 +01:00
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { target , dbUpdateHandleFile }
2015-03-01 10:46:28 +01:00
} else {
// We failed the rename so we have a source file that we still need to
// get rid of. Attempt to delete it instead so that we make *some*
// progress. The target is unhandled.
2014-12-19 23:12:12 +00:00
2017-08-19 14:36:56 +00:00
err = osutil . InWritableDir ( f . fs . Remove , f . fs , source . Name )
2015-03-01 10:46:28 +01:00
if err != nil {
2017-09-23 15:22:26 +01:00
err = fmt . Errorf ( "from %s: %s" , source . Name , err . Error ( ) )
f . newError ( "rename delete" , target . Name , err )
2015-03-01 10:46:28 +01:00
return
}
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { source , dbUpdateDeleteFile }
2015-03-01 10:46:28 +01:00
}
2014-12-19 23:12:12 +00:00
}
2015-05-27 11:14:39 +02:00
// This is the flow of data and events here, I think...
//
// +-----------------------+
// | | - - - - > ItemStarted
// | handleFile | - - - - > ItemFinished (on shortcuts)
// | |
// +-----------------------+
// |
// | copyChan (copyBlocksState; unless shortcut taken)
// |
// | +-----------------------+
// | | +-----------------------+
// +--->| | |
// | | copierRoutine |
// +-| |
// +-----------------------+
// |
// | pullChan (sharedPullerState)
// |
// | +-----------------------+
// | | +-----------------------+
// +-->| | |
// | | pullerRoutine |
// +-| |
// +-----------------------+
// |
// | finisherChan (sharedPullerState)
// |
// | +-----------------------+
// | | |
// +-->| finisherRoutine | - - - - > ItemFinished
// | |
// +-----------------------+
2014-09-27 14:44:15 +02:00
// handleFile queues the copies and pulls as necessary for a single new or
// changed file.
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) handleFile ( file protocol . FileInfo , copyChan chan <- copyBlocksState , finisherChan chan <- * sharedPullerState , dbUpdateChan chan <- dbUpdateJob ) {
2016-04-26 14:01:46 +00:00
curFile , hasCurFile := f . model . CurrentFolderFile ( f . folderID , file . Name )
2014-03-28 14:36:57 +01:00
2018-01-14 14:30:11 +00:00
have , need := blockDiff ( curFile . Blocks , file . Blocks )
2017-01-04 21:04:13 +00:00
if hasCurFile && len ( need ) == 0 {
2014-09-27 14:44:15 +02:00
// We are supposed to copy the entire file, and then fetch nothing. We
// are only updating metadata, so we don't actually *need* to make the
// copy.
2016-04-26 14:01:46 +00:00
l . Debugln ( f , "taking shortcut on" , file . Name )
2015-06-14 22:56:41 +02:00
2015-06-14 22:59:21 +02:00
events . Default . Log ( events . ItemStarted , map [ string ] string {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-06-14 22:56:41 +02:00
"item" : file . Name ,
"type" : "file" ,
"action" : "metadata" ,
} )
2016-04-26 14:01:46 +00:00
f . queue . Done ( file . Name )
2015-06-14 22:56:41 +02:00
2016-12-09 18:02:18 +00:00
err := f . shortcutFile ( file )
2015-02-01 17:31:19 +00:00
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-02-01 17:31:19 +00:00
"item" : file . Name ,
2015-05-27 11:14:39 +02:00
"error" : events . Error ( err ) ,
2015-04-14 20:59:06 +09:00
"type" : "file" ,
2015-06-14 22:56:41 +02:00
"action" : "metadata" ,
2015-02-01 17:31:19 +00:00
} )
2015-06-16 12:12:34 +01:00
if err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "shortcut" , file . Name , err )
2015-06-18 11:55:04 +02:00
} else {
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { file , dbUpdateShortcutFile }
2015-06-16 12:12:34 +01:00
}
2014-09-27 14:44:15 +02:00
return
2014-03-28 14:36:57 +01:00
}
2017-09-02 05:52:38 +00:00
tempName := fs . TempName ( file . Name )
2015-10-29 09:08:03 +01:00
2018-01-14 14:30:11 +00:00
populateOffsets ( file . Blocks )
2014-10-17 23:16:29 +01:00
2018-01-14 21:52:41 +00:00
blocks := make ( [ ] protocol . BlockInfo , 0 , len ( file . Blocks ) )
2015-12-21 13:29:18 -05:00
var blocksSize int64
2018-01-14 21:52:41 +00:00
reused := make ( [ ] int32 , 0 , len ( file . Blocks ) )
2014-10-03 23:15:54 +01:00
// Check for an old temporary file which might have some blocks we could
// reuse.
2017-08-19 14:36:56 +00:00
tempBlocks , err := scanner . HashFile ( f . ctx , f . fs , tempName , protocol . BlockSize , nil , false )
2014-10-03 23:15:54 +01:00
if err == nil {
// Check for any reusable blocks in the temp file
2018-01-14 14:30:11 +00:00
tempCopyBlocks , _ := blockDiff ( tempBlocks , file . Blocks )
2014-10-03 23:15:54 +01:00
// block.String() returns a string unique to the block
2015-01-14 23:00:00 +00:00
existingBlocks := make ( map [ string ] struct { } , len ( tempCopyBlocks ) )
2014-10-03 23:15:54 +01:00
for _ , block := range tempCopyBlocks {
2015-01-14 23:00:00 +00:00
existingBlocks [ block . String ( ) ] = struct { } { }
2014-10-03 23:15:54 +01:00
}
2014-10-08 23:41:23 +01:00
// Since the blocks are already there, we don't need to get them.
2016-04-15 10:59:41 +00:00
for i , block := range file . Blocks {
2014-10-03 23:15:54 +01:00
_ , ok := existingBlocks [ block . String ( ) ]
if ! ok {
2014-10-08 23:41:23 +01:00
blocks = append ( blocks , block )
2015-12-21 13:29:18 -05:00
blocksSize += int64 ( block . Size )
2016-04-15 10:59:41 +00:00
} else {
reused = append ( reused , int32 ( i ) )
2014-10-03 23:15:54 +01:00
}
}
2014-10-12 21:38:22 +01:00
// The sharedpullerstate will know which flags to use when opening the
// temp file depending if we are reusing any blocks or not.
2016-04-15 10:59:41 +00:00
if len ( reused ) == 0 {
2014-10-03 23:15:54 +01:00
// Otherwise, discard the file ourselves in order for the
2015-04-28 18:34:55 +03:00
// sharedpuller not to panic when it fails to exclusively create a
2014-10-03 23:15:54 +01:00
// file which already exists
2017-08-19 14:36:56 +00:00
osutil . InWritableDir ( f . fs . Remove , f . fs , tempName )
2014-10-03 23:15:54 +01:00
}
2014-10-08 23:41:23 +01:00
} else {
2016-04-15 10:59:41 +00:00
// Copy the blocks, as we don't want to shuffle them on the FileInfo
blocks = append ( blocks , file . Blocks ... )
2016-07-04 10:40:29 +00:00
blocksSize = file . Size
2014-10-03 23:15:54 +01:00
}
2017-04-12 09:01:19 +00:00
if f . MinDiskFree . BaseValue ( ) > 0 {
2017-08-19 14:36:56 +00:00
if usage , err := f . fs . Usage ( "." ) ; err == nil && usage . Free < blocksSize {
l . Warnf ( ` Folder "%s": insufficient disk space in %s for %s: have %.2f MiB, need %.2f MiB ` , f . folderID , f . fs . URI ( ) , file . Name , float64 ( usage . Free ) / 1024 / 1024 , float64 ( blocksSize ) / 1024 / 1024 )
2017-09-23 15:22:26 +01:00
f . newError ( "disk space" , file . Name , errors . New ( "insufficient space" ) )
2015-12-21 13:29:18 -05:00
return
}
}
2016-04-15 10:59:41 +00:00
// Shuffle the blocks
for i := range blocks {
j := rand . Intn ( i + 1 )
blocks [ i ] , blocks [ j ] = blocks [ j ] , blocks [ i ]
}
2015-12-21 13:29:18 -05:00
events . Default . Log ( events . ItemStarted , map [ string ] string {
2016-04-26 14:01:46 +00:00
"folder" : f . folderID ,
2015-12-21 13:29:18 -05:00
"item" : file . Name ,
"type" : "file" ,
"action" : "update" ,
} )
2014-09-27 14:44:15 +02:00
s := sharedPullerState {
2016-04-15 10:59:41 +00:00
file : file ,
2017-08-19 14:36:56 +00:00
fs : f . fs ,
2016-04-26 14:01:46 +00:00
folder : f . folderID ,
2016-04-15 10:59:41 +00:00
tempName : tempName ,
2017-08-19 14:36:56 +00:00
realName : file . Name ,
2016-04-15 10:59:41 +00:00
copyTotal : len ( blocks ) ,
copyNeeded : len ( blocks ) ,
reused : len ( reused ) ,
updated : time . Now ( ) ,
available : reused ,
availableUpdated : time . Now ( ) ,
2018-02-25 09:39:00 +01:00
ignorePerms : f . IgnorePerms || file . NoPermissions ,
2017-08-22 06:42:09 +00:00
hasCurFile : hasCurFile ,
curFile : curFile ,
2016-04-15 10:59:41 +00:00
mut : sync . NewRWMutex ( ) ,
2016-12-21 11:23:20 +00:00
sparse : ! f . DisableSparseFiles ,
2016-05-22 10:16:09 +00:00
created : time . Now ( ) ,
2014-03-28 14:36:57 +01:00
}
2016-12-14 23:30:29 +00:00
l . Debugf ( "%v need file %s; copy %d, reused %v" , f , file . Name , len ( blocks ) , len ( reused ) )
2014-03-28 14:36:57 +01:00
2014-10-08 23:41:23 +01:00
cs := copyBlocksState {
sharedPullerState : & s ,
blocks : blocks ,
2017-01-04 21:04:13 +00:00
have : len ( have ) ,
2014-10-06 10:14:36 +02:00
}
2014-10-08 23:41:23 +01:00
copyChan <- cs
2014-03-28 14:36:57 +01:00
}
2018-01-14 14:30:11 +00:00
// blockDiff returns lists of common and missing (to transform src into tgt)
// blocks. Both block lists must have been created with the same block size.
2018-01-14 21:52:41 +00:00
func blockDiff ( src , tgt [ ] protocol . BlockInfo ) ( [ ] protocol . BlockInfo , [ ] protocol . BlockInfo ) {
if len ( tgt ) == 0 {
2018-01-14 14:30:11 +00:00
return nil , nil
}
2018-01-14 21:52:41 +00:00
if len ( src ) == 0 {
2018-01-14 14:30:11 +00:00
// Copy the entire file
return nil , tgt
}
2018-01-14 21:52:41 +00:00
have := make ( [ ] protocol . BlockInfo , 0 , len ( src ) )
need := make ( [ ] protocol . BlockInfo , 0 , len ( tgt ) )
2018-01-14 14:30:11 +00:00
for i := range tgt {
2018-01-14 21:52:41 +00:00
if i >= len ( src ) {
return have , append ( need , tgt [ i : ] ... )
}
if ! bytes . Equal ( tgt [ i ] . Hash , src [ i ] . Hash ) {
2018-01-14 14:30:11 +00:00
// Copy differing block
need = append ( need , tgt [ i ] )
} else {
have = append ( have , tgt [ i ] )
}
}
return have , need
}
// populateOffsets sets the Offset field on each block
func populateOffsets ( blocks [ ] protocol . BlockInfo ) {
var offset int64
for i := range blocks {
blocks [ i ] . Offset = offset
offset += int64 ( blocks [ i ] . Size )
}
}
2014-09-27 14:44:15 +02:00
// shortcutFile sets file mode and modification time, when that's the only
// thing that has changed.
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) shortcutFile ( file protocol . FileInfo ) error {
2018-02-25 09:39:00 +01:00
if ! f . IgnorePerms && ! file . NoPermissions {
2017-08-19 14:36:56 +00:00
if err := f . fs . Chmod ( file . Name , fs . FileMode ( file . Permissions & 0777 ) ) ; err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "shortcut chmod" , file . Name , err )
2015-05-13 14:57:29 +00:00
return err
2014-10-10 00:34:32 +02:00
}
2014-04-27 12:14:53 +02:00
}
2014-03-28 14:36:57 +01:00
2017-08-19 14:36:56 +00:00
f . fs . Chtimes ( file . Name , file . ModTime ( ) , file . ModTime ( ) ) // never fails
2014-03-28 14:36:57 +01:00
2015-04-09 12:53:41 +02:00
// This may have been a conflict. We should merge the version vectors so
// that our clock doesn't move backwards.
2016-04-26 14:01:46 +00:00
if cur , ok := f . model . CurrentFolderFile ( f . folderID , file . Name ) ; ok {
2015-04-09 12:53:41 +02:00
file . Version = file . Version . Merge ( cur . Version )
}
2015-05-13 14:57:29 +00:00
return nil
2014-03-28 14:36:57 +01:00
}
2014-10-08 23:41:23 +01:00
// copierRoutine reads copierStates until the in channel closes and performs
// the relevant copies when possible, or passes it to the puller routine.
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) copierRoutine ( in <- chan copyBlocksState , pullChan chan <- pullBlockState , out chan <- * sharedPullerState ) {
2014-09-29 23:01:17 +01:00
buf := make ( [ ] byte , protocol . BlockSize )
2014-03-28 14:36:57 +01:00
2014-09-27 14:44:15 +02:00
for state := range in {
dstFd , err := state . tempFile ( )
if err != nil {
2015-05-27 11:14:39 +02:00
// Nothing more to do for this failed file, since we couldn't create a temporary for it.
2015-01-07 23:12:12 +00:00
out <- state . sharedPullerState
continue
2014-11-16 23:18:59 +00:00
}
2016-04-26 14:01:46 +00:00
if f . model . progressEmitter != nil {
f . model . progressEmitter . Register ( state . sharedPullerState )
2015-05-27 11:14:39 +02:00
}
2017-08-19 14:36:56 +00:00
folderFilesystems := make ( map [ string ] fs . Filesystem )
2015-09-04 12:01:00 +02:00
var folders [ ] string
2016-04-26 14:01:46 +00:00
f . model . fmut . RLock ( )
for folder , cfg := range f . model . folderCfgs {
2017-08-19 14:36:56 +00:00
folderFilesystems [ folder ] = cfg . Filesystem ( )
2015-09-04 12:01:00 +02:00
folders = append ( folders , folder )
2014-11-09 19:03:56 +00:00
}
2016-04-26 14:01:46 +00:00
f . model . fmut . RUnlock ( )
2014-11-09 19:03:56 +00:00
2017-08-19 14:36:56 +00:00
var file fs . File
2016-12-14 23:30:29 +00:00
var weakHashFinder * weakhash . Finder
2017-01-04 21:04:13 +00:00
2017-02-06 10:27:11 +00:00
if weakhash . Enabled {
blocksPercentChanged := 0
if tot := len ( state . file . Blocks ) ; tot > 0 {
blocksPercentChanged = ( tot - state . have ) * 100 / tot
2016-12-14 23:30:29 +00:00
}
2017-02-06 10:27:11 +00:00
if blocksPercentChanged >= f . WeakHashThresholdPct {
hashesToFind := make ( [ ] uint32 , 0 , len ( state . blocks ) )
for _ , block := range state . blocks {
if block . WeakHash != 0 {
hashesToFind = append ( hashesToFind , block . WeakHash )
}
}
if len ( hashesToFind ) > 0 {
2017-08-19 14:36:56 +00:00
file , err = f . fs . Open ( state . file . Name )
if err == nil {
weakHashFinder , err = weakhash . NewFinder ( file , protocol . BlockSize , hashesToFind )
if err != nil {
l . Debugln ( "weak hasher" , err )
}
2017-02-06 10:27:11 +00:00
}
} else {
l . Debugf ( "not weak hashing %s. file did not contain any weak hashes" , state . file . Name )
}
} else {
l . Debugf ( "not weak hashing %s. not enough changed %.02f < %d" , state . file . Name , blocksPercentChanged , f . WeakHashThresholdPct )
2016-12-14 23:30:29 +00:00
}
2017-02-06 10:27:11 +00:00
} else {
l . Debugf ( "not weak hashing %s. weak hashing disabled" , state . file . Name )
2016-12-14 23:30:29 +00:00
}
2014-09-27 14:44:15 +02:00
for _ , block := range state . blocks {
2016-12-21 11:23:20 +00:00
if ! f . DisableSparseFiles && state . reused == 0 && block . IsEmpty ( ) {
2015-11-21 16:30:53 +01:00
// The block is a block of all zeroes, and we are not reusing
// a temp file, so there is no need to do anything with it.
// If we were reusing a temp file and had this block to copy,
// it would be because the block in the temp file was *not* a
// block of all zeroes, so then we should not skip it.
// Pretend we copied it.
state . copiedFromOrigin ( )
2018-01-11 10:36:35 +00:00
state . copyDone ( block )
2015-11-21 16:30:53 +01:00
continue
}
2018-02-24 08:33:54 +01:00
if s := int ( block . Size ) ; s > cap ( buf ) {
buf = make ( [ ] byte , s )
} else {
buf = buf [ : s ]
}
2014-10-08 23:41:23 +01:00
2016-12-14 23:30:29 +00:00
found , err := weakHashFinder . Iterate ( block . WeakHash , buf , func ( offset int64 ) bool {
2018-01-14 14:30:11 +00:00
if _ , err := verifyBuffer ( buf , block ) ; err != nil {
2016-12-14 23:30:29 +00:00
return true
2014-10-24 23:20:08 +01:00
}
2014-10-08 23:41:23 +01:00
_ , err = dstFd . WriteAt ( buf , block . Offset )
if err != nil {
2015-01-07 23:12:12 +00:00
state . fail ( "dst write" , err )
2016-12-14 23:30:29 +00:00
2014-10-08 23:41:23 +01:00
}
2016-12-14 23:30:29 +00:00
if offset == block . Offset {
2014-10-12 21:38:22 +01:00
state . copiedFromOrigin ( )
2016-12-14 23:30:29 +00:00
} else {
state . copiedFromOriginShifted ( )
2014-10-12 21:38:22 +01:00
}
2016-12-14 23:30:29 +00:00
return false
2014-10-08 23:41:23 +01:00
} )
2016-12-14 23:30:29 +00:00
if err != nil {
l . Debugln ( "weak hasher iter" , err )
}
if ! found {
2017-08-19 14:36:56 +00:00
found = f . model . finder . Iterate ( folders , block . Hash , func ( folder , path string , index int32 ) bool {
fs := folderFilesystems [ folder ]
fd , err := fs . Open ( path )
2016-12-14 23:30:29 +00:00
if err != nil {
return false
}
_ , err = fd . ReadAt ( buf , protocol . BlockSize * int64 ( index ) )
fd . Close ( )
if err != nil {
return false
}
2018-01-14 14:30:11 +00:00
hash , err := verifyBuffer ( buf , block )
2016-12-14 23:30:29 +00:00
if err != nil {
if hash != nil {
2017-08-19 14:36:56 +00:00
l . Debugf ( "Finder block mismatch in %s:%s:%d expected %q got %q" , folder , path , index , block . Hash , hash )
err = f . model . finder . Fix ( folder , path , index , block . Hash , hash )
2016-12-14 23:30:29 +00:00
if err != nil {
l . Warnln ( "finder fix:" , err )
}
} else {
l . Debugln ( "Finder failed to verify buffer" , err )
}
return false
}
_ , err = dstFd . WriteAt ( buf , block . Offset )
if err != nil {
state . fail ( "dst write" , err )
}
2017-08-19 14:36:56 +00:00
if path == state . file . Name {
2016-12-14 23:30:29 +00:00
state . copiedFromOrigin ( )
}
return true
} )
}
2014-10-08 23:41:23 +01:00
if state . failed ( ) != nil {
break
2014-08-27 07:00:15 +02:00
}
2014-09-27 14:44:15 +02:00
2014-10-24 23:20:08 +01:00
if ! found {
2014-10-08 23:41:23 +01:00
state . pullStarted ( )
ps := pullBlockState {
sharedPullerState : state . sharedPullerState ,
block : block ,
}
pullChan <- ps
2014-10-12 21:38:22 +01:00
} else {
2016-04-15 10:59:41 +00:00
state . copyDone ( block )
2014-05-25 20:49:08 +02:00
}
2014-05-19 23:42:08 +02:00
}
2017-08-19 14:36:56 +00:00
if file != nil {
// os.File used to return invalid argument if nil.
// fs.File panics as it's an interface.
file . Close ( )
}
2014-09-27 14:44:15 +02:00
out <- state . sharedPullerState
2014-03-28 14:36:57 +01:00
}
}
2018-01-14 14:30:11 +00:00
func verifyBuffer ( buf [ ] byte , block protocol . BlockInfo ) ( [ ] byte , error ) {
if len ( buf ) != int ( block . Size ) {
return nil , fmt . Errorf ( "length mismatch %d != %d" , len ( buf ) , block . Size )
}
hf := sha256 . New ( )
_ , err := hf . Write ( buf )
if err != nil {
return nil , err
}
hash := hf . Sum ( nil )
if ! bytes . Equal ( hash , block . Hash ) {
return hash , fmt . Errorf ( "hash mismatch %x != %x" , hash , block . Hash )
}
return hash , nil
}
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) pullerRoutine ( in <- chan pullBlockState , out chan <- * sharedPullerState ) {
2018-02-25 10:14:02 +01:00
requestLimiter := newByteSemaphore ( f . PullerMaxPendingKiB * 1024 )
wg := sync . NewWaitGroup ( )
2014-09-27 14:44:15 +02:00
for state := range in {
if state . failed ( ) != nil {
2015-05-27 11:14:39 +02:00
out <- state . sharedPullerState
2014-12-28 23:11:32 +00:00
continue
2014-09-27 14:44:15 +02:00
}
2014-07-24 09:38:16 +02:00
2018-02-25 10:14:02 +01:00
// The requestLimiter limits how many pending block requests we have
// ongoing at any given time, based on the size of the blocks
// themselves.
2014-08-05 09:46:11 +02:00
2018-02-25 10:14:02 +01:00
state := state
bytes := int ( state . block . Size )
2015-11-21 16:30:53 +01:00
2018-02-25 10:14:02 +01:00
requestLimiter . take ( bytes )
wg . Add ( 1 )
2014-08-05 09:46:11 +02:00
2018-02-25 10:14:02 +01:00
go func ( ) {
defer wg . Done ( )
defer requestLimiter . give ( bytes )
2014-07-24 09:38:16 +02:00
2018-02-25 10:14:02 +01:00
f . pullBlock ( state , out )
} ( )
}
wg . Wait ( )
}
2014-12-28 23:11:32 +00:00
2018-02-25 10:14:02 +01:00
func ( f * sendReceiveFolder ) pullBlock ( state pullBlockState , out chan <- * sharedPullerState ) {
// Get an fd to the temporary file. Technically we don't need it until
// after fetching the block, but if we run into an error here there is
// no point in issuing the request to the network.
fd , err := state . tempFile ( )
if err != nil {
out <- state . sharedPullerState
return
}
2014-12-28 23:11:32 +00:00
2018-02-25 10:14:02 +01:00
if ! f . DisableSparseFiles && state . reused == 0 && state . block . IsEmpty ( ) {
// There is no need to request a block of all zeroes. Pretend we
// requested it and handled it correctly.
state . pullDone ( state . block )
out <- state . sharedPullerState
return
}
var lastError error
candidates := f . model . Availability ( f . folderID , state . file . Name , state . file . Version , state . block )
for {
// Select the least busy device to pull the block from. If we found no
// feasible device at all, fail the block (and in the long run, the
// file).
selected , found := activity . leastBusy ( candidates )
if ! found {
if lastError != nil {
state . fail ( "pull" , lastError )
2014-12-28 23:11:32 +00:00
} else {
2018-02-25 10:14:02 +01:00
state . fail ( "pull" , errNoDevice )
2014-12-28 23:11:32 +00:00
}
break
}
2018-02-25 10:14:02 +01:00
candidates = removeAvailability ( candidates , selected )
// Fetch the block, while marking the selected device as in use so that
// leastBusy can select another device when someone else asks.
activity . using ( selected )
buf , lastError := f . model . requestGlobal ( selected . ID , f . folderID , state . file . Name , state . block . Offset , int ( state . block . Size ) , state . block . Hash , selected . FromTemporary )
activity . done ( selected )
if lastError != nil {
l . Debugln ( "request:" , f . folderID , state . file . Name , state . block . Offset , state . block . Size , "returned error:" , lastError )
continue
}
// Verify that the received block matches the desired hash, if not
// try pulling it from another device.
_ , lastError = verifyBuffer ( buf , state . block )
if lastError != nil {
l . Debugln ( "request:" , f . folderID , state . file . Name , state . block . Offset , state . block . Size , "hash mismatch" )
continue
}
// Save the block data we got from the cluster
_ , err = fd . WriteAt ( buf , state . block . Offset )
if err != nil {
state . fail ( "save" , err )
} else {
state . pullDone ( state . block )
}
break
2014-07-24 09:38:16 +02:00
}
2018-02-25 10:14:02 +01:00
out <- state . sharedPullerState
2014-03-28 14:36:57 +01:00
}
2014-04-27 12:14:53 +02:00
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) performFinish ( ignores * ignore . Matcher , state * sharedPullerState , dbUpdateChan chan <- dbUpdateJob , scanChan chan <- string ) error {
2014-11-29 23:18:56 +01:00
// Set the correct permission bits on the new file
2018-02-25 09:39:00 +01:00
if ! f . IgnorePerms && ! state . file . NoPermissions {
2017-08-19 14:36:56 +00:00
if err := f . fs . Chmod ( state . tempName , fs . FileMode ( state . file . Permissions & 0777 ) ) ; err != nil {
2015-05-27 11:14:39 +02:00
return err
2014-11-16 23:18:59 +00:00
}
2014-11-29 23:18:56 +01:00
}
2014-11-16 23:18:59 +00:00
2017-08-19 14:36:56 +00:00
if stat , err := f . fs . Lstat ( state . file . Name ) ; err == nil {
2015-08-08 12:44:17 +02:00
// There is an old file or directory already in place. We need to
// handle that.
2017-08-22 06:42:09 +00:00
curMode := uint32 ( stat . Mode ( ) )
if runtime . GOOS == "windows" && osutil . IsWindowsExecutable ( state . file . Name ) {
curMode |= 0111
}
2017-11-13 06:57:07 +00:00
// Check that the file on disk is what we expect it to be according
// to the database. If there's a mismatch here, there might be local
2017-08-22 06:42:09 +00:00
// changes that we don't know about yet and we should scan before
2017-11-13 06:57:07 +00:00
// touching the file. There is also a case where we think the file
// should be there, but it was removed, which is a conflict, yet
// creations always wins when competing with a deletion, so no need
// to handle that specially.
2017-11-13 15:16:27 +00:00
changed := false
switch {
case ! state . hasCurFile :
// The file appeared from nowhere
l . Debugln ( "file exists but not scanned; not finishing:" , state . file . Name )
changed = true
case stat . IsDir ( ) != state . curFile . IsDirectory ( ) || stat . IsSymlink ( ) != state . curFile . IsSymlink ( ) :
// The file changed type. IsRegular is implicitly tested in the condition above
l . Debugln ( "file type changed but not rescanned; not finishing:" , state . curFile . Name )
changed = true
case stat . IsRegular ( ) :
if ! stat . ModTime ( ) . Equal ( state . curFile . ModTime ( ) ) || stat . Size ( ) != state . curFile . Size {
2017-11-13 06:57:07 +00:00
l . Debugln ( "file modified but not rescanned; not finishing:" , state . curFile . Name )
2017-11-13 15:16:27 +00:00
changed = true
break
}
// check permissions
fallthrough
case stat . IsDir ( ) :
// Dirs only have perm, no modetime/size
2018-02-25 09:39:00 +01:00
if ! f . IgnorePerms && ! state . curFile . NoPermissions && state . curFile . HasPermissionBits ( ) && ! protocol . PermsEqual ( state . curFile . Permissions , curMode ) {
2017-11-13 15:16:27 +00:00
l . Debugln ( "file permission modified but not rescanned; not finishing:" , state . curFile . Name )
changed = true
2017-11-13 06:57:07 +00:00
}
2017-08-22 06:42:09 +00:00
}
2017-11-13 15:16:27 +00:00
if changed {
2017-12-07 08:42:03 +00:00
scanChan <- state . curFile . Name
2017-11-13 15:16:27 +00:00
return fmt . Errorf ( "file modified but not rescanned; will try again later" )
}
2015-08-08 12:44:17 +02:00
switch {
2017-04-01 09:04:11 +00:00
case stat . IsDir ( ) || stat . IsSymlink ( ) :
2015-08-08 12:44:17 +02:00
// It's a directory or a symlink. These are not versioned or
// archived for conflicts, only removed (which of course fails for
// non-empty directories).
2017-12-07 08:42:03 +00:00
if err = f . deleteDir ( state . file . Name , ignores , scanChan ) ; err != nil {
2015-08-08 12:44:17 +02:00
return err
}
2017-08-22 06:42:09 +00:00
case f . inConflict ( state . curFile . Version , state . file . Version ) :
2015-08-08 12:44:17 +02:00
// The new file has been changed in conflict with the existing one. We
// should file it away as a conflict instead of just removing or
// archiving. Also merge with the version vector we had, to indicate
// we have resolved the conflict.
2017-08-22 06:42:09 +00:00
state . file . Version = state . file . Version . Merge ( state . curFile . Version )
2017-05-25 10:26:41 +00:00
err = osutil . InWritableDir ( func ( name string ) error {
return f . moveForConflict ( name , state . file . ModifiedBy . String ( ) )
2017-08-19 14:36:56 +00:00
} , f . fs , state . file . Name )
2017-05-25 10:26:41 +00:00
if err != nil {
2015-08-08 12:44:17 +02:00
return err
}
2017-07-25 11:36:09 +02:00
case f . versioner != nil && ! state . file . IsSymlink ( ) :
2015-08-08 12:44:17 +02:00
// If we should use versioning, let the versioner archive the old
// file before we replace it. Archiving a non-existent file is not
// an error.
2018-01-01 14:39:23 +00:00
if err = osutil . InWritableDir ( f . versioner . Archive , f . fs , state . file . Name ) ; err != nil {
2015-08-08 12:44:17 +02:00
return err
}
}
2014-11-29 23:18:56 +01:00
}
2015-08-08 12:44:17 +02:00
2016-05-22 09:06:07 +00:00
// Replace the original content with the new one. If it didn't work,
// leave the temp file in place for reuse.
2017-08-19 14:36:56 +00:00
if err := osutil . TryRename ( f . fs , state . tempName , state . file . Name ) ; err != nil {
2015-05-27 11:14:39 +02:00
return err
2014-11-29 23:18:56 +01:00
}
2014-04-27 12:14:53 +02:00
2016-08-16 18:22:19 +00:00
// Set the correct timestamp on the new file
2017-08-19 14:36:56 +00:00
f . fs . Chtimes ( state . file . Name , state . file . ModTime ( ) , state . file . ModTime ( ) ) // never fails
2016-08-16 18:22:19 +00:00
2014-11-29 23:18:56 +01:00
// Record the updated file in the index
2017-12-07 08:42:03 +00:00
dbUpdateChan <- dbUpdateJob { state . file , dbUpdateHandleFile }
2015-05-27 11:14:39 +02:00
return nil
2014-11-16 23:18:59 +00:00
}
2014-11-09 04:26:52 +00:00
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) finisherRoutine ( ignores * ignore . Matcher , in <- chan * sharedPullerState , dbUpdateChan chan <- dbUpdateJob , scanChan chan <- string ) {
2014-11-16 23:18:59 +00:00
for state := range in {
2014-11-29 23:18:56 +01:00
if closed , err := state . finalClose ( ) ; closed {
2016-04-26 15:11:19 +00:00
l . Debugln ( f , "closing" , state . file . Name )
2014-11-29 23:18:56 +01:00
2016-04-26 15:11:19 +00:00
f . queue . Done ( state . file . Name )
2015-05-27 11:14:39 +02:00
if err == nil {
2017-12-07 08:42:03 +00:00
err = f . performFinish ( ignores , state , dbUpdateChan , scanChan )
2015-01-07 23:12:12 +00:00
}
2015-05-27 11:14:39 +02:00
if err != nil {
2017-09-23 15:22:26 +01:00
f . newError ( "finisher" , state . file . Name , err )
2017-10-12 06:16:46 +00:00
} else {
2017-11-09 21:16:29 +00:00
blockStatsMut . Lock ( )
blockStats [ "total" ] += state . reused + state . copyTotal + state . pullTotal
blockStats [ "reused" ] += state . reused
blockStats [ "pulled" ] += state . pullTotal
// copyOriginShifted is counted towards copyOrigin due to progress bar reasons
// for reporting reasons we want to separate these.
blockStats [ "copyOrigin" ] += state . copyOrigin - state . copyOriginShifted
blockStats [ "copyOriginShifted" ] += state . copyOriginShifted
blockStats [ "copyElsewhere" ] += state . copyTotal - state . copyOrigin
blockStatsMut . Unlock ( )
2015-05-27 11:14:39 +02:00
}
2017-09-23 15:22:26 +01:00
2015-05-27 11:14:39 +02:00
events . Default . Log ( events . ItemFinished , map [ string ] interface { } {
2016-04-26 15:11:19 +00:00
"folder" : f . folderID ,
2015-05-27 11:14:39 +02:00
"item" : state . file . Name ,
"error" : events . Error ( err ) ,
"type" : "file" ,
"action" : "update" ,
} )
2016-04-26 15:11:19 +00:00
if f . model . progressEmitter != nil {
f . model . progressEmitter . Deregister ( state )
2014-11-29 23:18:56 +01:00
}
2014-04-27 12:14:53 +02:00
}
}
2014-09-27 14:44:15 +02:00
}
2014-04-27 12:14:53 +02:00
Add job queue (fixes #629)
Request to terminate currently ongoing downloads and jump to the bumped file
incoming in 3, 2, 1.
Also, has a slightly strange effect where we pop a job off the queue, but
the copyChannel is still busy and blocks, though it gets moved to the
progress slice in the jobqueue, and looks like it's in progress which it isn't
as it's waiting to be picked up from the copyChan.
As a result, the progress emitter doesn't register on the task, and hence the file
doesn't have a progress bar, but cannot be replaced by a bump.
I guess I can fix progress bar issue by moving the progressEmiter.Register just
before passing the file to the copyChan, but then we are back to the initial
problem of a file with a progress bar, but no progress happening as it's stuck
on write to copyChan
I checked if there is a way to check for channel writeability (before popping)
but got struck by lightning just for bringing the idea up in #go-nuts.
My ideal scenario would be to check if copyChan is writeable, pop job from the
queue and shove it down handleFile. This way jobs would stay in the queue while
they cannot be handled, meaning that the `Bump` could bring your file up higher.
2014-12-01 19:23:06 +00:00
// Moves the given filename to the front of the job queue
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) BringToFront ( filename string ) {
2016-04-26 15:11:19 +00:00
f . queue . BringToFront ( filename )
Add job queue (fixes #629)
Request to terminate currently ongoing downloads and jump to the bumped file
incoming in 3, 2, 1.
Also, has a slightly strange effect where we pop a job off the queue, but
the copyChannel is still busy and blocks, though it gets moved to the
progress slice in the jobqueue, and looks like it's in progress which it isn't
as it's waiting to be picked up from the copyChan.
As a result, the progress emitter doesn't register on the task, and hence the file
doesn't have a progress bar, but cannot be replaced by a bump.
I guess I can fix progress bar issue by moving the progressEmiter.Register just
before passing the file to the copyChan, but then we are back to the initial
problem of a file with a progress bar, but no progress happening as it's stuck
on write to copyChan
I checked if there is a way to check for channel writeability (before popping)
but got struck by lightning just for bringing the idea up in #go-nuts.
My ideal scenario would be to check if copyChan is writeable, pop job from the
queue and shove it down handleFile. This way jobs would stay in the queue while
they cannot be handled, meaning that the `Bump` could bring your file up higher.
2014-12-01 19:23:06 +00:00
}
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) Jobs ( ) ( [ ] string , [ ] string ) {
2016-04-26 15:11:19 +00:00
return f . queue . Jobs ( )
Add job queue (fixes #629)
Request to terminate currently ongoing downloads and jump to the bumped file
incoming in 3, 2, 1.
Also, has a slightly strange effect where we pop a job off the queue, but
the copyChannel is still busy and blocks, though it gets moved to the
progress slice in the jobqueue, and looks like it's in progress which it isn't
as it's waiting to be picked up from the copyChan.
As a result, the progress emitter doesn't register on the task, and hence the file
doesn't have a progress bar, but cannot be replaced by a bump.
I guess I can fix progress bar issue by moving the progressEmiter.Register just
before passing the file to the copyChan, but then we are back to the initial
problem of a file with a progress bar, but no progress happening as it's stuck
on write to copyChan
I checked if there is a way to check for channel writeability (before popping)
but got struck by lightning just for bringing the idea up in #go-nuts.
My ideal scenario would be to check if copyChan is writeable, pop job from the
queue and shove it down handleFile. This way jobs would stay in the queue while
they cannot be handled, meaning that the `Bump` could bring your file up higher.
2014-12-01 19:23:06 +00:00
}
2015-04-05 15:34:29 +02:00
// dbUpdaterRoutine aggregates db updates and commits them in batches no
// larger than 1000 items, and no more delayed than 2 seconds.
2017-12-07 08:42:03 +00:00
func ( f * sendReceiveFolder ) dbUpdaterRoutine ( dbUpdateChan <- chan dbUpdateJob ) {
2017-08-31 10:47:39 +02:00
const maxBatchTime = 2 * time . Second
2015-04-05 15:34:29 +02:00
2017-08-31 10:47:39 +02:00
batch := make ( [ ] dbUpdateJob , 0 , maxBatchSizeFiles )
files := make ( [ ] protocol . FileInfo , 0 , maxBatchSizeFiles )
2015-04-05 15:34:29 +02:00
tick := time . NewTicker ( maxBatchTime )
defer tick . Stop ( )
2017-08-19 14:36:56 +00:00
changedDirs := make ( map [ string ] struct { } )
2016-11-21 18:09:29 +01:00
2015-06-16 12:12:34 +01:00
handleBatch := func ( ) {
found := false
var lastFile protocol . FileInfo
for _ , job := range batch {
files = append ( files , job . file )
2017-08-19 14:36:56 +00:00
switch job . jobType {
case dbUpdateHandleFile , dbUpdateShortcutFile :
changedDirs [ filepath . Dir ( job . file . Name ) ] = struct { } { }
case dbUpdateHandleDir :
changedDirs [ job . file . Name ] = struct { } { }
2017-11-11 19:18:17 +00:00
case dbUpdateHandleSymlink , dbUpdateInvalidate :
// fsyncing symlinks is only supported by MacOS
// and invalidated files are db only changes -> no sync
2016-11-21 18:09:29 +01:00
}
2017-08-19 14:36:56 +00:00
2015-06-16 12:12:34 +01:00
if job . file . IsInvalid ( ) || ( job . file . IsDirectory ( ) && ! job . file . IsSymlink ( ) ) {
continue
}
if job . jobType & ( dbUpdateHandleFile | dbUpdateDeleteFile ) == 0 {
continue
}
found = true
lastFile = job . file
}
2017-08-19 14:36:56 +00:00
// sync directories
for dir := range changedDirs {
delete ( changedDirs , dir )
fd , err := f . fs . Open ( dir )
if err != nil {
2017-10-21 22:00:46 +00:00
l . Debugf ( "fsync %q failed: %v" , dir , err )
2017-08-19 14:36:56 +00:00
continue
}
if err := fd . Sync ( ) ; err != nil {
2017-10-21 22:00:46 +00:00
l . Debugf ( "fsync %q failed: %v" , dir , err )
2017-08-19 14:36:56 +00:00
}
fd . Close ( )
2016-11-21 18:09:29 +01:00
}
2016-05-19 00:19:26 +00:00
// All updates to file/folder objects that originated remotely
// (across the network) use this call to updateLocals
f . model . updateLocalsFromPulling ( f . folderID , files )
2015-06-16 12:12:34 +01:00
if found {
2016-04-26 15:11:19 +00:00
f . model . receivedFile ( f . folderID , lastFile )
2015-06-16 12:12:34 +01:00
}
batch = batch [ : 0 ]
files = files [ : 0 ]
}
2017-08-31 10:47:39 +02:00
batchSizeBytes := 0
2015-04-05 15:34:29 +02:00
loop :
for {
select {
2017-12-07 08:42:03 +00:00
case job , ok := <- dbUpdateChan :
2015-04-05 15:34:29 +02:00
if ! ok {
break loop
}
2016-07-29 19:54:24 +00:00
job . file . Sequence = 0
2015-06-16 12:12:34 +01:00
batch = append ( batch , job )
2015-04-05 15:34:29 +02:00
2017-08-31 10:47:39 +02:00
batchSizeBytes += job . file . ProtoSize ( )
if len ( batch ) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
2015-06-16 12:12:34 +01:00
handleBatch ( )
2017-08-31 10:47:39 +02:00
batchSizeBytes = 0
2015-04-05 15:34:29 +02:00
}
case <- tick . C :
if len ( batch ) > 0 {
2015-06-16 12:12:34 +01:00
handleBatch ( )
2017-08-31 10:49:17 +02:00
batchSizeBytes = 0
2015-04-05 15:34:29 +02:00
}
}
}
if len ( batch ) > 0 {
2015-06-16 12:12:34 +01:00
handleBatch ( )
2015-04-05 15:34:29 +02:00
}
}
2017-12-07 08:42:03 +00:00
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
// scheduled once when scanChan is closed (scanning can not happen during pulling).
func ( f * sendReceiveFolder ) pullScannerRoutine ( scanChan <- chan string ) {
toBeScanned := make ( map [ string ] struct { } )
for path := range scanChan {
toBeScanned [ path ] = struct { } { }
}
if len ( toBeScanned ) != 0 {
scanList := make ( [ ] string , 0 , len ( toBeScanned ) )
for path := range toBeScanned {
l . Debugln ( f , "scheduling scan after pulling for" , path )
scanList = append ( scanList , path )
}
f . Scan ( scanList )
}
}
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) inConflict ( current , replacement protocol . Vector ) bool {
2015-04-09 12:53:41 +02:00
if current . Concurrent ( replacement ) {
// Obvious case
return true
}
2018-02-25 09:39:00 +01:00
if replacement . Counter ( f . shortID ) > current . Counter ( f . shortID ) {
2015-04-09 12:53:41 +02:00
// The replacement file contains a higher version for ourselves than
// what we have. This isn't supposed to be possible, since it's only
// we who can increment that counter. We take it as a sign that
// something is wrong (our index may have been corrupted or removed)
// and flag it as a conflict.
return true
}
return false
}
2016-04-15 10:59:41 +00:00
func removeAvailability ( availabilities [ ] Availability , availability Availability ) [ ] Availability {
for i := range availabilities {
if availabilities [ i ] == availability {
availabilities [ i ] = availabilities [ len ( availabilities ) - 1 ]
return availabilities [ : len ( availabilities ) - 1 ]
2014-12-28 23:11:32 +00:00
}
}
2016-04-15 10:59:41 +00:00
return availabilities
2014-12-28 23:11:32 +00:00
}
2015-03-29 16:16:36 +02:00
2017-05-25 10:26:41 +00:00
func ( f * sendReceiveFolder ) moveForConflict ( name string , lastModBy string ) error {
2016-01-03 21:15:02 +01:00
if strings . Contains ( filepath . Base ( name ) , ".sync-conflict-" ) {
l . Infoln ( "Conflict for" , name , "which is already a conflict copy; not copying again." )
2017-08-19 14:36:56 +00:00
if err := f . fs . Remove ( name ) ; err != nil && ! fs . IsNotExist ( err ) {
2016-01-03 21:15:02 +01:00
return err
}
return nil
}
2016-12-21 11:23:20 +00:00
if f . MaxConflicts == 0 {
2017-08-19 14:36:56 +00:00
if err := f . fs . Remove ( name ) ; err != nil && ! fs . IsNotExist ( err ) {
2015-10-13 20:50:58 +01:00
return err
}
return nil
}
2015-03-29 16:16:36 +02:00
ext := filepath . Ext ( name )
withoutExt := name [ : len ( name ) - len ( ext ) ]
2017-05-25 10:26:41 +00:00
newName := withoutExt + time . Now ( ) . Format ( ".sync-conflict-20060102-150405-" ) + lastModBy + ext
2017-08-19 14:36:56 +00:00
err := f . fs . Rename ( name , newName )
if fs . IsNotExist ( err ) {
2015-04-28 11:33:54 +02:00
// We were supposed to move a file away but it does not exist. Either
// the user has already moved it away, or the conflict was between a
// remote modification and a local delete. In either way it does not
// matter, go ahead as if the move succeeded.
2015-10-13 20:50:58 +01:00
err = nil
}
2016-12-21 11:23:20 +00:00
if f . MaxConflicts > - 1 {
2017-08-19 14:36:56 +00:00
matches , gerr := f . fs . Glob ( withoutExt + ".sync-conflict-????????-??????*" + ext )
2016-12-21 11:23:20 +00:00
if gerr == nil && len ( matches ) > f . MaxConflicts {
2015-10-13 20:50:58 +01:00
sort . Sort ( sort . Reverse ( sort . StringSlice ( matches ) ) )
2016-12-21 11:23:20 +00:00
for _ , match := range matches [ f . MaxConflicts : ] {
2017-08-19 14:36:56 +00:00
gerr = f . fs . Remove ( match )
2015-10-13 20:50:58 +01:00
if gerr != nil {
2016-04-26 15:11:19 +00:00
l . Debugln ( f , "removing extra conflict" , gerr )
2015-10-13 20:50:58 +01:00
}
}
} else if gerr != nil {
2016-04-26 15:11:19 +00:00
l . Debugln ( f , "globbing for conflicts" , gerr )
2015-10-13 20:50:58 +01:00
}
2015-04-28 11:33:54 +02:00
}
return err
2015-03-29 16:16:36 +02:00
}
2015-06-26 13:31:30 +02:00
2017-09-23 15:22:26 +01:00
func ( f * sendReceiveFolder ) newError ( context , path string , err error ) {
2016-04-26 15:11:19 +00:00
f . errorsMut . Lock ( )
defer f . errorsMut . Unlock ( )
2015-06-26 13:31:30 +02:00
// We might get more than one error report for a file (i.e. error on
// Write() followed by Close()); we keep the first error as that is
// probably closer to the root cause.
2016-04-26 15:11:19 +00:00
if _ , ok := f . errors [ path ] ; ok {
2015-06-26 13:31:30 +02:00
return
}
2017-11-22 08:05:27 +00:00
l . Infof ( "Puller (folder %s, file %q): %s: %v" , f . Description ( ) , path , context , err )
2017-09-23 15:22:26 +01:00
f . errors [ path ] = fmt . Sprintf ( "%s: %s" , context , err . Error ( ) )
2015-06-26 13:31:30 +02:00
}
2016-12-16 22:23:35 +00:00
func ( f * sendReceiveFolder ) clearErrors ( ) {
2016-04-26 15:11:19 +00:00
f . errorsMut . Lock ( )
f . errors = make ( map [ string ] string )
f . errorsMut . Unlock ( )
2015-06-26 13:31:30 +02:00
}
2018-01-14 17:01:06 +00:00
func ( f * sendReceiveFolder ) PullErrors ( ) [ ] FileError {
2016-04-26 15:11:19 +00:00
f . errorsMut . Lock ( )
2018-01-14 17:01:06 +00:00
errors := make ( [ ] FileError , 0 , len ( f . errors ) )
2016-04-26 15:11:19 +00:00
for path , err := range f . errors {
2018-01-14 17:01:06 +00:00
errors = append ( errors , FileError { path , err } )
2015-06-26 13:31:30 +02:00
}
sort . Sort ( fileErrorList ( errors ) )
2016-04-26 15:11:19 +00:00
f . errorsMut . Unlock ( )
2015-06-26 13:31:30 +02:00
return errors
}
2017-11-07 06:59:35 +00:00
func ( f * sendReceiveFolder ) basePause ( ) time . Duration {
if f . PullerPauseS == 0 {
return defaultPullerPause
}
return time . Duration ( f . PullerPauseS ) * time . Second
}
2017-12-07 08:42:03 +00:00
// deleteDir attempts to delete a directory. It checks for files/dirs inside
// the directory and removes them if possible or returns an error if it fails
func ( f * sendReceiveFolder ) deleteDir ( dir string , ignores * ignore . Matcher , scanChan chan <- string ) error {
files , _ := f . fs . DirNames ( dir )
toBeDeleted := make ( [ ] string , 0 , len ( files ) )
hasIgnored := false
hasKnown := false
hasToBeScanned := false
for _ , dirFile := range files {
fullDirFile := filepath . Join ( dir , dirFile )
if fs . IsTemporary ( dirFile ) || ignores . Match ( fullDirFile ) . IsDeletable ( ) {
toBeDeleted = append ( toBeDeleted , fullDirFile )
} else if ignores != nil && ignores . Match ( fullDirFile ) . IsIgnored ( ) {
hasIgnored = true
} else if cf , ok := f . model . CurrentFolderFile ( f . ID , fullDirFile ) ; ! ok || cf . IsDeleted ( ) || cf . IsInvalid ( ) {
// Something appeared in the dir that we either are not
// aware of at all, that we think should be deleted or that
// is invalid, but not currently ignored -> schedule scan
scanChan <- fullDirFile
hasToBeScanned = true
} else {
// Dir contains file that is valid according to db and
// not ignored -> something weird is going on
hasKnown = true
}
}
if hasToBeScanned {
return errDirHasToBeScanned
}
if hasIgnored {
return errDirHasIgnored
}
if hasKnown {
return errDirNotEmpty
}
for _ , del := range toBeDeleted {
f . fs . RemoveAll ( del )
}
err := osutil . InWritableDir ( f . fs . Remove , f . fs , dir )
if err == nil || fs . IsNotExist ( err ) {
// It was removed or it doesn't exist to start with
return nil
}
if _ , serr := f . fs . Lstat ( dir ) ; serr != nil && ! fs . IsPermission ( serr ) {
// We get an error just looking at the directory, and it's not a
// permission problem. Lets assume the error is in fact some variant
// of "file does not exist" (possibly expressed as some parent being a
// file and not a directory etc) and that the delete is handled.
return nil
}
return err
}
2018-01-14 17:01:06 +00:00
// A []FileError is sent as part of an event and will be JSON serialized.
type FileError struct {
2015-06-26 13:31:30 +02:00
Path string ` json:"path" `
Err string ` json:"error" `
}
2018-01-14 17:01:06 +00:00
type fileErrorList [ ] FileError
2015-06-26 13:31:30 +02:00
func ( l fileErrorList ) Len ( ) int {
return len ( l )
}
func ( l fileErrorList ) Less ( a , b int ) bool {
return l [ a ] . Path < l [ b ] . Path
}
func ( l fileErrorList ) Swap ( a , b int ) {
l [ a ] , l [ b ] = l [ b ] , l [ a ]
}
2016-08-05 07:13:52 +00:00
2016-12-13 10:24:10 +00:00
// byComponentCount sorts by the number of path components in Name, that is
// "x/y" sorts before "foo/bar/baz".
type byComponentCount [ ] protocol . FileInfo
func ( l byComponentCount ) Len ( ) int {
return len ( l )
}
func ( l byComponentCount ) Less ( a , b int ) bool {
return componentCount ( l [ a ] . Name ) < componentCount ( l [ b ] . Name )
}
func ( l byComponentCount ) Swap ( a , b int ) {
l [ a ] , l [ b ] = l [ b ] , l [ a ]
}
func componentCount ( name string ) int {
count := 0
for _ , codepoint := range name {
2017-08-19 14:36:56 +00:00
if codepoint == fs . PathSeparator {
2016-12-13 10:24:10 +00:00
count ++
}
}
return count
}
2018-02-25 10:14:02 +01:00
type byteSemaphore struct {
max int
available int
mut stdsync . Mutex
cond * stdsync . Cond
}
func newByteSemaphore ( max int ) * byteSemaphore {
s := byteSemaphore {
max : max ,
available : max ,
}
s . cond = stdsync . NewCond ( & s . mut )
return & s
}
func ( s * byteSemaphore ) take ( bytes int ) {
if bytes > s . max {
panic ( "bug: more than max bytes will never be available" )
}
s . mut . Lock ( )
for bytes > s . available {
s . cond . Wait ( )
}
s . available -= bytes
s . mut . Unlock ( )
}
func ( s * byteSemaphore ) give ( bytes int ) {
s . mut . Lock ( )
if s . available + bytes > s . max {
panic ( "bug: can never give more than max" )
}
s . available += bytes
s . cond . Broadcast ( )
s . mut . Unlock ( )
}