all: Refactor cmd/syncthing creating lib/syncthing (ref #4085) (#5805)

* add skeleton for lib/syncthing

* copy syncthingMain to lib/syncthing (verbatim)

* Remove code to deduplicate copies of syncthingMain

* fix simple build errors

* move stuff from main to syncthing with minimal mod

* merge runtime options

* actually use syncthing.App

* pass io.writer to lib/syncthing for auditing

* get rid of env stuff in lib/syncthing

* add .Error() and comments

* review: Remove fs interactions from lib

* and go 1.13 happened

* utility functions
This commit is contained in:
Simon Frei 2019-07-14 12:43:13 +02:00 committed by Audrius Butkevicius
parent 82b70b9fae
commit 0025e9ccfb
13 changed files with 584 additions and 398 deletions

View File

@ -25,32 +25,23 @@ import (
"runtime/pprof" "runtime/pprof"
"sort" "sort"
"strconv" "strconv"
"strings"
"syscall" "syscall"
"time" "time"
"github.com/syncthing/syncthing/lib/api"
"github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/locations" "github.com/syncthing/syncthing/lib/locations"
"github.com/syncthing/syncthing/lib/logger" "github.com/syncthing/syncthing/lib/logger"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/syncthing"
"github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/upgrade"
"github.com/syncthing/syncthing/lib/ur"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/thejerf/suture"
) )
const ( const (
@ -158,15 +149,14 @@ The following are valid values for the STTRACE variable:
// Environment options // Environment options
var ( var (
noUpgradeFromEnv = os.Getenv("STNOUPGRADE") != ""
innerProcess = os.Getenv("STNORESTART") != "" || os.Getenv("STMONITORED") != "" innerProcess = os.Getenv("STNORESTART") != "" || os.Getenv("STMONITORED") != ""
noDefaultFolder = os.Getenv("STNODEFAULTFOLDER") != "" noDefaultFolder = os.Getenv("STNODEFAULTFOLDER") != ""
) )
type RuntimeOptions struct { type RuntimeOptions struct {
syncthing.Options
confDir string confDir string
resetDatabase bool resetDatabase bool
resetDeltaIdxs bool
showVersion bool showVersion bool
showPaths bool showPaths bool
showDeviceId bool showDeviceId bool
@ -179,15 +169,12 @@ type RuntimeOptions struct {
logFile string logFile string
auditEnabled bool auditEnabled bool
auditFile string auditFile string
verbose bool
paused bool paused bool
unpaused bool unpaused bool
guiAddress string guiAddress string
guiAPIKey string guiAPIKey string
generateDir string generateDir string
noRestart bool noRestart bool
profiler string
assetDir string
cpuProfile bool cpuProfile bool
stRestarting bool stRestarting bool
logFlags int logFlags int
@ -197,9 +184,12 @@ type RuntimeOptions struct {
func defaultRuntimeOptions() RuntimeOptions { func defaultRuntimeOptions() RuntimeOptions {
options := RuntimeOptions{ options := RuntimeOptions{
Options: syncthing.Options{
AssetDir: os.Getenv("STGUIASSETS"),
NoUpgrade: os.Getenv("STNOUPGRADE") != "",
ProfilerURL: os.Getenv("STPROFILER"),
},
noRestart: os.Getenv("STNORESTART") != "", noRestart: os.Getenv("STNORESTART") != "",
profiler: os.Getenv("STPROFILER"),
assetDir: os.Getenv("STGUIASSETS"),
cpuProfile: os.Getenv("STCPUPROFILE") != "", cpuProfile: os.Getenv("STCPUPROFILE") != "",
stRestarting: os.Getenv("STRESTART") != "", stRestarting: os.Getenv("STRESTART") != "",
logFlags: log.Ltime, logFlags: log.Ltime,
@ -232,7 +222,7 @@ func parseCommandLineOptions() RuntimeOptions {
flag.BoolVar(&options.browserOnly, "browser-only", false, "Open GUI in browser") flag.BoolVar(&options.browserOnly, "browser-only", false, "Open GUI in browser")
flag.BoolVar(&options.noRestart, "no-restart", options.noRestart, "Disable monitor process, managed restarts and log file writing") flag.BoolVar(&options.noRestart, "no-restart", options.noRestart, "Disable monitor process, managed restarts and log file writing")
flag.BoolVar(&options.resetDatabase, "reset-database", false, "Reset the database, forcing a full rescan and resync") flag.BoolVar(&options.resetDatabase, "reset-database", false, "Reset the database, forcing a full rescan and resync")
flag.BoolVar(&options.resetDeltaIdxs, "reset-deltas", false, "Reset delta index IDs, forcing a full index exchange") flag.BoolVar(&options.ResetDeltaIdxs, "reset-deltas", false, "Reset delta index IDs, forcing a full index exchange")
flag.BoolVar(&options.doUpgrade, "upgrade", false, "Perform upgrade") flag.BoolVar(&options.doUpgrade, "upgrade", false, "Perform upgrade")
flag.BoolVar(&options.doUpgradeCheck, "upgrade-check", false, "Check for available upgrade") flag.BoolVar(&options.doUpgradeCheck, "upgrade-check", false, "Check for available upgrade")
flag.BoolVar(&options.showVersion, "version", false, "Show version") flag.BoolVar(&options.showVersion, "version", false, "Show version")
@ -241,7 +231,7 @@ func parseCommandLineOptions() RuntimeOptions {
flag.BoolVar(&options.showDeviceId, "device-id", false, "Show the device ID") flag.BoolVar(&options.showDeviceId, "device-id", false, "Show the device ID")
flag.StringVar(&options.upgradeTo, "upgrade-to", options.upgradeTo, "Force upgrade directly from specified URL") flag.StringVar(&options.upgradeTo, "upgrade-to", options.upgradeTo, "Force upgrade directly from specified URL")
flag.BoolVar(&options.auditEnabled, "audit", false, "Write events to audit file") flag.BoolVar(&options.auditEnabled, "audit", false, "Write events to audit file")
flag.BoolVar(&options.verbose, "verbose", false, "Print verbose log output") flag.BoolVar(&options.Verbose, "verbose", false, "Print verbose log output")
flag.BoolVar(&options.paused, "paused", false, "Start with all devices and folders paused") flag.BoolVar(&options.paused, "paused", false, "Start with all devices and folders paused")
flag.BoolVar(&options.unpaused, "unpaused", false, "Start with all devices and folders unpaused") flag.BoolVar(&options.unpaused, "unpaused", false, "Start with all devices and folders unpaused")
flag.StringVar(&options.logFile, "logfile", options.logFile, "Log file name (still always logs to stdout). Cannot be used together with -no-restart/STNORESTART environment variable.") flag.StringVar(&options.logFile, "logfile", options.logFile, "Log file name (still always logs to stdout). Cannot be used together with -no-restart/STNORESTART environment variable.")
@ -264,33 +254,6 @@ func parseCommandLineOptions() RuntimeOptions {
return options return options
} }
// exiter implements api.Controller
type exiter struct {
stop chan int
}
func (e *exiter) Restart() {
l.Infoln("Restarting")
e.stop <- exitRestarting
}
func (e *exiter) Shutdown() {
l.Infoln("Shutting down")
e.stop <- exitSuccess
}
func (e *exiter) ExitUpgrading() {
l.Infoln("Shutting down after upgrade")
e.stop <- exitUpgrading
}
// waitForExit must be called synchronously.
func (e *exiter) waitForExit() int {
return <-e.stop
}
var exit = &exiter{make(chan int)}
func main() { func main() {
options := parseCommandLineOptions() options := parseCommandLineOptions()
l.SetFlags(options.logFlags) l.SetFlags(options.logFlags)
@ -339,10 +302,10 @@ func main() {
options.logFile = locations.Get(locations.LogFile) options.logFile = locations.Get(locations.LogFile)
} }
if options.assetDir == "" { if options.AssetDir == "" {
// The asset dir is blank if STGUIASSETS wasn't set, in which case we // The asset dir is blank if STGUIASSETS wasn't set, in which case we
// should look for extra assets in the default place. // should look for extra assets in the default place.
options.assetDir = locations.Get(locations.GUIAssets) options.AssetDir = locations.Get(locations.GUIAssets)
} }
if options.showVersion { if options.showVersion {
@ -536,7 +499,7 @@ func checkUpgrade() upgrade.Release {
func performUpgrade(release upgrade.Release) { func performUpgrade(release upgrade.Release) {
// Use leveldb database locks to protect against concurrent upgrades // Use leveldb database locks to protect against concurrent upgrades
_, err := db.Open(locations.Get(locations.Database)) _, err := syncthing.OpenGoleveldb(locations.Get(locations.Database))
if err == nil { if err == nil {
err = upgrade.To(release) err = upgrade.To(release)
if err != nil { if err != nil {
@ -593,48 +556,18 @@ func upgradeViaRest() error {
} }
func syncthingMain(runtimeOptions RuntimeOptions) { func syncthingMain(runtimeOptions RuntimeOptions) {
setupSignalHandling() cfg, err := loadConfigAtStartup(runtimeOptions.allowNewerConfig)
if err != nil {
// Create a main service manager. We'll add things to this as we go along. l.Warnln("Failed to initialize config:", err)
// We want any logging it does to go through our log system. os.Exit(exitError)
mainService := suture.New("main", suture.Spec{
Log: func(line string) {
l.Debugln(line)
},
PassThroughPanics: true,
})
mainService.ServeBackground()
// Set a log prefix similar to the ID we will have later on, or early log
// lines look ugly.
l.SetPrefix("[start] ")
if runtimeOptions.auditEnabled {
startAuditing(mainService, runtimeOptions.auditFile)
} }
if runtimeOptions.verbose { if runtimeOptions.unpaused {
mainService.Add(newVerboseService()) setPauseState(cfg, false)
} else if runtimeOptions.paused {
setPauseState(cfg, true)
} }
errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0)
systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog)
// Event subscription for the API; must start early to catch the early
// events. The LocalChangeDetected event might overwhelm the event
// receiver in some situations so we will not subscribe to it here.
defaultSub := events.NewBufferedSubscription(events.Default.Subscribe(api.DefaultEventMask), api.EventSubBufferSize)
diskSub := events.NewBufferedSubscription(events.Default.Subscribe(api.DiskEventMask), api.EventSubBufferSize)
if len(os.Getenv("GOMAXPROCS")) == 0 {
runtime.GOMAXPROCS(runtime.NumCPU())
}
// Attempt to increase the limit on number of open files to the maximum
// allowed, in case we have many peers. We don't really care enough to
// report the error if there is one.
osutil.MaximizeOpenFileLimit()
// Ensure that we have a certificate and key. // Ensure that we have a certificate and key.
cert, err := tls.LoadX509KeyPair( cert, err := tls.LoadX509KeyPair(
locations.Get(locations.CertFile), locations.Get(locations.CertFile),
@ -648,190 +581,33 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
tlsDefaultCommonName, tlsDefaultCommonName,
) )
if err != nil { if err != nil {
l.Infoln("Failed to generate certificate:", err) l.Warnln("Failed to generate certificate:", err)
os.Exit(exitError) os.Exit(1)
} }
} }
myID = protocol.NewDeviceID(cert.Certificate[0])
l.SetPrefix(fmt.Sprintf("[%s] ", myID.String()[:5]))
l.Infoln(build.LongVersion)
l.Infoln("My ID:", myID)
// Select SHA256 implementation and report. Affected by the
// STHASHING environment variable.
sha256.SelectAlgo()
sha256.Report()
// Emit the Starting event, now that we know who we are.
events.Default.Log(events.Starting, map[string]string{
"home": locations.GetBaseDir(locations.ConfigBaseDir),
"myID": myID.String(),
})
cfg, err := loadConfigAtStartup(runtimeOptions.allowNewerConfig)
if err != nil {
l.Warnln("Failed to initialize config:", err)
os.Exit(exitError)
}
if err := checkShortIDs(cfg); err != nil {
l.Warnln("Short device IDs are in conflict. Unlucky!\n Regenerate the device ID of one of the following:\n ", err)
os.Exit(exitError)
}
if len(runtimeOptions.profiler) > 0 {
go func() {
l.Debugln("Starting profiler on", runtimeOptions.profiler)
runtime.SetBlockProfileRate(1)
err := http.ListenAndServe(runtimeOptions.profiler, nil)
if err != nil {
l.Warnln(err)
os.Exit(exitError)
}
}()
}
perf := ur.CpuBench(3, 150*time.Millisecond, true)
l.Infof("Hashing performance is %.02f MB/s", perf)
dbFile := locations.Get(locations.Database) dbFile := locations.Get(locations.Database)
ldb, err := db.Open(dbFile) ldb, err := syncthing.OpenGoleveldb(dbFile)
if err != nil { if err != nil {
l.Warnln("Error opening database:", err) l.Warnln("Error opening database:", err)
os.Exit(exitError) os.Exit(1)
}
if err := db.UpdateSchema(ldb); err != nil {
l.Warnln("Database schema:", err)
os.Exit(exitError)
} }
if runtimeOptions.resetDeltaIdxs { appOpts := runtimeOptions.Options
l.Infoln("Reinitializing delta index IDs") if runtimeOptions.auditEnabled {
db.DropDeltaIndexIDs(ldb) appOpts.AuditWriter = auditWriter(runtimeOptions.auditFile)
} }
protectedFiles := []string{
locations.Get(locations.Database),
locations.Get(locations.ConfigFile),
locations.Get(locations.CertFile),
locations.Get(locations.KeyFile),
}
// Remove database entries for folders that no longer exist in the config
folders := cfg.Folders()
for _, folder := range ldb.ListFolders() {
if _, ok := folders[folder]; !ok {
l.Infof("Cleaning data for dropped folder %q", folder)
db.DropFolder(ldb, folder)
}
}
// Grab the previously running version string from the database.
miscDB := db.NewMiscDataNamespace(ldb)
prevVersion, _ := miscDB.String("prevVersion")
// Strip away prerelease/beta stuff and just compare the release
// numbers. 0.14.44 to 0.14.45-banana is an upgrade, 0.14.45-banana to
// 0.14.45-pineapple is not.
prevParts := strings.Split(prevVersion, "-")
curParts := strings.Split(build.Version, "-")
if prevParts[0] != curParts[0] {
if prevVersion != "" {
l.Infoln("Detected upgrade from", prevVersion, "to", build.Version)
}
// Drop delta indexes in case we've changed random stuff we
// shouldn't have. We will resend our index on next connect.
db.DropDeltaIndexIDs(ldb)
// Remember the new version.
miscDB.PutString("prevVersion", build.Version)
}
m := model.NewModel(cfg, myID, "syncthing", build.Version, ldb, protectedFiles)
if t := os.Getenv("STDEADLOCKTIMEOUT"); t != "" { if t := os.Getenv("STDEADLOCKTIMEOUT"); t != "" {
if secs, _ := strconv.Atoi(t); secs > 0 { secs, _ := strconv.Atoi(t)
m.StartDeadlockDetector(time.Duration(secs) * time.Second) appOpts.DeadlockTimeoutS = secs
}
} else if !build.IsRelease || build.IsBeta {
m.StartDeadlockDetector(20 * time.Minute)
} }
if runtimeOptions.unpaused { app := syncthing.New(cfg, ldb, cert, appOpts)
setPauseState(cfg, false)
} else if runtimeOptions.paused {
setPauseState(cfg, true)
}
// Add and start folders setupSignalHandling(app)
for _, folderCfg := range cfg.Folders() {
if folderCfg.Paused {
folderCfg.CreateRoot()
continue
}
m.AddFolder(folderCfg)
m.StartFolder(folderCfg.ID)
}
mainService.Add(m) if len(os.Getenv("GOMAXPROCS")) == 0 {
runtime.GOMAXPROCS(runtime.NumCPU())
// Start discovery
cachedDiscovery := discover.NewCachingMux()
mainService.Add(cachedDiscovery)
// The TLS configuration is used for both the listening socket and outgoing
// connections.
tlsCfg := tlsutil.SecureDefault()
tlsCfg.Certificates = []tls.Certificate{cert}
tlsCfg.NextProtos = []string{bepProtocolName}
tlsCfg.ClientAuth = tls.RequestClientCert
tlsCfg.SessionTicketsDisabled = true
tlsCfg.InsecureSkipVerify = true
// Start connection management
connectionsService := connections.NewService(cfg, myID, m, tlsCfg, cachedDiscovery, bepProtocolName, tlsDefaultCommonName)
mainService.Add(connectionsService)
if cfg.Options().GlobalAnnEnabled {
for _, srv := range cfg.GlobalDiscoveryServers() {
l.Infoln("Using discovery server", srv)
gd, err := discover.NewGlobal(srv, cert, connectionsService)
if err != nil {
l.Warnln("Global discovery:", err)
continue
}
// Each global discovery server gets its results cached for five
// minutes, and is not asked again for a minute when it's returned
// unsuccessfully.
cachedDiscovery.Add(gd, 5*time.Minute, time.Minute)
}
}
if cfg.Options().LocalAnnEnabled {
// v4 broadcasts
bcd, err := discover.NewLocal(myID, fmt.Sprintf(":%d", cfg.Options().LocalAnnPort), connectionsService)
if err != nil {
l.Warnln("IPv4 local discovery:", err)
} else {
cachedDiscovery.Add(bcd, 0, 0)
}
// v6 multicasts
mcd, err := discover.NewLocal(myID, cfg.Options().LocalAnnMCAddr, connectionsService)
if err != nil {
l.Warnln("IPv6 local discovery:", err)
} else {
cachedDiscovery.Add(mcd, 0, 0)
}
} }
if runtimeOptions.cpuProfile { if runtimeOptions.cpuProfile {
@ -846,49 +622,15 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
} }
} }
// Candidate builds always run with usage reporting.
if opts := cfg.Options(); build.IsCandidate {
l.Infoln("Anonymous usage reporting is always enabled for candidate releases.")
if opts.URAccepted != ur.Version {
opts.URAccepted = ur.Version
cfg.SetOptions(opts)
cfg.Save()
// Unique ID will be set and config saved below if necessary.
}
}
// If we are going to do usage reporting, ensure we have a valid unique ID.
if opts := cfg.Options(); opts.URAccepted > 0 && opts.URUniqueID == "" {
opts.URUniqueID = rand.String(8)
cfg.SetOptions(opts)
cfg.Save()
}
usageReportingSvc := ur.New(cfg, m, connectionsService, noUpgradeFromEnv)
mainService.Add(usageReportingSvc)
// GUI
setupGUI(mainService, cfg, m, defaultSub, diskSub, cachedDiscovery, connectionsService, usageReportingSvc, errors, systemLog, runtimeOptions)
myDev, _ := cfg.Device(myID)
l.Infof(`My name is "%v"`, myDev.Name)
for _, device := range cfg.Devices() {
if device.DeviceID != myID {
l.Infof(`Device %s is "%v" at %v`, device.DeviceID, device.Name, device.Addresses)
}
}
if opts := cfg.Options(); opts.RestartOnWakeup { if opts := cfg.Options(); opts.RestartOnWakeup {
go standbyMonitor() go standbyMonitor(app)
} }
// Candidate builds should auto upgrade. Make sure the option is set, // Candidate builds should auto upgrade. Make sure the option is set,
// unless we are in a build where it's disabled or the STNOUPGRADE // unless we are in a build where it's disabled or the STNOUPGRADE
// environment variable is set. // environment variable is set.
if build.IsCandidate && !upgrade.DisabledByCompilation && !noUpgradeFromEnv { if build.IsCandidate && !upgrade.DisabledByCompilation && !runtimeOptions.NoUpgrade {
l.Infoln("Automatic upgrade is always enabled for candidate releases.") l.Infoln("Automatic upgrade is always enabled for candidate releases.")
if opts := cfg.Options(); opts.AutoUpgradeIntervalH == 0 || opts.AutoUpgradeIntervalH > 24 { if opts := cfg.Options(); opts.AutoUpgradeIntervalH == 0 || opts.AutoUpgradeIntervalH > 24 {
opts.AutoUpgradeIntervalH = 12 opts.AutoUpgradeIntervalH = 12
@ -902,54 +644,33 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
} }
if opts := cfg.Options(); opts.AutoUpgradeIntervalH > 0 { if opts := cfg.Options(); opts.AutoUpgradeIntervalH > 0 {
if noUpgradeFromEnv { if runtimeOptions.NoUpgrade {
l.Infof("No automatic upgrades; STNOUPGRADE environment variable defined.") l.Infof("No automatic upgrades; STNOUPGRADE environment variable defined.")
} else { } else {
go autoUpgrade(cfg) go autoUpgrade(cfg, app)
} }
} }
if isSuperUser() { app.Start()
l.Warnln("Syncthing should not run as a privileged or system user. Please consider using a normal user account.")
}
events.Default.Log(events.StartupComplete, map[string]string{
"myID": myID.String(),
})
cleanConfigDirectory() cleanConfigDirectory()
if cfg.Options().SetLowPriority { if cfg.Options().StartBrowser && !runtimeOptions.noBrowser && !runtimeOptions.stRestarting {
if err := osutil.SetLowPriority(); err != nil { // Can potentially block if the utility we are invoking doesn't
l.Warnln("Failed to lower process priority:", err) // fork, and just execs, hence keep it in its own routine.
} go func() { _ = openURL(cfg.GUI().URL()) }()
} }
code := exit.waitForExit() status := app.Wait()
mainService.Stop()
done := make(chan struct{})
go func() {
ldb.Close()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
l.Warnln("Database failed to stop within 10s")
}
l.Infoln("Exiting")
if runtimeOptions.cpuProfile { if runtimeOptions.cpuProfile {
pprof.StopCPUProfile() pprof.StopCPUProfile()
} }
os.Exit(code) os.Exit(int(status))
} }
func setupSignalHandling() { func setupSignalHandling(app *syncthing.App) {
// Exit cleanly with "restarting" code on SIGHUP. // Exit cleanly with "restarting" code on SIGHUP.
restartSign := make(chan os.Signal, 1) restartSign := make(chan os.Signal, 1)
@ -957,7 +678,7 @@ func setupSignalHandling() {
signal.Notify(restartSign, sigHup) signal.Notify(restartSign, sigHup)
go func() { go func() {
<-restartSign <-restartSign
exit.Restart() app.Stop(syncthing.ExitRestart)
}() }()
// Exit with "success" code (no restart) on INT/TERM // Exit with "success" code (no restart) on INT/TERM
@ -967,7 +688,7 @@ func setupSignalHandling() {
signal.Notify(stopSign, os.Interrupt, sigTerm) signal.Notify(stopSign, os.Interrupt, sigTerm)
go func() { go func() {
<-stopSign <-stopSign
exit.Shutdown() app.Stop(syncthing.ExitSuccess)
}() }()
} }
@ -1044,8 +765,7 @@ func copyFile(src, dst string) error {
return nil return nil
} }
func startAuditing(mainService *suture.Supervisor, auditFile string) { func auditWriter(auditFile string) io.Writer {
var fd io.Writer var fd io.Writer
var err error var err error
var auditDest string var auditDest string
@ -1072,46 +792,9 @@ func startAuditing(mainService *suture.Supervisor, auditFile string) {
auditDest = auditFile auditDest = auditFile
} }
auditService := newAuditService(fd)
mainService.Add(auditService)
// We wait for the audit service to fully start before we return, to
// ensure we capture all events from the start.
auditService.WaitForStart()
l.Infoln("Audit log in", auditDest) l.Infoln("Audit log in", auditDest)
}
func setupGUI(mainService *suture.Supervisor, cfg config.Wrapper, m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) { return fd
guiCfg := cfg.GUI()
if !guiCfg.Enabled {
return
}
if guiCfg.InsecureAdminAccess {
l.Warnln("Insecure admin access is enabled.")
}
cpu := newCPUService()
mainService.Add(cpu)
summaryService := model.NewFolderSummaryService(cfg, m, myID)
mainService.Add(summaryService)
apiSvc := api.New(myID, cfg, runtimeOptions.assetDir, tlsDefaultCommonName, m, defaultSub, diskSub, discoverer, connectionsService, urService, summaryService, errors, systemLog, cpu, exit, noUpgradeFromEnv)
mainService.Add(apiSvc)
if err := apiSvc.WaitForStart(); err != nil {
l.Warnln("Failed starting API:", err)
os.Exit(exitError)
}
if cfg.Options().StartBrowser && !runtimeOptions.noBrowser && !runtimeOptions.stRestarting {
// Can potentially block if the utility we are invoking doesn't
// fork, and just execs, hence keep it in its own routine.
go func() { _ = openURL(guiCfg.URL()) }()
}
} }
func defaultConfig(cfgFile string) (config.Wrapper, error) { func defaultConfig(cfgFile string) (config.Wrapper, error) {
@ -1157,7 +840,7 @@ func ensureDir(dir string, mode fs.FileMode) error {
return nil return nil
} }
func standbyMonitor() { func standbyMonitor(app *syncthing.App) {
restartDelay := 60 * time.Second restartDelay := 60 * time.Second
now := time.Now() now := time.Now()
for { for {
@ -1170,14 +853,14 @@ func standbyMonitor() {
// things a moment to stabilize. // things a moment to stabilize.
time.Sleep(restartDelay) time.Sleep(restartDelay)
exit.Restart() app.Stop(syncthing.ExitRestart)
return return
} }
now = time.Now() now = time.Now()
} }
} }
func autoUpgrade(cfg config.Wrapper) { func autoUpgrade(cfg config.Wrapper, app *syncthing.App) {
timer := time.NewTimer(0) timer := time.NewTimer(0)
sub := events.Default.Subscribe(events.DeviceConnected) sub := events.Default.Subscribe(events.DeviceConnected)
for { for {
@ -1228,7 +911,7 @@ func autoUpgrade(cfg config.Wrapper) {
events.Default.Unsubscribe(sub) events.Default.Unsubscribe(sub)
l.Warnf("Automatically upgraded to version %q. Restarting in 1 minute.", rel.Tag) l.Warnf("Automatically upgraded to version %q. Restarting in 1 minute.", rel.Tag)
time.Sleep(time.Minute) time.Sleep(time.Minute)
exit.ExitUpgrading() app.Stop(syncthing.ExitUpgrade)
return return
} }
} }
@ -1276,28 +959,13 @@ func cleanConfigDirectory() {
} }
} }
// checkShortIDs verifies that the configuration won't result in duplicate
// short ID:s; that is, that the devices in the cluster all have unique
// initial 64 bits.
func checkShortIDs(cfg config.Wrapper) error {
exists := make(map[protocol.ShortID]protocol.DeviceID)
for deviceID := range cfg.Devices() {
shortID := deviceID.Short()
if otherID, ok := exists[shortID]; ok {
return fmt.Errorf("%v in conflict with %v", deviceID, otherID)
}
exists[shortID] = deviceID
}
return nil
}
func showPaths(options RuntimeOptions) { func showPaths(options RuntimeOptions) {
fmt.Printf("Configuration file:\n\t%s\n\n", locations.Get(locations.ConfigFile)) fmt.Printf("Configuration file:\n\t%s\n\n", locations.Get(locations.ConfigFile))
fmt.Printf("Database directory:\n\t%s\n\n", locations.Get(locations.Database)) fmt.Printf("Database directory:\n\t%s\n\n", locations.Get(locations.Database))
fmt.Printf("Device private key & certificate files:\n\t%s\n\t%s\n\n", locations.Get(locations.KeyFile), locations.Get(locations.CertFile)) fmt.Printf("Device private key & certificate files:\n\t%s\n\t%s\n\n", locations.Get(locations.KeyFile), locations.Get(locations.CertFile))
fmt.Printf("HTTPS private key & certificate files:\n\t%s\n\t%s\n\n", locations.Get(locations.HTTPSKeyFile), locations.Get(locations.HTTPSCertFile)) fmt.Printf("HTTPS private key & certificate files:\n\t%s\n\t%s\n\n", locations.Get(locations.HTTPSKeyFile), locations.Get(locations.HTTPSCertFile))
fmt.Printf("Log file:\n\t%s\n\n", options.logFile) fmt.Printf("Log file:\n\t%s\n\n", options.logFile)
fmt.Printf("GUI override directory:\n\t%s\n\n", options.assetDir) fmt.Printf("GUI override directory:\n\t%s\n\n", options.AssetDir)
fmt.Printf("Default sync folder directory:\n\t%s\n\n", locations.Get(locations.DefFolder)) fmt.Printf("Default sync folder directory:\n\t%s\n\n", locations.Get(locations.DefFolder))
} }

View File

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file, // License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/. // You can obtain one at https://mozilla.org/MPL/2.0/.
package main package syncthing
import ( import (
"encoding/json" "encoding/json"

View File

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file, // License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/. // You can obtain one at https://mozilla.org/MPL/2.0/.
package main package syncthing
import ( import (
"bytes" "bytes"

View File

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file, // License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/. // You can obtain one at https://mozilla.org/MPL/2.0/.
package main package syncthing
import ( import (
"math" "math"

View File

@ -6,7 +6,7 @@
//+build solaris //+build solaris
package main package syncthing
import ( import (
"encoding/binary" "encoding/binary"

View File

@ -6,7 +6,7 @@
//+build !windows,!solaris //+build !windows,!solaris
package main package syncthing
import "syscall" import "syscall"
import "time" import "time"

View File

@ -6,7 +6,7 @@
//+build windows //+build windows
package main package syncthing
import "syscall" import "syscall"
import "time" import "time"

22
lib/syncthing/debug.go Normal file
View File

@ -0,0 +1,22 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package syncthing
import (
"os"
"strings"
"github.com/syncthing/syncthing/lib/logger"
)
var (
l = logger.DefaultLogger.NewFacility("app", "Main run facility")
)
func init() {
l.SetDebug("app", strings.Contains(os.Getenv("STTRACE"), "app") || os.Getenv("STTRACE") == "all")
}

View File

@ -6,7 +6,7 @@
// +build !windows // +build !windows
package main package syncthing
import ( import (
"os" "os"

View File

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file, // License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/. // You can obtain one at https://mozilla.org/MPL/2.0/.
package main package syncthing
import "syscall" import "syscall"

496
lib/syncthing/syncthing.go Normal file
View File

@ -0,0 +1,496 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package syncthing
import (
"crypto/tls"
"fmt"
"io"
"net/http"
"runtime"
"strings"
"sync"
"time"
"github.com/syncthing/syncthing/lib/api"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/locations"
"github.com/syncthing/syncthing/lib/logger"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/syncthing/lib/ur"
"github.com/thejerf/suture"
)
const (
bepProtocolName = "bep/1.0"
tlsDefaultCommonName = "syncthing"
maxSystemErrors = 5
initialSystemLog = 10
maxSystemLog = 250
)
type ExitStatus int
const (
ExitSuccess ExitStatus = 0
ExitError ExitStatus = 1
ExitRestart ExitStatus = 3
ExitUpgrade ExitStatus = 4
)
type Options struct {
AssetDir string
AuditWriter io.Writer
DeadlockTimeoutS int
NoUpgrade bool
ProfilerURL string
ResetDeltaIdxs bool
Verbose bool
}
type App struct {
myID protocol.DeviceID
mainService *suture.Supervisor
cfg config.Wrapper
ll *db.Lowlevel
cert tls.Certificate
opts Options
exitStatus ExitStatus
err error
startOnce sync.Once
stop chan struct{}
stopped chan struct{}
}
func New(cfg config.Wrapper, ll *db.Lowlevel, cert tls.Certificate, opts Options) *App {
return &App{
cfg: cfg,
ll: ll,
opts: opts,
cert: cert,
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
}
// Run does the same as start, but then does not return until the app stops. It
// is equivalent to calling Start and then Wait.
func (a *App) Run() ExitStatus {
a.Start()
return a.Wait()
}
// Start executes the app and returns once all the startup operations are done,
// e.g. the API is ready for use.
func (a *App) Start() {
a.startOnce.Do(func() {
if err := a.startup(); err != nil {
close(a.stop)
a.exitStatus = ExitError
a.err = err
close(a.stopped)
return
}
go a.run()
})
}
func (a *App) startup() error {
// Create a main service manager. We'll add things to this as we go along.
// We want any logging it does to go through our log system.
a.mainService = suture.New("main", suture.Spec{
Log: func(line string) {
l.Debugln(line)
},
PassThroughPanics: true,
})
a.mainService.ServeBackground()
// Set a log prefix similar to the ID we will have later on, or early log
// lines look ugly.
l.SetPrefix("[start] ")
if a.opts.AuditWriter != nil {
a.startAuditing()
}
if a.opts.Verbose {
a.mainService.Add(newVerboseService())
}
errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0)
systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog)
// Event subscription for the API; must start early to catch the early
// events. The LocalChangeDetected event might overwhelm the event
// receiver in some situations so we will not subscribe to it here.
defaultSub := events.NewBufferedSubscription(events.Default.Subscribe(api.DefaultEventMask), api.EventSubBufferSize)
diskSub := events.NewBufferedSubscription(events.Default.Subscribe(api.DiskEventMask), api.EventSubBufferSize)
// Attempt to increase the limit on number of open files to the maximum
// allowed, in case we have many peers. We don't really care enough to
// report the error if there is one.
osutil.MaximizeOpenFileLimit()
a.myID = protocol.NewDeviceID(a.cert.Certificate[0])
l.SetPrefix(fmt.Sprintf("[%s] ", a.myID.String()[:5]))
l.Infoln(build.LongVersion)
l.Infoln("My ID:", a.myID)
// Select SHA256 implementation and report. Affected by the
// STHASHING environment variable.
sha256.SelectAlgo()
sha256.Report()
// Emit the Starting event, now that we know who we are.
events.Default.Log(events.Starting, map[string]string{
"home": locations.GetBaseDir(locations.ConfigBaseDir),
"myID": a.myID.String(),
})
if err := checkShortIDs(a.cfg); err != nil {
l.Warnln("Short device IDs are in conflict. Unlucky!\n Regenerate the device ID of one of the following:\n ", err)
return err
}
if len(a.opts.ProfilerURL) > 0 {
go func() {
l.Debugln("Starting profiler on", a.opts.ProfilerURL)
runtime.SetBlockProfileRate(1)
err := http.ListenAndServe(a.opts.ProfilerURL, nil)
if err != nil {
l.Warnln(err)
return
}
}()
}
perf := ur.CpuBench(3, 150*time.Millisecond, true)
l.Infof("Hashing performance is %.02f MB/s", perf)
if err := db.UpdateSchema(a.ll); err != nil {
l.Warnln("Database schema:", err)
return err
}
if a.opts.ResetDeltaIdxs {
l.Infoln("Reinitializing delta index IDs")
db.DropDeltaIndexIDs(a.ll)
}
protectedFiles := []string{
locations.Get(locations.Database),
locations.Get(locations.ConfigFile),
locations.Get(locations.CertFile),
locations.Get(locations.KeyFile),
}
// Remove database entries for folders that no longer exist in the config
folders := a.cfg.Folders()
for _, folder := range a.ll.ListFolders() {
if _, ok := folders[folder]; !ok {
l.Infof("Cleaning data for dropped folder %q", folder)
db.DropFolder(a.ll, folder)
}
}
// Grab the previously running version string from the database.
miscDB := db.NewMiscDataNamespace(a.ll)
prevVersion, _ := miscDB.String("prevVersion")
// Strip away prerelease/beta stuff and just compare the release
// numbers. 0.14.44 to 0.14.45-banana is an upgrade, 0.14.45-banana to
// 0.14.45-pineapple is not.
prevParts := strings.Split(prevVersion, "-")
curParts := strings.Split(build.Version, "-")
if prevParts[0] != curParts[0] {
if prevVersion != "" {
l.Infoln("Detected upgrade from", prevVersion, "to", build.Version)
}
// Drop delta indexes in case we've changed random stuff we
// shouldn't have. We will resend our index on next connect.
db.DropDeltaIndexIDs(a.ll)
// Remember the new version.
miscDB.PutString("prevVersion", build.Version)
}
m := model.NewModel(a.cfg, a.myID, "syncthing", build.Version, a.ll, protectedFiles)
if a.opts.DeadlockTimeoutS > 0 {
m.StartDeadlockDetector(time.Duration(a.opts.DeadlockTimeoutS) * time.Second)
} else if !build.IsRelease || build.IsBeta {
m.StartDeadlockDetector(20 * time.Minute)
}
// Add and start folders
for _, folderCfg := range a.cfg.Folders() {
if folderCfg.Paused {
folderCfg.CreateRoot()
continue
}
m.AddFolder(folderCfg)
m.StartFolder(folderCfg.ID)
}
a.mainService.Add(m)
// Start discovery
cachedDiscovery := discover.NewCachingMux()
a.mainService.Add(cachedDiscovery)
// The TLS configuration is used for both the listening socket and outgoing
// connections.
tlsCfg := tlsutil.SecureDefault()
tlsCfg.Certificates = []tls.Certificate{a.cert}
tlsCfg.NextProtos = []string{bepProtocolName}
tlsCfg.ClientAuth = tls.RequestClientCert
tlsCfg.SessionTicketsDisabled = true
tlsCfg.InsecureSkipVerify = true
// Start connection management
connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, cachedDiscovery, bepProtocolName, tlsDefaultCommonName)
a.mainService.Add(connectionsService)
if a.cfg.Options().GlobalAnnEnabled {
for _, srv := range a.cfg.GlobalDiscoveryServers() {
l.Infoln("Using discovery server", srv)
gd, err := discover.NewGlobal(srv, a.cert, connectionsService)
if err != nil {
l.Warnln("Global discovery:", err)
continue
}
// Each global discovery server gets its results cached for five
// minutes, and is not asked again for a minute when it's returned
// unsuccessfully.
cachedDiscovery.Add(gd, 5*time.Minute, time.Minute)
}
}
if a.cfg.Options().LocalAnnEnabled {
// v4 broadcasts
bcd, err := discover.NewLocal(a.myID, fmt.Sprintf(":%d", a.cfg.Options().LocalAnnPort), connectionsService)
if err != nil {
l.Warnln("IPv4 local discovery:", err)
} else {
cachedDiscovery.Add(bcd, 0, 0)
}
// v6 multicasts
mcd, err := discover.NewLocal(a.myID, a.cfg.Options().LocalAnnMCAddr, connectionsService)
if err != nil {
l.Warnln("IPv6 local discovery:", err)
} else {
cachedDiscovery.Add(mcd, 0, 0)
}
}
// Candidate builds always run with usage reporting.
if opts := a.cfg.Options(); build.IsCandidate {
l.Infoln("Anonymous usage reporting is always enabled for candidate releases.")
if opts.URAccepted != ur.Version {
opts.URAccepted = ur.Version
a.cfg.SetOptions(opts)
a.cfg.Save()
// Unique ID will be set and config saved below if necessary.
}
}
// If we are going to do usage reporting, ensure we have a valid unique ID.
if opts := a.cfg.Options(); opts.URAccepted > 0 && opts.URUniqueID == "" {
opts.URUniqueID = rand.String(8)
a.cfg.SetOptions(opts)
a.cfg.Save()
}
usageReportingSvc := ur.New(a.cfg, m, connectionsService, a.opts.NoUpgrade)
a.mainService.Add(usageReportingSvc)
// GUI
if err := a.setupGUI(m, defaultSub, diskSub, cachedDiscovery, connectionsService, usageReportingSvc, errors, systemLog); err != nil {
l.Warnln("Failed starting API:", err)
return err
}
myDev, _ := a.cfg.Device(a.myID)
l.Infof(`My name is "%v"`, myDev.Name)
for _, device := range a.cfg.Devices() {
if device.DeviceID != a.myID {
l.Infof(`Device %s is "%v" at %v`, device.DeviceID, device.Name, device.Addresses)
}
}
if isSuperUser() {
l.Warnln("Syncthing should not run as a privileged or system user. Please consider using a normal user account.")
}
events.Default.Log(events.StartupComplete, map[string]string{
"myID": a.myID.String(),
})
if a.cfg.Options().SetLowPriority {
if err := osutil.SetLowPriority(); err != nil {
l.Warnln("Failed to lower process priority:", err)
}
}
return nil
}
func (a *App) run() {
<-a.stop
a.mainService.Stop()
done := make(chan struct{})
go func() {
a.ll.Close()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
l.Warnln("Database failed to stop within 10s")
}
l.Infoln("Exiting")
close(a.stopped)
}
// Wait blocks until the app stops running.
func (a *App) Wait() ExitStatus {
<-a.stopped
return a.exitStatus
}
// Error returns an error if one occurred while running the app. It does not wait
// for the app to stop before returning.
func (a *App) Error() error {
select {
case <-a.stopped:
return nil
default:
}
return a.err
}
// Stop stops the app and sets its exit status to given reason, unless the app
// was already stopped before. In any case it returns the effective exit status.
func (a *App) Stop(stopReason ExitStatus) ExitStatus {
select {
case <-a.stopped:
case <-a.stop:
default:
close(a.stop)
}
<-a.stopped
// ExitSuccess is the default value for a.exitStatus. If another status
// was already set, ignore the stop reason given as argument to Stop.
if a.exitStatus == ExitSuccess {
a.exitStatus = stopReason
}
return a.exitStatus
}
func (a *App) startAuditing() {
auditService := newAuditService(a.opts.AuditWriter)
a.mainService.Add(auditService)
// We wait for the audit service to fully start before we return, to
// ensure we capture all events from the start.
auditService.WaitForStart()
}
func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error {
guiCfg := a.cfg.GUI()
if !guiCfg.Enabled {
return nil
}
if guiCfg.InsecureAdminAccess {
l.Warnln("Insecure admin access is enabled.")
}
cpu := newCPUService()
a.mainService.Add(cpu)
summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID)
a.mainService.Add(summaryService)
apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, discoverer, connectionsService, urService, summaryService, errors, systemLog, cpu, &controller{a}, a.opts.NoUpgrade)
a.mainService.Add(apiSvc)
if err := apiSvc.WaitForStart(); err != nil {
return err
}
return nil
}
// checkShortIDs verifies that the configuration won't result in duplicate
// short ID:s; that is, that the devices in the cluster all have unique
// initial 64 bits.
func checkShortIDs(cfg config.Wrapper) error {
exists := make(map[protocol.ShortID]protocol.DeviceID)
for deviceID := range cfg.Devices() {
shortID := deviceID.Short()
if otherID, ok := exists[shortID]; ok {
return fmt.Errorf("%v in conflict with %v", deviceID, otherID)
}
exists[shortID] = deviceID
}
return nil
}
// Implements api.Controller
type controller struct{ *App }
func (e *controller) Restart() {
e.Stop(ExitRestart)
}
func (e *controller) Shutdown() {
e.Stop(ExitSuccess)
}
func (e *controller) ExitUpgrading() {
e.Stop(ExitUpgrade)
}
func LoadCertificate(certFile, keyFile string) (tls.Certificate, error) {
return tls.LoadX509KeyPair(certFile, keyFile)
}
func LoadConfig(path string, cert tls.Certificate) (config.Wrapper, error) {
return config.Load(path, protocol.NewDeviceID(cert.Certificate[0]))
}
func OpenGoleveldb(path string) (*db.Lowlevel, error) {
return db.Open(path)
}

View File

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file, // License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/. // You can obtain one at https://mozilla.org/MPL/2.0/.
package main package syncthing
import ( import (
"testing" "testing"

View File

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file, // License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/. // You can obtain one at https://mozilla.org/MPL/2.0/.
package main package syncthing
import ( import (
"fmt" "fmt"