mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-09 23:00:58 +00:00
parent
b1db328931
commit
9d09fd6af3
114
cmd/stcrashreceiver/main.go
Normal file
114
cmd/stcrashreceiver/main.go
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
// Copyright (C) 2019 The Syncthing Authors.
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||||
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
// Command stcrashreceiver is a trivial HTTP server that allows two things:
|
||||||
|
//
|
||||||
|
// - uploading files (crash reports) named like a SHA256 hash using a PUT request
|
||||||
|
// - checking whether such file exists using a HEAD request
|
||||||
|
//
|
||||||
|
// Typically this should be deployed behind something that manages HTTPS.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/syncthing/syncthing/lib/sha256"
|
||||||
|
"github.com/syncthing/syncthing/lib/ur"
|
||||||
|
|
||||||
|
raven "github.com/getsentry/raven-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxRequestSize = 1 << 20 // 1 MiB
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
dir := flag.String("dir", ".", "Directory to store reports in")
|
||||||
|
dsn := flag.String("dsn", "", "Sentry DSN")
|
||||||
|
listen := flag.String("listen", ":22039", "HTTP listen address")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
|
cr := &crashReceiver{
|
||||||
|
dir: *dir,
|
||||||
|
dsn: *dsn,
|
||||||
|
}
|
||||||
|
mux.Handle("/", cr)
|
||||||
|
|
||||||
|
if *dsn != "" {
|
||||||
|
mux.HandleFunc("/failure", handleFailureFn(*dsn))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.SetOutput(os.Stdout)
|
||||||
|
if err := http.ListenAndServe(*listen, mux); err != nil {
|
||||||
|
log.Fatalln("HTTP serve:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleFailureFn(dsn string) func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
return func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
lr := io.LimitReader(req.Body, maxRequestSize)
|
||||||
|
bs, err := ioutil.ReadAll(lr)
|
||||||
|
req.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), 500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var reports []ur.FailureReport
|
||||||
|
err = json.Unmarshal(bs, &reports)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(reports) == 0 {
|
||||||
|
// Shouldn't happen
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
version, err := parseVersion(reports[0].Version)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, r := range reports {
|
||||||
|
pkt := packet(version)
|
||||||
|
pkt.Message = r.Description
|
||||||
|
pkt.Extra = raven.Extra{
|
||||||
|
"count": r.Count,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sendReport(dsn, pkt, userIDFor(req)); err != nil {
|
||||||
|
log.Println("Failed to send crash report:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// userIDFor returns a string we can use as the user ID for the purpose of
|
||||||
|
// counting affected users. It's the truncated hash of a salt, the user
|
||||||
|
// remote IP, and the current month.
|
||||||
|
func userIDFor(req *http.Request) string {
|
||||||
|
addr := req.RemoteAddr
|
||||||
|
if fwd := req.Header.Get("x-forwarded-for"); fwd != "" {
|
||||||
|
addr = fwd
|
||||||
|
}
|
||||||
|
if host, _, err := net.SplitHostPort(addr); err == nil {
|
||||||
|
addr = host
|
||||||
|
}
|
||||||
|
now := time.Now().Format("200601")
|
||||||
|
salt := "stcrashreporter"
|
||||||
|
hash := sha256.Sum256([]byte(salt + addr + now))
|
||||||
|
return fmt.Sprintf("%x", hash[:8])
|
||||||
|
}
|
@ -31,12 +31,7 @@ var (
|
|||||||
clientsMut sync.Mutex
|
clientsMut sync.Mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
func sendReport(dsn, path string, report []byte, userID string) error {
|
func sendReport(dsn string, pkt *raven.Packet, userID string) error {
|
||||||
pkt, err := parseReport(path, report)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
pkt.Interfaces = append(pkt.Interfaces, &raven.User{ID: userID})
|
pkt.Interfaces = append(pkt.Interfaces, &raven.User{ID: userID})
|
||||||
|
|
||||||
clientsMut.Lock()
|
clientsMut.Lock()
|
||||||
@ -44,6 +39,7 @@ func sendReport(dsn, path string, report []byte, userID string) error {
|
|||||||
|
|
||||||
cli, ok := clients[dsn]
|
cli, ok := clients[dsn]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
var err error
|
||||||
cli, err = raven.New(dsn)
|
cli, err = raven.New(dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -62,7 +58,7 @@ func sendReport(dsn, path string, report []byte, userID string) error {
|
|||||||
return <-errC
|
return <-errC
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseReport(path string, report []byte) (*raven.Packet, error) {
|
func parseCrashReport(path string, report []byte) (*raven.Packet, error) {
|
||||||
parts := bytes.SplitN(report, []byte("\n"), 2)
|
parts := bytes.SplitN(report, []byte("\n"), 2)
|
||||||
if len(parts) != 2 {
|
if len(parts) != 2 {
|
||||||
return nil, errors.New("no first line")
|
return nil, errors.New("no first line")
|
||||||
@ -126,31 +122,12 @@ func parseReport(path string, report []byte) (*raven.Packet, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt := &raven.Packet{
|
pkt := packet(version)
|
||||||
Message: string(subjectLine),
|
pkt.Message = string(subjectLine)
|
||||||
Platform: "go",
|
pkt.Extra = raven.Extra{
|
||||||
Release: version.tag,
|
"url": reportServer + path,
|
||||||
Environment: version.environment(),
|
|
||||||
Tags: raven.Tags{
|
|
||||||
raven.Tag{Key: "version", Value: version.version},
|
|
||||||
raven.Tag{Key: "tag", Value: version.tag},
|
|
||||||
raven.Tag{Key: "codename", Value: version.codename},
|
|
||||||
raven.Tag{Key: "runtime", Value: version.runtime},
|
|
||||||
raven.Tag{Key: "goos", Value: version.goos},
|
|
||||||
raven.Tag{Key: "goarch", Value: version.goarch},
|
|
||||||
raven.Tag{Key: "builder", Value: version.builder},
|
|
||||||
},
|
|
||||||
Extra: raven.Extra{
|
|
||||||
"url": reportServer + path,
|
|
||||||
},
|
|
||||||
Interfaces: []raven.Interface{&trace},
|
|
||||||
}
|
|
||||||
if version.commit != "" {
|
|
||||||
pkt.Tags = append(pkt.Tags, raven.Tag{Key: "commit", Value: version.commit})
|
|
||||||
}
|
|
||||||
for _, tag := range version.extra {
|
|
||||||
pkt.Tags = append(pkt.Tags, raven.Tag{Key: tag, Value: "1"})
|
|
||||||
}
|
}
|
||||||
|
pkt.Interfaces = []raven.Interface{&trace}
|
||||||
|
|
||||||
return pkt, nil
|
return pkt, nil
|
||||||
}
|
}
|
||||||
@ -217,3 +194,27 @@ func parseVersion(line string) (version, error) {
|
|||||||
|
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func packet(version version) *raven.Packet {
|
||||||
|
pkt := &raven.Packet{
|
||||||
|
Platform: "go",
|
||||||
|
Release: version.tag,
|
||||||
|
Environment: version.environment(),
|
||||||
|
Tags: raven.Tags{
|
||||||
|
raven.Tag{Key: "version", Value: version.version},
|
||||||
|
raven.Tag{Key: "tag", Value: version.tag},
|
||||||
|
raven.Tag{Key: "codename", Value: version.codename},
|
||||||
|
raven.Tag{Key: "runtime", Value: version.runtime},
|
||||||
|
raven.Tag{Key: "goos", Value: version.goos},
|
||||||
|
raven.Tag{Key: "goarch", Value: version.goarch},
|
||||||
|
raven.Tag{Key: "builder", Value: version.builder},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if version.commit != "" {
|
||||||
|
pkt.Tags = append(pkt.Tags, raven.Tag{Key: "commit", Value: version.commit})
|
||||||
|
}
|
||||||
|
for _, tag := range version.extra {
|
||||||
|
pkt.Tags = append(pkt.Tags, raven.Tag{Key: tag, Value: "1"})
|
||||||
|
}
|
||||||
|
return pkt
|
||||||
|
}
|
||||||
|
@ -64,7 +64,7 @@ func TestParseReport(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt, err := parseReport("1/2/345", bs)
|
pkt, err := parseCrashReport("1/2/345", bs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -4,52 +4,21 @@
|
|||||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
// Command stcrashreceiver is a trivial HTTP server that allows two things:
|
|
||||||
//
|
|
||||||
// - uploading files (crash reports) named like a SHA256 hash using a PUT request
|
|
||||||
// - checking whether such file exists using a HEAD request
|
|
||||||
//
|
|
||||||
// Typically this should be deployed behind something that manages HTTPS.
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/sha256"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxRequestSize = 1 << 20 // 1 MiB
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
dir := flag.String("dir", ".", "Directory to store reports in")
|
|
||||||
dsn := flag.String("dsn", "", "Sentry DSN")
|
|
||||||
listen := flag.String("listen", ":22039", "HTTP listen address")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
cr := &crashReceiver{
|
|
||||||
dir: *dir,
|
|
||||||
dsn: *dsn,
|
|
||||||
}
|
|
||||||
|
|
||||||
log.SetOutput(os.Stdout)
|
|
||||||
if err := http.ListenAndServe(*listen, cr); err != nil {
|
|
||||||
log.Fatalln("HTTP serve:", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type crashReceiver struct {
|
type crashReceiver struct {
|
||||||
dir string
|
dir string
|
||||||
dsn string
|
dsn string
|
||||||
@ -155,8 +124,13 @@ func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWrite
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// There's no need for the client to have to wait for this part.
|
// There's no need for the client to have to wait for this part.
|
||||||
if err := sendReport(r.dsn, reportID, bs, user); err != nil {
|
pkt, err := parseCrashReport(reportID, bs)
|
||||||
log.Println("Failed to send report:", err)
|
if err != nil {
|
||||||
|
log.Println("Failed to parse crash report:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := sendReport(r.dsn, pkt, user); err != nil {
|
||||||
|
log.Println("Failed to send crash report:", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -166,20 +140,3 @@ func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWrite
|
|||||||
func (r *crashReceiver) dirFor(base string) string {
|
func (r *crashReceiver) dirFor(base string) string {
|
||||||
return filepath.Join(base[0:2], base[2:4])
|
return filepath.Join(base[0:2], base[2:4])
|
||||||
}
|
}
|
||||||
|
|
||||||
// userIDFor returns a string we can use as the user ID for the purpose of
|
|
||||||
// counting affected users. It's the truncated hash of a salt, the user
|
|
||||||
// remote IP, and the current month.
|
|
||||||
func userIDFor(req *http.Request) string {
|
|
||||||
addr := req.RemoteAddr
|
|
||||||
if fwd := req.Header.Get("x-forwarded-for"); fwd != "" {
|
|
||||||
addr = fwd
|
|
||||||
}
|
|
||||||
if host, _, err := net.SplitHostPort(addr); err == nil {
|
|
||||||
addr = host
|
|
||||||
}
|
|
||||||
now := time.Now().Format("200601")
|
|
||||||
salt := "stcrashreporter"
|
|
||||||
hash := sha256.Sum256([]byte(salt + addr + now))
|
|
||||||
return fmt.Sprintf("%x", hash[:8])
|
|
||||||
}
|
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
"github.com/syncthing/syncthing/lib/util"
|
"github.com/syncthing/syncthing/lib/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventType int
|
type EventType int64
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Starting EventType = 1 << iota
|
Starting EventType = 1 << iota
|
||||||
@ -52,6 +52,7 @@ const (
|
|||||||
FolderWatchStateChanged
|
FolderWatchStateChanged
|
||||||
ListenAddressesChanged
|
ListenAddressesChanged
|
||||||
LoginAttempt
|
LoginAttempt
|
||||||
|
Failure
|
||||||
|
|
||||||
AllEvents = (1 << iota) - 1
|
AllEvents = (1 << iota) - 1
|
||||||
)
|
)
|
||||||
@ -121,6 +122,8 @@ func (t EventType) String() string {
|
|||||||
return "LoginAttempt"
|
return "LoginAttempt"
|
||||||
case FolderWatchStateChanged:
|
case FolderWatchStateChanged:
|
||||||
return "FolderWatchStateChanged"
|
return "FolderWatchStateChanged"
|
||||||
|
case Failure:
|
||||||
|
return "Failure"
|
||||||
default:
|
default:
|
||||||
return "Unknown"
|
return "Unknown"
|
||||||
}
|
}
|
||||||
@ -200,6 +203,8 @@ func UnmarshalEventType(s string) EventType {
|
|||||||
return LoginAttempt
|
return LoginAttempt
|
||||||
case "FolderWatchStateChanged":
|
case "FolderWatchStateChanged":
|
||||||
return FolderWatchStateChanged
|
return FolderWatchStateChanged
|
||||||
|
case "Failure":
|
||||||
|
return Failure
|
||||||
default:
|
default:
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -865,11 +865,13 @@ func (f *folder) monitorWatch(ctx context.Context) {
|
|||||||
f.setWatchError(err, next)
|
f.setWatchError(err, next)
|
||||||
// This error was previously a panic and should never occur, so generate
|
// This error was previously a panic and should never occur, so generate
|
||||||
// a warning, but don't do it repetitively.
|
// a warning, but don't do it repetitively.
|
||||||
if !warnedOutside {
|
var errOutside *fs.ErrWatchEventOutsideRoot
|
||||||
if _, ok := err.(*fs.ErrWatchEventOutsideRoot); ok {
|
if errors.As(err, &errOutside) {
|
||||||
|
if !warnedOutside {
|
||||||
l.Warnln(err)
|
l.Warnln(err)
|
||||||
warnedOutside = true
|
warnedOutside = true
|
||||||
}
|
}
|
||||||
|
f.evLogger.Log(events.Failure, "watching for changes encountered an event outside of the filesystem root")
|
||||||
}
|
}
|
||||||
aggrCancel()
|
aggrCancel()
|
||||||
errChan = nil
|
errChan = nil
|
||||||
|
@ -65,6 +65,7 @@ func (f *sendOnlyFolder) pull() bool {
|
|||||||
if !ok {
|
if !ok {
|
||||||
if intf.IsDeleted() {
|
if intf.IsDeleted() {
|
||||||
l.Debugln("Should never get a deleted file as needed when we don't have it")
|
l.Debugln("Should never get a deleted file as needed when we don't have it")
|
||||||
|
f.evLogger.Log(events.Failure, "got deleted file that doesn't exist locally as needed when pulling on send-only")
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -1973,6 +1973,7 @@ func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
|||||||
l.Warnln("Failed repairing sequence entries:", dbErr)
|
l.Warnln("Failed repairing sequence entries:", dbErr)
|
||||||
panic("Failed repairing sequence entries")
|
panic("Failed repairing sequence entries")
|
||||||
} else {
|
} else {
|
||||||
|
s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
|
||||||
l.Infof("Repaired %v sequence entries in database", fixed)
|
l.Infof("Repaired %v sequence entries in database", fixed)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -133,6 +133,8 @@ func (a *App) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) startup() error {
|
func (a *App) startup() error {
|
||||||
|
a.mainService.Add(ur.NewFailureHandler(a.cfg, a.evLogger))
|
||||||
|
|
||||||
a.mainService.Add(a.ll)
|
a.mainService.Add(a.ll)
|
||||||
|
|
||||||
if a.opts.AuditWriter != nil {
|
if a.opts.AuditWriter != nil {
|
||||||
|
188
lib/ur/failurereporting.go
Normal file
188
lib/ur/failurereporting.go
Normal file
@ -0,0 +1,188 @@
|
|||||||
|
// Copyright (C) 2020 The Syncthing Authors.
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||||
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
package ur
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/syncthing/syncthing/lib/build"
|
||||||
|
"github.com/syncthing/syncthing/lib/config"
|
||||||
|
"github.com/syncthing/syncthing/lib/dialer"
|
||||||
|
"github.com/syncthing/syncthing/lib/events"
|
||||||
|
"github.com/syncthing/syncthing/lib/util"
|
||||||
|
|
||||||
|
"github.com/thejerf/suture"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// When a specific failure first occurs, it is delayed by minDelay. If
|
||||||
|
// more of the same failures occurs those are further delayed and
|
||||||
|
// aggregated for maxDelay.
|
||||||
|
minDelay = 10 * time.Second
|
||||||
|
maxDelay = time.Minute
|
||||||
|
sendTimeout = time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
type FailureReport struct {
|
||||||
|
Description string
|
||||||
|
Count int
|
||||||
|
Version string
|
||||||
|
}
|
||||||
|
|
||||||
|
type FailureHandler interface {
|
||||||
|
suture.Service
|
||||||
|
config.Committer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFailureHandler(cfg config.Wrapper, evLogger events.Logger) FailureHandler {
|
||||||
|
h := &failureHandler{
|
||||||
|
cfg: cfg,
|
||||||
|
evLogger: evLogger,
|
||||||
|
optsChan: make(chan config.OptionsConfiguration),
|
||||||
|
}
|
||||||
|
h.Service = util.AsServiceWithError(h.serve, h.String())
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
type failureHandler struct {
|
||||||
|
suture.Service
|
||||||
|
cfg config.Wrapper
|
||||||
|
evLogger events.Logger
|
||||||
|
optsChan chan config.OptionsConfiguration
|
||||||
|
evChan <-chan events.Event
|
||||||
|
buf map[string]*failureStat
|
||||||
|
}
|
||||||
|
|
||||||
|
type failureStat struct {
|
||||||
|
first, last time.Time
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *failureHandler) serve(ctx context.Context) error {
|
||||||
|
go func() {
|
||||||
|
h.optsChan <- h.cfg.Options()
|
||||||
|
}()
|
||||||
|
h.cfg.Subscribe(h)
|
||||||
|
defer h.cfg.Unsubscribe(h)
|
||||||
|
|
||||||
|
var url string
|
||||||
|
var err error
|
||||||
|
var sub events.Subscription
|
||||||
|
timer := time.NewTimer(minDelay)
|
||||||
|
resetTimer := make(chan struct{})
|
||||||
|
outer:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case opts := <-h.optsChan:
|
||||||
|
// Sub nil checks just for safety - config updates can be racy.
|
||||||
|
if opts.URAccepted > 0 {
|
||||||
|
if sub == nil {
|
||||||
|
sub = h.evLogger.Subscribe(events.Failure)
|
||||||
|
h.evChan = sub.C()
|
||||||
|
}
|
||||||
|
} else if sub != nil {
|
||||||
|
sub.Unsubscribe()
|
||||||
|
sub = nil
|
||||||
|
}
|
||||||
|
url = opts.CRURL + "/failure"
|
||||||
|
case e := <-h.evChan:
|
||||||
|
descr := e.Data.(string)
|
||||||
|
if stat, ok := h.buf[descr]; ok {
|
||||||
|
stat.last = e.Time
|
||||||
|
stat.count++
|
||||||
|
} else {
|
||||||
|
h.buf[descr] = &failureStat{
|
||||||
|
first: e.Time,
|
||||||
|
last: e.Time,
|
||||||
|
count: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-timer.C:
|
||||||
|
reports := make([]FailureReport, 0, len(h.buf))
|
||||||
|
now := time.Now()
|
||||||
|
for descr, stat := range h.buf {
|
||||||
|
if now.Sub(stat.last) > minDelay || now.Sub(stat.first) > maxDelay {
|
||||||
|
reports = append(reports, FailureReport{
|
||||||
|
Description: descr,
|
||||||
|
Count: stat.count,
|
||||||
|
Version: build.LongVersion,
|
||||||
|
})
|
||||||
|
delete(h.buf, descr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(reports) > 0 {
|
||||||
|
// Lets keep process events/configs while it might be timing out for a while
|
||||||
|
go func() {
|
||||||
|
sendFailureReports(ctx, reports, url)
|
||||||
|
select {
|
||||||
|
case resetTimer <- struct{}{}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
case <-resetTimer:
|
||||||
|
timer.Reset(minDelay)
|
||||||
|
case <-ctx.Done():
|
||||||
|
break outer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sub != nil {
|
||||||
|
sub.Unsubscribe()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *failureHandler) VerifyConfiguration(_, _ config.Configuration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *failureHandler) CommitConfiguration(from, to config.Configuration) bool {
|
||||||
|
if from.Options.CREnabled != to.Options.CREnabled || from.Options.CRURL != to.Options.CRURL {
|
||||||
|
h.optsChan <- to.Options
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *failureHandler) String() string {
|
||||||
|
return "FailureHandler"
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendFailureReports(ctx context.Context, reports []FailureReport, url string) {
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := json.NewEncoder(&b).Encode(reports); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DialContext: dialer.DialContext,
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
reqCtx, reqCancel := context.WithTimeout(ctx, sendTimeout)
|
||||||
|
defer reqCancel()
|
||||||
|
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, &b)
|
||||||
|
if err != nil {
|
||||||
|
l.Infoln("Failed to send failure report:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
l.Infoln("Failed to send failure report:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
return
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user