2014-01-06 10:11:18 +00:00
package model
2013-12-15 10:43:31 +00:00
import (
2014-01-06 10:11:18 +00:00
"crypto/sha1"
2014-01-06 20:31:36 +00:00
"errors"
2013-12-23 17:12:44 +00:00
"fmt"
2014-01-01 02:22:49 +00:00
"io"
2014-01-06 10:11:18 +00:00
"log"
2014-01-05 22:54:57 +00:00
"net"
2013-12-15 10:43:31 +00:00
"os"
"path"
"sync"
"time"
"github.com/calmh/syncthing/buffers"
"github.com/calmh/syncthing/protocol"
)
type Model struct {
2013-12-30 14:30:29 +00:00
dir string
2014-01-09 12:58:35 +00:00
global map [ string ] File // the latest version of each file as it exists in the cluster
2014-01-18 03:06:44 +00:00
gmut sync . RWMutex // protects global
2014-01-09 12:58:35 +00:00
local map [ string ] File // the files we currently have locally on disk
2014-01-18 03:06:44 +00:00
lmut sync . RWMutex // protects local
2014-01-09 12:58:35 +00:00
remote map [ string ] map [ string ] File
2014-01-18 03:06:44 +00:00
rmut sync . RWMutex // protects remote
2014-01-09 12:58:35 +00:00
protoConn map [ string ] Connection
rawConn map [ string ] io . Closer
2014-01-18 03:06:44 +00:00
pmut sync . RWMutex // protects protoConn and rawConn
2013-12-30 14:30:29 +00:00
2014-01-20 21:22:27 +00:00
// Queue for files to fetch. fq can call back into the model, so we must ensure
// to hold no locks when calling methods on fq.
fq * FileQueue
2014-01-18 03:06:44 +00:00
dq chan File // queue for files to delete
2013-12-24 20:21:03 +00:00
2014-01-18 03:06:44 +00:00
updatedLocal int64 // timestamp of last update to local
updateGlobal int64 // timestamp of last update to remote
2013-12-24 20:21:03 +00:00
lastIdxBcast time . Time
lastIdxBcastRequest time . Time
2014-01-18 03:06:44 +00:00
umut sync . RWMutex // provides updated* and lastIdx*
2014-01-06 10:11:18 +00:00
2014-01-09 15:35:49 +00:00
rwRunning bool
delete bool
2014-01-18 03:06:44 +00:00
initmut sync . Mutex // protects rwRunning and delete
2014-01-06 10:11:18 +00:00
trace map [ string ] bool
2014-01-07 15:10:38 +00:00
2014-01-22 10:38:52 +00:00
sup suppressor
2014-01-12 15:59:35 +00:00
2014-01-26 15:48:20 +00:00
parallelRequests int
limitRequestRate chan struct { }
2014-01-18 03:06:44 +00:00
imut sync . Mutex // protects Index
2013-12-15 10:43:31 +00:00
}
2014-01-09 12:58:35 +00:00
type Connection interface {
ID ( ) string
Index ( [ ] protocol . FileInfo )
2014-01-09 15:35:49 +00:00
Request ( name string , offset int64 , size uint32 , hash [ ] byte ) ( [ ] byte , error )
2014-01-09 12:58:35 +00:00
Statistics ( ) protocol . Statistics
2014-01-23 12:12:45 +00:00
Option ( key string ) string
2014-01-09 12:58:35 +00:00
}
2013-12-15 10:43:31 +00:00
const (
2013-12-28 13:10:36 +00:00
idxBcastHoldtime = 15 * time . Second // Wait at least this long after the last index modification
idxBcastMaxDelay = 120 * time . Second // Unless we've already waited this long
2014-01-07 15:10:38 +00:00
minFileHoldTimeS = 60 // Never allow file changes more often than this
maxFileHoldTimeS = 600 // Always allow file changes at least this often
2013-12-15 10:43:31 +00:00
)
2014-01-07 21:44:21 +00:00
var (
ErrNoSuchFile = errors . New ( "no such file" )
ErrInvalid = errors . New ( "file is invalid" )
)
2014-01-06 20:31:36 +00:00
2014-01-06 10:11:18 +00:00
// NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way.
2014-01-22 10:38:52 +00:00
func NewModel ( dir string , maxChangeBw int ) * Model {
2013-12-15 10:43:31 +00:00
m := & Model {
2014-01-22 10:38:52 +00:00
dir : dir ,
global : make ( map [ string ] File ) ,
local : make ( map [ string ] File ) ,
remote : make ( map [ string ] map [ string ] File ) ,
protoConn : make ( map [ string ] Connection ) ,
rawConn : make ( map [ string ] io . Closer ) ,
lastIdxBcast : time . Now ( ) ,
trace : make ( map [ string ] bool ) ,
sup : suppressor { threshold : int64 ( maxChangeBw ) } ,
fq : NewFileQueue ( ) ,
dq : make ( chan File ) ,
2013-12-15 10:43:31 +00:00
}
2013-12-24 20:21:03 +00:00
go m . broadcastIndexLoop ( )
2013-12-15 10:43:31 +00:00
return m
}
2014-01-12 15:59:35 +00:00
func ( m * Model ) LimitRate ( kbps int ) {
m . limitRequestRate = make ( chan struct { } , kbps )
n := kbps / 10 + 1
go func ( ) {
for {
time . Sleep ( 100 * time . Millisecond )
for i := 0 ; i < n ; i ++ {
select {
case m . limitRequestRate <- struct { } { } :
}
}
}
} ( )
}
2014-01-06 10:11:18 +00:00
// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
func ( m * Model ) Trace ( t string ) {
m . trace [ t ] = true
}
// StartRW starts read/write processing on the current model. When in
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
2014-01-09 15:35:49 +00:00
func ( m * Model ) StartRW ( del bool , threads int ) {
2014-01-18 03:06:44 +00:00
m . initmut . Lock ( )
defer m . initmut . Unlock ( )
2014-01-06 10:11:18 +00:00
if m . rwRunning {
panic ( "starting started model" )
}
m . rwRunning = true
m . delete = del
2014-01-26 15:48:20 +00:00
m . parallelRequests = threads
2014-01-06 10:11:18 +00:00
go m . cleanTempFiles ( )
2014-01-09 15:35:49 +00:00
if del {
2014-01-15 00:47:27 +00:00
go m . deleteLoop ( )
2014-01-09 15:35:49 +00:00
}
2013-12-15 10:43:31 +00:00
}
2014-01-06 10:11:18 +00:00
// Generation returns an opaque integer that is guaranteed to increment on
// every change to the local repository or global model.
2014-01-05 15:16:37 +00:00
func ( m * Model ) Generation ( ) int64 {
2014-01-18 03:06:44 +00:00
m . umut . RLock ( )
defer m . umut . RUnlock ( )
2014-01-05 15:16:37 +00:00
return m . updatedLocal + m . updateGlobal
2013-12-23 17:12:44 +00:00
}
2014-01-20 21:22:27 +00:00
func ( m * Model ) LocalAge ( ) float64 {
m . umut . RLock ( )
defer m . umut . RUnlock ( )
return time . Since ( time . Unix ( m . updatedLocal , 0 ) ) . Seconds ( )
}
2014-01-05 22:54:57 +00:00
type ConnectionInfo struct {
protocol . Statistics
2014-01-23 12:12:45 +00:00
Address string
ClientID string
ClientVersion string
2014-01-05 22:54:57 +00:00
}
2014-01-06 10:11:18 +00:00
// ConnectionStats returns a map with connection statistics for each connected node.
2014-01-05 22:54:57 +00:00
func ( m * Model ) ConnectionStats ( ) map [ string ] ConnectionInfo {
type remoteAddrer interface {
RemoteAddr ( ) net . Addr
}
2014-01-18 03:06:44 +00:00
m . pmut . RLock ( )
2014-01-05 15:16:37 +00:00
2014-01-05 22:54:57 +00:00
var res = make ( map [ string ] ConnectionInfo )
2014-01-09 12:58:35 +00:00
for node , conn := range m . protoConn {
2014-01-05 22:54:57 +00:00
ci := ConnectionInfo {
2014-01-23 12:12:45 +00:00
Statistics : conn . Statistics ( ) ,
ClientID : conn . Option ( "clientId" ) ,
ClientVersion : conn . Option ( "clientVersion" ) ,
2014-01-05 22:54:57 +00:00
}
if nc , ok := m . rawConn [ node ] . ( remoteAddrer ) ; ok {
ci . Address = nc . RemoteAddr ( ) . String ( )
}
res [ node ] = ci
2013-12-30 14:30:29 +00:00
}
2014-01-18 03:06:44 +00:00
m . pmut . RUnlock ( )
2014-01-05 15:16:37 +00:00
return res
2013-12-30 14:30:29 +00:00
}
2014-01-06 10:11:18 +00:00
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the global model.
2014-01-05 22:54:57 +00:00
func ( m * Model ) GlobalSize ( ) ( files , deleted , bytes int ) {
2014-01-18 03:06:44 +00:00
m . gmut . RLock ( )
2014-01-05 15:16:37 +00:00
2013-12-30 14:30:29 +00:00
for _ , f := range m . global {
2014-01-07 21:44:21 +00:00
if f . Flags & protocol . FlagDeleted == 0 {
2014-01-05 22:54:57 +00:00
files ++
bytes += f . Size ( )
} else {
deleted ++
}
2013-12-30 14:30:29 +00:00
}
2014-01-18 03:06:44 +00:00
m . gmut . RUnlock ( )
2014-01-05 15:16:37 +00:00
return
}
2013-12-30 14:30:29 +00:00
2014-01-06 10:11:18 +00:00
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the local repository.
2014-01-05 22:54:57 +00:00
func ( m * Model ) LocalSize ( ) ( files , deleted , bytes int ) {
2014-01-18 03:06:44 +00:00
m . lmut . RLock ( )
2013-12-30 14:30:29 +00:00
2014-01-05 15:16:37 +00:00
for _ , f := range m . local {
2014-01-07 21:44:21 +00:00
if f . Flags & protocol . FlagDeleted == 0 {
2014-01-05 22:54:57 +00:00
files ++
bytes += f . Size ( )
} else {
deleted ++
}
2013-12-30 14:30:29 +00:00
}
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
2014-01-05 15:16:37 +00:00
return
2013-12-30 14:30:29 +00:00
}
2014-01-06 10:11:18 +00:00
// InSyncSize returns the number and total byte size of the local files that
// are in sync with the global model.
2014-01-06 05:38:01 +00:00
func ( m * Model ) InSyncSize ( ) ( files , bytes int ) {
2014-01-18 03:06:44 +00:00
m . gmut . RLock ( )
m . lmut . RLock ( )
2014-01-06 05:38:01 +00:00
for n , f := range m . local {
2014-01-07 21:44:21 +00:00
if gf , ok := m . global [ n ] ; ok && f . Equals ( gf ) {
2014-01-06 05:38:01 +00:00
files ++
bytes += f . Size ( )
}
}
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
m . gmut . RUnlock ( )
2014-01-06 05:38:01 +00:00
return
}
2014-01-06 10:11:18 +00:00
// NeedFiles returns the list of currently needed files and the total size.
func ( m * Model ) NeedFiles ( ) ( files [ ] File , bytes int ) {
2014-01-20 21:22:27 +00:00
qf := m . fq . QueuedFiles ( )
2014-01-18 03:06:44 +00:00
m . gmut . RLock ( )
2014-01-05 15:16:37 +00:00
2014-01-20 21:22:27 +00:00
for _ , n := range qf {
2014-01-05 22:54:57 +00:00
f := m . global [ n ]
2014-01-06 10:11:18 +00:00
files = append ( files , f )
bytes += f . Size ( )
2013-12-23 17:12:44 +00:00
}
2014-01-18 03:06:44 +00:00
m . gmut . RUnlock ( )
2014-01-05 15:16:37 +00:00
return
2013-12-23 17:12:44 +00:00
}
2013-12-30 14:30:29 +00:00
// Index is called when a new node is connected and we receive their full index.
2014-01-06 10:11:18 +00:00
// Implements the protocol.Model interface.
2013-12-15 10:43:31 +00:00
func ( m * Model ) Index ( nodeID string , fs [ ] protocol . FileInfo ) {
2014-01-23 21:20:15 +00:00
var files = make ( [ ] File , len ( fs ) )
for i := range fs {
files [ i ] = fileFromFileInfo ( fs [ i ] )
}
2014-01-18 03:06:44 +00:00
m . imut . Lock ( )
defer m . imut . Unlock ( )
2013-12-15 10:43:31 +00:00
2014-01-06 10:11:18 +00:00
if m . trace [ "net" ] {
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: NET IDX(in): %s: %d files" , nodeID , len ( fs ) )
2013-12-15 10:43:31 +00:00
}
2014-01-09 09:59:09 +00:00
repo := make ( map [ string ] File )
2014-01-23 21:20:15 +00:00
for _ , f := range files {
2014-01-09 09:59:09 +00:00
m . indexUpdate ( repo , f )
2013-12-28 13:10:36 +00:00
}
2014-01-18 03:06:44 +00:00
m . rmut . Lock ( )
2014-01-09 09:59:09 +00:00
m . remote [ nodeID ] = repo
2014-01-18 03:06:44 +00:00
m . rmut . Unlock ( )
2013-12-28 13:10:36 +00:00
m . recomputeGlobal ( )
2014-01-23 21:20:15 +00:00
m . recomputeNeedForFiles ( files )
2013-12-28 13:10:36 +00:00
}
2013-12-30 14:30:29 +00:00
// IndexUpdate is called for incremental updates to connected nodes' indexes.
2014-01-06 10:11:18 +00:00
// Implements the protocol.Model interface.
2013-12-28 13:10:36 +00:00
func ( m * Model ) IndexUpdate ( nodeID string , fs [ ] protocol . FileInfo ) {
2014-01-23 21:20:15 +00:00
var files = make ( [ ] File , len ( fs ) )
for i := range fs {
files [ i ] = fileFromFileInfo ( fs [ i ] )
}
2014-01-18 03:06:44 +00:00
m . imut . Lock ( )
defer m . imut . Unlock ( )
2013-12-28 13:10:36 +00:00
2014-01-06 10:11:18 +00:00
if m . trace [ "net" ] {
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: NET IDXUP(in): %s: %d files" , nodeID , len ( files ) )
2013-12-28 13:10:36 +00:00
}
2014-01-18 03:06:44 +00:00
m . rmut . Lock ( )
2013-12-28 13:10:36 +00:00
repo , ok := m . remote [ nodeID ]
if ! ok {
2014-01-09 09:59:09 +00:00
log . Printf ( "WARNING: Index update from node %s that does not have an index" , nodeID )
2014-01-18 03:06:44 +00:00
m . rmut . Unlock ( )
2013-12-28 13:10:36 +00:00
return
}
2014-01-23 21:20:15 +00:00
for _ , f := range files {
2014-01-09 09:59:09 +00:00
m . indexUpdate ( repo , f )
2013-12-15 10:43:31 +00:00
}
2014-01-18 03:06:44 +00:00
m . rmut . Unlock ( )
2013-12-15 10:43:31 +00:00
m . recomputeGlobal ( )
2014-01-23 21:20:15 +00:00
m . recomputeNeedForFiles ( files )
2013-12-15 10:43:31 +00:00
}
2014-01-23 21:20:15 +00:00
func ( m * Model ) indexUpdate ( repo map [ string ] File , f File ) {
2014-01-09 09:59:09 +00:00
if m . trace [ "idx" ] {
var flagComment string
if f . Flags & protocol . FlagDeleted != 0 {
flagComment = " (deleted)"
}
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: IDX(in): %q m=%d f=%o%s v=%d (%d blocks)" , f . Name , f . Modified , f . Flags , flagComment , f . Version , len ( f . Blocks ) )
2014-01-09 09:59:09 +00:00
}
if extraFlags := f . Flags &^ ( protocol . FlagInvalid | protocol . FlagDeleted | 0xfff ) ; extraFlags != 0 {
log . Printf ( "WARNING: IDX(in): Unknown flags 0x%x in index record %+v" , extraFlags , f )
return
}
2014-01-23 21:20:15 +00:00
repo [ f . Name ] = f
2014-01-09 09:59:09 +00:00
}
2014-01-20 21:22:27 +00:00
// Close removes the peer from the model and closes the underlying connection if possible.
2014-01-06 10:11:18 +00:00
// Implements the protocol.Model interface.
2013-12-31 02:21:57 +00:00
func ( m * Model ) Close ( node string , err error ) {
2014-02-09 22:13:06 +00:00
if m . trace [ "net" ] {
log . Printf ( "DEBUG: NET: %s: %v" , node , err )
}
if err == protocol . ErrClusterHash {
log . Printf ( "WARNING: Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes." , node )
}
2014-01-20 21:22:27 +00:00
m . fq . RemoveAvailable ( node )
2014-01-18 03:06:44 +00:00
m . pmut . Lock ( )
m . rmut . Lock ( )
2013-12-15 10:43:31 +00:00
2014-01-01 02:22:49 +00:00
conn , ok := m . rawConn [ node ]
2014-01-01 13:09:17 +00:00
if ok {
conn . Close ( )
2013-12-31 02:21:57 +00:00
}
2013-12-15 10:43:31 +00:00
delete ( m . remote , node )
2014-01-09 12:58:35 +00:00
delete ( m . protoConn , node )
2014-01-01 02:22:49 +00:00
delete ( m . rawConn , node )
2013-12-15 10:43:31 +00:00
2014-01-18 03:06:44 +00:00
m . rmut . Unlock ( )
m . pmut . Unlock ( )
2013-12-15 10:43:31 +00:00
m . recomputeGlobal ( )
2014-01-23 21:20:15 +00:00
m . recomputeNeedForGlobal ( )
2013-12-15 10:43:31 +00:00
}
2014-01-06 10:11:18 +00:00
// Request returns the specified data segment by reading it from local disk.
// Implements the protocol.Model interface.
2014-01-09 15:35:49 +00:00
func ( m * Model ) Request ( nodeID , name string , offset int64 , size uint32 , hash [ ] byte ) ( [ ] byte , error ) {
2014-01-06 20:31:36 +00:00
// Verify that the requested file exists in the local and global model.
2014-01-18 03:06:44 +00:00
m . lmut . RLock ( )
2014-01-07 21:44:21 +00:00
lf , localOk := m . local [ name ]
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
m . gmut . RLock ( )
2014-01-06 20:31:36 +00:00
_ , globalOk := m . global [ name ]
2014-01-18 03:06:44 +00:00
m . gmut . RUnlock ( )
2014-01-06 20:31:36 +00:00
if ! localOk || ! globalOk {
log . Printf ( "SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x" , nodeID , name , offset , size , hash )
return nil , ErrNoSuchFile
}
2014-01-07 21:44:21 +00:00
if lf . Flags & protocol . FlagInvalid != 0 {
return nil , ErrInvalid
}
2014-01-06 20:31:36 +00:00
2014-01-06 10:11:18 +00:00
if m . trace [ "net" ] && nodeID != "<local>" {
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: NET REQ(in): %s: %q o=%d s=%d h=%x" , nodeID , name , offset , size , hash )
2013-12-15 10:43:31 +00:00
}
fn := path . Join ( m . dir , name )
fd , err := os . Open ( fn ) // XXX: Inefficient, should cache fd?
if err != nil {
return nil , err
}
defer fd . Close ( )
buf := buffers . Get ( int ( size ) )
2014-01-09 15:35:49 +00:00
_ , err = fd . ReadAt ( buf , offset )
2013-12-15 10:43:31 +00:00
if err != nil {
return nil , err
}
2014-01-12 15:59:35 +00:00
if m . limitRequestRate != nil {
for s := 0 ; s < len ( buf ) ; s += 1024 {
<- m . limitRequestRate
}
}
2013-12-15 10:43:31 +00:00
return buf , nil
}
2014-01-06 10:11:18 +00:00
// ReplaceLocal replaces the local repository index with the given list of files.
2013-12-15 10:43:31 +00:00
func ( m * Model ) ReplaceLocal ( fs [ ] File ) {
var updated bool
var newLocal = make ( map [ string ] File )
2014-01-18 03:06:44 +00:00
m . lmut . RLock ( )
2013-12-15 10:43:31 +00:00
for _ , f := range fs {
newLocal [ f . Name ] = f
2014-01-07 21:44:21 +00:00
if ef := m . local [ f . Name ] ; ! ef . Equals ( f ) {
2013-12-15 10:43:31 +00:00
updated = true
}
}
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
2013-12-15 10:43:31 +00:00
if m . markDeletedLocals ( newLocal ) {
updated = true
}
2014-01-18 03:06:44 +00:00
m . lmut . RLock ( )
2013-12-15 10:43:31 +00:00
if len ( newLocal ) != len ( m . local ) {
updated = true
}
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
2013-12-15 10:43:31 +00:00
if updated {
2014-01-18 03:06:44 +00:00
m . lmut . Lock ( )
2013-12-15 10:43:31 +00:00
m . local = newLocal
2014-01-18 03:06:44 +00:00
m . lmut . Unlock ( )
2013-12-15 10:43:31 +00:00
m . recomputeGlobal ( )
2014-01-23 21:20:15 +00:00
m . recomputeNeedForGlobal ( )
2014-01-18 03:06:44 +00:00
m . umut . Lock ( )
2013-12-30 14:30:29 +00:00
m . updatedLocal = time . Now ( ) . Unix ( )
2013-12-25 01:31:25 +00:00
m . lastIdxBcastRequest = time . Now ( )
2014-01-18 03:06:44 +00:00
m . umut . Unlock ( )
2013-12-15 10:43:31 +00:00
}
}
2014-01-06 10:11:18 +00:00
// SeedLocal replaces the local repository index with the given list of files,
// in protocol data types. Does not track deletes, should only be used to seed
// the local index from a cache file at startup.
func ( m * Model ) SeedLocal ( fs [ ] protocol . FileInfo ) {
2014-01-18 03:06:44 +00:00
m . lmut . Lock ( )
2014-01-06 10:11:18 +00:00
m . local = make ( map [ string ] File )
for _ , f := range fs {
m . local [ f . Name ] = fileFromFileInfo ( f )
}
2014-01-18 03:06:44 +00:00
m . lmut . Unlock ( )
2014-01-06 10:11:18 +00:00
m . recomputeGlobal ( )
2014-01-23 21:20:15 +00:00
m . recomputeNeedForGlobal ( )
2014-01-06 10:11:18 +00:00
}
// ConnectedTo returns true if we are connected to the named node.
func ( m * Model ) ConnectedTo ( nodeID string ) bool {
2014-01-18 03:06:44 +00:00
m . pmut . RLock ( )
2014-01-09 12:58:35 +00:00
_ , ok := m . protoConn [ nodeID ]
2014-01-18 03:06:44 +00:00
m . pmut . RUnlock ( )
2014-01-06 10:11:18 +00:00
return ok
}
// RepoID returns a unique ID representing the current repository location.
func ( m * Model ) RepoID ( ) string {
return fmt . Sprintf ( "%x" , sha1 . Sum ( [ ] byte ( m . dir ) ) )
}
// AddConnection adds a new peer connection to the model. An initial index will
// be sent to the connected peer, thereafter index updates whenever the local
// repository changes.
2014-01-09 12:58:35 +00:00
func ( m * Model ) AddConnection ( rawConn io . Closer , protoConn Connection ) {
nodeID := protoConn . ID ( )
2014-01-18 03:06:44 +00:00
m . pmut . Lock ( )
2014-01-09 12:58:35 +00:00
m . protoConn [ nodeID ] = protoConn
m . rawConn [ nodeID ] = rawConn
2014-01-18 03:06:44 +00:00
m . pmut . Unlock ( )
2014-01-06 10:11:18 +00:00
go func ( ) {
2014-01-18 03:06:44 +00:00
idx := m . ProtocolIndex ( )
2014-01-09 12:58:35 +00:00
protoConn . Index ( idx )
2014-01-06 10:11:18 +00:00
} ( )
2014-01-09 15:35:49 +00:00
2014-01-20 21:22:27 +00:00
m . initmut . Lock ( )
rw := m . rwRunning
m . initmut . Unlock ( )
if ! rw {
return
}
2014-01-26 15:48:20 +00:00
for i := 0 ; i < m . parallelRequests ; i ++ {
2014-01-20 21:22:27 +00:00
i := i
go func ( ) {
if m . trace [ "pull" ] {
2014-02-09 22:13:06 +00:00
log . Println ( "DEBUG: PULL: Starting" , nodeID , i )
2014-01-20 21:22:27 +00:00
}
for {
m . pmut . RLock ( )
if _ , ok := m . protoConn [ nodeID ] ; ! ok {
if m . trace [ "pull" ] {
2014-02-09 22:13:06 +00:00
log . Println ( "DEBUG: PULL: Exiting" , nodeID , i )
2014-01-09 15:35:49 +00:00
}
2014-01-18 03:06:44 +00:00
m . pmut . RUnlock ( )
2014-01-20 21:22:27 +00:00
return
}
m . pmut . RUnlock ( )
2014-01-09 15:35:49 +00:00
2014-01-20 21:22:27 +00:00
qb , ok := m . fq . Get ( nodeID )
if ok {
if m . trace [ "pull" ] {
2014-02-09 22:13:06 +00:00
log . Println ( "DEBUG: PULL: Request" , nodeID , i , qb . name , qb . block . Offset )
2014-01-09 15:35:49 +00:00
}
2014-01-20 21:22:27 +00:00
data , _ := protoConn . Request ( qb . name , qb . block . Offset , qb . block . Size , qb . block . Hash )
m . fq . Done ( qb . name , qb . block . Offset , data )
} else {
time . Sleep ( 1 * time . Second )
2014-01-09 15:35:49 +00:00
}
2014-01-20 21:22:27 +00:00
}
} ( )
2014-01-09 15:35:49 +00:00
}
2014-01-06 10:11:18 +00:00
}
2014-01-18 03:06:44 +00:00
// ProtocolIndex returns the current local index in protocol data types.
2014-01-06 10:11:18 +00:00
// Must be called with the read lock held.
2014-01-18 03:06:44 +00:00
func ( m * Model ) ProtocolIndex ( ) [ ] protocol . FileInfo {
2014-01-06 10:11:18 +00:00
var index [ ] protocol . FileInfo
2014-01-18 03:06:44 +00:00
m . lmut . RLock ( )
2014-01-06 10:11:18 +00:00
for _ , f := range m . local {
mf := fileInfoFromFile ( f )
if m . trace [ "idx" ] {
var flagComment string
2014-01-07 21:44:21 +00:00
if mf . Flags & protocol . FlagDeleted != 0 {
2014-01-06 10:11:18 +00:00
flagComment = " (deleted)"
}
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: IDX(out): %q m=%d f=%o%s v=%d (%d blocks)" , mf . Name , mf . Modified , mf . Flags , flagComment , mf . Version , len ( mf . Blocks ) )
2014-01-06 10:11:18 +00:00
}
index = append ( index , mf )
}
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
2014-01-06 10:11:18 +00:00
return index
}
2014-01-09 15:35:49 +00:00
func ( m * Model ) requestGlobal ( nodeID , name string , offset int64 , size uint32 , hash [ ] byte ) ( [ ] byte , error ) {
2014-01-18 03:06:44 +00:00
m . pmut . RLock ( )
2014-01-09 12:58:35 +00:00
nc , ok := m . protoConn [ nodeID ]
2014-01-18 03:06:44 +00:00
m . pmut . RUnlock ( )
2014-01-06 10:11:18 +00:00
if ! ok {
return nil , fmt . Errorf ( "requestGlobal: no such node: %s" , nodeID )
}
if m . trace [ "net" ] {
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: NET REQ(out): %s: %q o=%d s=%d h=%x" , nodeID , name , offset , size , hash )
2014-01-06 10:11:18 +00:00
}
return nc . Request ( name , offset , size , hash )
}
2013-12-24 20:21:03 +00:00
func ( m * Model ) broadcastIndexLoop ( ) {
for {
2014-01-18 03:06:44 +00:00
m . umut . RLock ( )
2013-12-24 20:21:03 +00:00
bcastRequested := m . lastIdxBcastRequest . After ( m . lastIdxBcast )
holdtimeExceeded := time . Since ( m . lastIdxBcastRequest ) > idxBcastHoldtime
2014-01-18 03:06:44 +00:00
m . umut . RUnlock ( )
2013-12-28 13:10:36 +00:00
2013-12-24 20:21:03 +00:00
maxDelayExceeded := time . Since ( m . lastIdxBcast ) > idxBcastMaxDelay
if bcastRequested && ( holdtimeExceeded || maxDelayExceeded ) {
2014-01-18 03:06:44 +00:00
idx := m . ProtocolIndex ( )
2013-12-28 13:10:36 +00:00
var indexWg sync . WaitGroup
2014-01-09 12:58:35 +00:00
indexWg . Add ( len ( m . protoConn ) )
2014-01-18 03:06:44 +00:00
m . umut . Lock ( )
2013-12-28 13:10:36 +00:00
m . lastIdxBcast = time . Now ( )
2014-01-18 03:06:44 +00:00
m . umut . Unlock ( )
m . pmut . RLock ( )
2014-01-09 12:58:35 +00:00
for _ , node := range m . protoConn {
2013-12-24 20:21:03 +00:00
node := node
2014-01-06 10:11:18 +00:00
if m . trace [ "net" ] {
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: NET IDX(out/loop): %s: %d files" , node . ID ( ) , len ( idx ) )
2013-12-24 20:21:03 +00:00
}
2013-12-28 13:10:36 +00:00
go func ( ) {
node . Index ( idx )
indexWg . Done ( )
} ( )
2013-12-24 20:21:03 +00:00
}
2014-01-18 03:06:44 +00:00
m . pmut . RUnlock ( )
2013-12-28 13:10:36 +00:00
indexWg . Wait ( )
2013-12-15 10:43:31 +00:00
}
2013-12-28 13:10:36 +00:00
time . Sleep ( idxBcastHoldtime )
2013-12-15 10:43:31 +00:00
}
}
// markDeletedLocals sets the deleted flag on files that have gone missing locally.
func ( m * Model ) markDeletedLocals ( newLocal map [ string ] File ) bool {
// For every file in the existing local table, check if they are also
// present in the new local table. If they are not, check that we already
// had the newest version available according to the global table and if so
// note the file as having been deleted.
var updated bool
2014-01-18 03:06:44 +00:00
m . gmut . RLock ( )
m . lmut . RLock ( )
2013-12-15 10:43:31 +00:00
for n , f := range m . local {
if _ , ok := newLocal [ n ] ; ! ok {
2014-01-07 21:44:21 +00:00
if gf := m . global [ n ] ; ! gf . NewerThan ( f ) {
if f . Flags & protocol . FlagDeleted == 0 {
f . Flags = protocol . FlagDeleted
f . Version ++
2013-12-15 10:43:31 +00:00
f . Blocks = nil
updated = true
}
newLocal [ n ] = f
}
}
}
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
m . gmut . RUnlock ( )
return updated
2014-01-09 15:35:49 +00:00
}
2014-01-06 10:11:18 +00:00
func ( m * Model ) updateLocal ( f File ) {
2014-01-18 03:06:44 +00:00
var updated bool
m . lmut . Lock ( )
2014-01-07 21:44:21 +00:00
if ef , ok := m . local [ f . Name ] ; ! ok || ! ef . Equals ( f ) {
2013-12-15 10:43:31 +00:00
m . local [ f . Name ] = f
2014-01-18 03:06:44 +00:00
updated = true
}
m . lmut . Unlock ( )
if updated {
2013-12-15 10:43:31 +00:00
m . recomputeGlobal ( )
2014-01-18 03:06:44 +00:00
// We don't recomputeNeed here for two reasons:
// - a need shouldn't have arisen due to having a newer local file
// - recomputeNeed might call into fq.Add but we might have been called by
// fq which would be a deadlock on fq
m . umut . Lock ( )
2013-12-30 14:30:29 +00:00
m . updatedLocal = time . Now ( ) . Unix ( )
2013-12-25 01:31:25 +00:00
m . lastIdxBcastRequest = time . Now ( )
2014-01-18 03:06:44 +00:00
m . umut . Unlock ( )
2013-12-15 10:43:31 +00:00
}
}
2014-01-23 21:20:15 +00:00
/ *
XXX : Not done , needs elegant handling of availability
func ( m * Model ) recomputeGlobalFor ( files [ ] File ) bool {
m . gmut . Lock ( )
defer m . gmut . Unlock ( )
var updated bool
for _ , f := range files {
if gf , ok := m . global [ f . Name ] ; ! ok || f . NewerThan ( gf ) {
m . global [ f . Name ] = f
updated = true
// Fix availability
}
}
return updated
}
* /
2013-12-15 10:43:31 +00:00
func ( m * Model ) recomputeGlobal ( ) {
var newGlobal = make ( map [ string ] File )
2014-01-18 03:06:44 +00:00
m . lmut . RLock ( )
2013-12-15 10:43:31 +00:00
for n , f := range m . local {
newGlobal [ n ] = f
}
2014-01-18 03:06:44 +00:00
m . lmut . RUnlock ( )
2013-12-15 10:43:31 +00:00
2014-01-20 21:22:27 +00:00
var available = make ( map [ string ] [ ] string )
2014-01-18 03:06:44 +00:00
m . rmut . RLock ( )
2014-01-09 12:58:35 +00:00
var highestMod int64
2014-01-13 17:29:23 +00:00
for nodeID , fs := range m . remote {
2014-01-08 13:21:47 +00:00
for n , nf := range fs {
if lf , ok := newGlobal [ n ] ; ! ok || nf . NewerThan ( lf ) {
newGlobal [ n ] = nf
2014-01-20 21:22:27 +00:00
available [ n ] = [ ] string { nodeID }
2014-01-09 12:58:35 +00:00
if nf . Modified > highestMod {
highestMod = nf . Modified
}
2014-01-13 17:29:23 +00:00
} else if lf . Equals ( nf ) {
2014-01-20 21:22:27 +00:00
available [ n ] = append ( available [ n ] , nodeID )
2013-12-15 10:43:31 +00:00
}
}
}
2014-01-18 03:06:44 +00:00
m . rmut . RUnlock ( )
2013-12-15 10:43:31 +00:00
2014-01-20 21:22:27 +00:00
for f , ns := range available {
m . fq . SetAvailable ( f , ns )
}
2013-12-30 14:30:29 +00:00
// Figure out if anything actually changed
2014-01-18 03:06:44 +00:00
m . gmut . RLock ( )
2013-12-30 14:30:29 +00:00
var updated bool
2014-01-09 12:58:35 +00:00
if highestMod > m . updateGlobal || len ( newGlobal ) != len ( m . global ) {
2013-12-30 14:30:29 +00:00
updated = true
} else {
for n , f0 := range newGlobal {
2014-01-07 21:44:21 +00:00
if f1 , ok := m . global [ n ] ; ! ok || ! f0 . Equals ( f1 ) {
2013-12-30 14:30:29 +00:00
updated = true
break
}
}
}
2014-01-18 03:06:44 +00:00
m . gmut . RUnlock ( )
2013-12-30 14:30:29 +00:00
if updated {
2014-01-18 03:06:44 +00:00
m . gmut . Lock ( )
m . umut . Lock ( )
2013-12-30 14:30:29 +00:00
m . global = newGlobal
2014-01-18 03:06:44 +00:00
m . updateGlobal = time . Now ( ) . Unix ( )
m . umut . Unlock ( )
m . gmut . Unlock ( )
2013-12-30 14:30:29 +00:00
}
2013-12-15 10:43:31 +00:00
}
2014-01-23 21:20:15 +00:00
type addOrder struct {
n string
remote [ ] Block
fm * fileMonitor
}
2014-01-18 03:06:44 +00:00
2014-01-23 21:20:15 +00:00
func ( m * Model ) recomputeNeedForGlobal ( ) {
2014-01-15 00:47:27 +00:00
var toDelete [ ] File
2014-01-18 03:06:44 +00:00
var toAdd [ ] addOrder
m . gmut . RLock ( )
2014-01-15 00:47:27 +00:00
2014-01-23 21:20:15 +00:00
for _ , gf := range m . global {
toAdd , toDelete = m . recomputeNeedForFile ( gf , toAdd , toDelete )
}
2014-01-18 03:06:44 +00:00
2014-01-23 21:20:15 +00:00
m . gmut . RUnlock ( )
2014-01-09 15:35:49 +00:00
2014-01-23 21:20:15 +00:00
for _ , ao := range toAdd {
m . fq . Add ( ao . n , ao . remote , ao . fm )
}
for _ , gf := range toDelete {
m . dq <- gf
}
}
func ( m * Model ) recomputeNeedForFiles ( files [ ] File ) {
var toDelete [ ] File
var toAdd [ ] addOrder
m . gmut . RLock ( )
for _ , gf := range files {
toAdd , toDelete = m . recomputeNeedForFile ( gf , toAdd , toDelete )
2013-12-15 10:43:31 +00:00
}
2014-01-15 00:47:27 +00:00
2014-01-18 03:06:44 +00:00
m . gmut . RUnlock ( )
2013-12-15 10:43:31 +00:00
2014-01-18 03:06:44 +00:00
for _ , ao := range toAdd {
2014-01-23 15:39:12 +00:00
m . fq . Add ( ao . n , ao . remote , ao . fm )
2014-01-18 03:06:44 +00:00
}
for _ , gf := range toDelete {
m . dq <- gf
}
2014-01-13 17:29:23 +00:00
}
2014-01-23 21:20:15 +00:00
func ( m * Model ) recomputeNeedForFile ( gf File , toAdd [ ] addOrder , toDelete [ ] File ) ( [ ] addOrder , [ ] File ) {
m . lmut . RLock ( )
lf , ok := m . local [ gf . Name ]
m . lmut . RUnlock ( )
if ! ok || gf . NewerThan ( lf ) {
if gf . Flags & protocol . FlagInvalid != 0 {
// Never attempt to sync invalid files
return toAdd , toDelete
}
if gf . Flags & protocol . FlagDeleted != 0 && ! m . delete {
// Don't want to delete files, so forget this need
return toAdd , toDelete
}
if gf . Flags & protocol . FlagDeleted != 0 && ! ok {
// Don't have the file, so don't need to delete it
return toAdd , toDelete
}
if m . trace [ "need" ] {
2014-02-09 22:13:06 +00:00
log . Printf ( "DEBUG: NEED: lf:%v gf:%v" , lf , gf )
2014-01-23 21:20:15 +00:00
}
if gf . Flags & protocol . FlagDeleted != 0 {
toDelete = append ( toDelete , gf )
} else {
local , remote := BlockDiff ( lf . Blocks , gf . Blocks )
fm := fileMonitor {
name : gf . Name ,
path : path . Clean ( path . Join ( m . dir , gf . Name ) ) ,
global : gf ,
model : m ,
localBlocks : local ,
}
toAdd = append ( toAdd , addOrder { gf . Name , remote , & fm } )
}
}
return toAdd , toDelete
}
2014-01-18 03:06:44 +00:00
func ( m * Model ) WhoHas ( name string ) [ ] string {
2013-12-15 10:43:31 +00:00
var remote [ ] string
2014-01-18 03:06:44 +00:00
m . gmut . RLock ( )
m . rmut . RLock ( )
2013-12-15 10:43:31 +00:00
gf := m . global [ name ]
for node , files := range m . remote {
2014-01-07 21:44:21 +00:00
if file , ok := files [ name ] ; ok && file . Equals ( gf ) {
2013-12-15 10:43:31 +00:00
remote = append ( remote , node )
}
}
2014-01-18 03:06:44 +00:00
m . rmut . RUnlock ( )
m . gmut . RUnlock ( )
2013-12-15 10:43:31 +00:00
return remote
}
2014-01-15 00:47:27 +00:00
func ( m * Model ) deleteLoop ( ) {
2014-01-09 15:35:49 +00:00
for file := range m . dq {
if m . trace [ "file" ] {
2014-02-09 22:13:06 +00:00
log . Println ( "DEBUG: FILE: Delete" , file . Name )
2014-01-09 15:35:49 +00:00
}
path := path . Clean ( path . Join ( m . dir , file . Name ) )
err := os . Remove ( path )
if err != nil {
log . Printf ( "WARNING: %s: %v" , file . Name , err )
}
2014-01-18 03:06:44 +00:00
m . updateLocal ( file )
2014-01-09 15:35:49 +00:00
}
}
2013-12-30 01:33:57 +00:00
func fileFromFileInfo ( f protocol . FileInfo ) File {
2014-01-09 12:58:35 +00:00
var blocks = make ( [ ] Block , len ( f . Blocks ) )
2014-01-09 15:35:49 +00:00
var offset int64
2014-01-09 12:58:35 +00:00
for i , b := range f . Blocks {
blocks [ i ] = Block {
2013-12-28 13:10:36 +00:00
Offset : offset ,
2014-01-09 15:35:49 +00:00
Size : b . Size ,
2013-12-28 13:10:36 +00:00
Hash : b . Hash ,
2014-01-09 12:58:35 +00:00
}
2014-01-09 15:35:49 +00:00
offset += int64 ( b . Size )
2013-12-28 13:10:36 +00:00
}
2013-12-30 00:49:40 +00:00
return File {
Name : f . Name ,
Flags : f . Flags ,
2014-01-09 15:35:49 +00:00
Modified : f . Modified ,
2014-01-07 21:44:21 +00:00
Version : f . Version ,
2013-12-30 00:49:40 +00:00
Blocks : blocks ,
}
2013-12-28 13:10:36 +00:00
}
2013-12-30 01:33:57 +00:00
func fileInfoFromFile ( f File ) protocol . FileInfo {
2014-01-09 12:58:35 +00:00
var blocks = make ( [ ] protocol . BlockInfo , len ( f . Blocks ) )
for i , b := range f . Blocks {
blocks [ i ] = protocol . BlockInfo {
2014-01-09 15:35:49 +00:00
Size : b . Size ,
Hash : b . Hash ,
2014-01-09 12:58:35 +00:00
}
2013-12-30 01:33:57 +00:00
}
return protocol . FileInfo {
Name : f . Name ,
Flags : f . Flags ,
2014-01-09 15:35:49 +00:00
Modified : f . Modified ,
2014-01-07 21:44:21 +00:00
Version : f . Version ,
2013-12-30 01:33:57 +00:00
Blocks : blocks ,
}
}