2018-04-08 12:02:30 +00:00
package restorer
import (
"io"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// packCache is thread safe in-memory cache of pack files required to restore
// one or more files. The cache is meant to hold pack files that cannot be
// fully used right away. This happens when pack files contains blobs from
// "head" of some files and "middle" of other files. "Middle" blobs cannot be
// written to their files until after blobs from some other packs are written
// to the files first.
//
// While the cache is thread safe, implementation assumes (and enforces)
// that individual entries are used by one client at a time. Clients must
// #Close() entry's reader to make the entry available for use by other
// clients. This limitation can be relaxed in the future if necessary.
type packCache struct {
// guards access to cache internal data structures
lock sync . Mutex
// cache capacity
capacity int
reservedCapacity int
allocatedCapacity int
// pack records currently being used by active restore worker
reservedPacks map [ restic . ID ] * packCacheRecord
// unused allocated packs, can be deleted if necessary
cachedPacks map [ restic . ID ] * packCacheRecord
}
type packCacheRecord struct {
master * packCacheRecord
cache * packCache
id restic . ID // cached pack id
offset int64 // cached pack byte range
data [ ] byte
}
type readerAtCloser interface {
io . Closer
io . ReaderAt
}
type bytesWriteSeeker struct {
pos int
data [ ] byte
}
func ( wr * bytesWriteSeeker ) Write ( p [ ] byte ) ( n int , err error ) {
if wr . pos + len ( p ) > len ( wr . data ) {
return - 1 , errors . Errorf ( "not enough space" )
}
n = copy ( wr . data [ wr . pos : ] , p )
wr . pos += n
return n , nil
}
func ( wr * bytesWriteSeeker ) Seek ( offset int64 , whence int ) ( int64 , error ) {
if offset != 0 || whence != io . SeekStart {
return - 1 , errors . Errorf ( "unsupported seek request" )
}
wr . pos = 0
return 0 , nil
}
func newPackCache ( capacity int ) * packCache {
return & packCache {
capacity : capacity ,
reservedPacks : make ( map [ restic . ID ] * packCacheRecord ) ,
cachedPacks : make ( map [ restic . ID ] * packCacheRecord ) ,
}
}
func ( c * packCache ) reserve ( packID restic . ID , offset int64 , length int ) ( record * packCacheRecord , err error ) {
c . lock . Lock ( )
defer c . lock . Unlock ( )
if offset < 0 || length <= 0 {
return nil , errors . Errorf ( "illegal pack cache allocation range %s {offset: %d, length: %d}" , packID . Str ( ) , offset , length )
}
if c . reservedCapacity + length > c . capacity {
return nil , errors . Errorf ( "not enough cache capacity: requested %d, available %d" , length , c . capacity - c . reservedCapacity )
}
if _ , ok := c . reservedPacks [ packID ] ; ok {
return nil , errors . Errorf ( "pack is already reserved %s" , packID . Str ( ) )
}
// the pack is available in the cache and currently unused
if pack , ok := c . cachedPacks [ packID ] ; ok {
// check if cached pack includes requested byte range
// the range can shrink, but it never grows bigger unless there is a bug elsewhere
if pack . offset > offset || ( pack . offset + int64 ( len ( pack . data ) ) ) < ( offset + int64 ( length ) ) {
return nil , errors . Errorf ( "cached range %d-%d is smaller than requested range %d-%d for pack %s" , pack . offset , pack . offset + int64 ( len ( pack . data ) ) , length , offset + int64 ( length ) , packID . Str ( ) )
}
// move the pack to the used map
delete ( c . cachedPacks , packID )
c . reservedPacks [ packID ] = pack
c . reservedCapacity += len ( pack . data )
debug . Log ( "Using cached pack %s (%d bytes)" , pack . id . Str ( ) , len ( pack . data ) )
if pack . offset != offset || len ( pack . data ) != length {
// restrict returned record to requested range
return & packCacheRecord {
cache : c ,
master : pack ,
offset : offset ,
data : pack . data [ int ( offset - pack . offset ) : int ( offset - pack . offset ) + length ] ,
} , nil
}
return pack , nil
}
for c . allocatedCapacity + length > c . capacity {
// all cached packs will be needed at some point
// so it does not matter which one to purge
for _ , cached := range c . cachedPacks {
delete ( c . cachedPacks , cached . id )
c . allocatedCapacity -= len ( cached . data )
debug . Log ( "dropped cached pack %s (%d bytes)" , cached . id . Str ( ) , len ( cached . data ) )
break
}
}
pack := & packCacheRecord {
cache : c ,
id : packID ,
offset : offset ,
}
c . reservedPacks [ pack . id ] = pack
c . allocatedCapacity += length
c . reservedCapacity += length
return pack , nil
}
// get returns reader of the specified cached pack. Uses provided load func
// to download pack content if necessary.
// The returned reader is only able to read pack within byte range specified
// by offset and length parameters, attempts to read outside that range will
// result in an error.
// The returned reader must be closed before the same packID can be requested
// from the cache again.
func ( c * packCache ) get ( packID restic . ID , offset int64 , length int , load func ( offset int64 , length int , wr io . WriteSeeker ) error ) ( readerAtCloser , error ) {
pack , err := c . reserve ( packID , offset , length )
if err != nil {
return nil , err
}
if pack . data == nil {
2018-09-13 01:48:58 +00:00
releasePack := func ( ) {
delete ( c . reservedPacks , pack . id )
c . reservedCapacity -= length
c . allocatedCapacity -= length
}
2018-04-08 12:02:30 +00:00
wr := & bytesWriteSeeker { data : make ( [ ] byte , length ) }
err = load ( offset , length , wr )
if err != nil {
2018-09-13 01:48:58 +00:00
releasePack ( )
2018-04-08 12:02:30 +00:00
return nil , err
}
if wr . pos != length {
2018-09-13 01:48:58 +00:00
releasePack ( )
2018-04-08 12:02:30 +00:00
return nil , errors . Errorf ( "invalid read size" )
}
pack . data = wr . data
debug . Log ( "Downloaded and cached pack %s (%d bytes)" , pack . id . Str ( ) , len ( pack . data ) )
}
return pack , nil
}
// releases the pack record back to the cache
func ( c * packCache ) release ( pack * packCacheRecord ) error {
c . lock . Lock ( )
defer c . lock . Unlock ( )
if _ , ok := c . reservedPacks [ pack . id ] ; ! ok {
return errors . Errorf ( "invalid pack release request" )
}
delete ( c . reservedPacks , pack . id )
c . cachedPacks [ pack . id ] = pack
c . reservedCapacity -= len ( pack . data )
return nil
}
// remove removes specified pack from the cache and frees
// corresponding cache space. should be called after the pack
// was fully used up by the restorer.
func ( c * packCache ) remove ( packID restic . ID ) error {
c . lock . Lock ( )
defer c . lock . Unlock ( )
if _ , ok := c . reservedPacks [ packID ] ; ok {
return errors . Errorf ( "invalid pack remove request, pack %s is reserved" , packID . Str ( ) )
}
pack , ok := c . cachedPacks [ packID ]
if ! ok {
return errors . Errorf ( "invalid pack remove request, pack %s is not cached" , packID . Str ( ) )
}
delete ( c . cachedPacks , pack . id )
c . allocatedCapacity -= len ( pack . data )
return nil
}
// ReadAt reads len(b) bytes from the pack starting at byte offset off.
// It returns the number of bytes read and the error, if any.
func ( r * packCacheRecord ) ReadAt ( b [ ] byte , off int64 ) ( n int , err error ) {
if off < r . offset || off + int64 ( len ( b ) ) > r . offset + int64 ( len ( r . data ) ) {
return - 1 , errors . Errorf ( "read outside available range" )
}
return copy ( b , r . data [ off - r . offset : ] ) , nil
}
// Close closes the pack reader and releases corresponding cache record
// to the cache. Once closed, the record can be reused by subsequent
// requests for the same packID or it can be purged from the cache to make
// room for other packs
func ( r * packCacheRecord ) Close ( ) ( err error ) {
if r . master != nil {
return r . cache . release ( r . master )
}
return r . cache . release ( r )
}