// Copyright (C) 2014 The Syncthing Authors. // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this file, // You can obtain one at https://mozilla.org/MPL/2.0/. //go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6 //go:generate counterfeiter -o mocks/model.go --fake-name Model . Model package model import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net" "os" "path/filepath" "reflect" "runtime" "strings" stdsync "sync" "sync/atomic" "time" "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/connections" "github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/semaphore" "github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/svcutil" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/ur/contract" "github.com/syncthing/syncthing/lib/versioner" ) type service interface { suture.Service BringToFront(string) Override() Revert() DelayScan(d time.Duration) ScheduleScan() SchedulePull() // something relevant changed, we should try a pull Jobs(page, perpage int) ([]string, []string, int) // In progress, Queued, skipped Scan(subs []string) error Errors() []FileError WatchError() error ScheduleForceRescan(path string) GetStatistics() (stats.FolderStatistics, error) getState() (folderState, time.Time, error) } type Availability struct { ID protocol.DeviceID `json:"id"` FromTemporary bool `json:"fromTemporary"` } type Model interface { suture.Service connections.Model ResetFolder(folder string) error DelayScan(folder string, next time.Duration) ScanFolder(folder string) error ScanFolders() map[string]error ScanFolderSubdirs(folder string, subs []string) error State(folder string) (string, time.Time, error) FolderErrors(folder string) ([]FileError, error) WatchError(folder string) error Override(folder string) Revert(folder string) BringToFront(folder, file string) LoadIgnores(folder string) ([]string, []string, error) CurrentIgnores(folder string) ([]string, []string, error) SetIgnores(folder string, content []string) error GetFolderVersions(folder string) (map[string][]versioner.FileVersion, error) RestoreFolderVersions(folder string, versions map[string]time.Time) (map[string]error, error) DBSnapshot(folder string) (*db.Snapshot, error) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, error) RemoteNeedFolderFiles(folder string, device protocol.DeviceID, page, perpage int) ([]db.FileInfoTruncated, error) LocalChangedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, error) FolderProgressBytesCompleted(folder string) int64 CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool, error) CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool, error) GetMtimeMapping(folder string, file string) (fs.MtimeMapping, error) Availability(folder string, file protocol.FileInfo, block protocol.BlockInfo) ([]Availability, error) Completion(device protocol.DeviceID, folder string) (FolderCompletion, error) ConnectionStats() map[string]interface{} DeviceStatistics() (map[protocol.DeviceID]stats.DeviceStatistics, error) FolderStatistics() (map[string]stats.FolderStatistics, error) UsageReportingStats(report *contract.Report, version int, preview bool) ConnectedTo(remoteID protocol.DeviceID) bool PendingDevices() (map[protocol.DeviceID]db.ObservedDevice, error) PendingFolders(device protocol.DeviceID) (map[string]db.PendingFolder, error) DismissPendingDevice(device protocol.DeviceID) error DismissPendingFolder(device protocol.DeviceID, folder string) error GlobalDirectoryTree(folder, prefix string, levels int, dirsOnly bool) ([]*TreeEntry, error) RequestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) } type model struct { *suture.Supervisor // constructor parameters cfg config.Wrapper id protocol.DeviceID db *db.Lowlevel protectedFiles []string evLogger events.Logger // constant or concurrency safe fields finder *db.BlockFinder progressEmitter *ProgressEmitter shortID protocol.ShortID // globalRequestLimiter limits the amount of data in concurrent incoming // requests globalRequestLimiter *semaphore.Semaphore // folderIOLimiter limits the number of concurrent I/O heavy operations, // such as scans and pulls. folderIOLimiter *semaphore.Semaphore fatalChan chan error started chan struct{} keyGen *protocol.KeyGenerator promotionTimer *time.Timer // fields protected by mut mut sync.RWMutex folderCfgs map[string]config.FolderConfiguration // folder -> cfg folderFiles map[string]*db.FileSet // folder -> files deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef folderIgnores map[string]*ignore.Matcher // folder -> matcher object folderRunners *serviceMap[string, service] // folder -> puller or scanner folderRestartMuts syncMutexMap // folder -> restart mutex folderVersioners map[string]versioner.Versioner // folder -> versioner (may be nil) folderEncryptionPasswordTokens map[string][]byte // folder -> encryption token (may be missing, and only for encryption type folders) folderEncryptionFailures map[string]map[protocol.DeviceID]error // folder -> device -> error regarding encryption consistency (may be missing) connections map[string]protocol.Connection // connection ID -> connection deviceConnIDs map[protocol.DeviceID][]string // device -> connection IDs (invariant: if the key exists, the value is len >= 1, with the primary connection at the start of the slice) promotedConnID map[protocol.DeviceID]string // device -> latest promoted connection ID connRequestLimiters map[protocol.DeviceID]*semaphore.Semaphore closed map[string]chan struct{} // connection ID -> closed channel helloMessages map[protocol.DeviceID]protocol.Hello deviceDownloads map[protocol.DeviceID]*deviceDownloadState remoteFolderStates map[protocol.DeviceID]map[string]remoteFolderState // deviceID -> folders indexHandlers *serviceMap[protocol.DeviceID, *indexHandlerRegistry] // for testing only foldersRunning atomic.Int32 } var _ config.Verifier = &model{} type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, events.Logger, *semaphore.Semaphore) service var folderFactories = make(map[config.FolderType]folderFactory) var ( errDeviceUnknown = errors.New("unknown device") errDevicePaused = errors.New("device is paused") ErrFolderPaused = errors.New("folder is paused") ErrFolderNotRunning = errors.New("folder is not running") ErrFolderMissing = errors.New("no such folder") errNoVersioner = errors.New("folder has no versioner") // errors about why a connection is closed errStopped = errors.New("Syncthing is being stopped") errEncryptionInvConfigLocal = errors.New("can't encrypt outgoing data because local data is encrypted (folder-type receive-encrypted)") errEncryptionInvConfigRemote = errors.New("remote has encrypted data and encrypts that data for us - this is impossible") errEncryptionNotEncryptedLocal = errors.New("remote expects to exchange encrypted data, but is configured for plain data") errEncryptionPlainForReceiveEncrypted = errors.New("remote expects to exchange plain data, but is configured to be encrypted") errEncryptionPlainForRemoteEncrypted = errors.New("remote expects to exchange plain data, but local data is encrypted (folder-type receive-encrypted)") errEncryptionNotEncryptedUntrusted = errors.New("device is untrusted, but configured to receive plain data") errEncryptionPassword = errors.New("different encryption passwords used") errEncryptionTokenRead = errors.New("failed to read encryption token") errEncryptionTokenWrite = errors.New("failed to write encryption token") errMissingRemoteInClusterConfig = errors.New("remote device missing in cluster config") errMissingLocalInClusterConfig = errors.New("local device missing in cluster config") ) // NewModel creates and starts a new model. The model starts in read-only mode, // where it sends index information to connected peers and responds to requests // for file data without altering the local folder in any way. func NewModel(cfg config.Wrapper, id protocol.DeviceID, ldb *db.Lowlevel, protectedFiles []string, evLogger events.Logger, keyGen *protocol.KeyGenerator) Model { spec := svcutil.SpecWithDebugLogger(l) m := &model{ Supervisor: suture.New("model", spec), // constructor parameters cfg: cfg, id: id, db: ldb, protectedFiles: protectedFiles, evLogger: evLogger, // constant or concurrency safe fields finder: db.NewBlockFinder(ldb), progressEmitter: NewProgressEmitter(cfg, evLogger), shortID: id.Short(), globalRequestLimiter: semaphore.New(1024 * cfg.Options().MaxConcurrentIncomingRequestKiB()), folderIOLimiter: semaphore.New(cfg.Options().MaxFolderConcurrency()), fatalChan: make(chan error), started: make(chan struct{}), keyGen: keyGen, promotionTimer: time.NewTimer(0), // fields protected by mut mut: sync.NewRWMutex(), folderCfgs: make(map[string]config.FolderConfiguration), folderFiles: make(map[string]*db.FileSet), deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), folderIgnores: make(map[string]*ignore.Matcher), folderRunners: newServiceMap[string, service](evLogger), folderVersioners: make(map[string]versioner.Versioner), folderEncryptionPasswordTokens: make(map[string][]byte), folderEncryptionFailures: make(map[string]map[protocol.DeviceID]error), connections: make(map[string]protocol.Connection), deviceConnIDs: make(map[protocol.DeviceID][]string), promotedConnID: make(map[protocol.DeviceID]string), connRequestLimiters: make(map[protocol.DeviceID]*semaphore.Semaphore), closed: make(map[string]chan struct{}), helloMessages: make(map[protocol.DeviceID]protocol.Hello), deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), remoteFolderStates: make(map[protocol.DeviceID]map[string]remoteFolderState), indexHandlers: newServiceMap[protocol.DeviceID, *indexHandlerRegistry](evLogger), } for devID, cfg := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID) m.setConnRequestLimitersLocked(cfg) } m.Add(m.folderRunners) m.Add(m.progressEmitter) m.Add(m.indexHandlers) m.Add(svcutil.AsService(m.serve, m.String())) return m } func (m *model) serve(ctx context.Context) error { defer m.closeAllConnectionsAndWait() cfg := m.cfg.Subscribe(m) defer m.cfg.Unsubscribe(m) if err := m.initFolders(cfg); err != nil { close(m.started) return svcutil.AsFatalErr(err, svcutil.ExitError) } close(m.started) for { select { case <-ctx.Done(): l.Debugln(m, "context closed, stopping", ctx.Err()) return ctx.Err() case err := <-m.fatalChan: l.Debugln(m, "fatal error, stopping", err) return svcutil.AsFatalErr(err, svcutil.ExitError) case <-m.promotionTimer.C: l.Debugln("promotion timer fired") m.promoteConnections() } } } func (m *model) initFolders(cfg config.Configuration) error { clusterConfigDevices := make(deviceIDSet, len(cfg.Devices)) for _, folderCfg := range cfg.Folders { if folderCfg.Paused { folderCfg.CreateRoot() continue } err := m.newFolder(folderCfg, cfg.Options.CacheIgnoredFiles) if err != nil { return err } clusterConfigDevices.add(folderCfg.DeviceIDs()) } ignoredDevices := observedDeviceSet(m.cfg.IgnoredDevices()) m.cleanPending(cfg.DeviceMap(), cfg.FolderMap(), ignoredDevices, nil) m.sendClusterConfig(clusterConfigDevices.AsSlice()) return nil } func (m *model) closeAllConnectionsAndWait() { m.mut.RLock() closed := make([]chan struct{}, 0, len(m.connections)) for connID, conn := range m.connections { closed = append(closed, m.closed[connID]) go conn.Close(errStopped) } m.mut.RUnlock() for _, c := range closed { <-c } } func (m *model) fatal(err error) { select { case m.fatalChan <- err: default: } } // Need to hold lock on m.mut when calling this. func (m *model) addAndStartFolderLocked(cfg config.FolderConfiguration, fset *db.FileSet, cacheIgnoredFiles bool) { ignores := ignore.New(cfg.Filesystem(nil), ignore.WithCache(cacheIgnoredFiles)) if cfg.Type != config.FolderTypeReceiveEncrypted { if err := ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) { l.Warnln("Loading ignores:", err) } } m.addAndStartFolderLockedWithIgnores(cfg, fset, ignores) } // Only needed for testing, use addAndStartFolderLocked instead. func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguration, fset *db.FileSet, ignores *ignore.Matcher) { m.folderCfgs[cfg.ID] = cfg m.folderFiles[cfg.ID] = fset m.folderIgnores[cfg.ID] = ignores _, ok := m.folderRunners.Get(cfg.ID) if ok { l.Warnln("Cannot start already running folder", cfg.Description()) panic("cannot start already running folder") } folderFactory, ok := folderFactories[cfg.Type] if !ok { panic(fmt.Sprintf("unknown folder type 0x%x", cfg.Type)) } folder := cfg.ID // Find any devices for which we hold the index in the db, but the folder // is not shared, and drop it. expected := mapDevices(cfg.DeviceIDs()) for _, available := range fset.ListDevices() { if _, ok := expected[available]; !ok { l.Debugln("dropping", folder, "state for", available) fset.Drop(available) } } v, ok := fset.Sequence(protocol.LocalDeviceID), true indexHasFiles := ok && v > 0 if !indexHasFiles { // It's a blank folder, so this may the first time we're looking at // it. Attempt to create and tag with our marker as appropriate. We // don't really do anything with errors at this point except warn - // if these things don't work, we still want to start the folder and // it'll show up as errored later. if err := cfg.CreateRoot(); err != nil { l.Warnln("Failed to create folder root directory:", err) } else if err = cfg.CreateMarker(); err != nil { l.Warnln("Failed to create folder marker:", err) } } if cfg.Type == config.FolderTypeReceiveEncrypted { if encryptionToken, err := readEncryptionToken(cfg); err == nil { m.folderEncryptionPasswordTokens[folder] = encryptionToken } else if !fs.IsNotExist(err) { l.Warnf("Failed to read encryption token: %v", err) } } // These are our metadata files, and they should always be hidden. ffs := cfg.Filesystem(nil) _ = ffs.Hide(config.DefaultMarkerName) _ = ffs.Hide(versioner.DefaultPath) _ = ffs.Hide(".stignore") var ver versioner.Versioner if cfg.Versioning.Type != "" { var err error ver, err = versioner.New(cfg) if err != nil { panic(fmt.Errorf("creating versioner: %w", err)) } } m.folderVersioners[folder] = ver m.warnAboutOverwritingProtectedFiles(cfg, ignores) p := folderFactory(m, fset, ignores, cfg, ver, m.evLogger, m.folderIOLimiter) m.folderRunners.Add(folder, p) l.Infof("Ready to synchronize %s (%s)", cfg.Description(), cfg.Type) } func (m *model) warnAboutOverwritingProtectedFiles(cfg config.FolderConfiguration, ignores *ignore.Matcher) { if cfg.Type == config.FolderTypeSendOnly { return } // This is a bit of a hack. ffs := cfg.Filesystem(nil) if ffs.Type() != fs.FilesystemTypeBasic { return } folderLocation := ffs.URI() var filesAtRisk []string for _, protectedFilePath := range m.protectedFiles { // check if file is synced in this folder if protectedFilePath != folderLocation && !fs.IsParent(protectedFilePath, folderLocation) { continue } // check if file is ignored relPath, _ := filepath.Rel(folderLocation, protectedFilePath) if ignores.Match(relPath).IsIgnored() { continue } filesAtRisk = append(filesAtRisk, protectedFilePath) } if len(filesAtRisk) > 0 { l.Warnln("Some protected files may be overwritten and cause issues. See https://docs.syncthing.net/users/config.html#syncing-configuration-files for more information. The at risk files are:", strings.Join(filesAtRisk, ", ")) } } func (m *model) removeFolder(cfg config.FolderConfiguration) { m.mut.RLock() wait := m.folderRunners.StopAndWaitChan(cfg.ID, 0) m.mut.RUnlock() <-wait m.mut.Lock() isPathUnique := true for folderID, folderCfg := range m.folderCfgs { if folderID != cfg.ID && folderCfg.Path == cfg.Path { isPathUnique = false break } } if isPathUnique { // Remove (if empty and removable) or move away (if non-empty or // otherwise not removable) Syncthing-specific marker files. if err := cfg.RemoveMarker(); err != nil && !errors.Is(err, os.ErrNotExist) { moved := config.DefaultMarkerName + time.Now().Format(".removed-20060102-150405") fs := cfg.Filesystem(nil) _ = fs.Rename(config.DefaultMarkerName, moved) } } m.cleanupFolderLocked(cfg) m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { r.Remove(cfg.ID) return nil }) m.mut.Unlock() // Remove it from the database db.DropFolder(m.db, cfg.ID) } // Need to hold lock on m.mut when calling this. func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) { // clear up our config maps m.folderRunners.Remove(cfg.ID) delete(m.folderCfgs, cfg.ID) delete(m.folderFiles, cfg.ID) delete(m.folderIgnores, cfg.ID) delete(m.folderVersioners, cfg.ID) delete(m.folderEncryptionPasswordTokens, cfg.ID) delete(m.folderEncryptionFailures, cfg.ID) } func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredFiles bool) error { if to.ID == "" { panic("bug: cannot restart empty folder ID") } if to.ID != from.ID { l.Warnf("bug: folder restart cannot change ID %q -> %q", from.ID, to.ID) panic("bug: folder restart cannot change ID") } folder := to.ID // This mutex protects the entirety of the restart operation, preventing // there from being more than one folder restart operation in progress // at any given time. The usual locking stuff doesn't cover this, // because those locks are released while we are waiting for the folder // to shut down (and must be so because the folder might need them as // part of its operations before shutting down). restartMut := m.folderRestartMuts.Get(folder) restartMut.Lock() defer restartMut.Unlock() m.mut.RLock() wait := m.folderRunners.StopAndWaitChan(from.ID, 0) m.mut.RUnlock() <-wait m.mut.Lock() defer m.mut.Unlock() // Cache the (maybe) existing fset before it's removed by cleanupFolderLocked fset := m.folderFiles[folder] fsetNil := fset == nil m.cleanupFolderLocked(from) if !to.Paused { if fsetNil { // Create a new fset. Might take a while and we do it under // locking, but it's unsafe to create fset:s concurrently so // that's the price we pay. var err error fset, err = db.NewFileSet(folder, m.db) if err != nil { return fmt.Errorf("restarting %v: %w", to.Description(), err) } } m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles) } runner, _ := m.folderRunners.Get(to.ID) m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { r.RegisterFolderState(to, fset, runner) return nil }) var infoMsg string switch { case to.Paused: infoMsg = "Paused" case from.Paused: infoMsg = "Unpaused" default: infoMsg = "Restarted" } l.Infof("%v folder %v (%v)", infoMsg, to.Description(), to.Type) return nil } func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool) error { // Creating the fileset can take a long time (metadata calculation) so // we do it outside of the lock. fset, err := db.NewFileSet(cfg.ID, m.db) if err != nil { return fmt.Errorf("adding %v: %w", cfg.Description(), err) } m.mut.Lock() defer m.mut.Unlock() m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles) // Cluster configs might be received and processed before reaching this // point, i.e. before the folder is started. If that's the case, start // index senders here. m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { runner, _ := m.folderRunners.Get(cfg.ID) r.RegisterFolderState(cfg, fset, runner) return nil }) return nil } func (m *model) UsageReportingStats(report *contract.Report, version int, preview bool) { if version >= 3 { // Block stats blockStatsMut.Lock() for k, v := range blockStats { switch k { case "total": report.BlockStats.Total = v case "renamed": report.BlockStats.Renamed = v case "reused": report.BlockStats.Reused = v case "pulled": report.BlockStats.Pulled = v case "copyOrigin": report.BlockStats.CopyOrigin = v case "copyOriginShifted": report.BlockStats.CopyOriginShifted = v case "copyElsewhere": report.BlockStats.CopyElsewhere = v } // Reset counts, as these are incremental if !preview { blockStats[k] = 0 } } blockStatsMut.Unlock() // Transport stats m.mut.RLock() for _, conn := range m.connections { report.TransportStats[conn.Transport()]++ } m.mut.RUnlock() // Ignore stats var seenPrefix [3]bool for folder := range m.cfg.Folders() { lines, _, err := m.CurrentIgnores(folder) if err != nil { continue } report.IgnoreStats.Lines += len(lines) for _, line := range lines { // Allow prefixes to be specified in any order, but only once. for { if strings.HasPrefix(line, "!") && !seenPrefix[0] { seenPrefix[0] = true line = line[1:] report.IgnoreStats.Inverts++ } else if strings.HasPrefix(line, "(?i)") && !seenPrefix[1] { seenPrefix[1] = true line = line[4:] report.IgnoreStats.Folded++ } else if strings.HasPrefix(line, "(?d)") && !seenPrefix[2] { seenPrefix[2] = true line = line[4:] report.IgnoreStats.Deletable++ } else { seenPrefix[0] = false seenPrefix[1] = false seenPrefix[2] = false break } } // Noops, remove line = strings.TrimSuffix(line, "**") line = strings.TrimPrefix(line, "**/") if strings.HasPrefix(line, "/") { report.IgnoreStats.Rooted++ } else if strings.HasPrefix(line, "#include ") { report.IgnoreStats.Includes++ if strings.Contains(line, "..") { report.IgnoreStats.EscapedIncludes++ } } if strings.Contains(line, "**") { report.IgnoreStats.DoubleStars++ // Remove not to trip up star checks. line = strings.ReplaceAll(line, "**", "") } if strings.Contains(line, "*") { report.IgnoreStats.Stars++ } } } } } type ConnectionStats struct { protocol.Statistics // Total for primary + secondaries Connected bool `json:"connected"` Paused bool `json:"paused"` ClientVersion string `json:"clientVersion"` Address string `json:"address"` // mirror values from Primary, for compatibility with <1.24.0 Type string `json:"type"` // mirror values from Primary, for compatibility with <1.24.0 IsLocal bool `json:"isLocal"` // mirror values from Primary, for compatibility with <1.24.0 Crypto string `json:"crypto"` // mirror values from Primary, for compatibility with <1.24.0 Primary ConnectionInfo `json:"primary,omitempty"` Secondary []ConnectionInfo `json:"secondary,omitempty"` } type ConnectionInfo struct { protocol.Statistics Address string `json:"address"` Type string `json:"type"` IsLocal bool `json:"isLocal"` Crypto string `json:"crypto"` } // ConnectionStats returns a map with connection statistics for each device. func (m *model) ConnectionStats() map[string]interface{} { m.mut.RLock() defer m.mut.RUnlock() res := make(map[string]interface{}) devs := m.cfg.Devices() conns := make(map[string]ConnectionStats, len(devs)) for device, deviceCfg := range devs { if device == m.id { continue } hello := m.helloMessages[device] versionString := hello.ClientVersion if hello.ClientName != "syncthing" { versionString = hello.ClientName + " " + hello.ClientVersion } connIDs, ok := m.deviceConnIDs[device] cs := ConnectionStats{ Connected: ok, Paused: deviceCfg.Paused, ClientVersion: strings.TrimSpace(versionString), } if ok { conn := m.connections[connIDs[0]] cs.Primary.Type = conn.Type() cs.Primary.IsLocal = conn.IsLocal() cs.Primary.Crypto = conn.Crypto() cs.Primary.Statistics = conn.Statistics() cs.Primary.Address = conn.RemoteAddr().String() cs.Type = cs.Primary.Type cs.IsLocal = cs.Primary.IsLocal cs.Crypto = cs.Primary.Crypto cs.Address = cs.Primary.Address cs.Statistics = cs.Primary.Statistics for _, connID := range connIDs[1:] { conn = m.connections[connID] sec := ConnectionInfo{ Statistics: conn.Statistics(), Address: conn.RemoteAddr().String(), Type: conn.Type(), IsLocal: conn.IsLocal(), Crypto: conn.Crypto(), } if sec.At.After(cs.At) { cs.At = sec.At } if sec.StartedAt.Before(cs.StartedAt) { cs.StartedAt = sec.StartedAt } cs.InBytesTotal += sec.InBytesTotal cs.OutBytesTotal += sec.OutBytesTotal cs.Secondary = append(cs.Secondary, sec) } } conns[device.String()] = cs } res["connections"] = conns in, out := protocol.TotalInOut() res["total"] = map[string]interface{}{ "at": time.Now().Truncate(time.Second), "inBytesTotal": in, "outBytesTotal": out, } return res } // DeviceStatistics returns statistics about each device func (m *model) DeviceStatistics() (map[protocol.DeviceID]stats.DeviceStatistics, error) { m.mut.RLock() defer m.mut.RUnlock() res := make(map[protocol.DeviceID]stats.DeviceStatistics, len(m.deviceStatRefs)) for id, sr := range m.deviceStatRefs { stats, err := sr.GetStatistics() if err != nil { return nil, err } if len(m.deviceConnIDs[id]) > 0 { // If a device is currently connected, we can see them right // now. stats.LastSeen = time.Now().Truncate(time.Second) } res[id] = stats } return res, nil } // FolderStatistics returns statistics about each folder func (m *model) FolderStatistics() (map[string]stats.FolderStatistics, error) { res := make(map[string]stats.FolderStatistics) m.mut.RLock() defer m.mut.RUnlock() err := m.folderRunners.Each(func(id string, runner service) error { stats, err := runner.GetStatistics() if err != nil { return err } res[id] = stats return nil }) if err != nil { return nil, err } return res, nil } type FolderCompletion struct { CompletionPct float64 GlobalBytes int64 NeedBytes int64 GlobalItems int NeedItems int NeedDeletes int Sequence int64 RemoteState remoteFolderState } func newFolderCompletion(global, need db.Counts, sequence int64, state remoteFolderState) FolderCompletion { comp := FolderCompletion{ GlobalBytes: global.Bytes, NeedBytes: need.Bytes, GlobalItems: global.Files + global.Directories + global.Symlinks, NeedItems: need.Files + need.Directories + need.Symlinks, NeedDeletes: need.Deleted, Sequence: sequence, RemoteState: state, } comp.setCompletionPct() return comp } func (comp *FolderCompletion) add(other FolderCompletion) { comp.GlobalBytes += other.GlobalBytes comp.NeedBytes += other.NeedBytes comp.GlobalItems += other.GlobalItems comp.NeedItems += other.NeedItems comp.NeedDeletes += other.NeedDeletes comp.setCompletionPct() } func (comp *FolderCompletion) setCompletionPct() { if comp.GlobalBytes == 0 { comp.CompletionPct = 100 } else { needRatio := float64(comp.NeedBytes) / float64(comp.GlobalBytes) comp.CompletionPct = 100 * (1 - needRatio) } // If the completion is 100% but there are deletes we need to handle, // drop it down a notch. Hack for consumers that look only at the // percentage (our own GUI does the same calculation as here on its own // and needs the same fixup). if comp.NeedBytes == 0 && comp.NeedDeletes > 0 { comp.CompletionPct = 95 // chosen by fair dice roll } } // Map returns the members as a map, e.g. used in api to serialize as JSON. func (comp *FolderCompletion) Map() map[string]interface{} { return map[string]interface{}{ "completion": comp.CompletionPct, "globalBytes": comp.GlobalBytes, "needBytes": comp.NeedBytes, "globalItems": comp.GlobalItems, "needItems": comp.NeedItems, "needDeletes": comp.NeedDeletes, "sequence": comp.Sequence, "remoteState": comp.RemoteState, } } // Completion returns the completion status, in percent with some counters, // for the given device and folder. The device can be any known device ID // (including the local device) or explicitly protocol.LocalDeviceID. An // empty folder string means the aggregate of all folders shared with the // given device. func (m *model) Completion(device protocol.DeviceID, folder string) (FolderCompletion, error) { // The user specifically asked for our own device ID. Internally that is // known as protocol.LocalDeviceID so translate. if device == m.id { device = protocol.LocalDeviceID } if folder != "" { // We want completion for a specific folder. return m.folderCompletion(device, folder) } // We want completion for all (shared) folders as an aggregate. var comp FolderCompletion for _, fcfg := range m.cfg.FolderList() { if fcfg.Paused { continue } if device == protocol.LocalDeviceID || fcfg.SharedWith(device) { folderComp, err := m.folderCompletion(device, fcfg.ID) if errors.Is(err, ErrFolderPaused) { continue } else if err != nil { return FolderCompletion{}, err } comp.add(folderComp) } } return comp, nil } func (m *model) folderCompletion(device protocol.DeviceID, folder string) (FolderCompletion, error) { m.mut.RLock() err := m.checkFolderRunningRLocked(folder) rf := m.folderFiles[folder] m.mut.RUnlock() if err != nil { return FolderCompletion{}, err } snap, err := rf.Snapshot() if err != nil { return FolderCompletion{}, err } defer snap.Release() m.mut.RLock() state := m.remoteFolderStates[device][folder] downloaded := m.deviceDownloads[device].BytesDownloaded(folder) m.mut.RUnlock() need := snap.NeedSize(device) need.Bytes -= downloaded // This might might be more than it really is, because some blocks can be of a smaller size. if need.Bytes < 0 { need.Bytes = 0 } comp := newFolderCompletion(snap.GlobalSize(), need, snap.Sequence(device), state) l.Debugf("%v Completion(%s, %q): %v", m, device, folder, comp.Map()) return comp, nil } // DBSnapshot returns a snapshot of the database content relevant to the given folder. func (m *model) DBSnapshot(folder string) (*db.Snapshot, error) { m.mut.RLock() err := m.checkFolderRunningRLocked(folder) rf := m.folderFiles[folder] m.mut.RUnlock() if err != nil { return nil, err } return rf.Snapshot() } func (m *model) FolderProgressBytesCompleted(folder string) int64 { return m.progressEmitter.BytesCompleted(folder) } // NeedFolderFiles returns paginated list of currently needed files in // progress, queued, and to be queued on next puller iteration. func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, error) { m.mut.RLock() rf, rfOk := m.folderFiles[folder] runner, runnerOk := m.folderRunners.Get(folder) cfg := m.folderCfgs[folder] m.mut.RUnlock() if !rfOk { return nil, nil, nil, ErrFolderMissing } snap, err := rf.Snapshot() if err != nil { return nil, nil, nil, err } defer snap.Release() var progress, queued, rest []db.FileInfoTruncated var seen map[string]struct{} p := newPager(page, perpage) if runnerOk { progressNames, queuedNames, skipped := runner.Jobs(page, perpage) progress = make([]db.FileInfoTruncated, len(progressNames)) queued = make([]db.FileInfoTruncated, len(queuedNames)) seen = make(map[string]struct{}, len(progressNames)+len(queuedNames)) for i, name := range progressNames { if f, ok := snap.GetGlobalTruncated(name); ok { progress[i] = f seen[name] = struct{}{} } } for i, name := range queuedNames { if f, ok := snap.GetGlobalTruncated(name); ok { queued[i] = f seen[name] = struct{}{} } } p.get -= len(seen) if p.get == 0 { return progress, queued, nil, nil } p.toSkip -= skipped } rest = make([]db.FileInfoTruncated, 0, perpage) snap.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool { if cfg.IgnoreDelete && f.IsDeleted() { return true } if p.skip() { return true } ft := f.(db.FileInfoTruncated) if _, ok := seen[ft.Name]; !ok { rest = append(rest, ft) p.get-- } return p.get > 0 }) return progress, queued, rest, nil } // RemoteNeedFolderFiles returns paginated list of currently needed files for a // remote device to become synced with a folder. func (m *model) RemoteNeedFolderFiles(folder string, device protocol.DeviceID, page, perpage int) ([]db.FileInfoTruncated, error) { m.mut.RLock() rf, ok := m.folderFiles[folder] m.mut.RUnlock() if !ok { return nil, ErrFolderMissing } snap, err := rf.Snapshot() if err != nil { return nil, err } defer snap.Release() files := make([]db.FileInfoTruncated, 0, perpage) p := newPager(page, perpage) snap.WithNeedTruncated(device, func(f protocol.FileIntf) bool { if p.skip() { return true } files = append(files, f.(db.FileInfoTruncated)) return !p.done() }) return files, nil } func (m *model) LocalChangedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, error) { m.mut.RLock() rf, ok := m.folderFiles[folder] m.mut.RUnlock() if !ok { return nil, ErrFolderMissing } snap, err := rf.Snapshot() if err != nil { return nil, err } defer snap.Release() if snap.ReceiveOnlyChangedSize().TotalItems() == 0 { return nil, nil } p := newPager(page, perpage) files := make([]db.FileInfoTruncated, 0, perpage) snap.WithHaveTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool { if !f.IsReceiveOnlyChanged() { return true } if p.skip() { return true } ft := f.(db.FileInfoTruncated) files = append(files, ft) return !p.done() }) return files, nil } type pager struct { toSkip, get int } func newPager(page, perpage int) *pager { return &pager{ toSkip: (page - 1) * perpage, get: perpage, } } func (p *pager) skip() bool { if p.toSkip == 0 { return false } p.toSkip-- return true } func (p *pager) done() bool { if p.get > 0 { p.get-- } return p.get == 0 } // Index is called when a new device is connected and we receive their full index. // Implements the protocol.Model interface. func (m *model) Index(conn protocol.Connection, idx *protocol.Index) error { return m.handleIndex(conn, idx.Folder, idx.Files, false) } // IndexUpdate is called for incremental updates to connected devices' indexes. // Implements the protocol.Model interface. func (m *model) IndexUpdate(conn protocol.Connection, idxUp *protocol.IndexUpdate) error { return m.handleIndex(conn, idxUp.Folder, idxUp.Files, true) } func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protocol.FileInfo, update bool) error { op := "Index" if update { op += " update" } deviceID := conn.DeviceID() l.Debugf("%v (in): %s / %q: %d files", op, deviceID, folder, len(fs)) if cfg, ok := m.cfg.Folder(folder); !ok || !cfg.SharedWith(deviceID) { l.Warnf("%v for unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", op, folder, deviceID) return fmt.Errorf("%s: %w", folder, ErrFolderMissing) } else if cfg.Paused { l.Debugf("%v for paused folder (ID %q) sent from device %q.", op, folder, deviceID) return fmt.Errorf("%s: %w", folder, ErrFolderPaused) } m.mut.RLock() indexHandler, ok := m.getIndexHandlerRLocked(conn) m.mut.RUnlock() if !ok { // This should be impossible, as an index handler is registered when // we send a cluster config, and that is what triggers index // sending. m.evLogger.Log(events.Failure, "index sender does not exist for connection on which indexes were received") l.Debugf("%v for folder (ID %q) sent from device %q: missing index handler", op, folder, deviceID) return fmt.Errorf("%s: %w", folder, ErrFolderNotRunning) } return indexHandler.ReceiveIndex(folder, fs, update, op) } type clusterConfigDeviceInfo struct { local, remote protocol.Device } type ClusterConfigReceivedEventData struct { Device protocol.DeviceID `json:"device"` } func (m *model) ClusterConfig(conn protocol.Connection, cm *protocol.ClusterConfig) error { deviceID := conn.DeviceID() if cm.Secondary { // No handling of secondary connection ClusterConfigs; they merely // indicate the connection is ready to start. l.Debugf("Skipping secondary ClusterConfig from %v at %s", deviceID.Short(), conn) return nil } // Check the peer device's announced folders against our own. Emits events // for folders that we don't expect (unknown or not shared). // Also, collect a list of folders we do share, and if he's interested in // temporary indexes, subscribe the connection. l.Debugf("Handling ClusterConfig from %v at %s", deviceID.Short(), conn) indexHandlerRegistry := m.ensureIndexHandler(conn) deviceCfg, ok := m.cfg.Device(deviceID) if !ok { l.Debugf("Device %s disappeared from config while processing cluster-config", deviceID.Short()) return errDeviceUnknown } // Assemble the device information from the connected device about // themselves and us for all folders. ccDeviceInfos := make(map[string]*clusterConfigDeviceInfo, len(cm.Folders)) for _, folder := range cm.Folders { info := &clusterConfigDeviceInfo{} for _, dev := range folder.Devices { if dev.ID == m.id { info.local = dev } else if dev.ID == deviceID { info.remote = dev } if info.local.ID != protocol.EmptyDeviceID && info.remote.ID != protocol.EmptyDeviceID { break } } if info.remote.ID == protocol.EmptyDeviceID { l.Infof("Device %v sent cluster-config without the device info for the remote on folder %v", deviceID.Short(), folder.Description()) return errMissingRemoteInClusterConfig } if info.local.ID == protocol.EmptyDeviceID { l.Infof("Device %v sent cluster-config without the device info for us locally on folder %v", deviceID.Short(), folder.Description()) return errMissingLocalInClusterConfig } ccDeviceInfos[folder.ID] = info } for _, info := range ccDeviceInfos { if deviceCfg.Introducer && info.local.Introducer { l.Warnf("Remote %v is an introducer to us, and we are to them - only one should be introducer to the other, see https://docs.syncthing.net/users/introducer.html", deviceCfg.Description()) } break } // Needs to happen outside of the mut, as can cause CommitConfiguration if deviceCfg.AutoAcceptFolders { w, _ := m.cfg.Modify(func(cfg *config.Configuration) { changedFcfg := make(map[string]config.FolderConfiguration) haveFcfg := cfg.FolderMap() for _, folder := range cm.Folders { from, ok := haveFcfg[folder.ID] if to, changed := m.handleAutoAccepts(deviceID, folder, ccDeviceInfos[folder.ID], from, ok, cfg.Defaults.Folder); changed { changedFcfg[folder.ID] = to } } if len(changedFcfg) == 0 { return } for i := range cfg.Folders { if fcfg, ok := changedFcfg[cfg.Folders[i].ID]; ok { cfg.Folders[i] = fcfg delete(changedFcfg, cfg.Folders[i].ID) } } for _, fcfg := range changedFcfg { cfg.Folders = append(cfg.Folders, fcfg) } }) // Need to wait for the waiter, as this calls CommitConfiguration, // which sets up the folder and as we return from this call, // ClusterConfig starts poking at m.folderFiles and other things // that might not exist until the config is committed. w.Wait() } tempIndexFolders, states, err := m.ccHandleFolders(cm.Folders, deviceCfg, ccDeviceInfos, indexHandlerRegistry) if err != nil { return err } m.mut.Lock() m.remoteFolderStates[deviceID] = states m.mut.Unlock() m.evLogger.Log(events.ClusterConfigReceived, ClusterConfigReceivedEventData{ Device: deviceID, }) if len(tempIndexFolders) > 0 { var connOK bool var conn protocol.Connection m.mut.RLock() if connIDs, connIDOK := m.deviceConnIDs[deviceID]; connIDOK { conn, connOK = m.connections[connIDs[0]] } m.mut.RUnlock() // In case we've got ClusterConfig, and the connection disappeared // from infront of our nose. if connOK { m.progressEmitter.temporaryIndexSubscribe(conn, tempIndexFolders) } } if deviceCfg.Introducer { m.cfg.Modify(func(cfg *config.Configuration) { folders, devices, foldersDevices, introduced := m.handleIntroductions(deviceCfg, cm, cfg.FolderMap(), cfg.DeviceMap()) folders, devices, deintroduced := m.handleDeintroductions(deviceCfg, foldersDevices, folders, devices) if !introduced && !deintroduced { return } cfg.Folders = make([]config.FolderConfiguration, 0, len(folders)) for _, fcfg := range folders { cfg.Folders = append(cfg.Folders, fcfg) } cfg.Devices = make([]config.DeviceConfiguration, 0, len(devices)) for _, dcfg := range devices { cfg.Devices = append(cfg.Devices, dcfg) } }) } return nil } func (m *model) ensureIndexHandler(conn protocol.Connection) *indexHandlerRegistry { deviceID := conn.DeviceID() connID := conn.ConnectionID() m.mut.Lock() defer m.mut.Unlock() indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID) if ok && indexHandlerRegistry.conn.ConnectionID() == connID { // This is an existing and proper index handler for this connection. return indexHandlerRegistry } if ok { // A handler exists, but it's for another connection than the one we // now got a ClusterConfig on. This should be unusual as it means // the other side has decided to start using a new primary // connection but we haven't seen it close yet. Ideally it will // close shortly by itself... l.Infof("Abandoning old index handler for %s (%s) in favour of %s", deviceID.Short(), indexHandlerRegistry.conn.ConnectionID(), connID) m.indexHandlers.RemoveAndWait(deviceID, 0) } // Create a new index handler for this device. indexHandlerRegistry = newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger) for id, fcfg := range m.folderCfgs { l.Debugln("Registering folder", id, "for", deviceID.Short()) runner, _ := m.folderRunners.Get(id) indexHandlerRegistry.RegisterFolderState(fcfg, m.folderFiles[id], runner) } m.indexHandlers.Add(deviceID, indexHandlerRegistry) return indexHandlerRegistry } func (m *model) getIndexHandlerRLocked(conn protocol.Connection) (*indexHandlerRegistry, bool) { // Reads from index handlers, which requires the mutex to be read locked deviceID := conn.DeviceID() connID := conn.ConnectionID() indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID) if ok && indexHandlerRegistry.conn.ConnectionID() == connID { // This is an existing and proper index handler for this connection. return indexHandlerRegistry, true } // There is no index handler, or it's not registered for this connection. return nil, false } func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*clusterConfigDeviceInfo, indexHandlers *indexHandlerRegistry) ([]string, map[string]remoteFolderState, error) { var folderDevice config.FolderDeviceConfiguration tempIndexFolders := make([]string, 0, len(folders)) seenFolders := make(map[string]remoteFolderState, len(folders)) updatedPending := make([]updatedPendingFolder, 0, len(folders)) deviceID := deviceCfg.DeviceID expiredPending, err := m.db.PendingFoldersForDevice(deviceID) if err != nil { l.Infof("Could not get pending folders for cleanup: %v", err) } of := db.ObservedFolder{Time: time.Now().Truncate(time.Second)} for _, folder := range folders { seenFolders[folder.ID] = remoteFolderValid cfg, ok := m.cfg.Folder(folder.ID) if ok { folderDevice, ok = cfg.Device(deviceID) } if !ok { indexHandlers.Remove(folder.ID) if deviceCfg.IgnoredFolder(folder.ID) { l.Infof("Ignoring folder %s from device %s since we are configured to", folder.Description(), deviceID) continue } delete(expiredPending, folder.ID) of.Label = folder.Label of.ReceiveEncrypted = len(ccDeviceInfos[folder.ID].local.EncryptionPasswordToken) > 0 of.RemoteEncrypted = len(ccDeviceInfos[folder.ID].remote.EncryptionPasswordToken) > 0 if err := m.db.AddOrUpdatePendingFolder(folder.ID, of, deviceID); err != nil { l.Warnf("Failed to persist pending folder entry to database: %v", err) } if !folder.Paused { indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID]) } updatedPending = append(updatedPending, updatedPendingFolder{ FolderID: folder.ID, FolderLabel: folder.Label, DeviceID: deviceID, ReceiveEncrypted: of.ReceiveEncrypted, RemoteEncrypted: of.RemoteEncrypted, }) // DEPRECATED: Only for backwards compatibility, should be removed. m.evLogger.Log(events.FolderRejected, map[string]string{ "folder": folder.ID, "folderLabel": folder.Label, "device": deviceID.String(), }) l.Infof("Unexpected folder %s sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.Description(), deviceID) continue } if folder.Paused { indexHandlers.Remove(folder.ID) seenFolders[cfg.ID] = remoteFolderPaused continue } if cfg.Paused { indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID]) continue } if err := m.ccCheckEncryption(cfg, folderDevice, ccDeviceInfos[folder.ID], deviceCfg.Untrusted); err != nil { sameError := false m.mut.Lock() if devs, ok := m.folderEncryptionFailures[folder.ID]; ok { sameError = devs[deviceID] == err } else { m.folderEncryptionFailures[folder.ID] = make(map[protocol.DeviceID]error) } m.folderEncryptionFailures[folder.ID][deviceID] = err m.mut.Unlock() msg := fmt.Sprintf("Failure checking encryption consistency with device %v for folder %v: %v", deviceID, cfg.Description(), err) if sameError { l.Debugln(msg) } else { if rerr, ok := err.(*redactedError); ok { err = rerr.redacted } m.evLogger.Log(events.Failure, err.Error()) l.Warnln(msg) } return tempIndexFolders, seenFolders, err } m.mut.Lock() if devErrs, ok := m.folderEncryptionFailures[folder.ID]; ok { if len(devErrs) == 1 { delete(m.folderEncryptionFailures, folder.ID) } else { delete(m.folderEncryptionFailures[folder.ID], deviceID) } } m.mut.Unlock() // Handle indexes if !folder.DisableTempIndexes { tempIndexFolders = append(tempIndexFolders, folder.ID) } indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID]) } indexHandlers.RemoveAllExcept(seenFolders) // Explicitly mark folders we offer, but the remote has not accepted for folderID, cfg := range m.cfg.Folders() { if _, seen := seenFolders[folderID]; !seen && cfg.SharedWith(deviceID) { l.Debugf("Remote device %v has not accepted sharing folder %s", deviceID.Short(), cfg.Description()) seenFolders[folderID] = remoteFolderNotSharing } } expiredPendingList := make([]map[string]string, 0, len(expiredPending)) for folder := range expiredPending { if err = m.db.RemovePendingFolderForDevice(folder, deviceID); err != nil { msg := "Failed to remove pending folder-device entry" l.Warnf("%v (%v, %v): %v", msg, folder, deviceID, err) m.evLogger.Log(events.Failure, msg) continue } expiredPendingList = append(expiredPendingList, map[string]string{ "folderID": folder, "deviceID": deviceID.String(), }) } if len(updatedPending) > 0 || len(expiredPendingList) > 0 { m.evLogger.Log(events.PendingFoldersChanged, map[string]interface{}{ "added": updatedPending, "removed": expiredPendingList, }) } return tempIndexFolders, seenFolders, nil } func (m *model) ccCheckEncryption(fcfg config.FolderConfiguration, folderDevice config.FolderDeviceConfiguration, ccDeviceInfos *clusterConfigDeviceInfo, deviceUntrusted bool) error { hasTokenRemote := len(ccDeviceInfos.remote.EncryptionPasswordToken) > 0 hasTokenLocal := len(ccDeviceInfos.local.EncryptionPasswordToken) > 0 isEncryptedRemote := folderDevice.EncryptionPassword != "" isEncryptedLocal := fcfg.Type == config.FolderTypeReceiveEncrypted if !isEncryptedRemote && !isEncryptedLocal && deviceUntrusted { return errEncryptionNotEncryptedUntrusted } if !(hasTokenRemote || hasTokenLocal || isEncryptedRemote || isEncryptedLocal) { // No one cares about encryption here return nil } if isEncryptedRemote && isEncryptedLocal { // Should never happen, but config racyness and be safe. return errEncryptionInvConfigLocal } if hasTokenRemote && hasTokenLocal { return errEncryptionInvConfigRemote } if !(hasTokenRemote || hasTokenLocal) { if isEncryptedRemote { return errEncryptionPlainForRemoteEncrypted } else { return errEncryptionPlainForReceiveEncrypted } } if !(isEncryptedRemote || isEncryptedLocal) { return errEncryptionNotEncryptedLocal } if isEncryptedRemote { passwordToken := protocol.PasswordToken(m.keyGen, fcfg.ID, folderDevice.EncryptionPassword) match := false if hasTokenLocal { match = bytes.Equal(passwordToken, ccDeviceInfos.local.EncryptionPasswordToken) } else { // hasTokenRemote == true match = bytes.Equal(passwordToken, ccDeviceInfos.remote.EncryptionPasswordToken) } if !match { return errEncryptionPassword } return nil } // isEncryptedLocal == true var ccToken []byte if hasTokenLocal { ccToken = ccDeviceInfos.local.EncryptionPasswordToken } else { // hasTokenRemote == true ccToken = ccDeviceInfos.remote.EncryptionPasswordToken } m.mut.RLock() token, ok := m.folderEncryptionPasswordTokens[fcfg.ID] m.mut.RUnlock() if !ok { var err error token, err = readEncryptionToken(fcfg) if err != nil && !fs.IsNotExist(err) { if rerr, ok := redactPathError(err); ok { return rerr } return &redactedError{ error: err, redacted: errEncryptionTokenRead, } } if err == nil { m.mut.Lock() m.folderEncryptionPasswordTokens[fcfg.ID] = token m.mut.Unlock() } else { if err := writeEncryptionToken(ccToken, fcfg); err != nil { if rerr, ok := redactPathError(err); ok { return rerr } else { return &redactedError{ error: err, redacted: errEncryptionTokenWrite, } } } m.mut.Lock() m.folderEncryptionPasswordTokens[fcfg.ID] = ccToken m.mut.Unlock() // We can only announce ourselves once we have the token, // thus we need to resend CCs now that we have it. m.sendClusterConfig(fcfg.DeviceIDs()) return nil } } if !bytes.Equal(token, ccToken) { return errEncryptionPassword } return nil } func (m *model) sendClusterConfig(ids []protocol.DeviceID) { if len(ids) == 0 { return } ccConns := make([]protocol.Connection, 0, len(ids)) m.mut.RLock() for _, id := range ids { if connIDs, ok := m.deviceConnIDs[id]; ok { ccConns = append(ccConns, m.connections[connIDs[0]]) } } m.mut.RUnlock() // Generating cluster-configs acquires the mutex. for _, conn := range ccConns { cm, passwords := m.generateClusterConfig(conn.DeviceID()) conn.SetFolderPasswords(passwords) go conn.ClusterConfig(cm) } } // handleIntroductions handles adding devices/folders that are shared by an introducer device func (m *model) handleIntroductions(introducerCfg config.DeviceConfiguration, cm *protocol.ClusterConfig, folders map[string]config.FolderConfiguration, devices map[protocol.DeviceID]config.DeviceConfiguration) (map[string]config.FolderConfiguration, map[protocol.DeviceID]config.DeviceConfiguration, folderDeviceSet, bool) { changed := false foldersDevices := make(folderDeviceSet) for _, folder := range cm.Folders { // Adds devices which we do not have, but the introducer has // for the folders that we have in common. Also, shares folders // with devices that we have in common, yet are currently not sharing // the folder. fcfg, ok := folders[folder.ID] if !ok { // Don't have this folder, carry on. continue } folderChanged := false for _, device := range folder.Devices { // No need to share with self. if device.ID == m.id { continue } foldersDevices.set(device.ID, folder.ID) if _, ok := devices[device.ID]; !ok { // The device is currently unknown. Add it to the config. devices[device.ID] = m.introduceDevice(device, introducerCfg) } else if fcfg.SharedWith(device.ID) { // We already share the folder with this device, so // nothing to do. continue } if fcfg.Type != config.FolderTypeReceiveEncrypted && device.EncryptionPasswordToken != nil { l.Infof("Cannot share folder %s with %v because the introducer %v encrypts data, which requires a password", folder.Description(), device.ID, introducerCfg.DeviceID) continue } // We don't yet share this folder with this device. Add the device // to sharing list of the folder. l.Infof("Sharing folder %s with %v (vouched for by introducer %v)", folder.Description(), device.ID, introducerCfg.DeviceID) fcfg.Devices = append(fcfg.Devices, config.FolderDeviceConfiguration{ DeviceID: device.ID, IntroducedBy: introducerCfg.DeviceID, }) folderChanged = true } if folderChanged { folders[fcfg.ID] = fcfg changed = true } } return folders, devices, foldersDevices, changed } // handleDeintroductions handles removals of devices/shares that are removed by an introducer device func (*model) handleDeintroductions(introducerCfg config.DeviceConfiguration, foldersDevices folderDeviceSet, folders map[string]config.FolderConfiguration, devices map[protocol.DeviceID]config.DeviceConfiguration) (map[string]config.FolderConfiguration, map[protocol.DeviceID]config.DeviceConfiguration, bool) { if introducerCfg.SkipIntroductionRemovals { return folders, devices, false } changed := false devicesNotIntroduced := make(map[protocol.DeviceID]struct{}) // Check if we should unshare some folders, if the introducer has unshared them. for folderID, folderCfg := range folders { for k := 0; k < len(folderCfg.Devices); k++ { if folderCfg.Devices[k].IntroducedBy != introducerCfg.DeviceID { devicesNotIntroduced[folderCfg.Devices[k].DeviceID] = struct{}{} continue } if !foldersDevices.has(folderCfg.Devices[k].DeviceID, folderCfg.ID) { // We could not find that folder shared on the // introducer with the device that was introduced to us. // We should follow and unshare as well. l.Infof("Unsharing folder %s with %v as introducer %v no longer shares the folder with that device", folderCfg.Description(), folderCfg.Devices[k].DeviceID, folderCfg.Devices[k].IntroducedBy) folderCfg.Devices = append(folderCfg.Devices[:k], folderCfg.Devices[k+1:]...) folders[folderID] = folderCfg k-- changed = true } } } // Check if we should remove some devices, if the introducer no longer // shares any folder with them. Yet do not remove if we share other // folders that haven't been introduced by the introducer. for deviceID, device := range devices { if device.IntroducedBy == introducerCfg.DeviceID { if !foldersDevices.hasDevice(deviceID) { if _, ok := devicesNotIntroduced[deviceID]; !ok { // The introducer no longer shares any folder with the // device, remove the device. l.Infof("Removing device %v as introducer %v no longer shares any folders with that device", deviceID, device.IntroducedBy) changed = true delete(devices, deviceID) continue } l.Infof("Would have removed %v as %v no longer shares any folders, yet there are other folders that are shared with this device that haven't been introduced by this introducer.", deviceID, device.IntroducedBy) } } } return folders, devices, changed } // handleAutoAccepts handles adding and sharing folders for devices that have // AutoAcceptFolders set to true. func (m *model) handleAutoAccepts(deviceID protocol.DeviceID, folder protocol.Folder, ccDeviceInfos *clusterConfigDeviceInfo, cfg config.FolderConfiguration, haveCfg bool, defaultFolderCfg config.FolderConfiguration) (config.FolderConfiguration, bool) { if !haveCfg { defaultPathFs := fs.NewFilesystem(defaultFolderCfg.FilesystemType, defaultFolderCfg.Path) var pathAlternatives []string if alt := fs.SanitizePath(folder.Label); alt != "" { pathAlternatives = append(pathAlternatives, alt) } if alt := fs.SanitizePath(folder.ID); alt != "" { pathAlternatives = append(pathAlternatives, alt) } if len(pathAlternatives) == 0 { l.Infof("Failed to auto-accept folder %s from %s due to lack of path alternatives", folder.Description(), deviceID) return config.FolderConfiguration{}, false } for _, path := range pathAlternatives { // Make sure the folder path doesn't already exist. if _, err := defaultPathFs.Lstat(path); !fs.IsNotExist(err) { continue } // Attempt to create it to make sure it does, now. fullPath := filepath.Join(defaultFolderCfg.Path, path) if err := defaultPathFs.MkdirAll(path, 0o700); err != nil { l.Warnf("Failed to create path for auto-accepted folder %s at path %s: %v", folder.Description(), fullPath, err) continue } fcfg := newFolderConfiguration(m.cfg, folder.ID, folder.Label, defaultFolderCfg.FilesystemType, fullPath) fcfg.Devices = append(fcfg.Devices, config.FolderDeviceConfiguration{ DeviceID: deviceID, }) if len(ccDeviceInfos.remote.EncryptionPasswordToken) > 0 || len(ccDeviceInfos.local.EncryptionPasswordToken) > 0 { fcfg.Type = config.FolderTypeReceiveEncrypted // Override the user-configured defaults, as normally done by the GUI fcfg.FSWatcherEnabled = false if fcfg.RescanIntervalS != 0 { minRescanInterval := 3600 * 24 if fcfg.RescanIntervalS < minRescanInterval { fcfg.RescanIntervalS = minRescanInterval } } fcfg.Versioning.Reset() // Other necessary settings are ensured by FolderConfiguration itself } else { ignores := m.cfg.DefaultIgnores() if err := m.setIgnores(fcfg, ignores.Lines); err != nil { l.Warnf("Failed to apply default ignores to auto-accepted folder %s at path %s: %v", folder.Description(), fcfg.Path, err) } } l.Infof("Auto-accepted %s folder %s at path %s", deviceID, folder.Description(), fcfg.Path) return fcfg, true } l.Infof("Failed to auto-accept folder %s from %s due to path conflict", folder.Description(), deviceID) return config.FolderConfiguration{}, false } else { for _, device := range cfg.DeviceIDs() { if device == deviceID { // Already shared nothing todo. return config.FolderConfiguration{}, false } } if cfg.Type == config.FolderTypeReceiveEncrypted { if len(ccDeviceInfos.remote.EncryptionPasswordToken) == 0 && len(ccDeviceInfos.local.EncryptionPasswordToken) == 0 { l.Infof("Failed to auto-accept device %s on existing folder %s as the remote wants to send us unencrypted data, but the folder type is receive-encrypted", folder.Description(), deviceID) return config.FolderConfiguration{}, false } } else { if len(ccDeviceInfos.remote.EncryptionPasswordToken) > 0 || len(ccDeviceInfos.local.EncryptionPasswordToken) > 0 { l.Infof("Failed to auto-accept device %s on existing folder %s as the remote wants to send us encrypted data, but the folder type is not receive-encrypted", folder.Description(), deviceID) return config.FolderConfiguration{}, false } } cfg.Devices = append(cfg.Devices, config.FolderDeviceConfiguration{ DeviceID: deviceID, }) l.Infof("Shared %s with %s due to auto-accept", folder.ID, deviceID) return cfg, true } } func (m *model) introduceDevice(device protocol.Device, introducerCfg config.DeviceConfiguration) config.DeviceConfiguration { addresses := []string{"dynamic"} for _, addr := range device.Addresses { if addr != "dynamic" { addresses = append(addresses, addr) } } l.Infof("Adding device %v to config (vouched for by introducer %v)", device.ID, introducerCfg.DeviceID) newDeviceCfg := m.cfg.DefaultDevice() newDeviceCfg.DeviceID = device.ID newDeviceCfg.Name = device.Name newDeviceCfg.Compression = introducerCfg.Compression newDeviceCfg.Addresses = addresses newDeviceCfg.CertName = device.CertName newDeviceCfg.IntroducedBy = introducerCfg.DeviceID // The introducers' introducers are also our introducers. if device.Introducer { l.Infof("Device %v is now also an introducer", device.ID) newDeviceCfg.Introducer = true newDeviceCfg.SkipIntroductionRemovals = device.SkipIntroductionRemovals } return newDeviceCfg } // Closed is called when a connection has been closed func (m *model) Closed(conn protocol.Connection, err error) { connID := conn.ConnectionID() deviceID := conn.DeviceID() m.mut.Lock() conn, ok := m.connections[connID] if !ok { m.mut.Unlock() return } closed := m.closed[connID] delete(m.closed, connID) delete(m.connections, connID) removedIsPrimary := m.promotedConnID[deviceID] == connID remainingConns := without(m.deviceConnIDs[deviceID], connID) var wait <-chan error if removedIsPrimary { m.progressEmitter.temporaryIndexUnsubscribe(conn) if idxh, ok := m.indexHandlers.Get(deviceID); ok && idxh.conn.ConnectionID() == connID { wait = m.indexHandlers.RemoveAndWaitChan(deviceID, 0) } m.scheduleConnectionPromotion() } if len(remainingConns) == 0 { // All device connections closed delete(m.deviceConnIDs, deviceID) delete(m.promotedConnID, deviceID) delete(m.connRequestLimiters, deviceID) delete(m.helloMessages, deviceID) delete(m.remoteFolderStates, deviceID) delete(m.deviceDownloads, deviceID) } else { // Some connections remain m.deviceConnIDs[deviceID] = remainingConns } m.mut.Unlock() if wait != nil { <-wait } m.mut.RLock() m.deviceDidCloseRLocked(deviceID, time.Since(conn.EstablishedAt())) m.mut.RUnlock() k := map[bool]string{false: "secondary", true: "primary"}[removedIsPrimary] l.Infof("Lost %s connection to %s at %s: %v (%d remain)", k, deviceID.Short(), conn, err, len(remainingConns)) if len(remainingConns) == 0 { l.Infof("Connection to %s at %s closed: %v", deviceID.Short(), conn, err) m.evLogger.Log(events.DeviceDisconnected, map[string]string{ "id": deviceID.String(), "error": err.Error(), }) } close(closed) } // Implements protocol.RequestResponse type requestResponse struct { data []byte closed chan struct{} once stdsync.Once } func newRequestResponse(size int) *requestResponse { return &requestResponse{ data: protocol.BufferPool.Get(size), closed: make(chan struct{}), } } func (r *requestResponse) Data() []byte { return r.data } func (r *requestResponse) Close() { r.once.Do(func() { protocol.BufferPool.Put(r.data) close(r.closed) }) } func (r *requestResponse) Wait() { <-r.closed } // Request returns the specified data segment by reading it from local disk. // Implements the protocol.Model interface. func (m *model) Request(conn protocol.Connection, req *protocol.Request) (out protocol.RequestResponse, err error) { if req.Size < 0 || req.Offset < 0 { return nil, protocol.ErrInvalid } deviceID := conn.DeviceID() m.mut.RLock() folderCfg, ok := m.folderCfgs[req.Folder] folderIgnores := m.folderIgnores[req.Folder] m.mut.RUnlock() if !ok { // The folder might be already unpaused in the config, but not yet // in the model. l.Debugf("Request from %s for file %s in unstarted folder %q", deviceID.Short(), req.Name, req.Folder) return nil, protocol.ErrGeneric } if !folderCfg.SharedWith(deviceID) { l.Warnf("Request from %s for file %s in unshared folder %q", deviceID.Short(), req.Name, req.Folder) return nil, protocol.ErrGeneric } if folderCfg.Paused { l.Debugf("Request from %s for file %s in paused folder %q", deviceID.Short(), req.Name, req.Folder) return nil, protocol.ErrGeneric } // Make sure the path is valid and in canonical form if name, err := fs.Canonicalize(req.Name); err != nil { l.Debugf("Request from %s in folder %q for invalid filename %s", deviceID.Short(), req.Folder, req.Name) return nil, protocol.ErrGeneric } else { req.Name = name } if deviceID != protocol.LocalDeviceID { l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d t=%v", m, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size, req.FromTemporary) } if fs.IsInternal(req.Name) { l.Debugf("%v REQ(in) for internal file: %s: %q / %q o=%d s=%d", m, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrInvalid } if folderIgnores.Match(req.Name).IsIgnored() { l.Debugf("%v REQ(in) for ignored file: %s: %q / %q o=%d s=%d", m, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrInvalid } // Restrict parallel requests by connection/device m.mut.RLock() limiter := m.connRequestLimiters[deviceID] m.mut.RUnlock() // The requestResponse releases the bytes to the buffer pool and the // limiters when its Close method is called. res := newLimitedRequestResponse(int(req.Size), limiter, m.globalRequestLimiter) defer func() { // Close it ourselves if it isn't returned due to an error if err != nil { res.Close() } }() // Grab the FS after limiting, as it causes I/O and we want to minimize // the race time between the symlink check and the read. folderFs := folderCfg.Filesystem(nil) if err := osutil.TraversesSymlink(folderFs, filepath.Dir(req.Name)); err != nil { l.Debugf("%v REQ(in) traversal check: %s - %s: %q / %q o=%d s=%d", m, err, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrNoSuchFile } // Only check temp files if the flag is set, and if we are set to advertise // the temp indexes. if req.FromTemporary && !folderCfg.DisableTempIndexes { tempFn := fs.TempName(req.Name) if info, err := folderFs.Lstat(tempFn); err != nil || !info.IsRegular() { // Reject reads for anything that doesn't exist or is something // other than a regular file. l.Debugf("%v REQ(in) failed stating temp file (%v): %s: %q / %q o=%d s=%d", m, err, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrNoSuchFile } _, err := readOffsetIntoBuf(folderFs, tempFn, req.Offset, res.data) if err == nil && scanner.Validate(res.data, req.Hash, req.WeakHash) { return res, nil } // Fall through to reading from a non-temp file, just in case the temp // file has finished downloading. } if info, err := folderFs.Lstat(req.Name); err != nil || !info.IsRegular() { // Reject reads for anything that doesn't exist or is something // other than a regular file. l.Debugf("%v REQ(in) failed stating file (%v): %s: %q / %q o=%d s=%d", m, err, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrNoSuchFile } n, err := readOffsetIntoBuf(folderFs, req.Name, req.Offset, res.data) if fs.IsNotExist(err) { l.Debugf("%v REQ(in) file doesn't exist: %s: %q / %q o=%d s=%d", m, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrNoSuchFile } else if err == io.EOF { // Read beyond end of file. This might indicate a problem, or it // might be a short block that gets padded when read for encrypted // folders. We ignore the error and let the hash validation in the // next step take care of it, by only hashing the part we actually // managed to read. } else if err != nil { l.Debugf("%v REQ(in) failed reading file (%v): %s: %q / %q o=%d s=%d", m, err, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrGeneric } if folderCfg.Type != config.FolderTypeReceiveEncrypted && len(req.Hash) > 0 && !scanner.Validate(res.data[:n], req.Hash, req.WeakHash) { m.recheckFile(deviceID, req.Folder, req.Name, req.Offset, req.Hash, req.WeakHash) l.Debugf("%v REQ(in) failed validating data: %s: %q / %q o=%d s=%d", m, deviceID.Short(), req.Folder, req.Name, req.Offset, req.Size) return nil, protocol.ErrNoSuchFile } return res, nil } // newLimitedRequestResponse takes size bytes from the limiters in order, // skipping nil limiters, then returns a requestResponse of the given size. // When the requestResponse is closed the limiters are given back the bytes, // in reverse order. func newLimitedRequestResponse(size int, limiters ...*semaphore.Semaphore) *requestResponse { multi := semaphore.MultiSemaphore(limiters) multi.Take(size) res := newRequestResponse(size) go func() { res.Wait() multi.Give(size) }() return res } func (m *model) recheckFile(deviceID protocol.DeviceID, folder, name string, offset int64, hash []byte, weakHash uint32) { cf, ok, err := m.CurrentFolderFile(folder, name) if err != nil { l.Debugf("%v recheckFile: %s: %q / %q: current file error: %v", m, deviceID, folder, name, err) return } if !ok { l.Debugf("%v recheckFile: %s: %q / %q: no current file", m, deviceID, folder, name) return } if cf.IsDeleted() || cf.IsInvalid() || cf.IsSymlink() || cf.IsDirectory() { l.Debugf("%v recheckFile: %s: %q / %q: not a regular file", m, deviceID, folder, name) return } blockIndex := int(offset / int64(cf.BlockSize())) if blockIndex >= len(cf.Blocks) { l.Debugf("%v recheckFile: %s: %q / %q i=%d: block index too far", m, deviceID, folder, name, blockIndex) return } block := cf.Blocks[blockIndex] // Seems to want a different version of the file, whatever. if !bytes.Equal(block.Hash, hash) { l.Debugf("%v recheckFile: %s: %q / %q i=%d: hash mismatch %x != %x", m, deviceID, folder, name, blockIndex, block.Hash, hash) return } if weakHash != 0 && block.WeakHash != weakHash { l.Debugf("%v recheckFile: %s: %q / %q i=%d: weak hash mismatch %v != %v", m, deviceID, folder, name, blockIndex, block.WeakHash, weakHash) return } // The hashes provided part of the request match what we expect to find according // to what we have in the database, yet the content we've read off the filesystem doesn't // Something is fishy, invalidate the file and rescan it. // The file will temporarily become invalid, which is ok as the content is messed up. m.mut.RLock() runner, ok := m.folderRunners.Get(folder) m.mut.RUnlock() if !ok { l.Debugf("%v recheckFile: %s: %q / %q: Folder stopped before rescan could be scheduled", m, deviceID, folder, name) return } runner.ScheduleForceRescan(name) l.Debugf("%v recheckFile: %s: %q / %q", m, deviceID, folder, name) } func (m *model) CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool, error) { m.mut.RLock() fs, ok := m.folderFiles[folder] m.mut.RUnlock() if !ok { return protocol.FileInfo{}, false, ErrFolderMissing } snap, err := fs.Snapshot() if err != nil { return protocol.FileInfo{}, false, err } f, ok := snap.Get(protocol.LocalDeviceID, file) snap.Release() return f, ok, nil } func (m *model) CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool, error) { m.mut.RLock() ffs, ok := m.folderFiles[folder] m.mut.RUnlock() if !ok { return protocol.FileInfo{}, false, ErrFolderMissing } snap, err := ffs.Snapshot() if err != nil { return protocol.FileInfo{}, false, err } f, ok := snap.GetGlobal(file) snap.Release() return f, ok, nil } func (m *model) GetMtimeMapping(folder string, file string) (fs.MtimeMapping, error) { m.mut.RLock() ffs, ok := m.folderFiles[folder] fcfg := m.folderCfgs[folder] m.mut.RUnlock() if !ok { return fs.MtimeMapping{}, ErrFolderMissing } return fs.GetMtimeMapping(fcfg.Filesystem(ffs), file) } // Connection returns if we are connected to the given device. func (m *model) ConnectedTo(deviceID protocol.DeviceID) bool { m.mut.RLock() _, ok := m.deviceConnIDs[deviceID] m.mut.RUnlock() return ok } // LoadIgnores loads or refreshes the ignore patterns from disk, if the // folder is healthy, and returns the refreshed lines and patterns. func (m *model) LoadIgnores(folder string) ([]string, []string, error) { m.mut.RLock() cfg, cfgOk := m.folderCfgs[folder] ignores, ignoresOk := m.folderIgnores[folder] m.mut.RUnlock() if !cfgOk { cfg, cfgOk = m.cfg.Folder(folder) if !cfgOk { return nil, nil, fmt.Errorf("folder %s does not exist", folder) } } if cfg.Type == config.FolderTypeReceiveEncrypted { return nil, nil, nil } if !ignoresOk { ignores = ignore.New(cfg.Filesystem(nil)) } err := ignores.Load(".stignore") if fs.IsNotExist(err) { // Having no ignores is not an error. return nil, nil, nil } // Return lines and patterns, which may have some meaning even when err // != nil, depending on the specific error. return ignores.Lines(), ignores.Patterns(), err } // CurrentIgnores returns the currently loaded set of ignore patterns, // whichever it may be. No attempt is made to load or refresh ignore // patterns from disk. func (m *model) CurrentIgnores(folder string) ([]string, []string, error) { m.mut.RLock() _, cfgOk := m.folderCfgs[folder] ignores, ignoresOk := m.folderIgnores[folder] m.mut.RUnlock() if !cfgOk { return nil, nil, fmt.Errorf("folder %s does not exist", folder) } if !ignoresOk { // Empty ignore patterns return []string{}, []string{}, nil } return ignores.Lines(), ignores.Patterns(), nil } func (m *model) SetIgnores(folder string, content []string) error { cfg, ok := m.cfg.Folder(folder) if !ok { return fmt.Errorf("folder %s does not exist", cfg.Description()) } return m.setIgnores(cfg, content) } func (m *model) setIgnores(cfg config.FolderConfiguration, content []string) error { err := cfg.CheckPath() if err == config.ErrPathMissing { if err = cfg.CreateRoot(); err != nil { return fmt.Errorf("failed to create folder root: %w", err) } err = cfg.CheckPath() } if err != nil && err != config.ErrMarkerMissing { return err } if err := ignore.WriteIgnores(cfg.Filesystem(nil), ".stignore", content); err != nil { l.Warnln("Saving .stignore:", err) return err } m.mut.RLock() runner, ok := m.folderRunners.Get(cfg.ID) m.mut.RUnlock() if ok { runner.ScheduleScan() } return nil } // OnHello is called when an device connects to us. // This allows us to extract some information from the Hello message // and add it to a list of known devices ahead of any checks. func (m *model) OnHello(remoteID protocol.DeviceID, addr net.Addr, hello protocol.Hello) error { if _, ok := m.cfg.Device(remoteID); !ok { if err := m.db.AddOrUpdatePendingDevice(remoteID, hello.DeviceName, addr.String()); err != nil { l.Warnf("Failed to persist pending device entry to database: %v", err) } m.evLogger.Log(events.PendingDevicesChanged, map[string][]interface{}{ "added": {map[string]string{ "deviceID": remoteID.String(), "name": hello.DeviceName, "address": addr.String(), }}, }) // DEPRECATED: Only for backwards compatibility, should be removed. m.evLogger.Log(events.DeviceRejected, map[string]string{ "name": hello.DeviceName, "device": remoteID.String(), "address": addr.String(), }) return errDeviceUnknown } return nil } // AddConnection adds a new peer connection to the model. An initial index will // be sent to the connected peer, thereafter index updates whenever the local // folder changes. func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { deviceID := conn.DeviceID() deviceCfg, ok := m.cfg.Device(deviceID) if !ok { l.Infoln("Trying to add connection to unknown device") return } connID := conn.ConnectionID() closed := make(chan struct{}) m.mut.Lock() m.connections[connID] = conn m.closed[connID] = closed m.helloMessages[deviceID] = hello m.deviceConnIDs[deviceID] = append(m.deviceConnIDs[deviceID], connID) if m.deviceDownloads[deviceID] == nil { m.deviceDownloads[deviceID] = newDeviceDownloadState() } event := map[string]string{ "id": deviceID.String(), "deviceName": hello.DeviceName, "clientName": hello.ClientName, "clientVersion": hello.ClientVersion, "type": conn.Type(), } addr := conn.RemoteAddr() if addr != nil { event["addr"] = addr.String() } m.evLogger.Log(events.DeviceConnected, event) if len(m.deviceConnIDs[deviceID]) == 1 { l.Infof(`Device %s client is "%s %s" named "%s" at %s`, deviceID.Short(), hello.ClientName, hello.ClientVersion, hello.DeviceName, conn) } else { l.Infof(`Additional connection (+%d) for device %s at %s`, len(m.deviceConnIDs[deviceID])-1, deviceID.Short(), conn) } m.mut.Unlock() if (deviceCfg.Name == "" || m.cfg.Options().OverwriteRemoteDevNames) && hello.DeviceName != "" { m.cfg.Modify(func(cfg *config.Configuration) { for i := range cfg.Devices { if cfg.Devices[i].DeviceID == deviceID { if cfg.Devices[i].Name == "" || cfg.Options.OverwriteRemoteDevNames { cfg.Devices[i].Name = hello.DeviceName } return } } }) } m.deviceWasSeen(deviceID) m.scheduleConnectionPromotion() } func (m *model) scheduleConnectionPromotion() { // Keeps deferring to prevent multiple executions in quick succession, // e.g. if multiple connections to a single device are closed. m.promotionTimer.Reset(time.Second) } // promoteConnections checks for devices that have connections, but where // the primary connection hasn't started index handlers etc. yet, and // promotes the primary connection to be the index handling one. This should // be called after adding new connections, and after closing a primary // device connection. func (m *model) promoteConnections() { m.mut.Lock() defer m.mut.Unlock() for deviceID, connIDs := range m.deviceConnIDs { cm, passwords := m.generateClusterConfigRLocked(deviceID) if m.promotedConnID[deviceID] != connIDs[0] { // The previously promoted connection is not the current // primary; we should promote the primary connection to be the // index handling one. We do this by sending a ClusterConfig on // it, which will cause the other side to start sending us index // messages there. (On our side, we manage index handlers based // on where we get ClusterConfigs from the peer.) conn := m.connections[connIDs[0]] l.Debugf("Promoting connection to %s at %s", deviceID.Short(), conn) if conn.Statistics().StartedAt.IsZero() { conn.SetFolderPasswords(passwords) conn.Start() } conn.ClusterConfig(cm) m.promotedConnID[deviceID] = connIDs[0] } // Make sure any other new connections also get started, and that // they get a secondary-marked ClusterConfig. for _, connID := range connIDs[1:] { conn := m.connections[connID] if conn.Statistics().StartedAt.IsZero() { conn.SetFolderPasswords(passwords) conn.Start() conn.ClusterConfig(protocol.ClusterConfig{Secondary: true}) } } } } func (m *model) DownloadProgress(conn protocol.Connection, p *protocol.DownloadProgress) error { deviceID := conn.DeviceID() m.mut.RLock() cfg, ok := m.folderCfgs[p.Folder] m.mut.RUnlock() if !ok || cfg.DisableTempIndexes || !cfg.SharedWith(deviceID) { return nil } m.mut.RLock() downloads := m.deviceDownloads[deviceID] m.mut.RUnlock() downloads.Update(p.Folder, p.Updates) state := downloads.GetBlockCounts(p.Folder) m.evLogger.Log(events.RemoteDownloadProgress, map[string]interface{}{ "device": deviceID.String(), "folder": p.Folder, "state": state, }) return nil } func (m *model) deviceWasSeen(deviceID protocol.DeviceID) { m.mut.RLock() sr, ok := m.deviceStatRefs[deviceID] m.mut.RUnlock() if ok { _ = sr.WasSeen() } } func (m *model) deviceDidCloseRLocked(deviceID protocol.DeviceID, duration time.Duration) { if sr, ok := m.deviceStatRefs[deviceID]; ok { _ = sr.LastConnectionDuration(duration) _ = sr.WasSeen() } } func (m *model) RequestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { conn, connOK := m.requestConnectionForDevice(deviceID) if !connOK { return nil, fmt.Errorf("requestGlobal: no connection to device: %s", deviceID.Short()) } l.Debugf("%v REQ(out): %s (%s): %q / %q b=%d o=%d s=%d h=%x wh=%x ft=%t", m, deviceID.Short(), conn, folder, name, blockNo, offset, size, hash, weakHash, fromTemporary) return conn.Request(ctx, folder, name, blockNo, offset, size, hash, weakHash, fromTemporary) } // requestConnectionForDevice returns a connection to the given device, to // be used for sending a request. If there is only one device connection, // this is the one to use. If there are multiple then we avoid the first // ("primary") connection, which is dedicated to index data, and pick a // random one of the others. func (m *model) requestConnectionForDevice(deviceID protocol.DeviceID) (protocol.Connection, bool) { m.mut.RLock() defer m.mut.RUnlock() connIDs, ok := m.deviceConnIDs[deviceID] if !ok { return nil, false } // If there is an entry in deviceConns, it always contains at least one // connection. connID := connIDs[0] if len(connIDs) > 1 { // Pick a random connection of the non-primary ones idx := rand.Intn(len(connIDs)-1) + 1 connID = connIDs[idx] } conn, connOK := m.connections[connID] return conn, connOK } func (m *model) ScanFolders() map[string]error { m.mut.RLock() folders := make([]string, 0, len(m.folderCfgs)) for folder := range m.folderCfgs { folders = append(folders, folder) } m.mut.RUnlock() errors := make(map[string]error, len(m.folderCfgs)) errorsMut := sync.NewMutex() wg := sync.NewWaitGroup() wg.Add(len(folders)) for _, folder := range folders { folder := folder go func() { err := m.ScanFolder(folder) if err != nil { errorsMut.Lock() errors[folder] = err errorsMut.Unlock() } wg.Done() }() } wg.Wait() return errors } func (m *model) ScanFolder(folder string) error { return m.ScanFolderSubdirs(folder, nil) } func (m *model) ScanFolderSubdirs(folder string, subs []string) error { m.mut.RLock() err := m.checkFolderRunningRLocked(folder) runner, _ := m.folderRunners.Get(folder) m.mut.RUnlock() if err != nil { return err } return runner.Scan(subs) } func (m *model) DelayScan(folder string, next time.Duration) { m.mut.RLock() runner, ok := m.folderRunners.Get(folder) m.mut.RUnlock() if !ok { return } runner.DelayScan(next) } // numHashers returns the number of hasher routines to use for a given folder, // taking into account configuration and available CPU cores. func (m *model) numHashers(folder string) int { m.mut.RLock() folderCfg := m.folderCfgs[folder] numFolders := len(m.folderCfgs) m.mut.RUnlock() if folderCfg.Hashers > 0 { // Specific value set in the config, use that. return folderCfg.Hashers } if build.IsWindows || build.IsDarwin || build.IsIOS || build.IsAndroid { // Interactive operating systems; don't load the system too heavily by // default. return 1 } // For other operating systems and architectures, lets try to get some // work done... Divide the available CPU cores among the configured // folders. if perFolder := runtime.GOMAXPROCS(-1) / numFolders; perFolder > 0 { return perFolder } return 1 } // generateClusterConfig returns a ClusterConfigMessage that is correct and the // set of folder passwords for the given peer device func (m *model) generateClusterConfig(device protocol.DeviceID) (protocol.ClusterConfig, map[string]string) { m.mut.RLock() defer m.mut.RUnlock() return m.generateClusterConfigRLocked(device) } func (m *model) generateClusterConfigRLocked(device protocol.DeviceID) (protocol.ClusterConfig, map[string]string) { var message protocol.ClusterConfig folders := m.cfg.FolderList() passwords := make(map[string]string, len(folders)) for _, folderCfg := range folders { if !folderCfg.SharedWith(device) { continue } encryptionToken, hasEncryptionToken := m.folderEncryptionPasswordTokens[folderCfg.ID] if folderCfg.Type == config.FolderTypeReceiveEncrypted && !hasEncryptionToken { // We haven't gotten a token for us yet and without one the other // side can't validate us - pretend we don't have the folder yet. continue } protocolFolder := protocol.Folder{ ID: folderCfg.ID, Label: folderCfg.Label, ReadOnly: folderCfg.Type == config.FolderTypeSendOnly, IgnorePermissions: folderCfg.IgnorePerms, IgnoreDelete: folderCfg.IgnoreDelete, DisableTempIndexes: folderCfg.DisableTempIndexes, } fs := m.folderFiles[folderCfg.ID] // Even if we aren't paused, if we haven't started the folder yet // pretend we are. Otherwise the remote might get confused about // the missing index info (and drop all the info). We will send // another cluster config once the folder is started. protocolFolder.Paused = folderCfg.Paused || fs == nil for _, folderDevice := range folderCfg.Devices { deviceCfg, _ := m.cfg.Device(folderDevice.DeviceID) protocolDevice := protocol.Device{ ID: deviceCfg.DeviceID, Name: deviceCfg.Name, Addresses: deviceCfg.Addresses, Compression: deviceCfg.Compression, CertName: deviceCfg.CertName, Introducer: deviceCfg.Introducer, } if deviceCfg.DeviceID == m.id && hasEncryptionToken { protocolDevice.EncryptionPasswordToken = encryptionToken } else if folderDevice.EncryptionPassword != "" { protocolDevice.EncryptionPasswordToken = protocol.PasswordToken(m.keyGen, folderCfg.ID, folderDevice.EncryptionPassword) if folderDevice.DeviceID == device { passwords[folderCfg.ID] = folderDevice.EncryptionPassword } } if fs != nil { if deviceCfg.DeviceID == m.id { protocolDevice.IndexID = fs.IndexID(protocol.LocalDeviceID) protocolDevice.MaxSequence = fs.Sequence(protocol.LocalDeviceID) } else { protocolDevice.IndexID = fs.IndexID(deviceCfg.DeviceID) protocolDevice.MaxSequence = fs.Sequence(deviceCfg.DeviceID) } } protocolFolder.Devices = append(protocolFolder.Devices, protocolDevice) } message.Folders = append(message.Folders, protocolFolder) } return message, passwords } func (m *model) State(folder string) (string, time.Time, error) { m.mut.RLock() runner, ok := m.folderRunners.Get(folder) m.mut.RUnlock() if !ok { // The returned error should be an actual folder error, so returning // errors.New("does not exist") or similar here would be // inappropriate. return "", time.Time{}, nil } state, changed, err := runner.getState() return state.String(), changed, err } func (m *model) FolderErrors(folder string) ([]FileError, error) { m.mut.RLock() err := m.checkFolderRunningRLocked(folder) runner, _ := m.folderRunners.Get(folder) m.mut.RUnlock() if err != nil { return nil, err } return runner.Errors(), nil } func (m *model) WatchError(folder string) error { m.mut.RLock() err := m.checkFolderRunningRLocked(folder) runner, _ := m.folderRunners.Get(folder) m.mut.RUnlock() if err != nil { return nil // If the folder isn't running, there's no error to report. } return runner.WatchError() } func (m *model) Override(folder string) { // Grab the runner and the file set. m.mut.RLock() runner, ok := m.folderRunners.Get(folder) m.mut.RUnlock() if !ok { return } // Run the override, taking updates as if they came from scanning. runner.Override() } func (m *model) Revert(folder string) { // Grab the runner and the file set. m.mut.RLock() runner, ok := m.folderRunners.Get(folder) m.mut.RUnlock() if !ok { return } // Run the revert, taking updates as if they came from scanning. runner.Revert() } type TreeEntry struct { Name string `json:"name"` ModTime time.Time `json:"modTime"` Size int64 `json:"size"` Type protocol.FileInfoType `json:"type"` Children []*TreeEntry `json:"children,omitempty"` } func findByName(slice []*TreeEntry, name string) *TreeEntry { for _, child := range slice { if child.Name == name { return child } } return nil } func (m *model) GlobalDirectoryTree(folder, prefix string, levels int, dirsOnly bool) ([]*TreeEntry, error) { m.mut.RLock() files, ok := m.folderFiles[folder] m.mut.RUnlock() if !ok { return nil, ErrFolderMissing } root := &TreeEntry{ Children: make([]*TreeEntry, 0), } sep := string(filepath.Separator) prefix = osutil.NativeFilename(prefix) if prefix != "" && !strings.HasSuffix(prefix, sep) { prefix = prefix + sep } snap, err := files.Snapshot() if err != nil { return nil, err } defer snap.Release() snap.WithPrefixedGlobalTruncated(prefix, func(fi protocol.FileIntf) bool { f := fi.(db.FileInfoTruncated) // Don't include the prefix itself. if f.IsInvalid() || f.IsDeleted() || strings.HasPrefix(prefix, f.Name) { return true } f.Name = strings.Replace(f.Name, prefix, "", 1) dir := filepath.Dir(f.Name) base := filepath.Base(f.Name) if levels > -1 && strings.Count(f.Name, sep) > levels { return true } parent := root if dir != "." { for _, path := range strings.Split(dir, sep) { child := findByName(parent.Children, path) if child == nil { err = fmt.Errorf("could not find child '%s' for path '%s' in parent '%s'", path, f.Name, parent.Name) return false } parent = child } } if dirsOnly && !f.IsDirectory() { return true } parent.Children = append(parent.Children, &TreeEntry{ Name: base, Type: f.Type, ModTime: f.ModTime(), Size: f.FileSize(), }) return true }) if err != nil { return nil, err } return root.Children, nil } func (m *model) GetFolderVersions(folder string) (map[string][]versioner.FileVersion, error) { m.mut.RLock() err := m.checkFolderRunningRLocked(folder) ver := m.folderVersioners[folder] m.mut.RUnlock() if err != nil { return nil, err } if ver == nil { return nil, errNoVersioner } return ver.GetVersions() } func (m *model) RestoreFolderVersions(folder string, versions map[string]time.Time) (map[string]error, error) { m.mut.RLock() err := m.checkFolderRunningRLocked(folder) fcfg := m.folderCfgs[folder] ver := m.folderVersioners[folder] m.mut.RUnlock() if err != nil { return nil, err } if ver == nil { return nil, errNoVersioner } restoreErrors := make(map[string]error) for file, version := range versions { if err := ver.Restore(file, version); err != nil { restoreErrors[file] = err } } // Trigger scan if !fcfg.FSWatcherEnabled { go func() { _ = m.ScanFolder(folder) }() } return restoreErrors, nil } func (m *model) Availability(folder string, file protocol.FileInfo, block protocol.BlockInfo) ([]Availability, error) { m.mut.RLock() defer m.mut.RUnlock() fs, ok := m.folderFiles[folder] cfg := m.folderCfgs[folder] if !ok { return nil, ErrFolderMissing } snap, err := fs.Snapshot() if err != nil { return nil, err } defer snap.Release() return m.availabilityInSnapshotRLocked(cfg, snap, file, block), nil } func (m *model) availabilityInSnapshot(cfg config.FolderConfiguration, snap *db.Snapshot, file protocol.FileInfo, block protocol.BlockInfo) []Availability { m.mut.RLock() defer m.mut.RUnlock() return m.availabilityInSnapshotRLocked(cfg, snap, file, block) } func (m *model) availabilityInSnapshotRLocked(cfg config.FolderConfiguration, snap *db.Snapshot, file protocol.FileInfo, block protocol.BlockInfo) []Availability { var availabilities []Availability for _, device := range snap.Availability(file.Name) { if _, ok := m.remoteFolderStates[device]; !ok { continue } if state := m.remoteFolderStates[device][cfg.ID]; state != remoteFolderValid { continue } _, ok := m.deviceConnIDs[device] if ok { availabilities = append(availabilities, Availability{ID: device, FromTemporary: false}) } } for _, device := range cfg.Devices { if m.deviceDownloads[device.DeviceID].Has(cfg.ID, file.Name, file.Version, int(block.Offset/int64(file.BlockSize()))) { availabilities = append(availabilities, Availability{ID: device.DeviceID, FromTemporary: true}) } } return availabilities } // BringToFront bumps the given files priority in the job queue. func (m *model) BringToFront(folder, file string) { m.mut.RLock() runner, ok := m.folderRunners.Get(folder) m.mut.RUnlock() if ok { runner.BringToFront(file) } } func (m *model) ResetFolder(folder string) error { m.mut.RLock() defer m.mut.RUnlock() _, ok := m.folderRunners.Get(folder) if ok { return errors.New("folder must be paused when resetting") } l.Infof("Cleaning metadata for reset folder %q", folder) db.DropFolder(m.db, folder) return nil } func (m *model) String() string { return fmt.Sprintf("model@%p", m) } func (*model) VerifyConfiguration(from, to config.Configuration) error { toFolders := to.FolderMap() for _, from := range from.Folders { to, ok := toFolders[from.ID] if ok && from.Type != to.Type && (from.Type == config.FolderTypeReceiveEncrypted || to.Type == config.FolderTypeReceiveEncrypted) { return errors.New("folder type must not be changed from/to receive-encrypted") } } // Verify that any requested versioning is possible to construct, or we // will panic later when starting the folder. for _, to := range to.Folders { if to.Versioning.Type != "" { if _, err := versioner.New(to); err != nil { return err } } } return nil } func (m *model) CommitConfiguration(from, to config.Configuration) bool { // TODO: This should not use reflect, and should take more care to try to handle stuff without restart. // Delay processing config changes until after the initial setup <-m.started // Go through the folder configs and figure out if we need to restart or not. // Tracks devices affected by any configuration change to resend ClusterConfig. clusterConfigDevices := make(deviceIDSet, len(from.Devices)+len(to.Devices)) closeDevices := make([]protocol.DeviceID, 0, len(to.Devices)) fromFolders := mapFolders(from.Folders) toFolders := mapFolders(to.Folders) for folderID, cfg := range toFolders { if _, ok := fromFolders[folderID]; !ok { // A folder was added. if cfg.Paused { l.Infoln("Paused folder", cfg.Description()) } else { l.Infoln("Adding folder", cfg.Description()) if err := m.newFolder(cfg, to.Options.CacheIgnoredFiles); err != nil { m.fatal(err) return true } } clusterConfigDevices.add(cfg.DeviceIDs()) } } removedFolders := make(map[string]struct{}) for folderID, fromCfg := range fromFolders { toCfg, ok := toFolders[folderID] if !ok { // The folder was removed. m.removeFolder(fromCfg) clusterConfigDevices.add(fromCfg.DeviceIDs()) removedFolders[fromCfg.ID] = struct{}{} continue } if fromCfg.Paused && toCfg.Paused { continue } // This folder exists on both sides. Settings might have changed. // Check if anything differs that requires a restart. if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles { if err := m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles); err != nil { m.fatal(err) return true } clusterConfigDevices.add(fromCfg.DeviceIDs()) if toCfg.Type != config.FolderTypeReceiveEncrypted { clusterConfigDevices.add(toCfg.DeviceIDs()) } else { // If we don't have the encryption token yet, we need to drop // the connection to make the remote re-send the cluster-config // and with it the token. m.mut.RLock() _, ok := m.folderEncryptionPasswordTokens[toCfg.ID] m.mut.RUnlock() if !ok { closeDevices = append(closeDevices, toCfg.DeviceIDs()...) } else { clusterConfigDevices.add(toCfg.DeviceIDs()) } } } // Emit the folder pause/resume event if fromCfg.Paused != toCfg.Paused { eventType := events.FolderResumed if toCfg.Paused { eventType = events.FolderPaused } m.evLogger.Log(eventType, map[string]string{"id": toCfg.ID, "label": toCfg.Label}) } } // Pausing a device, unpausing is handled by the connection service. fromDevices := from.DeviceMap() toDevices := to.DeviceMap() for deviceID, toCfg := range toDevices { fromCfg, ok := fromDevices[deviceID] if !ok { sr := stats.NewDeviceStatisticsReference(m.db, deviceID) m.mut.Lock() m.deviceStatRefs[deviceID] = sr m.mut.Unlock() continue } delete(fromDevices, deviceID) if fromCfg.Paused == toCfg.Paused { continue } if toCfg.Paused { l.Infoln("Pausing", deviceID) closeDevices = append(closeDevices, deviceID) m.evLogger.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) } else { // Ignored folder was removed, reconnect to retrigger the prompt. if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) { closeDevices = append(closeDevices, deviceID) } l.Infoln("Resuming", deviceID) m.evLogger.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) } if toCfg.MaxRequestKiB != fromCfg.MaxRequestKiB { m.mut.Lock() m.setConnRequestLimitersLocked(toCfg) m.mut.Unlock() } } // Clean up after removed devices removedDevices := make([]protocol.DeviceID, 0, len(fromDevices)) m.mut.Lock() for deviceID := range fromDevices { delete(m.deviceStatRefs, deviceID) removedDevices = append(removedDevices, deviceID) delete(clusterConfigDevices, deviceID) } m.mut.Unlock() m.mut.RLock() for _, id := range closeDevices { delete(clusterConfigDevices, id) if conns, ok := m.deviceConnIDs[id]; ok { for _, connID := range conns { go m.connections[connID].Close(errDevicePaused) } } } for _, id := range removedDevices { delete(clusterConfigDevices, id) if conns, ok := m.deviceConnIDs[id]; ok { for _, connID := range conns { go m.connections[connID].Close(errDevicePaused) } } } m.mut.RUnlock() // Generating cluster-configs acquires the mutex. m.sendClusterConfig(clusterConfigDevices.AsSlice()) ignoredDevices := observedDeviceSet(to.IgnoredDevices) m.cleanPending(toDevices, toFolders, ignoredDevices, removedFolders) m.globalRequestLimiter.SetCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB()) m.folderIOLimiter.SetCapacity(to.Options.MaxFolderConcurrency()) // Some options don't require restart as those components handle it fine // by themselves. Compare the options structs containing only the // attributes that require restart and act apprioriately. if !reflect.DeepEqual(from.Options.RequiresRestartOnly(), to.Options.RequiresRestartOnly()) { l.Debugln(m, "requires restart, options differ") return false } return true } func (m *model) setConnRequestLimitersLocked(cfg config.DeviceConfiguration) { // Touches connRequestLimiters which is protected by the mutex. // 0: default, <0: no limiting switch { case cfg.MaxRequestKiB > 0: m.connRequestLimiters[cfg.DeviceID] = semaphore.New(1024 * cfg.MaxRequestKiB) case cfg.MaxRequestKiB == 0: m.connRequestLimiters[cfg.DeviceID] = semaphore.New(1024 * defaultPullerPendingKiB) } } func (m *model) cleanPending(existingDevices map[protocol.DeviceID]config.DeviceConfiguration, existingFolders map[string]config.FolderConfiguration, ignoredDevices deviceIDSet, removedFolders map[string]struct{}) { var removedPendingFolders []map[string]string pendingFolders, err := m.db.PendingFolders() if err != nil { msg := "Could not iterate through pending folder entries for cleanup" l.Warnf("%v: %v", msg, err) m.evLogger.Log(events.Failure, msg) // Continue with pending devices below, loop is skipped. } for folderID, pf := range pendingFolders { if _, ok := removedFolders[folderID]; ok { // Forget pending folder device associations for recently removed // folders as well, assuming the folder is no longer of interest // at all (but might become pending again). l.Debugf("Discarding pending removed folder %v from all devices", folderID) if err := m.db.RemovePendingFolder(folderID); err != nil { msg := "Failed to remove pending folder entry" l.Warnf("%v (%v): %v", msg, folderID, err) m.evLogger.Log(events.Failure, msg) } else { removedPendingFolders = append(removedPendingFolders, map[string]string{ "folderID": folderID, }) } continue } for deviceID := range pf.OfferedBy { if dev, ok := existingDevices[deviceID]; !ok { l.Debugf("Discarding pending folder %v from unknown device %v", folderID, deviceID) goto removeFolderForDevice } else if dev.IgnoredFolder(folderID) { l.Debugf("Discarding now ignored pending folder %v for device %v", folderID, deviceID) goto removeFolderForDevice } if folderCfg, ok := existingFolders[folderID]; ok { if folderCfg.SharedWith(deviceID) { l.Debugf("Discarding now shared pending folder %v for device %v", folderID, deviceID) goto removeFolderForDevice } } continue removeFolderForDevice: if err := m.db.RemovePendingFolderForDevice(folderID, deviceID); err != nil { msg := "Failed to remove pending folder-device entry" l.Warnf("%v (%v, %v): %v", msg, folderID, deviceID, err) m.evLogger.Log(events.Failure, msg) continue } removedPendingFolders = append(removedPendingFolders, map[string]string{ "folderID": folderID, "deviceID": deviceID.String(), }) } } if len(removedPendingFolders) > 0 { m.evLogger.Log(events.PendingFoldersChanged, map[string]interface{}{ "removed": removedPendingFolders, }) } var removedPendingDevices []map[string]string pendingDevices, err := m.db.PendingDevices() if err != nil { msg := "Could not iterate through pending device entries for cleanup" l.Warnf("%v: %v", msg, err) m.evLogger.Log(events.Failure, msg) return } for deviceID := range pendingDevices { if _, ok := ignoredDevices[deviceID]; ok { l.Debugf("Discarding now ignored pending device %v", deviceID) goto removeDevice } if _, ok := existingDevices[deviceID]; ok { l.Debugf("Discarding now added pending device %v", deviceID) goto removeDevice } continue removeDevice: if err := m.db.RemovePendingDevice(deviceID); err != nil { msg := "Failed to remove pending device entry" l.Warnf("%v: %v", msg, err) m.evLogger.Log(events.Failure, msg) continue } removedPendingDevices = append(removedPendingDevices, map[string]string{ "deviceID": deviceID.String(), }) } if len(removedPendingDevices) > 0 { m.evLogger.Log(events.PendingDevicesChanged, map[string]interface{}{ "removed": removedPendingDevices, }) } } // checkFolderRunningRLocked returns nil if the folder is up and running and a // descriptive error if not. // Need to hold (read) lock on m.mut when calling this. func (m *model) checkFolderRunningRLocked(folder string) error { _, ok := m.folderRunners.Get(folder) if ok { return nil } if cfg, ok := m.cfg.Folder(folder); !ok { return ErrFolderMissing } else if cfg.Paused { return ErrFolderPaused } return ErrFolderNotRunning } // PendingDevices lists unknown devices that tried to connect. func (m *model) PendingDevices() (map[protocol.DeviceID]db.ObservedDevice, error) { return m.db.PendingDevices() } // PendingFolders lists folders that we don't yet share with the offering devices. It // returns the entries grouped by folder and filters for a given device unless the // argument is specified as EmptyDeviceID. func (m *model) PendingFolders(device protocol.DeviceID) (map[string]db.PendingFolder, error) { return m.db.PendingFoldersForDevice(device) } // DismissPendingDevices removes the record of a specific pending device. func (m *model) DismissPendingDevice(device protocol.DeviceID) error { l.Debugf("Discarding pending device %v", device) err := m.db.RemovePendingDevice(device) if err != nil { return err } removedPendingDevices := []map[string]string{ {"deviceID": device.String()}, } m.evLogger.Log(events.PendingDevicesChanged, map[string]interface{}{ "removed": removedPendingDevices, }) return nil } // DismissPendingFolders removes records of pending folders. Either a specific folder / // device combination, or all matching a specific folder ID if the device argument is // specified as EmptyDeviceID. func (m *model) DismissPendingFolder(device protocol.DeviceID, folder string) error { var removedPendingFolders []map[string]string if device == protocol.EmptyDeviceID { l.Debugf("Discarding pending removed folder %s from all devices", folder) err := m.db.RemovePendingFolder(folder) if err != nil { return err } removedPendingFolders = []map[string]string{ {"folderID": folder}, } } else { l.Debugf("Discarding pending folder %s from device %v", folder, device) err := m.db.RemovePendingFolderForDevice(folder, device) if err != nil { return err } removedPendingFolders = []map[string]string{ { "folderID": folder, "deviceID": device.String(), }, } } if len(removedPendingFolders) > 0 { m.evLogger.Log(events.PendingFoldersChanged, map[string]interface{}{ "removed": removedPendingFolders, }) } return nil } // mapFolders returns a map of folder ID to folder configuration for the given // slice of folder configurations. func mapFolders(folders []config.FolderConfiguration) map[string]config.FolderConfiguration { m := make(map[string]config.FolderConfiguration, len(folders)) for _, cfg := range folders { m[cfg.ID] = cfg } return m } // mapDevices returns a map of device ID to nothing for the given slice of // device IDs. func mapDevices(devices []protocol.DeviceID) map[protocol.DeviceID]struct{} { m := make(map[protocol.DeviceID]struct{}, len(devices)) for _, dev := range devices { m[dev] = struct{}{} } return m } func observedDeviceSet(devices []config.ObservedDevice) deviceIDSet { res := make(deviceIDSet, len(devices)) for _, dev := range devices { res[dev.ID] = struct{}{} } return res } func readOffsetIntoBuf(fs fs.Filesystem, file string, offset int64, buf []byte) (int, error) { fd, err := fs.Open(file) if err != nil { l.Debugln("readOffsetIntoBuf.Open", file, err) return 0, err } defer fd.Close() n, err := fd.ReadAt(buf, offset) if err != nil { l.Debugln("readOffsetIntoBuf.ReadAt", file, err) } return n, err } // folderDeviceSet is a set of (folder, deviceID) pairs type folderDeviceSet map[string]map[protocol.DeviceID]struct{} // set adds the (dev, folder) pair to the set func (s folderDeviceSet) set(dev protocol.DeviceID, folder string) { devs, ok := s[folder] if !ok { devs = make(map[protocol.DeviceID]struct{}) s[folder] = devs } devs[dev] = struct{}{} } // has returns true if the (dev, folder) pair is in the set func (s folderDeviceSet) has(dev protocol.DeviceID, folder string) bool { _, ok := s[folder][dev] return ok } // hasDevice returns true if the device is set on any folder func (s folderDeviceSet) hasDevice(dev protocol.DeviceID) bool { for _, devices := range s { if _, ok := devices[dev]; ok { return true } } return false } // syncMutexMap is a type safe wrapper for a sync.Map that holds mutexes type syncMutexMap struct { inner stdsync.Map } func (m *syncMutexMap) Get(key string) sync.Mutex { v, _ := m.inner.LoadOrStore(key, sync.NewMutex()) return v.(sync.Mutex) } type deviceIDSet map[protocol.DeviceID]struct{} func (s deviceIDSet) add(ids []protocol.DeviceID) { for _, id := range ids { if _, ok := s[id]; !ok { s[id] = struct{}{} } } } func (s deviceIDSet) AsSlice() []protocol.DeviceID { ids := make([]protocol.DeviceID, 0, len(s)) for id := range s { ids = append(ids, id) } return ids } func encryptionTokenPath(cfg config.FolderConfiguration) string { return filepath.Join(cfg.MarkerName, config.EncryptionTokenName) } type storedEncryptionToken struct { FolderID string Token []byte } func readEncryptionToken(cfg config.FolderConfiguration) ([]byte, error) { fd, err := cfg.Filesystem(nil).Open(encryptionTokenPath(cfg)) if err != nil { return nil, err } defer fd.Close() var stored storedEncryptionToken if err := json.NewDecoder(fd).Decode(&stored); err != nil { return nil, err } return stored.Token, nil } func writeEncryptionToken(token []byte, cfg config.FolderConfiguration) error { tokenName := encryptionTokenPath(cfg) fd, err := cfg.Filesystem(nil).OpenFile(tokenName, fs.OptReadWrite|fs.OptCreate, 0o666) if err != nil { return err } defer fd.Close() return json.NewEncoder(fd).Encode(storedEncryptionToken{ FolderID: cfg.ID, Token: token, }) } func newFolderConfiguration(w config.Wrapper, id, label string, fsType fs.FilesystemType, path string) config.FolderConfiguration { fcfg := w.DefaultFolder() fcfg.ID = id fcfg.Label = label fcfg.FilesystemType = fsType fcfg.Path = path return fcfg } type updatedPendingFolder struct { FolderID string `json:"folderID"` FolderLabel string `json:"folderLabel"` DeviceID protocol.DeviceID `json:"deviceID"` ReceiveEncrypted bool `json:"receiveEncrypted"` RemoteEncrypted bool `json:"remoteEncrypted"` } // redactPathError checks if the error is actually a os.PathError, and if yes // returns a redactedError with the path removed. func redactPathError(err error) (error, bool) { perr, ok := err.(*os.PathError) if !ok { return nil, false } return &redactedError{ error: err, redacted: fmt.Errorf("%v: %w", perr.Op, perr.Err), }, true } type redactedError struct { error redacted error } func without[E comparable, S ~[]E](s S, e E) S { for i, x := range s { if x == e { return append(s[:i], s[i+1:]...) } } return s }