lib/model: Don't close file early (fixes #6875) (#6876)

This commit is contained in:
Audrius Butkevicius 2020-08-03 20:54:42 +01:00 committed by Jakob Borg
parent 4faa5882f2
commit 00008994e4
2 changed files with 86 additions and 80 deletions

View File

@ -1317,10 +1317,10 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
if err != nil { if err != nil {
return false return false
} }
defer fd.Close()
srcOffset := int64(state.file.BlockSize()) * int64(index) srcOffset := int64(state.file.BlockSize()) * int64(index)
_, err = fd.ReadAt(buf, srcOffset) _, err = fd.ReadAt(buf, srcOffset)
fd.Close()
if err != nil { if err != nil {
return false return false
} }

View File

@ -202,94 +202,100 @@ func TestHandleFileWithTemp(t *testing.T) {
} }
func TestCopierFinder(t *testing.T) { func TestCopierFinder(t *testing.T) {
// After diff between required and existing we should: for _, method := range []fs.CopyRangeMethod{fs.CopyRangeMethodStandard, fs.CopyRangeMethodAllWithFallback} {
// Copy: 1, 2, 3, 4, 6, 7, 8 t.Run(method.String(), func(t *testing.T) {
// Since there is no existing file, nor a temp file // After diff between required and existing we should:
// Copy: 1, 2, 3, 4, 6, 7, 8
// Since there is no existing file, nor a temp file
// After dropping out blocks found locally: // After dropping out blocks found locally:
// Pull: 1, 5, 6, 8 // Pull: 1, 5, 6, 8
tempFile := fs.TempName("file2") tempFile := fs.TempName("file2")
existingBlocks := []int{0, 2, 3, 4, 0, 0, 7, 0} existingBlocks := []int{0, 2, 3, 4, 0, 0, 7, 0}
existingFile := setupFile(fs.TempName("file"), existingBlocks) existingFile := setupFile(fs.TempName("file"), existingBlocks)
existingFile.Size = 1 existingFile.Size = 1
requiredFile := existingFile requiredFile := existingFile
requiredFile.Blocks = blocks[1:] requiredFile.Blocks = blocks[1:]
requiredFile.Name = "file2" requiredFile.Name = "file2"
m, f := setupSendReceiveFolder(existingFile) m, f := setupSendReceiveFolder(existingFile)
defer cleanupSRFolder(f, m) f.CopyRangeMethod = method
if _, err := prepareTmpFile(f.Filesystem()); err != nil { defer cleanupSRFolder(f, m)
t.Fatal(err)
}
copyChan := make(chan copyBlocksState) if _, err := prepareTmpFile(f.Filesystem()); err != nil {
pullChan := make(chan pullBlockState, 4) t.Fatal(err)
finisherChan := make(chan *sharedPullerState, 1)
// Run a single fetcher routine
go f.copierRoutine(copyChan, pullChan, finisherChan)
defer close(copyChan)
f.handleFile(requiredFile, f.fset.Snapshot(), copyChan)
timeout := time.After(10 * time.Second)
pulls := make([]pullBlockState, 4)
for i := 0; i < 4; i++ {
select {
case pulls[i] = <-pullChan:
case <-timeout:
t.Fatalf("Timed out before receiving all 4 states on pullChan (already got %v)", i)
}
}
var finish *sharedPullerState
select {
case finish = <-finisherChan:
case <-timeout:
t.Fatal("Timed out before receiving 4 states on pullChan")
}
defer cleanupSharedPullerState(finish)
select {
case <-pullChan:
t.Fatal("Pull channel has data to be read")
case <-finisherChan:
t.Fatal("Finisher channel has data to be read")
default:
}
// Verify that the right blocks went into the pull list.
// They are pulled in random order.
for _, idx := range []int{1, 5, 6, 8} {
found := false
block := blocks[idx]
for _, pulledBlock := range pulls {
if string(pulledBlock.block.Hash) == string(block.Hash) {
found = true
break
} }
}
if !found {
t.Errorf("Did not find block %s", block.String())
}
if string(finish.file.Blocks[idx-1].Hash) != string(blocks[idx].Hash) {
t.Errorf("Block %d mismatch: %s != %s", idx, finish.file.Blocks[idx-1].String(), blocks[idx].String())
}
}
// Verify that the fetched blocks have actually been written to the temp file copyChan := make(chan copyBlocksState)
blks, err := scanner.HashFile(context.TODO(), f.Filesystem(), tempFile, protocol.MinBlockSize, nil, false) pullChan := make(chan pullBlockState, 4)
if err != nil { finisherChan := make(chan *sharedPullerState, 1)
t.Log(err)
}
for _, eq := range []int{2, 3, 4, 7} { // Run a single fetcher routine
if string(blks[eq-1].Hash) != string(blocks[eq].Hash) { go f.copierRoutine(copyChan, pullChan, finisherChan)
t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String()) defer close(copyChan)
}
f.handleFile(requiredFile, f.fset.Snapshot(), copyChan)
timeout := time.After(10 * time.Second)
pulls := make([]pullBlockState, 4)
for i := 0; i < 4; i++ {
select {
case pulls[i] = <-pullChan:
case <-timeout:
t.Fatalf("Timed out before receiving all 4 states on pullChan (already got %v)", i)
}
}
var finish *sharedPullerState
select {
case finish = <-finisherChan:
case <-timeout:
t.Fatal("Timed out before receiving 4 states on pullChan")
}
defer cleanupSharedPullerState(finish)
select {
case <-pullChan:
t.Fatal("Pull channel has data to be read")
case <-finisherChan:
t.Fatal("Finisher channel has data to be read")
default:
}
// Verify that the right blocks went into the pull list.
// They are pulled in random order.
for _, idx := range []int{1, 5, 6, 8} {
found := false
block := blocks[idx]
for _, pulledBlock := range pulls {
if string(pulledBlock.block.Hash) == string(block.Hash) {
found = true
break
}
}
if !found {
t.Errorf("Did not find block %s", block.String())
}
if string(finish.file.Blocks[idx-1].Hash) != string(blocks[idx].Hash) {
t.Errorf("Block %d mismatch: %s != %s", idx, finish.file.Blocks[idx-1].String(), blocks[idx].String())
}
}
// Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(context.TODO(), f.Filesystem(), tempFile, protocol.MinBlockSize, nil, false)
if err != nil {
t.Log(err)
}
for _, eq := range []int{2, 3, 4, 7} {
if string(blks[eq-1].Hash) != string(blocks[eq].Hash) {
t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String())
}
}
})
} }
} }