From 15699a39cf8528b0ef3b970cb38adf64e99522d5 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 1 Apr 2014 23:18:32 +0200 Subject: [PATCH] Synchronize directory existence & metadata (fixes #11) --- cmd/syncthing/model.go | 27 ++++++++++-- cmd/syncthing/puller.go | 95 ++++++++++++++++++++++++++++++++++++++--- files/set.go | 10 +++-- files/set_test.go | 5 ++- protocol/protocol.go | 5 ++- scanner/walk.go | 39 +++++++++++++++-- 6 files changed, 161 insertions(+), 20 deletions(-) diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index 9f23f7277..fb827cf10 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -227,6 +227,14 @@ func (m *Model) NeedFiles() ([]scanner.File, int64) { return nf, bytes } +// NeedFiles returns the list of currently needed files and the total size. +func (m *Model) NeedFilesRepo(repo string) []scanner.File { + m.rmut.RLock() + nf := m.repoFiles[repo].Need(cid.LocalID) + m.rmut.RUnlock() + return nf +} + // Index is called when a new node is connected and we receive their full index. // Implements the protocol.Model interface. func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) { @@ -366,6 +374,20 @@ func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) { m.rmut.RUnlock() } +func (m *Model) CurrentRepoFile(repo string, file string) scanner.File { + m.rmut.RLock() + f := m.repoFiles[repo].Get(cid.LocalID, file) + m.rmut.RUnlock() + return f +} + +func (m *Model) CurrentGlobalFile(repo string, file string) scanner.File { + m.rmut.RLock() + f := m.repoFiles[repo].GetGlobal(file) + m.rmut.RUnlock() + return f +} + type cFiler struct { m *Model r string @@ -373,10 +395,7 @@ type cFiler struct { // Implements scanner.CurrentFiler func (cf cFiler) CurrentFile(file string) scanner.File { - cf.m.rmut.RLock() - f := cf.m.repoFiles[cf.r].Get(cid.LocalID, file) - cf.m.rmut.RUnlock() - return f + return cf.m.CurrentRepoFile(cf.r, file) } // ConnectedTo returns true if we are connected to the named node. diff --git a/cmd/syncthing/puller.go b/cmd/syncthing/puller.go index cad660e68..d0e968492 100644 --- a/cmd/syncthing/puller.go +++ b/cmd/syncthing/puller.go @@ -119,6 +119,7 @@ func (p *puller) run() { walkTicker := time.Tick(time.Duration(cfg.Options.RescanIntervalS) * time.Second) timeout := time.Tick(5 * time.Second) + changed := true for { // Run the pulling loop as long as there are blocks to fetch @@ -126,16 +127,15 @@ func (p *puller) run() { for { select { case res := <-p.requestResults: + changed = true p.requestSlots <- true p.handleRequestResult(res) case b := <-p.blocks: + changed = true p.handleBlock(b) case <-timeout: - if debugPull { - dlog.Println("timeout") - } if len(p.openFiles) == 0 && p.bq.empty() { // Nothing more to do for the moment break pull @@ -154,6 +154,11 @@ func (p *puller) run() { } } + if changed { + p.fixupDirectories() + changed = false + } + // Do a rescan if it's time for it select { case <-walkTicker: @@ -181,6 +186,72 @@ func (p *puller) runRO() { } } +func (p *puller) fixupDirectories() { + var deleteDirs []string + fn := func(path string, info os.FileInfo, err error) error { + if !info.IsDir() { + return nil + } + + rn, err := filepath.Rel(p.dir, path) + if err != nil { + return nil + } + + if rn == "." { + return nil + } + + cur := p.model.CurrentGlobalFile(p.repo, rn) + if cur.Name != rn { + // No matching dir in current list; weird + return nil + } + + if cur.Flags&protocol.FlagDeleted != 0 { + if debugPull { + dlog.Printf("queue delete dir: %v", cur) + } + + // We queue the directories to delete since we walk the + // tree in depth first order and need to remove the + // directories in the opposite order. + + deleteDirs = append(deleteDirs, path) + return nil + } + + if cur.Flags&uint32(os.ModePerm) != uint32(info.Mode()&os.ModePerm) { + os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm) + if debugPull { + dlog.Printf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur) + } + } + + if cur.Modified != info.ModTime().Unix() { + t := time.Unix(cur.Modified, 0) + os.Chtimes(path, t, t) + if debugPull { + dlog.Printf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur) + } + } + + return nil + } + filepath.Walk(p.dir, fn) + + // Delete any queued directories + for i := len(deleteDirs) - 1; i >= 0; i-- { + if debugPull { + dlog.Println("delete dir:", deleteDirs[i]) + } + err := os.Remove(deleteDirs[i]) + if err != nil { + warnln(err) + } + } +} + func (p *puller) handleRequestResult(res requestResult) { p.oustandingPerNode.decrease(res.node) f := res.file @@ -251,6 +322,18 @@ func (p *puller) handleRequestResult(res requestResult) { func (p *puller) handleBlock(b bqBlock) { f := b.file + // For directories, simply making sure they exist is enough + if f.Flags&protocol.FlagDirectory != 0 { + path := filepath.Join(p.dir, f.Name) + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + os.MkdirAll(path, 0777) + } + p.model.updateLocal(p.repo, f) + p.requestSlots <- true + return + } + of, ok := p.openFiles[f.Name] of.done = b.last @@ -429,13 +512,13 @@ func (p *puller) handleEmptyBlock(b bqBlock) { Rename(of.temp, of.filepath) } delete(p.openFiles, f.Name) - p.model.repoFiles[p.repo].Update(cid.LocalID, []scanner.File{f}) + p.model.updateLocal(p.repo, f) } func (p *puller) queueNeededBlocks() { queued := 0 - for _, f := range p.model.repoFiles[p.repo].Need(cid.LocalID) { - lf := p.model.repoFiles[p.repo].Get(cid.LocalID, f.Name) + for _, f := range p.model.NeedFilesRepo(p.repo) { + lf := p.model.CurrentRepoFile(p.repo, f.Name) have, need := scanner.BlockDiff(lf.Blocks, f.Blocks) if debugNeed { dlog.Printf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need) diff --git a/files/set.go b/files/set.go index 7de636b4b..f691f1f07 100644 --- a/files/set.go +++ b/files/set.go @@ -112,7 +112,7 @@ func (m *Set) ReplaceWithDelete(id uint, fs []scanner.File) { if _, ok := nf[ck.Name]; !ok { cf := m.files[ck].File if cf.Flags&protocol.FlagDeleted != protocol.FlagDeleted { - cf.Flags = protocol.FlagDeleted + cf.Flags |= protocol.FlagDeleted cf.Blocks = nil cf.Size = 0 cf.Version = lamport.Default.Tick(cf.Version) @@ -145,9 +145,13 @@ func (m *Set) Need(id uint) []scanner.File { } var fs []scanner.File m.Lock() + rkID := m.remoteKey[id] for name, gk := range m.globalKey { - if gk.newerThan(m.remoteKey[id][name]) { - fs = append(fs, m.files[gk].File) + if gk.newerThan(rkID[name]) { + if m.files[gk].File.Flags&protocol.FlagDirectory == 0 || // Regular file + m.files[gk].File.Flags&(protocol.FlagDirectory|protocol.FlagDeleted) == protocol.FlagDirectory { // Non-deleted directory + fs = append(fs, m.files[gk].File) + } } } m.Unlock() diff --git a/files/set_test.go b/files/set_test.go index 1731c2019..d28baca8b 100644 --- a/files/set_test.go +++ b/files/set_test.go @@ -77,6 +77,7 @@ func TestLocalDeleted(t *testing.T) { scanner.File{Name: "b", Version: 1000}, scanner.File{Name: "c", Version: 1000}, scanner.File{Name: "d", Version: 1000}, + scanner.File{Name: "z", Version: 1000, Flags: protocol.FlagDirectory}, } m.ReplaceWithDelete(cid.LocalID, local1) @@ -91,6 +92,7 @@ func TestLocalDeleted(t *testing.T) { scanner.File{Name: "b", Version: 1001, Flags: protocol.FlagDeleted}, local1[2], scanner.File{Name: "d", Version: 1002, Flags: protocol.FlagDeleted}, + scanner.File{Name: "z", Version: 1003, Flags: protocol.FlagDeleted | protocol.FlagDirectory}, } m.ReplaceWithDelete(cid.LocalID, local2) @@ -109,8 +111,9 @@ func TestLocalDeleted(t *testing.T) { expectedGlobal2 := []scanner.File{ local1[0], scanner.File{Name: "b", Version: 1001, Flags: protocol.FlagDeleted}, - scanner.File{Name: "c", Version: 1003, Flags: protocol.FlagDeleted}, + scanner.File{Name: "c", Version: 1004, Flags: protocol.FlagDeleted}, scanner.File{Name: "d", Version: 1002, Flags: protocol.FlagDeleted}, + scanner.File{Name: "z", Version: 1003, Flags: protocol.FlagDeleted | protocol.FlagDirectory}, } m.ReplaceWithDelete(cid.LocalID, local3) diff --git a/protocol/protocol.go b/protocol/protocol.go index 69ab7aaf6..59c44baaa 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -27,8 +27,9 @@ const ( ) const ( - FlagDeleted uint32 = 1 << 12 - FlagInvalid = 1 << 13 + FlagDeleted uint32 = 1 << 12 + FlagInvalid = 1 << 13 + FlagDirectory = 1 << 14 ) var ( diff --git a/scanner/walk.go b/scanner/walk.go index e25f48680..d8bf3442c 100644 --- a/scanner/walk.go +++ b/scanner/walk.go @@ -10,6 +10,7 @@ import ( "time" "github.com/calmh/syncthing/lamport" + "github.com/calmh/syncthing/protocol" ) type Walker struct { @@ -137,7 +138,6 @@ func (w *Walker) loadIgnoreFiles(dir string, ign map[string][]string) filepath.W func (w *Walker) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.WalkFunc { return func(p string, info os.FileInfo, err error) error { - if err != nil { if debug { dlog.Println("error:", p, info, err) @@ -153,7 +153,12 @@ func (w *Walker) walkAndHashFiles(res *[]File, ign map[string][]string) filepath return nil } + if rn == "." { + return nil + } + if w.TempNamer != nil && w.TempNamer.IsTemporary(rn) { + // A temporary file if debug { dlog.Println("temporary:", rn) } @@ -161,13 +166,15 @@ func (w *Walker) walkAndHashFiles(res *[]File, ign map[string][]string) filepath } if _, sn := filepath.Split(rn); sn == w.IgnoreFile { + // An ignore-file; these are ignored themselves if debug { dlog.Println("ignorefile:", rn) } return nil } - if rn != "." && w.ignoreFile(ign, rn) { + if w.ignoreFile(ign, rn) { + // An ignored file if debug { dlog.Println("ignored:", rn) } @@ -177,10 +184,34 @@ func (w *Walker) walkAndHashFiles(res *[]File, ign map[string][]string) filepath return nil } - if info.Mode()&os.ModeType == 0 { + if info.Mode().IsDir() { if w.CurrentFiler != nil { cf := w.CurrentFiler.CurrentFile(rn) - if cf.Modified == info.ModTime().Unix() { + if cf.Modified == info.ModTime().Unix() && cf.Flags == uint32(info.Mode()&os.ModePerm|protocol.FlagDirectory) { + if debug { + dlog.Println("unchanged:", cf) + } + *res = append(*res, cf) + } else { + f := File{ + Name: rn, + Version: lamport.Default.Tick(0), + Flags: uint32(info.Mode()&os.ModePerm) | protocol.FlagDirectory, + Modified: info.ModTime().Unix(), + } + if debug { + dlog.Println("dir:", cf, f) + } + *res = append(*res, f) + } + return nil + } + } + + if info.Mode().IsRegular() { + if w.CurrentFiler != nil { + cf := w.CurrentFiler.CurrentFile(rn) + if cf.Flags&protocol.FlagDeleted == 0 && cf.Modified == info.ModTime().Unix() { if debug { dlog.Println("unchanged:", cf) }