syncthing/lib/rc/rc.go

661 lines
15 KiB
Go
Raw Normal View History

// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
// Package rc provides remote control of a Syncthing process via the REST API.
package rc
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
2015-07-31 16:52:50 +00:00
"net/url"
"os"
"os/exec"
"path/filepath"
2015-07-31 16:52:50 +00:00
"strconv"
"time"
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/config"
2015-10-13 18:52:22 +00:00
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/events"
2015-09-22 17:38:46 +00:00
"github.com/syncthing/syncthing/lib/protocol"
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/sync"
)
2015-10-31 10:15:49 +00:00
// APIKey is set via the STGUIAPIKEY variable when we launch the binary, to
// ensure that we have API access regardless of authentication settings.
const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
type Process struct {
// Set at initialization
addr string
// Set by eventLoop()
eventMut sync.Mutex
id protocol.DeviceID
folders []string
startComplete chan struct{}
stopped chan struct{}
stopErr error
sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
done map[string]bool // Folder ID => 100%
cmd *exec.Cmd
logfd *os.File
}
// NewProcess returns a new Process talking to Syncthing at the specified address.
// Example: NewProcess("127.0.0.1:8082")
func NewProcess(addr string) *Process {
p := &Process{
addr: addr,
sequence: make(map[string]map[string]int64),
done: make(map[string]bool),
eventMut: sync.NewMutex(),
startComplete: make(chan struct{}),
stopped: make(chan struct{}),
}
return p
}
func (p *Process) ID() protocol.DeviceID {
return p.id
}
// LogTo creates the specified log file and ensures that stdout and stderr
// from the Start()ed process is redirected there. Must be called before
// Start().
func (p *Process) LogTo(filename string) error {
if p.cmd != nil {
panic("logfd cannot be set with an existing cmd")
}
if p.logfd != nil {
p.logfd.Close()
}
fd, err := os.Create(filename)
if err != nil {
return err
}
p.logfd = fd
return nil
}
// Start runs the specified Syncthing binary with the given arguments.
// Syncthing should be configured to provide an API on the address given to
// NewProcess. Event processing is started.
func (p *Process) Start(bin string, args ...string) error {
cmd := exec.Command(bin, args...)
if p.logfd != nil {
cmd.Stdout = p.logfd
cmd.Stderr = p.logfd
}
cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
err := cmd.Start()
if err != nil {
return err
}
p.cmd = cmd
go p.eventLoop()
go p.wait()
return nil
}
func (p *Process) wait() {
2019-02-02 11:16:27 +00:00
p.cmd.Wait()
if p.logfd != nil {
p.stopErr = p.checkForProblems(p.logfd)
}
close(p.stopped)
}
// AwaitStartup waits for the Syncthing process to start and perform initial
// scans of all folders.
func (p *Process) AwaitStartup() {
select {
case <-p.startComplete:
case <-p.stopped:
}
}
// Stop stops the running Syncthing process. If the process was logging to a
// local file (set by LogTo), the log file will be opened and checked for
// panics and data races. The presence of either will be signalled in the form
// of a returned error.
func (p *Process) Stop() (*os.ProcessState, error) {
select {
case <-p.stopped:
return p.cmd.ProcessState, p.stopErr
default:
}
if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
// Unexpected EOF is somewhat expected here, as we may exit before
// returning something sensible.
return nil, err
}
<-p.stopped
return p.cmd.ProcessState, p.stopErr
}
// Stopped returns a channel that will be closed when Syncthing has stopped.
func (p *Process) Stopped() chan struct{} {
return p.stopped
}
// Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
// return code is returned as an error.
func (p *Process) Get(path string) ([]byte, error) {
client := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DialContext: dialer.DialContext,
2015-10-13 18:52:22 +00:00
Proxy: http.ProxyFromEnvironment,
DisableKeepAlives: true,
},
}
url := fmt.Sprintf("http://%s%s", p.addr, path)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Add("X-API-Key", APIKey)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
return p.readResponse(resp)
}
// Post performs an HTTP POST and returns the bytes and/or an error. Any
// non-200 return code is returned as an error.
func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
client := &http.Client{
Timeout: 600 * time.Second,
Transport: &http.Transport{
DisableKeepAlives: true,
},
}
url := fmt.Sprintf("http://%s%s", p.addr, path)
req, err := http.NewRequest("POST", url, data)
if err != nil {
return nil, err
}
req.Header.Add("X-API-Key", APIKey)
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
return p.readResponse(resp)
}
type Event struct {
ID int
Time time.Time
Type string
Data interface{}
}
func (p *Process) Events(since int) ([]Event, error) {
bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d&timeout=10", since))
if err != nil {
return nil, err
}
var evs []Event
dec := json.NewDecoder(bytes.NewReader(bs))
dec.UseNumber()
err = dec.Decode(&evs)
if err != nil {
return nil, fmt.Errorf("events: %w in %q", err, bs)
}
return evs, err
}
func (p *Process) Rescan(folder string) error {
2015-09-24 17:51:59 +00:00
_, err := p.Post("/rest/db/scan?folder="+url.QueryEscape(folder), nil)
return err
}
func (p *Process) RescanDelay(folder string, delaySeconds int) error {
2015-09-24 17:51:59 +00:00
_, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", url.QueryEscape(folder), delaySeconds), nil)
return err
}
2015-07-31 16:52:50 +00:00
func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
return p.RescanSubs(folder, []string{sub}, delaySeconds)
}
func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
data := url.Values{}
data.Set("folder", folder)
for _, sub := range subs {
data.Add("sub", sub)
}
data.Set("next", strconv.Itoa(delaySeconds))
_, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
return err
}
func (p *Process) ConfigInSync() (bool, error) {
bs, err := p.Get("/rest/system/config/insync")
if err != nil {
return false, err
}
return bytes.Contains(bs, []byte("true")), nil
}
func (p *Process) GetConfig() (config.Configuration, error) {
var cfg config.Configuration
bs, err := p.Get("/rest/system/config")
if err != nil {
return cfg, err
}
err = json.Unmarshal(bs, &cfg)
return cfg, err
}
func (p *Process) PostConfig(cfg config.Configuration) error {
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(cfg); err != nil {
return err
}
_, err := p.Post("/rest/system/config", buf)
return err
}
func (p *Process) PauseDevice(dev protocol.DeviceID) error {
_, err := p.Post("/rest/system/pause?device="+dev.String(), nil)
return err
}
func (p *Process) ResumeDevice(dev protocol.DeviceID) error {
_, err := p.Post("/rest/system/resume?device="+dev.String(), nil)
return err
}
func (p *Process) PauseAll() error {
_, err := p.Post("/rest/system/pause", nil)
return err
}
func (p *Process) ResumeAll() error {
_, err := p.Post("/rest/system/resume", nil)
return err
}
func InSync(folder string, ps ...*Process) bool {
for _, p := range ps {
p.eventMut.Lock()
}
defer func() {
for _, p := range ps {
p.eventMut.Unlock()
}
}()
for i := range ps {
// If our latest FolderSummary didn't report 100%, then we are not done.
if !ps[i].done[folder] {
l.Debugf("done = ps[%d].done[%q] = false", i, folder)
return false
}
// Check Sequence for each device. The local version seen by remote
// devices should be the same as what it has locally, or the index
// hasn't been sent yet.
sourceID := ps[i].id.String()
sourceSeq := ps[i].sequence[folder][sourceID]
l.Debugf("sourceSeq = ps[%d].sequence[%q][%q] = %d", i, folder, sourceID, sourceSeq)
for j := range ps {
if i != j {
remoteSeq := ps[j].sequence[folder][sourceID]
if remoteSeq != sourceSeq {
l.Debugf("remoteSeq = ps[%d].sequence[%q][%q] = %d", j, folder, sourceID, remoteSeq)
return false
}
}
}
}
return true
}
func AwaitSync(folder string, ps ...*Process) {
for {
time.Sleep(250 * time.Millisecond)
if InSync(folder, ps...) {
return
}
}
}
type Model struct {
GlobalBytes int
GlobalDeleted int
GlobalFiles int
InSyncBytes int
InSyncFiles int
Invalid string
LocalBytes int
LocalDeleted int
LocalFiles int
NeedBytes int
NeedFiles int
State string
StateChanged time.Time
Version int
}
func (p *Process) Model(folder string) (Model, error) {
2015-09-24 17:51:59 +00:00
bs, err := p.Get("/rest/db/status?folder=" + url.QueryEscape(folder))
if err != nil {
return Model{}, err
}
var res Model
if err := json.Unmarshal(bs, &res); err != nil {
return Model{}, err
}
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("%+v", res)
return res, nil
}
func (p *Process) readResponse(resp *http.Response) ([]byte, error) {
bs, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return bs, err
}
if resp.StatusCode != 200 {
return bs, errors.New(resp.Status)
}
return bs, nil
}
func (p *Process) checkForProblems(logfd *os.File) error {
fd, err := os.Open(logfd.Name())
if err != nil {
return err
}
defer fd.Close()
raceConditionStart := []byte("WARNING: DATA RACE")
raceConditionSep := []byte("==================")
panicConditionStart := []byte("panic:")
panicConditionSep := []byte("[") // fallback if we don't already know our ID
if p.id.String() != "" {
panicConditionSep = []byte(p.id.String()[:5])
}
sc := bufio.NewScanner(fd)
race := false
_panic := false
for sc.Scan() {
line := sc.Bytes()
if race || _panic {
if bytes.Contains(line, panicConditionSep) {
_panic = false
continue
}
fmt.Printf("%s\n", line)
if bytes.Contains(line, raceConditionSep) {
race = false
}
} else if bytes.Contains(line, raceConditionStart) {
fmt.Printf("%s\n", raceConditionSep)
fmt.Printf("%s\n", raceConditionStart)
race = true
if err == nil {
err = errors.New("Race condition detected")
}
} else if bytes.Contains(line, panicConditionStart) {
_panic = true
if err == nil {
err = errors.New("Panic detected")
}
}
}
return err
}
func (p *Process) eventLoop() {
since := 0
notScanned := make(map[string]struct{})
start := time.Now()
for {
select {
case <-p.stopped:
return
default:
}
evs, err := p.Events(since)
if err != nil {
if time.Since(start) < 5*time.Second {
// The API has probably not started yet, lets give it some time.
continue
}
// If we're stopping, no need to print the error.
select {
case <-p.stopped:
return
default:
}
log.Println("eventLoop: events:", err)
continue
}
for _, ev := range evs {
if ev.ID != since+1 {
l.Warnln("Event ID jumped", since, "to", ev.ID)
}
since = ev.ID
switch ev.Type {
case "Starting":
// The Starting event tells us where the configuration is. Load
// it and populate our list of folders.
data := ev.Data.(map[string]interface{})
id, err := protocol.DeviceIDFromString(data["myID"].(string))
if err != nil {
log.Println("eventLoop: DeviceIdFromString:", err)
continue
}
p.id = id
home := data["home"].(string)
w, _, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID, events.NoopLogger)
if err != nil {
log.Println("eventLoop: Starting:", err)
continue
}
for id := range w.Folders() {
p.eventMut.Lock()
p.folders = append(p.folders, id)
p.eventMut.Unlock()
notScanned[id] = struct{}{}
}
l.Debugln("Started", p.id)
case "StateChanged":
// When a folder changes to idle, we tick it off by removing
// it from p.notScanned.
if len(p.folders) == 0 {
// We haven't parsed the config yet, shouldn't happen
panic("race, or lost startup event")
}
select {
case <-p.startComplete:
default:
data := ev.Data.(map[string]interface{})
to := data["to"].(string)
if to == "idle" {
folder := data["folder"].(string)
delete(notScanned, folder)
if len(notScanned) == 0 {
close(p.startComplete)
}
}
}
case "LocalIndexUpdated":
data := ev.Data.(map[string]interface{})
folder := data["folder"].(string)
version, _ := data["version"].(json.Number).Int64()
p.eventMut.Lock()
m := p.sequence[folder]
if m == nil {
m = make(map[string]int64)
}
device := p.id.String()
if device == "" {
p.eventMut.Unlock()
panic("race, or startup not complete")
}
m[device] = version
p.sequence[folder] = m
p.done[folder] = false
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
p.eventMut.Unlock()
case "RemoteIndexUpdated":
data := ev.Data.(map[string]interface{})
device := data["device"].(string)
folder := data["folder"].(string)
version, _ := data["version"].(json.Number).Int64()
p.eventMut.Lock()
m := p.sequence[folder]
if m == nil {
m = make(map[string]int64)
}
m[device] = version
p.sequence[folder] = m
p.done[folder] = false
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
p.eventMut.Unlock()
case "FolderSummary":
data := ev.Data.(map[string]interface{})
folder := data["folder"].(string)
summary := data["summary"].(map[string]interface{})
need, _ := summary["needBytes"].(json.Number).Int64()
done := need == 0
p.eventMut.Lock()
p.done[folder] = done
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("Foldersummary %v %v\n\t%+v", p.id, folder, p.done)
p.eventMut.Unlock()
}
}
}
}
2015-11-21 15:30:53 +00:00
type ConnectionStats struct {
Address string
Type string
Connected bool
Paused bool
ClientVersion string
InBytesTotal int64
OutBytesTotal int64
}
func (p *Process) Connections() (map[string]ConnectionStats, error) {
bs, err := p.Get("/rest/system/connections")
if err != nil {
return nil, err
}
var res map[string]ConnectionStats
if err := json.Unmarshal(bs, &res); err != nil {
return nil, err
}
return res, nil
}
type SystemStatus struct {
Alloc int64
Goroutines int
MyID protocol.DeviceID
PathSeparator string
StartTime time.Time
Sys int64
Themes []string
Tilde string
Uptime int
}
func (p *Process) SystemStatus() (SystemStatus, error) {
bs, err := p.Get("/rest/system/status")
if err != nil {
return SystemStatus{}, err
}
var res SystemStatus
if err := json.Unmarshal(bs, &res); err != nil {
return SystemStatus{}, err
}
return res, nil
}
type SystemVersion struct {
Arch string
Codename string
LongVersion string
OS string
Version string
}
func (p *Process) SystemVersion() (SystemVersion, error) {
bs, err := p.Get("/rest/system/version")
if err != nil {
return SystemVersion{}, err
}
var res SystemVersion
if err := json.Unmarshal(bs, &res); err != nil {
return SystemVersion{}, err
}
return res, nil
}