Merge pull request #999 from piobpl/master

Showing detailed sync progress (fixes #476)
This commit is contained in:
Audrius Butkevicius 2014-11-25 20:55:12 +00:00
commit a70f3f12c5
14 changed files with 570 additions and 188 deletions

View File

@ -23,6 +23,7 @@ Marcin Dziadus <dziadus.marcin@gmail.com>
Michael Tilli <pyfisch@gmail.com>
Philippe Schommers <philippe@schommers.be>
Phill Luby <phill.luby@newredo.com>
Piotr Bejda <piotrb10@gmail.com>
Ryan Sullivan <kayoticsully@gmail.com>
Tully Robinson <tully@tojr.org>
Veeti Paananen <veeti.paananen@rojekti.fi>

View File

@ -116,6 +116,7 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http, $translate, $loca
$scope.seenError = '';
$scope.upgradeInfo = null;
$scope.stats = {};
$scope.progress = {};
$http.get(urlbase + "/lang").success(function (langs) {
// Find the first language in the list provided by the user's browser
@ -270,6 +271,55 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http, $translate, $loca
});
});
$scope.$on('DownloadProgress', function (event, arg) {
var stats = arg.data;
var progress = {};
for(var folder in stats){
refreshFolder(folder);
progress[folder] = {};
for(var file in stats[folder]){
var s = stats[folder][file];
var reused = Math.floor(100 * s.Reused / s.Total);
var copiedFromOrigin = Math.floor(100 * s.CopiedFromOrigin / s.Total);
var copiedFromElsewhere = Math.floor(100 * s.CopiedFromElsewhere / s.Total);
var pulled = Math.floor(100 * s.Pulled / s.Total);
var pulling = Math.floor(100 * s.Pulling / s.Total);
// We can do the following, because if s.Pulling > 0, than reused + copied + pulled < 100 because off rounding them down.
// We do this to show which files are currently being pulled
if (s.Pulling && pulling == 0) {
pulling = 1;
}
progress[folder][file] = {
Reused: reused,
CopiedFromOrigin: copiedFromOrigin,
CopiedFromElsewhere: copiedFromElsewhere,
Pulled: pulled,
Pulling: pulling,
BytesTotal: s.BytesTotal,
BytesDone: s.BytesDone,
};
}
}
for(var folder in $scope.progress){
var refresh = false;
if (!(folder in progress)) {
refresh = true;
refreshFolder(folder);
} else {
for(file in $scope.progress[folder]){
if (!(file in progress[folder])) {
refresh = true;
}
}
}
if (refresh) {
refreshNeed(folder);
}
}
$scope.progress = progress;
console.log("DownloadProgress", $scope.progress);
});
var debouncedFuncs = {};
function refreshFolder(folder) {
@ -394,6 +444,17 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http, $translate, $loca
});
}
function refreshNeed (folder) {
if ($scope.neededFolder == folder) {
$http.get(urlbase + "/need?folder=" + encodeURIComponent(folder)).success(function (data) {
if ($scope.neededFolder == folder) {
console.log("refreshNeed", folder, data);
$scope.needed = data;
}
});
}
}
var refreshDeviceStats = debounce(function () {
$http.get(urlbase + "/stats/device").success(function (data) {
$scope.stats = data;
@ -475,7 +536,7 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http, $translate, $loca
}
var pct = 100 * $scope.model[folder].inSyncBytes / $scope.model[folder].globalBytes;
return Math.floor(pct);
return Math.min(Math.floor(pct), 100);
};
$scope.deviceIcon = function (deviceCfg) {
@ -974,11 +1035,11 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http, $translate, $loca
};
$scope.showNeed = function (folder) {
$scope.neededLoaded = false;
$('#needed').modal();
$http.get(urlbase + "/need?folder=" + encodeURIComponent(folder)).success(function (data) {
$scope.needed = data;
$scope.neededLoaded = true;
$scope.neededFolder = folder;
refreshNeed(folder);
$('#needed').modal().result.finally(function(){
$scope.neededFolder = undefined;
$scope.needed = undefined;
});
};

View File

@ -765,9 +765,29 @@
<tr ng-repeat="f in needed" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.Name}}">{{f.Name | basename}}</td>
<td class="text-right small-data"><span ng-if="f.Size > 0">{{f.Size | binary}}B</span></td>
<td>
<span ng-if="a == 'sync' && progress[neededFolder] && progress[neededFolder][f.Name]">
<div class="progress progress-striped active">
<div class="progress-bar progress-bar-success" style="width: {{progress[neededFolder][f.Name].Reused}}%"></div>
<div class="progress-bar" style="width: {{progress[neededFolder][f.Name].CopiedFromOrigin}}%"></div>
<div class="progress-bar progress-bar-info" style="width: {{progress[neededFolder][f.Name].CopiedFromElsewhere}}%"></div>
<div class="progress-bar progress-bar-warning" style="width: {{progress[neededFolder][f.Name].Pulled}}%"></div>
<div class="progress-bar progress-bar-danger" style="width: {{progress[neededFolder][f.Name].Pulling}}%"></div>
<span class="show frontal">{{progress[neededFolder][f.Name].BytesDone | binary}}B / {{progress[neededFolder][f.Name].BytesTotal | binary}}B</span>
</div>
</span>
</td>
</tr>
</table>
<span translate>Legend:</span>
<div class="progress progress-striped active">
<div class="progress-bar progress-bar-success" style="width: 20%"><span translate class="show">Reused</span></div>
<div class="progress-bar" style="width: 20%"><span translate class="show">Copied from original</span></div>
<div class="progress-bar progress-bar-info" style="width: 20%"><span translate class="show">Copied from elsewhere</span></div>
<div class="progress-bar progress-bar-warning" style="width: 20%"><span translate class="show">Downloaded</span></div>
<div class="progress-bar progress-bar-danger" style="width: 20%"><span translate class="show">Downloading</span></div>
</div>
</modal>
<!-- About modal -->
@ -793,11 +813,11 @@
<li>Emil Hessman</li>
<li>Felix Ableitner</li>
<li>Felix Unterpaintner</li>
<li>Gilli Sigurdsson</li>
</ul>
</div>
<div class="col-md-6">
<ul>
<li>Gilli Sigurdsson</li>
<li>James Patterson</li>
<li>Jens Diemer</li>
<li>Jochen Voss</li>
@ -806,6 +826,7 @@
<li>Michael Tilli</li>
<li>Philippe Schommers</li>
<li>Phill Luby</li>
<li>Piotr Bejda</li>
<li>Ryan Sullivan</li>
<li>Tully Robinson</li>
<li>Veeti Paananen</li>

View File

@ -149,3 +149,19 @@ table.table-condensed td {
.dl-horizontal.dl-narrow dd {
margin-left: 60px;
}
/**
* Progress bars with centered text
*/
.progress {
margin-bottom: 0px;
position: relative;
}
.progress span.frontal {
text-align: center;
position: absolute;
display: block;
width: 100%;
}

File diff suppressed because one or more lines are too long

View File

@ -159,24 +159,25 @@ type FolderDeviceConfiguration struct {
}
type OptionsConfiguration struct {
ListenAddress []string `xml:"listenAddress" default:"0.0.0.0:22000"`
GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22026"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"`
LocalAnnPort int `xml:"localAnnouncePort" default:"21025"`
LocalAnnMCAddr string `xml:"localAnnounceMCAddr" default:"[ff32::5222]:21026"`
MaxSendKbps int `xml:"maxSendKbps"`
MaxRecvKbps int `xml:"maxRecvKbps"`
ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60"`
StartBrowser bool `xml:"startBrowser" default:"true"`
UPnPEnabled bool `xml:"upnpEnabled" default:"true"`
UPnPLease int `xml:"upnpLeaseMinutes" default:"0"`
UPnPRenewal int `xml:"upnpRenewalMinutes" default:"30"`
URAccepted int `xml:"urAccepted"` // Accepted usage reporting version; 0 for off (undecided), -1 for off (permanently)
RestartOnWakeup bool `xml:"restartOnWakeup" default:"true"`
AutoUpgradeIntervalH int `xml:"autoUpgradeIntervalH" default:"12"` // 0 for off
KeepTemporariesH int `xml:"keepTemporariesH" default:"24"` // 0 for off
CacheIgnoredFiles bool `xml:"cacheIgnoredFiles" default:"true"`
ListenAddress []string `xml:"listenAddress" default:"0.0.0.0:22000"`
GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22026"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"`
LocalAnnPort int `xml:"localAnnouncePort" default:"21025"`
LocalAnnMCAddr string `xml:"localAnnounceMCAddr" default:"[ff32::5222]:21026"`
MaxSendKbps int `xml:"maxSendKbps"`
MaxRecvKbps int `xml:"maxRecvKbps"`
ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60"`
StartBrowser bool `xml:"startBrowser" default:"true"`
UPnPEnabled bool `xml:"upnpEnabled" default:"true"`
UPnPLease int `xml:"upnpLeaseMinutes" default:"0"`
UPnPRenewal int `xml:"upnpRenewalMinutes" default:"30"`
URAccepted int `xml:"urAccepted"` // Accepted usage reporting version; 0 for off (undecided), -1 for off (permanently)
RestartOnWakeup bool `xml:"restartOnWakeup" default:"true"`
AutoUpgradeIntervalH int `xml:"autoUpgradeIntervalH" default:"12"` // 0 for off
KeepTemporariesH int `xml:"keepTemporariesH" default:"24"` // 0 for off
CacheIgnoredFiles bool `xml:"cacheIgnoredFiles" default:"true"`
ProgressUpdateIntervalS int `xml:"progressUpdateIntervalS" default:"5"`
Deprecated_RescanIntervalS int `xml:"rescanIntervalS,omitempty" json:"-"`
Deprecated_UREnabled bool `xml:"urEnabled,omitempty" json:"-"`

View File

@ -35,23 +35,24 @@ func init() {
func TestDefaultValues(t *testing.T) {
expected := OptionsConfiguration{
ListenAddress: []string{"0.0.0.0:22000"},
GlobalAnnServer: "announce.syncthing.net:22026",
GlobalAnnEnabled: true,
LocalAnnEnabled: true,
LocalAnnPort: 21025,
LocalAnnMCAddr: "[ff32::5222]:21026",
MaxSendKbps: 0,
MaxRecvKbps: 0,
ReconnectIntervalS: 60,
StartBrowser: true,
UPnPEnabled: true,
UPnPLease: 0,
UPnPRenewal: 30,
RestartOnWakeup: true,
AutoUpgradeIntervalH: 12,
KeepTemporariesH: 24,
CacheIgnoredFiles: true,
ListenAddress: []string{"0.0.0.0:22000"},
GlobalAnnServer: "announce.syncthing.net:22026",
GlobalAnnEnabled: true,
LocalAnnEnabled: true,
LocalAnnPort: 21025,
LocalAnnMCAddr: "[ff32::5222]:21026",
MaxSendKbps: 0,
MaxRecvKbps: 0,
ReconnectIntervalS: 60,
StartBrowser: true,
UPnPEnabled: true,
UPnPLease: 0,
UPnPRenewal: 30,
RestartOnWakeup: true,
AutoUpgradeIntervalH: 12,
KeepTemporariesH: 24,
CacheIgnoredFiles: true,
ProgressUpdateIntervalS: 5,
}
cfg := New(device1)
@ -136,23 +137,24 @@ func TestNoListenAddress(t *testing.T) {
func TestOverriddenValues(t *testing.T) {
expected := OptionsConfiguration{
ListenAddress: []string{":23000"},
GlobalAnnServer: "syncthing.nym.se:22026",
GlobalAnnEnabled: false,
LocalAnnEnabled: false,
LocalAnnPort: 42123,
LocalAnnMCAddr: "quux:3232",
MaxSendKbps: 1234,
MaxRecvKbps: 2341,
ReconnectIntervalS: 6000,
StartBrowser: false,
UPnPEnabled: false,
UPnPLease: 60,
UPnPRenewal: 15,
RestartOnWakeup: false,
AutoUpgradeIntervalH: 24,
KeepTemporariesH: 48,
CacheIgnoredFiles: false,
ListenAddress: []string{":23000"},
GlobalAnnServer: "syncthing.nym.se:22026",
GlobalAnnEnabled: false,
LocalAnnEnabled: false,
LocalAnnPort: 42123,
LocalAnnMCAddr: "quux:3232",
MaxSendKbps: 1234,
MaxRecvKbps: 2341,
ReconnectIntervalS: 6000,
StartBrowser: false,
UPnPEnabled: false,
UPnPLease: 60,
UPnPRenewal: 15,
RestartOnWakeup: false,
AutoUpgradeIntervalH: 24,
KeepTemporariesH: 48,
CacheIgnoredFiles: false,
ProgressUpdateIntervalS: 10,
}
cfg, err := Load("testdata/overridenvalues.xml", device1)

View File

@ -19,5 +19,6 @@
<autoUpgradeIntervalH>24</autoUpgradeIntervalH>
<keepTemporariesH>48</keepTemporariesH>
<cacheIgnoredFiles>false</cacheIgnoredFiles>
<progressUpdateIntervalS>10</progressUpdateIntervalS>
</options>
</configuration>

View File

@ -38,6 +38,7 @@ const (
StateChanged
FolderRejected
ConfigSaved
DownloadProgress
AllEvents = (1 << iota) - 1
)
@ -70,6 +71,8 @@ func (t EventType) String() string {
return "FolderRejected"
case ConfigSaved:
return "ConfigSaved"
case DownloadProgress:
return "DownloadProgress"
default:
return "Unknown"
}

View File

@ -82,9 +82,10 @@ type service interface {
}
type Model struct {
cfg *config.ConfigWrapper
db *leveldb.DB
finder *files.BlockFinder
cfg *config.ConfigWrapper
db *leveldb.DB
finder *files.BlockFinder
progressEmitter *ProgressEmitter
deviceName string
clientName string
@ -142,7 +143,9 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s
rawConn: make(map[protocol.DeviceID]io.Closer),
deviceVer: make(map[protocol.DeviceID]string),
finder: files.NewBlockFinder(db, cfg),
progressEmitter: NewProgressEmitter(cfg),
}
go m.progressEmitter.Serve()
var timeout = 20 * 60 // seconds
if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
@ -172,15 +175,16 @@ func (m *Model) StartFolderRW(folder string) {
panic("cannot start already running folder " + folder)
}
p := &Puller{
folder: folder,
dir: cfg.Path,
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
model: m,
ignorePerms: cfg.IgnorePerms,
lenientMtimes: cfg.LenientMtimes,
copiers: cfg.Copiers,
pullers: cfg.Pullers,
finishers: cfg.Finishers,
folder: folder,
dir: cfg.Path,
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
model: m,
ignorePerms: cfg.IgnorePerms,
lenientMtimes: cfg.LenientMtimes,
progressEmitter: m.progressEmitter,
copiers: cfg.Copiers,
pullers: cfg.Pullers,
finishers: cfg.Finishers,
}
m.folderRunners[folder] = p
m.fmut.Unlock()
@ -392,6 +396,7 @@ func (m *Model) NeedSize(folder string) (files int, bytes int64) {
return true
})
}
bytes -= m.progressEmitter.BytesCompleted(folder)
if debug {
l.Debugf("%v NeedSize(%q): %d %d", m, folder, files, bytes)
}

133
internal/model/progressemitter.go Executable file
View File

@ -0,0 +1,133 @@
package model
import (
"path/filepath"
"reflect"
"sync"
"time"
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events"
)
type ProgressEmitter struct {
registry map[string]*sharedPullerState
interval time.Duration
last map[string]map[string]*pullerProgress
mut sync.Mutex
timer *time.Timer
stop chan struct{}
}
// Creates a new progress emitter which emits DownloadProgress events every
// interval.
func NewProgressEmitter(cfg *config.ConfigWrapper) *ProgressEmitter {
t := &ProgressEmitter{
stop: make(chan struct{}),
registry: make(map[string]*sharedPullerState),
last: make(map[string]map[string]*pullerProgress),
timer: time.NewTimer(time.Millisecond),
}
t.Changed(cfg.Raw())
cfg.Subscribe(t)
return t
}
// Starts progress emitter which starts emitting DownloadProgress events as
// the progress happens.
func (t *ProgressEmitter) Serve() {
for {
select {
case <-t.stop:
if debug {
l.Debugln("progress emitter: stopping")
}
return
case <-t.timer.C:
if debug {
l.Debugln("progress emitter: timer - looking after", len(t.registry))
}
output := make(map[string]map[string]*pullerProgress)
t.mut.Lock()
for _, puller := range t.registry {
if output[puller.folder] == nil {
output[puller.folder] = make(map[string]*pullerProgress)
}
output[puller.folder][puller.file.Name] = puller.Progress()
}
if !reflect.DeepEqual(t.last, output) {
events.Default.Log(events.DownloadProgress, output)
t.last = output
if debug {
l.Debugf("progress emitter: emitting %#v", output)
}
} else if debug {
l.Debugln("progress emitter: nothing new")
}
if len(t.registry) != 0 {
t.timer.Reset(t.interval)
}
t.mut.Unlock()
}
}
}
// Interface method to handle configuration changes
func (t *ProgressEmitter) Changed(cfg config.Configuration) error {
t.mut.Lock()
defer t.mut.Unlock()
t.interval = time.Duration(cfg.Options.ProgressUpdateIntervalS) * time.Second
if debug {
l.Debugln("progress emitter: updated interval", t.interval)
}
return nil
}
// Stops the emitter.
func (t *ProgressEmitter) Stop() {
t.stop <- struct{}{}
}
// Register a puller with the emitter which will start broadcasting pullers
// progress.
func (t *ProgressEmitter) Register(s *sharedPullerState) {
t.mut.Lock()
defer t.mut.Unlock()
if debug {
l.Debugln("progress emitter: registering", s.folder, s.file.Name)
}
if len(t.registry) == 0 {
t.timer.Reset(t.interval)
}
t.registry[filepath.Join(s.folder, s.file.Name)] = s
}
// Deregister a puller which will stop boardcasting pullers state.
func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
t.mut.Lock()
defer t.mut.Unlock()
if debug {
l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
}
delete(t.registry, filepath.Join(s.folder, s.file.Name))
}
// Returns number of bytes completed in the given folder.
func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
t.mut.Lock()
defer t.mut.Unlock()
files, ok := t.last[folder]
if ok {
for _, s := range files {
bytes += s.BytesDone
}
}
if debug {
l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
}
return
}

View File

@ -0,0 +1,95 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package model
import (
"testing"
"time"
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events"
)
var timeout = 10 * time.Millisecond
func expectEvent(w *events.Subscription, t *testing.T, size int) {
event, err := w.Poll(timeout)
if err != nil {
t.Fatal("Unexpected error:", err)
}
if event.Type != events.DownloadProgress {
t.Fatal("Unexpected event:", event)
}
data := event.Data.(map[string]map[string]*pullerProgress)
if len(data) != size {
t.Fatal("Unexpected event data size:", data)
}
}
func expectTimeout(w *events.Subscription, t *testing.T){
_, err := w.Poll(timeout)
if err != events.ErrTimeout {
t.Fatal("Unexpected non-Timeout error:", err)
}
}
func TestProgressEmitter(t *testing.T) {
l.Debugln("test progress emitter")
w := events.Default.Subscribe(events.DownloadProgress)
c := config.Wrap("/tmp/test", config.Configuration{})
c.SetOptions(config.OptionsConfiguration{
ProgressUpdateIntervalS: 0,
})
p := NewProgressEmitter(c)
go p.Serve()
expectTimeout(w, t)
s := sharedPullerState{}
p.Register(&s)
expectEvent(w, t, 1)
expectTimeout(w, t)
s.copyDone()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.copiedFromOrigin()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.pullStarted()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.pullDone()
expectEvent(w, t, 1)
expectTimeout(w, t)
p.Deregister(&s)
expectEvent(w, t, 0)
expectTimeout(w, t)
}

View File

@ -65,17 +65,18 @@ var (
)
type Puller struct {
folder string
dir string
scanIntv time.Duration
model *Model
stop chan struct{}
versioner versioner.Versioner
ignorePerms bool
lenientMtimes bool
copiers int
pullers int
finishers int
folder string
dir string
scanIntv time.Duration
model *Model
stop chan struct{}
versioner versioner.Versioner
ignorePerms bool
lenientMtimes bool
progressEmitter *ProgressEmitter
copiers int
pullers int
finishers int
}
// Serve will run scans and pulls. It will return when Stop()ed or on a
@ -527,9 +528,9 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
folder: p.folder,
tempName: tempName,
realName: realName,
copyTotal: len(blocks),
copyNeeded: len(blocks),
reused: reused,
copyTotal: uint32(len(blocks)),
copyNeeded: uint32(len(blocks)),
reused: uint32(reused),
}
if debug {
@ -598,6 +599,10 @@ nextFile:
continue nextFile
}
if p.progressEmitter != nil {
p.progressEmitter.Register(state.sharedPullerState)
}
evictionChan := make(chan lfu.Eviction)
fdCache := lfu.New()
@ -737,101 +742,109 @@ nextBlock:
}
}
func (p *Puller) performFinish(state *sharedPullerState) {
if closed, err := state.finalClose(); closed {
if debug {
l.Debugln(p, "closing", state.file.Name)
}
if err != nil {
l.Warnln("puller: final:", err)
return
}
// Verify the file against expected hashes
fd, err := os.Open(state.tempName)
if err != nil {
l.Warnln("puller: final:", err)
return
}
err = scanner.Verify(fd, protocol.BlockSize, state.file.Blocks)
fd.Close()
if err != nil {
l.Infoln("puller:", state.file.Name, err, "(file changed during pull?)")
return
}
// Set the correct permission bits on the new file
if !p.ignorePerms {
err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
if err != nil {
l.Warnln("puller: final:", err)
return
}
}
// Set the correct timestamp on the new file
t := time.Unix(state.file.Modified, 0)
err = os.Chtimes(state.tempName, t, t)
if err != nil {
if p.lenientMtimes {
// We accept the failure with a warning here and allow the sync to
// continue. We'll sync the new mtime back to the other devices later.
// If they have the same problem & setting, we might never get in
// sync.
l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err)
} else {
l.Warnln("puller: final:", err)
return
}
}
// If we should use versioning, let the versioner archive the old
// file before we replace it. Archiving a non-existent file is not
// an error.
if p.versioner != nil {
err = p.versioner.Archive(state.realName)
if err != nil {
l.Warnln("puller: final:", err)
return
}
}
// If the target path is a symlink or a directory, we cannot copy
// over it, hence remove it before proceeding.
stat, err := os.Lstat(state.realName)
isLink, _ := symlinks.IsSymlink(state.realName)
if isLink || (err == nil && stat.IsDir()) {
osutil.InWritableDir(os.Remove, state.realName)
}
// Replace the original content with the new one
err = osutil.Rename(state.tempName, state.realName)
if err != nil {
l.Warnln("puller: final:", err)
return
}
// If it's a symlink, the target of the symlink is inside the file.
if state.file.IsSymlink() {
content, err := ioutil.ReadFile(state.realName)
if err != nil {
l.Warnln("puller: final: reading symlink:", err)
return
}
// Remove the file, and replace it with a symlink.
err = osutil.InWritableDir(func(path string) error {
os.Remove(path)
return symlinks.Create(path, string(content), state.file.Flags)
}, state.realName)
if err != nil {
l.Warnln("puller: final: creating symlink:", err)
return
}
}
// Record the updated file in the index
p.model.updateLocal(p.folder, state.file)
}
}
func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
for state := range in {
if closed, err := state.finalClose(); closed {
if debug {
l.Debugln(p, "closing", state.file.Name)
}
if err != nil {
l.Warnln("puller: final:", err)
continue
}
// Verify the file against expected hashes
fd, err := os.Open(state.tempName)
if err != nil {
l.Warnln("puller: final:", err)
continue
}
err = scanner.Verify(fd, protocol.BlockSize, state.file.Blocks)
fd.Close()
if err != nil {
l.Infoln("puller:", state.file.Name, err, "(file changed during pull?)")
continue
}
// Set the correct permission bits on the new file
if !p.ignorePerms {
err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
if err != nil {
l.Warnln("puller: final:", err)
continue
}
}
// Set the correct timestamp on the new file
t := time.Unix(state.file.Modified, 0)
err = os.Chtimes(state.tempName, t, t)
if err != nil {
if p.lenientMtimes {
// We accept the failure with a warning here and allow the sync to
// continue. We'll sync the new mtime back to the other devices later.
// If they have the same problem & setting, we might never get in
// sync.
l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err)
} else {
l.Warnln("puller: final:", err)
continue
}
}
// If we should use versioning, let the versioner archive the old
// file before we replace it. Archiving a non-existent file is not
// an error.
if p.versioner != nil {
err = p.versioner.Archive(state.realName)
if err != nil {
l.Warnln("puller: final:", err)
continue
}
}
// If the target path is a symlink or a directory, we cannot copy
// over it, hence remove it before proceeding.
stat, err := os.Lstat(state.realName)
isLink, _ := symlinks.IsSymlink(state.realName)
if isLink || (err == nil && stat.IsDir()) {
osutil.InWritableDir(os.Remove, state.realName)
}
// Replace the original content with the new one
err = osutil.Rename(state.tempName, state.realName)
if err != nil {
l.Warnln("puller: final:", err)
continue
}
// If it's a symlink, the target of the symlink is inside the file.
if state.file.IsSymlink() {
content, err := ioutil.ReadFile(state.realName)
if err != nil {
l.Warnln("puller: final: reading symlink:", err)
continue
}
// Remove the file, and replace it with a symlink.
err = osutil.InWritableDir(func(path string) error {
os.Remove(path)
return symlinks.Create(path, string(content), state.file.Flags)
}, state.realName)
if err != nil {
l.Warnln("puller: final: creating symlink:", err)
continue
}
}
// Record the updated file in the index
p.model.updateLocal(p.folder, state.file)
p.performFinish(state)
if state.closed && p.progressEmitter != nil {
p.progressEmitter.Deregister(state)
}
}
}

View File

@ -31,20 +31,32 @@ type sharedPullerState struct {
folder string
tempName string
realName string
reused int // Number of blocks reused from temporary file
reused uint32 // Number of blocks reused from temporary file
// Mutable, must be locked for access
err error // The first error we hit
fd *os.File // The fd of the temp file
copyTotal int // Total number of copy actions for the whole job
pullTotal int // Total number of pull actions for the whole job
copyNeeded int // Number of copy actions still pending
pullNeeded int // Number of block pulls still pending
copyOrigin int // Number of blocks copied from the original file
copyTotal uint32 // Total number of copy actions for the whole job
pullTotal uint32 // Total number of pull actions for the whole job
copyOrigin uint32 // Number of blocks copied from the original file
copyNeeded uint32 // Number of copy actions still pending
pullNeeded uint32 // Number of block pulls still pending
closed bool // Set when the file has been closed
mut sync.Mutex // Protects the above
}
// A momentary state representing the progress of the puller
type pullerProgress struct {
Total uint32
Reused uint32
CopiedFromOrigin uint32
CopiedFromElsewhere uint32
Pulled uint32
Pulling uint32
BytesDone int64
BytesTotal int64
}
// tempFile returns the fd for the temporary file, reusing an open fd
// or creating the file as necessary.
func (s *sharedPullerState) tempFile() (*os.File, error) {
@ -208,3 +220,21 @@ func (s *sharedPullerState) finalClose() (bool, error) {
}
return true, nil
}
// Returns the momentarily progress for the puller
func (s *sharedPullerState) Progress() *pullerProgress {
s.mut.Lock()
defer s.mut.Unlock()
total := s.reused + s.copyTotal + s.pullTotal
done := total - s.copyNeeded - s.pullNeeded
return &pullerProgress{
Total: total,
Reused: s.reused,
CopiedFromOrigin: s.copyOrigin,
CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
Pulled: s.pullTotal - s.pullNeeded,
Pulling: s.pullNeeded,
BytesTotal: protocol.BlocksToSize(total),
BytesDone: protocol.BlocksToSize(done),
}
}