Merge remote-tracking branch 'origin/pr/1097'

* origin/pr/1097:
  Revert "Cache file descriptors" (fixes #1096)
This commit is contained in:
Jakob Borg 2014-12-08 13:23:06 +01:00
commit 9c6aedc91b
14 changed files with 295 additions and 546 deletions

8
Godeps/Godeps.json generated
View File

@ -6,8 +6,8 @@
], ],
"Deps": [ "Deps": [
{ {
"ImportPath": "github.com/AudriusButkevicius/lrufdcache", "ImportPath": "github.com/AudriusButkevicius/lfu-go",
"Rev": "9bddff8f67224ab3e7d80525a6ae9bcf1ce10769" "Rev": "164bcecceb92fd6037f4d18a8d97b495ec6ef669"
}, },
{ {
"ImportPath": "github.com/bkaradzic/go-lz4", "ImportPath": "github.com/bkaradzic/go-lz4",
@ -25,10 +25,6 @@
"ImportPath": "github.com/calmh/xdr", "ImportPath": "github.com/calmh/xdr",
"Rev": "45c46b7db7ff83b8b9ee09bbd95f36ab50043ece" "Rev": "45c46b7db7ff83b8b9ee09bbd95f36ab50043ece"
}, },
{
"ImportPath": "github.com/golang/groupcache/lru",
"Rev": "f391194b967ae0d21deadc861ea87120d9687447"
},
{ {
"ImportPath": "github.com/juju/ratelimit", "ImportPath": "github.com/juju/ratelimit",
"Rev": "f9f36d11773655c0485207f0ad30dc2655f69d56" "Rev": "f9f36d11773655c0485207f0ad30dc2655f69d56"

View File

@ -0,0 +1,19 @@
Copyright (C) 2012 Dave Grijalva
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,19 @@
A simple LFU cache for golang. Based on the paper [An O(1) algorithm for implementing the LFU cache eviction scheme](http://dhruvbird.com/lfu.pdf).
Usage:
```go
import "github.com/dgrijalva/lfu-go"
// Make a new thing
c := lfu.New()
// Set some values
c.Set("myKey", myValue)
// Retrieve some values
myValue = c.Get("myKey")
// Evict some values
c.Evict(1)
```

View File

@ -0,0 +1,156 @@
package lfu
import (
"container/list"
"sync"
)
type Eviction struct {
Key string
Value interface{}
}
type Cache struct {
// If len > UpperBound, cache will automatically evict
// down to LowerBound. If either value is 0, this behavior
// is disabled.
UpperBound int
LowerBound int
values map[string]*cacheEntry
freqs *list.List
len int
lock *sync.Mutex
EvictionChannel chan<- Eviction
}
type cacheEntry struct {
key string
value interface{}
freqNode *list.Element
}
type listEntry struct {
entries map[*cacheEntry]byte
freq int
}
func New() *Cache {
c := new(Cache)
c.values = make(map[string]*cacheEntry)
c.freqs = list.New()
c.lock = new(sync.Mutex)
return c
}
func (c *Cache) Get(key string) interface{} {
c.lock.Lock()
defer c.lock.Unlock()
if e, ok := c.values[key]; ok {
c.increment(e)
return e.value
}
return nil
}
func (c *Cache) Set(key string, value interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
if e, ok := c.values[key]; ok {
// value already exists for key. overwrite
e.value = value
c.increment(e)
} else {
// value doesn't exist. insert
e := new(cacheEntry)
e.key = key
e.value = value
c.values[key] = e
c.increment(e)
c.len++
// bounds mgmt
if c.UpperBound > 0 && c.LowerBound > 0 {
if c.len > c.UpperBound {
c.evict(c.len - c.LowerBound)
}
}
}
}
func (c *Cache) Len() int {
c.lock.Lock()
defer c.lock.Unlock()
return c.len
}
func (c *Cache) Evict(count int) int {
c.lock.Lock()
defer c.lock.Unlock()
return c.evict(count)
}
func (c *Cache) evict(count int) int {
// No lock here so it can be called
// from within the lock (during Set)
var evicted int
for i := 0; i < count; {
if place := c.freqs.Front(); place != nil {
for entry, _ := range place.Value.(*listEntry).entries {
if i < count {
if c.EvictionChannel != nil {
c.EvictionChannel <- Eviction{
Key: entry.key,
Value: entry.value,
}
}
delete(c.values, entry.key)
c.remEntry(place, entry)
evicted++
c.len--
i++
}
}
}
}
return evicted
}
func (c *Cache) increment(e *cacheEntry) {
currentPlace := e.freqNode
var nextFreq int
var nextPlace *list.Element
if currentPlace == nil {
// new entry
nextFreq = 1
nextPlace = c.freqs.Front()
} else {
// move up
nextFreq = currentPlace.Value.(*listEntry).freq + 1
nextPlace = currentPlace.Next()
}
if nextPlace == nil || nextPlace.Value.(*listEntry).freq != nextFreq {
// create a new list entry
li := new(listEntry)
li.freq = nextFreq
li.entries = make(map[*cacheEntry]byte)
if currentPlace != nil {
nextPlace = c.freqs.InsertAfter(li, currentPlace)
} else {
nextPlace = c.freqs.PushFront(li)
}
}
e.freqNode = nextPlace
nextPlace.Value.(*listEntry).entries[e] = 1
if currentPlace != nil {
// remove from current position
c.remEntry(currentPlace, e)
}
}
func (c *Cache) remEntry(place *list.Element, entry *cacheEntry) {
entries := place.Value.(*listEntry).entries
delete(entries, entry)
if len(entries) == 0 {
c.freqs.Remove(place)
}
}

View File

@ -0,0 +1,68 @@
package lfu
import (
"fmt"
"testing"
)
func TestLFU(t *testing.T) {
c := New()
c.Set("a", "a")
if v := c.Get("a"); v != "a" {
t.Errorf("Value was not saved: %v != 'a'", v)
}
if l := c.Len(); l != 1 {
t.Errorf("Length was not updated: %v != 1", l)
}
c.Set("b", "b")
if v := c.Get("b"); v != "b" {
t.Errorf("Value was not saved: %v != 'b'", v)
}
if l := c.Len(); l != 2 {
t.Errorf("Length was not updated: %v != 2", l)
}
c.Get("a")
evicted := c.Evict(1)
if v := c.Get("a"); v != "a" {
t.Errorf("Value was improperly evicted: %v != 'a'", v)
}
if v := c.Get("b"); v != nil {
t.Errorf("Value was not evicted: %v", v)
}
if l := c.Len(); l != 1 {
t.Errorf("Length was not updated: %v != 1", l)
}
if evicted != 1 {
t.Errorf("Number of evicted items is wrong: %v != 1", evicted)
}
}
func TestBoundsMgmt(t *testing.T) {
c := New()
c.UpperBound = 10
c.LowerBound = 5
for i := 0; i < 100; i++ {
c.Set(fmt.Sprintf("%v", i), i)
}
if c.Len() > 10 {
t.Errorf("Bounds management failed to evict properly: %v", c.Len())
}
}
func TestEviction(t *testing.T) {
ch := make(chan Eviction, 1)
c := New()
c.EvictionChannel = ch
c.Set("a", "b")
c.Evict(1)
ev := <-ch
if ev.Key != "a" || ev.Value.(string) != "b" {
t.Error("Incorrect item")
}
}

View File

@ -1,24 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof

View File

@ -1,25 +0,0 @@
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org>

View File

@ -1,4 +0,0 @@
lrufdcache
==========
A LRU file descriptor cache

View File

@ -1,88 +0,0 @@
// Package logger implements a LRU file descriptor cache for concurrent ReadAt
// calls.
package lrufdcache
import (
"os"
"sync"
"github.com/golang/groupcache/lru"
)
// A wrapper around *os.File which counts references
type CachedFile struct {
file *os.File
wg sync.WaitGroup
// Locking between file.Close and file.ReadAt
// (just to please the race detector...)
flock sync.RWMutex
}
// Tells the cache that we are done using the file, but it's up to the cache
// to decide when this file will really be closed. The error, if any, will be
// lost.
func (f *CachedFile) Close() error {
f.wg.Done()
return nil
}
// Read the file at the given offset.
func (f *CachedFile) ReadAt(buf []byte, at int64) (int, error) {
f.flock.RLock()
defer f.flock.RUnlock()
return f.file.ReadAt(buf, at)
}
type FileCache struct {
cache *lru.Cache
mut sync.Mutex
}
// Create a new cache with the number of entries to hold.
func NewCache(entries int) *FileCache {
c := FileCache{
cache: lru.New(entries),
}
c.cache.OnEvicted = func(key lru.Key, fdi interface{}) {
// The file might not have been closed by all openers yet, therefore
// spawn a routine which waits for that to happen and then closes the
// file.
go func(item *CachedFile) {
item.wg.Wait()
item.flock.Lock()
item.file.Close()
item.flock.Unlock()
}(fdi.(*CachedFile))
}
return &c
}
// Open and cache a file descriptor or use an existing cached descriptor for
// the given path.
func (c *FileCache) Open(path string) (*CachedFile, error) {
// Evictions can only happen during c.cache.Add, and there is a potential
// race between c.cache.Get and cfd.wg.Add where if not guarded by a mutex
// could result in cfd getting closed before the counter is incremented if
// a concurrent routine does a c.cache.Add
c.mut.Lock()
defer c.mut.Unlock()
fdi, ok := c.cache.Get(path)
if ok {
cfd := fdi.(*CachedFile)
cfd.wg.Add(1)
return cfd, nil
}
fd, err := os.Open(path)
if err != nil {
return nil, err
}
cfd := &CachedFile{
file: fd,
wg: sync.WaitGroup{},
}
cfd.wg.Add(1)
c.cache.Add(path, cfd)
return cfd, nil
}

View File

@ -1,195 +0,0 @@
package lrufdcache
import (
"io/ioutil"
"os"
"sync"
"time"
"testing"
)
func TestNoopReadFailsOnClosed(t *testing.T) {
fd, err := ioutil.TempFile("", "fdcache")
if err != nil {
t.Fatal(err)
return
}
fd.WriteString("test")
fd.Close()
buf := make([]byte, 4)
defer os.Remove(fd.Name())
_, err = fd.ReadAt(buf, 0)
if err == nil {
t.Fatal("Expected error")
}
}
func TestSingleFileEviction(t *testing.T) {
c := NewCache(1)
wg := sync.WaitGroup{}
fd, err := ioutil.TempFile("", "fdcache")
if err != nil {
t.Fatal(err)
return
}
fd.WriteString("test")
fd.Close()
buf := make([]byte, 4)
defer os.Remove(fd.Name())
for k := 0; k < 100; k++ {
wg.Add(1)
go func() {
defer wg.Done()
cfd, err := c.Open(fd.Name())
if err != nil {
t.Fatal(err)
return
}
defer cfd.Close()
_, err = cfd.ReadAt(buf, 0)
if err != nil {
t.Fatal(err)
}
}()
}
wg.Wait()
}
func TestMultifileEviction(t *testing.T) {
c := NewCache(1)
wg := sync.WaitGroup{}
for k := 0; k < 100; k++ {
wg.Add(1)
go func() {
defer wg.Done()
fd, err := ioutil.TempFile("", "fdcache")
if err != nil {
t.Fatal(err)
return
}
fd.WriteString("test")
fd.Close()
buf := make([]byte, 4)
defer os.Remove(fd.Name())
cfd, err := c.Open(fd.Name())
if err != nil {
t.Fatal(err)
return
}
defer cfd.Close()
_, err = cfd.ReadAt(buf, 0)
if err != nil {
t.Fatal(err)
}
}()
}
wg.Wait()
}
func TestMixedEviction(t *testing.T) {
c := NewCache(1)
wg := sync.WaitGroup{}
wg2 := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg2.Add(1)
go func() {
defer wg2.Done()
fd, err := ioutil.TempFile("", "fdcache")
if err != nil {
t.Fatal(err)
return
}
fd.WriteString("test")
fd.Close()
buf := make([]byte, 4)
for k := 0; k < 100; k++ {
wg.Add(1)
go func() {
defer wg.Done()
cfd, err := c.Open(fd.Name())
if err != nil {
t.Fatal(err)
return
}
defer cfd.Close()
_, err = cfd.ReadAt(buf, 0)
if err != nil {
t.Fatal(err)
}
}()
}
}()
}
wg2.Wait()
wg.Wait()
}
func TestLimit(t *testing.T) {
testcase := 50
fd, err := ioutil.TempFile("", "fdcache")
if err != nil {
t.Fatal(err)
return
}
fd.Close()
defer os.Remove(fd.Name())
c := NewCache(testcase)
fds := make([]*CachedFile, testcase*2)
for i := 0; i < testcase*2; i++ {
fd, err := ioutil.TempFile("", "fdcache")
if err != nil {
t.Fatal(err)
return
}
fd.WriteString("test")
fd.Close()
defer os.Remove(fd.Name())
nfd, err := c.Open(fd.Name())
if err != nil {
t.Fatal(err)
return
}
fds = append(fds, nfd)
nfd.Close()
}
// Allow closes to happen
time.Sleep(time.Millisecond * 100)
buf := make([]byte, 4)
ok := 0
for _, fd := range fds {
if fd == nil {
continue
}
_, err := fd.ReadAt(buf, 0)
if err == nil {
ok++
}
}
if ok > testcase {
t.Fatal("More than", testcase, "fds open")
}
}

View File

@ -1,121 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package lru implements an LRU cache.
package lru
import "container/list"
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specificies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct {
key Key
value interface{}
}
// New creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller.
func New(maxEntries int) *Cache {
return &Cache{
MaxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}
// Remove removes the provided key from the cache.
func (c *Cache) Remove(key Key) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
if c.cache == nil {
return 0
}
return c.ll.Len()
}

View File

@ -1,73 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lru
import (
"testing"
)
type simpleStruct struct {
int
string
}
type complexStruct struct {
int
simpleStruct
}
var getTests = []struct {
name string
keyToAdd interface{}
keyToGet interface{}
expectedOk bool
}{
{"string_hit", "myKey", "myKey", true},
{"string_miss", "myKey", "nonsense", false},
{"simple_struct_hit", simpleStruct{1, "two"}, simpleStruct{1, "two"}, true},
{"simeple_struct_miss", simpleStruct{1, "two"}, simpleStruct{0, "noway"}, false},
{"complex_struct_hit", complexStruct{1, simpleStruct{2, "three"}},
complexStruct{1, simpleStruct{2, "three"}}, true},
}
func TestGet(t *testing.T) {
for _, tt := range getTests {
lru := New(0)
lru.Add(tt.keyToAdd, 1234)
val, ok := lru.Get(tt.keyToGet)
if ok != tt.expectedOk {
t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok)
} else if ok && val != 1234 {
t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val)
}
}
}
func TestRemove(t *testing.T) {
lru := New(0)
lru.Add("myKey", 1234)
if val, ok := lru.Get("myKey"); !ok {
t.Fatal("TestRemove returned no match")
} else if val != 1234 {
t.Fatalf("TestRemove failed. Expected %d, got %v", 1234, val)
}
lru.Remove("myKey")
if _, ok := lru.Get("myKey"); ok {
t.Fatal("TestRemove returned a removed entry")
}
}

View File

@ -41,8 +41,6 @@ import (
"github.com/syncthing/syncthing/internal/stats" "github.com/syncthing/syncthing/internal/stats"
"github.com/syncthing/syncthing/internal/symlinks" "github.com/syncthing/syncthing/internal/symlinks"
"github.com/syncthing/syncthing/internal/versioner" "github.com/syncthing/syncthing/internal/versioner"
"github.com/AudriusButkevicius/lrufdcache"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
@ -88,7 +86,6 @@ type Model struct {
db *leveldb.DB db *leveldb.DB
finder *files.BlockFinder finder *files.BlockFinder
progressEmitter *ProgressEmitter progressEmitter *ProgressEmitter
cache *lrufdcache.FileCache
deviceName string deviceName string
clientName string clientName string
@ -130,7 +127,6 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s
m := &Model{ m := &Model{
cfg: cfg, cfg: cfg,
db: db, db: db,
cache: lrufdcache.NewCache(25),
deviceName: deviceName, deviceName: deviceName,
clientName: clientName, clientName: clientName,
clientVersion: clientVersion, clientVersion: clientVersion,
@ -699,11 +695,12 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
} }
reader = strings.NewReader(target) reader = strings.NewReader(target)
} else { } else {
reader, err = m.cache.Open(fn) reader, err = os.Open(fn) // XXX: Inefficient, should cache fd?
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer reader.(*lrufdcache.CachedFile).Close()
defer reader.(*os.File).Close()
} }
buf := make([]byte, size) buf := make([]byte, size)

View File

@ -26,6 +26,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/AudriusButkevicius/lfu-go"
"github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/osutil" "github.com/syncthing/syncthing/internal/osutil"
@ -600,6 +602,19 @@ nextFile:
p.progressEmitter.Register(state.sharedPullerState) p.progressEmitter.Register(state.sharedPullerState)
} }
evictionChan := make(chan lfu.Eviction)
fdCache := lfu.New()
fdCache.UpperBound = 50
fdCache.LowerBound = 20
fdCache.EvictionChannel = evictionChan
go func() {
for item := range evictionChan {
item.Value.(*os.File).Close()
}
}()
folderRoots := make(map[string]string) folderRoots := make(map[string]string)
p.model.fmut.RLock() p.model.fmut.RLock()
for folder, cfg := range p.model.folderCfgs { for folder, cfg := range p.model.folderCfgs {
@ -613,11 +628,18 @@ nextFile:
found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool { found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
path := filepath.Join(folderRoots[folder], file) path := filepath.Join(folderRoots[folder], file)
fd, err := p.model.cache.Open(path) var fd *os.File
if err != nil {
return false fdi := fdCache.Get(path)
if fdi != nil {
fd = fdi.(*os.File)
} else {
fd, err = os.Open(path)
if err != nil {
return false
}
fdCache.Set(path, fd)
} }
defer fd.Close()
_, err = fd.ReadAt(buf, protocol.BlockSize*int64(index)) _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
if err != nil { if err != nil {
@ -666,6 +688,8 @@ nextFile:
state.copyDone() state.copyDone()
} }
} }
fdCache.Evict(fdCache.Len())
close(evictionChan)
out <- state.sharedPullerState out <- state.sharedPullerState
} }
} }