mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-03 07:12:27 +00:00
all: Send deadlocks as failures, crash only as a last resort (#7785)
This commit is contained in:
parent
dc0dd09e93
commit
67b18569cf
@ -19,10 +19,9 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/sha256"
|
||||
"github.com/syncthing/syncthing/lib/ur"
|
||||
@ -33,7 +32,7 @@ import (
|
||||
const maxRequestSize = 1 << 20 // 1 MiB
|
||||
|
||||
func main() {
|
||||
dir := flag.String("dir", ".", "Directory to store reports in")
|
||||
dir := flag.String("dir", ".", "Parent directory to store crash and failure reports in")
|
||||
dsn := flag.String("dsn", "", "Sentry DSN")
|
||||
listen := flag.String("listen", ":22039", "HTTP listen address")
|
||||
flag.Parse()
|
||||
@ -41,13 +40,13 @@ func main() {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
cr := &crashReceiver{
|
||||
dir: *dir,
|
||||
dir: filepath.Join(*dir, "crash_reports"),
|
||||
dsn: *dsn,
|
||||
}
|
||||
mux.Handle("/", cr)
|
||||
|
||||
if *dsn != "" {
|
||||
mux.HandleFunc("/newcrash/failure", handleFailureFn(*dsn))
|
||||
mux.HandleFunc("/newcrash/failure", handleFailureFn(*dsn, filepath.Join(*dir, "failure_reports")))
|
||||
}
|
||||
|
||||
log.SetOutput(os.Stdout)
|
||||
@ -56,7 +55,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func handleFailureFn(dsn string) func(w http.ResponseWriter, req *http.Request) {
|
||||
func handleFailureFn(dsn, failureDir string) func(w http.ResponseWriter, req *http.Request) {
|
||||
return func(w http.ResponseWriter, req *http.Request) {
|
||||
lr := io.LimitReader(req.Body, maxRequestSize)
|
||||
bs, err := ioutil.ReadAll(lr)
|
||||
@ -89,6 +88,18 @@ func handleFailureFn(dsn string) func(w http.ResponseWriter, req *http.Request)
|
||||
pkt.Extra = raven.Extra{
|
||||
"count": r.Count,
|
||||
}
|
||||
for k, v := range r.Extra {
|
||||
pkt.Extra[k] = v
|
||||
}
|
||||
if len(r.Goroutines) != 0 {
|
||||
url, err := saveFailureWithGoroutines(r.FailureData, failureDir)
|
||||
if err != nil {
|
||||
log.Println("Saving failure report:", err)
|
||||
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
pkt.Extra["goroutinesURL"] = url
|
||||
}
|
||||
message := sanitizeMessageLDB(r.Description)
|
||||
pkt.Fingerprint = []string{message}
|
||||
|
||||
@ -101,19 +112,15 @@ func handleFailureFn(dsn string) func(w http.ResponseWriter, req *http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
// userIDFor returns a string we can use as the user ID for the purpose of
|
||||
// counting affected users. It's the truncated hash of a salt, the user
|
||||
// remote IP, and the current month.
|
||||
func userIDFor(req *http.Request) string {
|
||||
addr := req.RemoteAddr
|
||||
if fwd := req.Header.Get("x-forwarded-for"); fwd != "" {
|
||||
addr = fwd
|
||||
func saveFailureWithGoroutines(data ur.FailureData, failureDir string) (string, error) {
|
||||
bs := make([]byte, len(data.Description)+len(data.Goroutines))
|
||||
copy(bs, data.Description)
|
||||
copy(bs[len(data.Description):], data.Goroutines)
|
||||
id := fmt.Sprintf("%x", sha256.Sum256(bs))
|
||||
path := fullPathCompressed(failureDir, id)
|
||||
err := compressAndWrite(bs, path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if host, _, err := net.SplitHostPort(addr); err == nil {
|
||||
addr = host
|
||||
}
|
||||
now := time.Now().Format("200601")
|
||||
salt := "stcrashreporter"
|
||||
hash := sha256.Sum256([]byte(salt + addr + now))
|
||||
return fmt.Sprintf("%x", hash[:8])
|
||||
return reportServer + path, nil
|
||||
}
|
||||
|
57
cmd/stcrashreceiver/util.go
Normal file
57
cmd/stcrashreceiver/util.go
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright (C) 2021 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/sha256"
|
||||
)
|
||||
|
||||
// userIDFor returns a string we can use as the user ID for the purpose of
|
||||
// counting affected users. It's the truncated hash of a salt, the user
|
||||
// remote IP, and the current month.
|
||||
func userIDFor(req *http.Request) string {
|
||||
addr := req.RemoteAddr
|
||||
if fwd := req.Header.Get("x-forwarded-for"); fwd != "" {
|
||||
addr = fwd
|
||||
}
|
||||
if host, _, err := net.SplitHostPort(addr); err == nil {
|
||||
addr = host
|
||||
}
|
||||
now := time.Now().Format("200601")
|
||||
salt := "stcrashreporter"
|
||||
hash := sha256.Sum256([]byte(salt + addr + now))
|
||||
return fmt.Sprintf("%x", hash[:8])
|
||||
}
|
||||
|
||||
// 01234567890abcdef... => 01/23
|
||||
func dirFor(base string) string {
|
||||
return filepath.Join(base[0:2], base[2:4])
|
||||
}
|
||||
|
||||
func fullPathCompressed(root, reportID string) string {
|
||||
return filepath.Join(root, dirFor(reportID), reportID) + ".gz"
|
||||
}
|
||||
|
||||
func compressAndWrite(bs []byte, fullPath string) error {
|
||||
// Compress the report for storage
|
||||
buf := new(bytes.Buffer)
|
||||
gw := gzip.NewWriter(buf)
|
||||
_, _ = gw.Write(bs) // can't fail
|
||||
gw.Close()
|
||||
|
||||
// Create an output file with the compressed report
|
||||
return ioutil.WriteFile(fullPath, buf.Bytes(), 0644)
|
||||
}
|
@ -324,7 +324,7 @@ func (m *model) fatal(err error) {
|
||||
// period.
|
||||
func (m *model) StartDeadlockDetector(timeout time.Duration) {
|
||||
l.Infof("Starting deadlock detector with %v timeout", timeout)
|
||||
detector := newDeadlockDetector(timeout)
|
||||
detector := newDeadlockDetector(timeout, m.evLogger, m.fatal)
|
||||
detector.Watch("fmut", m.fmut)
|
||||
detector.Watch("pmut", m.pmut)
|
||||
}
|
||||
|
@ -9,59 +9,98 @@ package model
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/syncthing/syncthing/lib/events"
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
"github.com/syncthing/syncthing/lib/ur"
|
||||
)
|
||||
|
||||
type Holdable interface {
|
||||
Holders() string
|
||||
}
|
||||
|
||||
func newDeadlockDetector(timeout time.Duration) *deadlockDetector {
|
||||
func newDeadlockDetector(timeout time.Duration, evLogger events.Logger, fatal func(error)) *deadlockDetector {
|
||||
return &deadlockDetector{
|
||||
timeout: timeout,
|
||||
warnTimeout: timeout,
|
||||
fatalTimeout: 10 * timeout,
|
||||
lockers: make(map[string]sync.Locker),
|
||||
evLogger: evLogger,
|
||||
fatal: fatal,
|
||||
}
|
||||
}
|
||||
|
||||
type deadlockDetector struct {
|
||||
timeout time.Duration
|
||||
warnTimeout, fatalTimeout time.Duration
|
||||
lockers map[string]sync.Locker
|
||||
evLogger events.Logger
|
||||
fatal func(error)
|
||||
}
|
||||
|
||||
func (d *deadlockDetector) Watch(name string, mut sync.Locker) {
|
||||
d.lockers[name] = mut
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(d.timeout / 4)
|
||||
ok := make(chan bool, 2)
|
||||
time.Sleep(d.warnTimeout / 4)
|
||||
done := make(chan struct{}, 1)
|
||||
|
||||
go func() {
|
||||
mut.Lock()
|
||||
_ = 1 // empty critical section
|
||||
mut.Unlock()
|
||||
ok <- true
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
time.Sleep(d.timeout)
|
||||
ok <- false
|
||||
d.watchInner(name, done)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (d *deadlockDetector) watchInner(name string, done chan struct{}) {
|
||||
warn := time.NewTimer(d.warnTimeout)
|
||||
fatal := time.NewTimer(d.fatalTimeout)
|
||||
defer func() {
|
||||
warn.Stop()
|
||||
fatal.Stop()
|
||||
}()
|
||||
|
||||
if r := <-ok; !r {
|
||||
msg := fmt.Sprintf("deadlock detected at %s", name)
|
||||
select {
|
||||
case <-warn.C:
|
||||
failure := ur.FailureDataWithGoroutines(fmt.Sprintf("potential deadlock detected at %s (short timeout)", name))
|
||||
failure.Extra["timeout"] = d.warnTimeout.String()
|
||||
d.evLogger.Log(events.Failure, failure)
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-fatal.C:
|
||||
err := fmt.Errorf("potential deadlock detected at %s (long timeout)", name)
|
||||
failure := ur.FailureDataWithGoroutines(err.Error())
|
||||
failure.Extra["timeout"] = d.fatalTimeout.String()
|
||||
others := d.otherHolders()
|
||||
failure.Extra["other-holders"] = others
|
||||
d.evLogger.Log(events.Failure, failure)
|
||||
d.fatal(err)
|
||||
// Give it a minute to shut down gracefully, maybe shutting down
|
||||
// can get out of the deadlock (or it's not really a deadlock).
|
||||
time.Sleep(time.Minute)
|
||||
panic(fmt.Sprintf("%v:\n%v", err, others))
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
func (d *deadlockDetector) otherHolders() string {
|
||||
var b strings.Builder
|
||||
for otherName, otherMut := range d.lockers {
|
||||
if otherHolder, ok := otherMut.(Holdable); ok {
|
||||
msg += "\n===" + otherName + "===\n" + otherHolder.Holders()
|
||||
b.WriteString("===" + otherName + "===\n" + otherHolder.Holders() + "\n")
|
||||
}
|
||||
}
|
||||
panic(msg)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// inWritableDir calls fn(path), while making sure that the directory
|
||||
|
@ -11,6 +11,8 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/build"
|
||||
@ -35,11 +37,26 @@ var (
|
||||
)
|
||||
|
||||
type FailureReport struct {
|
||||
Description string
|
||||
FailureData
|
||||
Count int
|
||||
Version string
|
||||
}
|
||||
|
||||
type FailureData struct {
|
||||
Description string
|
||||
Goroutines string
|
||||
Extra map[string]string
|
||||
}
|
||||
|
||||
func FailureDataWithGoroutines(description string) FailureData {
|
||||
var buf *strings.Builder
|
||||
pprof.NewProfile("goroutine").WriteTo(buf, 1)
|
||||
return FailureData{
|
||||
Description: description,
|
||||
Goroutines: buf.String(),
|
||||
}
|
||||
}
|
||||
|
||||
type FailureHandler interface {
|
||||
suture.Service
|
||||
config.Committer
|
||||
@ -64,6 +81,7 @@ type failureHandler struct {
|
||||
type failureStat struct {
|
||||
first, last time.Time
|
||||
count int
|
||||
data FailureData
|
||||
}
|
||||
|
||||
func (h *failureHandler) Serve(ctx context.Context) error {
|
||||
@ -82,23 +100,28 @@ func (h *failureHandler) Serve(ctx context.Context) error {
|
||||
if !ok {
|
||||
// Just to be safe - shouldn't ever happen, as
|
||||
// evChan is set to nil when unsubscribing.
|
||||
h.addReport(evChanClosed, time.Now())
|
||||
h.addReport(FailureData{Description: evChanClosed}, time.Now())
|
||||
evChan = nil
|
||||
continue
|
||||
}
|
||||
descr, ok := e.Data.(string)
|
||||
if !ok {
|
||||
var data FailureData
|
||||
switch d := e.Data.(type) {
|
||||
case string:
|
||||
data.Description = d
|
||||
case FailureData:
|
||||
data = d
|
||||
default:
|
||||
// Same here, shouldn't ever happen.
|
||||
h.addReport(invalidEventDataType, time.Now())
|
||||
h.addReport(FailureData{Description: invalidEventDataType}, time.Now())
|
||||
continue
|
||||
}
|
||||
h.addReport(descr, e.Time)
|
||||
h.addReport(data, e.Time)
|
||||
case <-timer.C:
|
||||
reports := make([]FailureReport, 0, len(h.buf))
|
||||
now := time.Now()
|
||||
for descr, stat := range h.buf {
|
||||
if now.Sub(stat.last) > minDelay || now.Sub(stat.first) > maxDelay {
|
||||
reports = append(reports, newFailureReport(descr, stat.count))
|
||||
reports = append(reports, newFailureReport(stat))
|
||||
delete(h.buf, descr)
|
||||
}
|
||||
}
|
||||
@ -125,8 +148,8 @@ func (h *failureHandler) Serve(ctx context.Context) error {
|
||||
sub.Unsubscribe()
|
||||
if len(h.buf) > 0 {
|
||||
reports := make([]FailureReport, 0, len(h.buf))
|
||||
for descr, stat := range h.buf {
|
||||
reports = append(reports, newFailureReport(descr, stat.count))
|
||||
for _, stat := range h.buf {
|
||||
reports = append(reports, newFailureReport(stat))
|
||||
}
|
||||
timeout, cancel := context.WithTimeout(context.Background(), finalSendTimeout)
|
||||
defer cancel()
|
||||
@ -151,16 +174,17 @@ func (h *failureHandler) applyOpts(opts config.OptionsConfiguration, sub events.
|
||||
return url, nil, nil
|
||||
}
|
||||
|
||||
func (h *failureHandler) addReport(descr string, evTime time.Time) {
|
||||
if stat, ok := h.buf[descr]; ok {
|
||||
func (h *failureHandler) addReport(data FailureData, evTime time.Time) {
|
||||
if stat, ok := h.buf[data.Description]; ok {
|
||||
stat.last = evTime
|
||||
stat.count++
|
||||
return
|
||||
}
|
||||
h.buf[descr] = &failureStat{
|
||||
h.buf[data.Description] = &failureStat{
|
||||
first: evTime,
|
||||
last: evTime,
|
||||
count: 1,
|
||||
data: data,
|
||||
}
|
||||
}
|
||||
|
||||
@ -210,10 +234,10 @@ func sendFailureReports(ctx context.Context, reports []FailureReport, url string
|
||||
return
|
||||
}
|
||||
|
||||
func newFailureReport(descr string, count int) FailureReport {
|
||||
func newFailureReport(stat *failureStat) FailureReport {
|
||||
return FailureReport{
|
||||
Description: descr,
|
||||
Count: count,
|
||||
FailureData: stat.data,
|
||||
Count: stat.count,
|
||||
Version: build.LongVersion,
|
||||
}
|
||||
}
|
||||
|
@ -23,8 +23,8 @@ import (
|
||||
"github.com/syncthing/syncthing/lib/build"
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/connections"
|
||||
"github.com/syncthing/syncthing/lib/db"
|
||||
"github.com/syncthing/syncthing/lib/dialer"
|
||||
"github.com/syncthing/syncthing/lib/model"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/scanner"
|
||||
"github.com/syncthing/syncthing/lib/upgrade"
|
||||
@ -38,15 +38,20 @@ const Version = 3
|
||||
|
||||
var StartTime = time.Now().Truncate(time.Second)
|
||||
|
||||
type Model interface {
|
||||
DBSnapshot(folder string) (*db.Snapshot, error)
|
||||
UsageReportingStats(report *contract.Report, version int, preview bool)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
cfg config.Wrapper
|
||||
model model.Model
|
||||
model Model
|
||||
connectionsService connections.Service
|
||||
noUpgrade bool
|
||||
forceRun chan struct{}
|
||||
}
|
||||
|
||||
func New(cfg config.Wrapper, m model.Model, connectionsService connections.Service, noUpgrade bool) *Service {
|
||||
func New(cfg config.Wrapper, m Model, connectionsService connections.Service, noUpgrade bool) *Service {
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
model: m,
|
||||
|
Loading…
Reference in New Issue
Block a user