From 69e385e4cd5660146763d58c063c4f80b553310b Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Fri, 3 Oct 2014 23:15:54 +0100 Subject: [PATCH] Reuse temporary files (fixes #4) --- internal/config/config.go | 1 + internal/config/config_test.go | 2 + internal/config/testdata/overridenvalues.xml | 1 + internal/model/puller.go | 54 +++++++++++++++++++- internal/model/sharedpullerstate.go | 7 ++- internal/scanner/blockqueue.go | 45 ++++++++-------- 6 files changed, 87 insertions(+), 23 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index d791c0623..9819d155d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -145,6 +145,7 @@ type OptionsConfiguration struct { URAccepted int `xml:"urAccepted"` // Accepted usage reporting version; 0 for off (undecided), -1 for off (permanently) RestartOnWakeup bool `xml:"restartOnWakeup" default:"true"` AutoUpgradeIntervalH int `xml:"autoUpgradeIntervalH" default:"12"` // 0 for off + KeepTemporariesH int `xml:"keepTemporariesH" default:"24"` // 0 for off Deprecated_RescanIntervalS int `xml:"rescanIntervalS,omitempty" json:"-"` Deprecated_UREnabled bool `xml:"urEnabled,omitempty" json:"-"` diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 68c4213c2..eb488040f 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -49,6 +49,7 @@ func TestDefaultValues(t *testing.T) { UPnPRenewal: 30, RestartOnWakeup: true, AutoUpgradeIntervalH: 12, + KeepTemporariesH: 24, } cfg := New("test", device1) @@ -141,6 +142,7 @@ func TestOverriddenValues(t *testing.T) { UPnPRenewal: 15, RestartOnWakeup: false, AutoUpgradeIntervalH: 24, + KeepTemporariesH: 48, } cfg, err := Load("testdata/overridenvalues.xml", device1) diff --git a/internal/config/testdata/overridenvalues.xml b/internal/config/testdata/overridenvalues.xml index 84d2999ff..9d5340fbd 100755 --- a/internal/config/testdata/overridenvalues.xml +++ b/internal/config/testdata/overridenvalues.xml @@ -17,5 +17,6 @@ 15 false 24 + 48 diff --git a/internal/model/puller.go b/internal/model/puller.go index 73f1f1d3d..b2b3dfd06 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -412,12 +412,62 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name)) realName := filepath.Join(p.dir, file.Name) + var reuse bool + + // Check for an old temporary file which might have some blocks we could + // reuse. + tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize) + if err == nil { + // Check for any reusable blocks in the temp file + tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks) + + // block.String() returns a string unique to the block + existingBlocks := make(map[string]bool, len(tempCopyBlocks)) + for _, block := range tempCopyBlocks { + existingBlocks[block.String()] = true + } + + // Since the blocks are already there, we don't need to copy them + // nor we need to pull them, hence discard blocks which are already + // there, if they are exactly the same... + var newCopyBlocks []protocol.BlockInfo + for _, block := range copyBlocks { + _, ok := existingBlocks[block.String()] + if !ok { + newCopyBlocks = append(newCopyBlocks, block) + } + } + + var newPullBlocks []protocol.BlockInfo + for _, block := range pullBlocks { + _, ok := existingBlocks[block.String()] + if !ok { + newPullBlocks = append(newPullBlocks, block) + } + } + + // If any blocks could be reused, let the sharedpullerstate know + // which flags it is expected to set on the file. + // Also update the list of work for the routines. + if len(copyBlocks) != len(newCopyBlocks) || len(pullBlocks) != len(newPullBlocks) { + reuse = true + copyBlocks = newCopyBlocks + pullBlocks = newPullBlocks + } else { + // Otherwise, discard the file ourselves in order for the + // sharedpuller not to panic when it fails to exlusively create a + // file which already exists + os.Remove(tempName) + } + } + s := sharedPullerState{ file: file, folder: p.folder, tempName: tempName, realName: realName, pullNeeded: len(pullBlocks), + reuse: reuse, } if len(copyBlocks) > 0 { s.copyNeeded = 1 @@ -628,12 +678,14 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) { // clean deletes orphaned temporary files func (p *Puller) clean() { + keep := time.Duration(p.model.cfg.Options.KeepTemporariesH) * time.Hour + now := time.Now() filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } - if info.Mode().IsRegular() && defTempNamer.IsTemporary(path) { + if info.Mode().IsRegular() && defTempNamer.IsTemporary(path) && info.ModTime().Add(keep).Before(now) { os.Remove(path) } diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go index 61585006b..1347e6274 100644 --- a/internal/model/sharedpullerstate.go +++ b/internal/model/sharedpullerstate.go @@ -31,6 +31,7 @@ type sharedPullerState struct { folder string tempName string realName string + reuse bool // Mutable, must be locked for access err error // The first error we hit @@ -77,7 +78,11 @@ func (s *sharedPullerState) tempFile() (*os.File, error) { } // Attempt to create the temp file - fd, err := os.OpenFile(s.tempName, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0644) + flags := os.O_WRONLY + if !s.reuse { + flags |= os.O_CREATE | os.O_EXCL + } + fd, err := os.OpenFile(s.tempName, flags, 0644) if err != nil { s.earlyCloseLocked("dst create", err) return nil, err diff --git a/internal/scanner/blockqueue.go b/internal/scanner/blockqueue.go index fd4497a61..4ce67b86a 100644 --- a/internal/scanner/blockqueue.go +++ b/internal/scanner/blockqueue.go @@ -34,7 +34,7 @@ func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan pr for i := 0; i < workers; i++ { go func() { - hashFile(dir, blockSize, outbox, inbox) + hashFiles(dir, blockSize, outbox, inbox) wg.Done() }() } @@ -45,32 +45,35 @@ func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan pr }() } -func hashFile(dir string, blockSize int, outbox, inbox chan protocol.FileInfo) { +func HashFile(path string, blockSize int) ([]protocol.BlockInfo, error) { + fd, err := os.Open(path) + if err != nil { + if debug { + l.Debugln("open:", err) + } + return []protocol.BlockInfo{}, err + } + + fi, err := fd.Stat() + if err != nil { + fd.Close() + if debug { + l.Debugln("stat:", err) + } + return []protocol.BlockInfo{}, err + } + defer fd.Close() + return Blocks(fd, blockSize, fi.Size()) +} + +func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo) { for f := range inbox { if protocol.IsDirectory(f.Flags) || protocol.IsDeleted(f.Flags) { outbox <- f continue } - fd, err := os.Open(filepath.Join(dir, f.Name)) - if err != nil { - if debug { - l.Debugln("open:", err) - } - continue - } - - fi, err := fd.Stat() - if err != nil { - fd.Close() - if debug { - l.Debugln("stat:", err) - } - continue - } - blocks, err := Blocks(fd, blockSize, fi.Size()) - fd.Close() - + blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize) if err != nil { if debug { l.Debugln("hash error:", f.Name, err)