Handle sparse files (fixes #245)

This commit is contained in:
Jakob Borg 2015-11-21 16:30:53 +01:00
parent f7ad97918a
commit d46f267663
6 changed files with 136 additions and 1 deletions

View File

@ -35,6 +35,7 @@ type FolderConfiguration struct {
PullerSleepS int `xml:"pullerSleepS" json:"pullerSleepS"` PullerSleepS int `xml:"pullerSleepS" json:"pullerSleepS"`
PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"` PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"`
MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"` MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"`
DisableSparseFiles bool `xml:"disableSparseFiles" json:"disableSparseFiles"`
Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved
cachedPath string cachedPath string

View File

@ -91,6 +91,7 @@ type rwFolder struct {
maxConflicts int maxConflicts int
sleep time.Duration sleep time.Duration
pause time.Duration pause time.Duration
allowSparse bool
stop chan struct{} stop chan struct{}
queue *jobQueue queue *jobQueue
@ -125,6 +126,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
shortID: shortID, shortID: shortID,
order: cfg.Order, order: cfg.Order,
maxConflicts: cfg.MaxConflicts, maxConflicts: cfg.MaxConflicts,
allowSparse: !cfg.DisableSparseFiles,
stop: make(chan struct{}), stop: make(chan struct{}),
queue: newJobQueue(), queue: newJobQueue(),
@ -1027,6 +1029,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
ignorePerms: p.ignorePermissions(file), ignorePerms: p.ignorePermissions(file),
version: curFile.Version, version: curFile.Version,
mut: sync.NewMutex(), mut: sync.NewMutex(),
sparse: p.allowSparse,
} }
l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused) l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
@ -1113,6 +1116,18 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
p.model.fmut.RUnlock() p.model.fmut.RUnlock()
for _, block := range state.blocks { for _, block := range state.blocks {
if p.allowSparse && state.reused == 0 && block.IsEmpty() {
// The block is a block of all zeroes, and we are not reusing
// a temp file, so there is no need to do anything with it.
// If we were reusing a temp file and had this block to copy,
// it would be because the block in the temp file was *not* a
// block of all zeroes, so then we should not skip it.
// Pretend we copied it.
state.copiedFromOrigin()
continue
}
buf = buf[:int(block.Size)] buf = buf[:int(block.Size)]
found := p.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool { found := p.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool {
fd, err := os.Open(filepath.Join(folderRoots[folder], file)) fd, err := os.Open(filepath.Join(folderRoots[folder], file))
@ -1185,6 +1200,14 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
continue continue
} }
if p.allowSparse && state.reused == 0 && state.block.IsEmpty() {
// There is no need to request a block of all zeroes. Pretend we
// requested it and handled it correctly.
state.pullDone()
out <- state.sharedPullerState
continue
}
var lastError error var lastError error
potentialDevices := p.model.Availability(p.folder, state.file.Name) potentialDevices := p.model.Availability(p.folder, state.file.Name)
for { for {

View File

@ -27,6 +27,7 @@ type sharedPullerState struct {
reused int // Number of blocks reused from temporary file reused int // Number of blocks reused from temporary file
ignorePerms bool ignorePerms bool
version protocol.Vector // The current (old) version version protocol.Vector // The current (old) version
sparse bool
// Mutable, must be locked for access // Mutable, must be locked for access
err error // The first error we hit err error // The first error we hit
@ -138,6 +139,15 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
return nil, err return nil, err
} }
if s.sparse {
// Truncate sets the size of the file. This creates a sparse file or a
// space reservation, depending on the underlying filesystem.
if err := fd.Truncate(s.file.Size()); err != nil {
s.failLocked("dst truncate", err)
return nil, err
}
}
// Same fd will be used by all writers // Same fd will be used by all writers
s.fd = fd s.fd = fd

View File

@ -5,7 +5,13 @@
package protocol package protocol
import "fmt" import (
"bytes"
"crypto/sha256"
"fmt"
)
var sha256OfEmptyBlock = sha256.Sum256(make([]byte, BlockSize))
type IndexMessage struct { type IndexMessage struct {
Folder string // max:256 Folder string // max:256
@ -98,6 +104,11 @@ func (b BlockInfo) String() string {
return fmt.Sprintf("Block{%d/%d/%x}", b.Offset, b.Size, b.Hash) return fmt.Sprintf("Block{%d/%d/%x}", b.Offset, b.Size, b.Hash)
} }
// IsEmpty returns true if the block is a full block of zeroes.
func (b BlockInfo) IsEmpty() bool {
return b.Size == BlockSize && bytes.Equal(b.Hash, sha256OfEmptyBlock[:])
}
type RequestMessage struct { type RequestMessage struct {
Folder string // max:256 Folder string // max:256
Name string // max:8192 Name string // max:8192

View File

@ -550,3 +550,27 @@ func (p *Process) eventLoop() {
} }
} }
} }
type ConnectionStats struct {
Address string
Type string
Connected bool
Paused bool
ClientVersion string
InBytesTotal int64
OutBytesTotal int64
}
func (p *Process) Connections() (map[string]ConnectionStats, error) {
bs, err := p.Get("/rest/system/connections")
if err != nil {
return nil, err
}
var res map[string]ConnectionStats
if err := json.Unmarshal(bs, &res); err != nil {
return nil, err
}
return res, nil
}

View File

@ -466,3 +466,69 @@ func scSyncAndCompare(p []*rc.Process, expected [][]fileInfo) error {
return nil return nil
} }
func TestSyncSparseFile(t *testing.T) {
// This test verifies that when syncing a file that consists mostly of
// zeroes, those blocks are not transferred. It doesn't verify whether
// the resulting file is actually *sparse* or not.alterFiles
log.Println("Cleaning...")
err := removeAll("s1", "s12-1",
"s2", "s12-2", "s23-2",
"s3", "s23-3",
"h1/index*", "h2/index*", "h3/index*")
if err != nil {
t.Fatal(err)
}
log.Println("Generating files...")
if err := os.Mkdir("s1", 0755); err != nil {
t.Fatal(err)
}
fd, err := os.Create("s1/testfile")
if err != nil {
t.Fatal(err)
}
if _, err := fd.Write([]byte("Start")); err != nil {
t.Fatal(err)
}
kib := make([]byte, 1024)
for i := 0; i < 8192; i++ {
if _, err := fd.Write(kib); err != nil {
t.Fatal(err)
}
}
if _, err := fd.Write([]byte("End")); err != nil {
t.Fatal(err)
}
fd.Close()
// Start the syncers
log.Println("Syncing...")
p0 := startInstance(t, 1)
defer checkedStop(t, p0)
p1 := startInstance(t, 2)
defer checkedStop(t, p1)
rc.AwaitSync("default", p0, p1)
log.Println("Comparing...")
if err := compareDirectories("s1", "s2"); err != nil {
t.Fatal(err)
}
conns, err := p0.Connections()
if err != nil {
t.Fatal(err)
}
tot := conns["total"]
if tot.OutBytesTotal > 256<<10 {
t.Fatal("Sending side has sent", tot.OutBytesTotal, "bytes, which is too much")
}
}