mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-18 19:15:19 +00:00
all: Add Prometheus metrics endpoint
This commit is contained in:
parent
c84e47a7b2
commit
04b121b5f4
@ -32,6 +32,7 @@ import (
|
||||
|
||||
"github.com/calmh/incontainer"
|
||||
"github.com/julienschmidt/httprouter"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
"github.com/thejerf/suture/v4"
|
||||
"github.com/vitrun/qart/qr"
|
||||
@ -351,6 +352,15 @@ func (s *service) Serve(ctx context.Context) error {
|
||||
// Handle the special meta.js path
|
||||
mux.HandleFunc("/meta.js", s.getJSMetadata)
|
||||
|
||||
// Handle Prometheus metrics
|
||||
promHttpHandler := promhttp.Handler()
|
||||
mux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
|
||||
// fetching metrics counts as an event, for the purpose of whether
|
||||
// we should prepare folder summaries etc.
|
||||
s.fss.OnEventRequest()
|
||||
promHttpHandler.ServeHTTP(w, req)
|
||||
})
|
||||
|
||||
guiCfg := s.cfg.GUI()
|
||||
|
||||
// Wrap everything in CSRF protection. The /rest prefix should be
|
||||
|
@ -297,6 +297,7 @@ loop:
|
||||
case e := <-l.events:
|
||||
// Incoming events get sent
|
||||
l.sendEvent(e)
|
||||
metricEventsCreated.WithLabelValues(e.Type.String()).Inc()
|
||||
|
||||
case fn := <-l.funcs:
|
||||
// Subscriptions are handled here.
|
||||
@ -345,9 +346,11 @@ func (l *logger) sendEvent(e Event) {
|
||||
|
||||
select {
|
||||
case s.events <- e:
|
||||
metricEventsForwarded.WithLabelValues(e.Type.String(), "sent").Inc()
|
||||
case <-l.timeout.C:
|
||||
// if s.events is not ready, drop the event
|
||||
timedOut = true
|
||||
metricEventsForwarded.WithLabelValues(e.Type.String(), "dropped").Inc()
|
||||
}
|
||||
|
||||
// If stop returns false it already sent something to the
|
||||
|
27
lib/events/metrics.go
Normal file
27
lib/events/metrics.go
Normal file
@ -0,0 +1,27 @@
|
||||
// Copyright (C) 2023 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var (
|
||||
metricEventsCreated = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "events",
|
||||
Name: "created_total",
|
||||
Help: "Total number of created events",
|
||||
}, []string{"event"})
|
||||
metricEventsForwarded = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "events",
|
||||
Name: "forwarded_total",
|
||||
Help: "Total number of events forwarded to subscribers",
|
||||
}, []string{"event", "state"})
|
||||
)
|
@ -137,6 +137,9 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
|
||||
f.pullPause = f.pullBasePause()
|
||||
f.pullFailTimer = time.NewTimer(0)
|
||||
<-f.pullFailTimer.C
|
||||
|
||||
registerFolderMetrics(f.ID)
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
@ -459,6 +462,12 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
}
|
||||
defer f.ioLimiter.Give(1)
|
||||
|
||||
t0 := time.Now()
|
||||
defer func() {
|
||||
metricFolderScans.WithLabelValues(f.ID).Inc()
|
||||
metricFolderScanSeconds.WithLabelValues(f.ID).Add(time.Since(t0).Seconds())
|
||||
}()
|
||||
|
||||
for i := range subDirs {
|
||||
sub := osutil.NativeFilename(subDirs[i])
|
||||
|
||||
|
@ -163,9 +163,12 @@ func (f *sendReceiveFolder) pull() (bool, error) {
|
||||
scanChan := make(chan string)
|
||||
go f.pullScannerRoutine(scanChan)
|
||||
|
||||
t0 := time.Now()
|
||||
defer func() {
|
||||
close(scanChan)
|
||||
f.setState(FolderIdle)
|
||||
metricFolderPulls.WithLabelValues(f.ID).Inc()
|
||||
metricFolderPullSeconds.WithLabelValues(f.ID).Add(time.Since(t0).Seconds())
|
||||
}()
|
||||
|
||||
changed := 0
|
||||
@ -573,9 +576,9 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, snap *db.Snapshot,
|
||||
})
|
||||
}()
|
||||
|
||||
mode := fs.FileMode(file.Permissions & 0777)
|
||||
mode := fs.FileMode(file.Permissions & 0o777)
|
||||
if f.IgnorePerms || file.NoPermissions {
|
||||
mode = 0777
|
||||
mode = 0o777
|
||||
}
|
||||
|
||||
if shouldDebug() {
|
||||
@ -705,7 +708,7 @@ func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) boo
|
||||
return true
|
||||
}
|
||||
l.Debugf("%v creating parent directory of %v", f, file)
|
||||
if err := f.mtimefs.MkdirAll(parent, 0755); err != nil {
|
||||
if err := f.mtimefs.MkdirAll(parent, 0o755); err != nil {
|
||||
f.newPullError(file, fmt.Errorf("creating parent dir: %w", err))
|
||||
return false
|
||||
}
|
||||
@ -1235,7 +1238,7 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch
|
||||
f.queue.Done(file.Name)
|
||||
|
||||
if !f.IgnorePerms && !file.NoPermissions {
|
||||
if err = f.mtimefs.Chmod(file.Name, fs.FileMode(file.Permissions&0777)); err != nil {
|
||||
if err = f.mtimefs.Chmod(file.Name, fs.FileMode(file.Permissions&0o777)); err != nil {
|
||||
f.newPullError(file.Name, fmt.Errorf("shortcut file (setting permissions): %w", err))
|
||||
return
|
||||
}
|
||||
@ -1249,7 +1252,7 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch
|
||||
// Still need to re-write the trailer with the new encrypted fileinfo.
|
||||
if f.Type == config.FolderTypeReceiveEncrypted {
|
||||
err = inWritableDir(func(path string) error {
|
||||
fd, err := f.mtimefs.OpenFile(path, fs.OptReadWrite, 0666)
|
||||
fd, err := f.mtimefs.OpenFile(path, fs.OptReadWrite, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1329,7 +1332,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
||||
// block of all zeroes, so then we should not skip it.
|
||||
|
||||
// Pretend we copied it.
|
||||
state.copiedFromOrigin()
|
||||
state.skippedSparseBlock(block.Size)
|
||||
state.copyDone(block)
|
||||
continue
|
||||
}
|
||||
@ -1348,9 +1351,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
||||
state.fail(fmt.Errorf("dst write: %w", err))
|
||||
}
|
||||
if offset == block.Offset {
|
||||
state.copiedFromOrigin()
|
||||
state.copiedFromOrigin(block.Size)
|
||||
} else {
|
||||
state.copiedFromOriginShifted()
|
||||
state.copiedFromOriginShifted(block.Size)
|
||||
}
|
||||
|
||||
return false
|
||||
@ -1398,7 +1401,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
||||
state.fail(fmt.Errorf("dst write: %w", err))
|
||||
}
|
||||
if path == state.file.Name {
|
||||
state.copiedFromOrigin()
|
||||
state.copiedFromOrigin(block.Size)
|
||||
} else {
|
||||
state.copiedFromElsewhere(block.Size)
|
||||
}
|
||||
return true
|
||||
})
|
||||
@ -1608,7 +1613,7 @@ loop:
|
||||
func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
|
||||
// Set the correct permission bits on the new file
|
||||
if !f.IgnorePerms && !file.NoPermissions {
|
||||
if err := f.mtimefs.Chmod(tempName, fs.FileMode(file.Permissions&0777)); err != nil {
|
||||
if err := f.mtimefs.Chmod(tempName, fs.FileMode(file.Permissions&0o777)); err != nil {
|
||||
return fmt.Errorf("setting permissions: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -396,6 +396,24 @@ func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
|
||||
Summary: data,
|
||||
})
|
||||
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeFiles).Set(float64(data.GlobalFiles))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDirectories).Set(float64(data.GlobalDirectories))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeSymlinks).Set(float64(data.GlobalSymlinks))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDeleted).Set(float64(data.GlobalDeleted))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeBytes).Set(float64(data.GlobalBytes))
|
||||
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeFiles).Set(float64(data.LocalFiles))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDirectories).Set(float64(data.LocalDirectories))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeSymlinks).Set(float64(data.LocalSymlinks))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDeleted).Set(float64(data.LocalDeleted))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeBytes).Set(float64(data.LocalBytes))
|
||||
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeFiles).Set(float64(data.NeedFiles))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDirectories).Set(float64(data.NeedDirectories))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeSymlinks).Set(float64(data.NeedSymlinks))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDeleted).Set(float64(data.NeedDeletes))
|
||||
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeBytes).Set(float64(data.NeedBytes))
|
||||
|
||||
for _, devCfg := range c.cfg.Folders()[folder].Devices {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -111,6 +111,10 @@ func (s *stateTracker) setState(newState folderState) {
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
metricFolderState.WithLabelValues(s.folderID).Set(float64(s.current))
|
||||
}()
|
||||
|
||||
/* This should hold later...
|
||||
if s.current != FolderIdle && (newState == FolderScanning || newState == FolderSyncing) {
|
||||
panic("illegal state transition " + s.current.String() + " -> " + newState.String())
|
||||
@ -148,6 +152,10 @@ func (s *stateTracker) setError(err error) {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
|
||||
defer func() {
|
||||
metricFolderState.WithLabelValues(s.folderID).Set(float64(s.current))
|
||||
}()
|
||||
|
||||
eventData := map[string]interface{}{
|
||||
"folder": s.folderID,
|
||||
"from": s.current.String(),
|
||||
|
94
lib/model/metrics.go
Normal file
94
lib/model/metrics.go
Normal file
@ -0,0 +1,94 @@
|
||||
// Copyright (C) 2023 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var (
|
||||
metricFolderState = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "model",
|
||||
Name: "folder_state",
|
||||
Help: "Current folder state",
|
||||
}, []string{"folder"})
|
||||
|
||||
metricFolderPulls = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "model",
|
||||
Name: "folder_pulls_total",
|
||||
Help: "Total number of folder pull iterations",
|
||||
}, []string{"folder"})
|
||||
metricFolderPullSeconds = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "model",
|
||||
Name: "folder_pull_seconds_total",
|
||||
Help: "Total time spent pulling",
|
||||
}, []string{"folder"})
|
||||
|
||||
metricFolderScans = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "model",
|
||||
Name: "folder_scans_total",
|
||||
Help: "Total number of folder scan iterations",
|
||||
}, []string{"folder"})
|
||||
metricFolderScanSeconds = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "model",
|
||||
Name: "folder_scan_seconds_total",
|
||||
Help: "Total time spent scanning",
|
||||
}, []string{"folder"})
|
||||
|
||||
metricFolderProcessedBytesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "model",
|
||||
Name: "folder_processed_bytes_total",
|
||||
Help: "Total amount of data processed during folder syncing",
|
||||
}, []string{"folder", "source"})
|
||||
|
||||
metricFolderSummary = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "model",
|
||||
Name: "folder_summary",
|
||||
Help: "",
|
||||
}, []string{"folder", "scope", "type"})
|
||||
)
|
||||
|
||||
const (
|
||||
metricSourceNetwork = "network" // from the network
|
||||
metricSourceLocalOrigin = "local_origin" // from the existing version of the local file
|
||||
metricSourceLocalOther = "local_other" // from a different local file
|
||||
metricSourceLocalShifted = "local_shifted" // from the existing version of the local file, rolling hash shifted
|
||||
metricSourceSkipped = "skipped" // block of all zeroes, invented out of thin air
|
||||
|
||||
metricScopeGlobal = "global"
|
||||
metricScopeLocal = "local"
|
||||
metricScopeNeed = "need"
|
||||
|
||||
metricTypeFiles = "files"
|
||||
metricTypeDirectories = "directories"
|
||||
metricTypeSymlinks = "symlinks"
|
||||
metricTypeDeleted = "deleted"
|
||||
metricTypeBytes = "bytes"
|
||||
)
|
||||
|
||||
func registerFolderMetrics(folderID string) {
|
||||
// Register metrics for this folder, so that counters are present even
|
||||
// when zero.
|
||||
metricFolderState.WithLabelValues(folderID)
|
||||
metricFolderPulls.WithLabelValues(folderID)
|
||||
metricFolderPullSeconds.WithLabelValues(folderID)
|
||||
metricFolderScans.WithLabelValues(folderID)
|
||||
metricFolderScanSeconds.WithLabelValues(folderID)
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceNetwork)
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceLocalOrigin)
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceLocalOther)
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceLocalShifted)
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceSkipped)
|
||||
}
|
@ -91,7 +91,7 @@ func TestProgressEmitter(t *testing.T) {
|
||||
expectEvent(w, t, 1)
|
||||
expectTimeout(w, t)
|
||||
|
||||
s.copiedFromOrigin()
|
||||
s.copiedFromOrigin(1)
|
||||
|
||||
expectEvent(w, t, 1)
|
||||
expectTimeout(w, t)
|
||||
|
@ -152,11 +152,11 @@ func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
|
||||
// permissions will be set to the final value later, but in the meantime
|
||||
// we don't want to have a temporary file with looser permissions than
|
||||
// the final outcome.
|
||||
mode := fs.FileMode(s.file.Permissions) | 0600
|
||||
mode := fs.FileMode(s.file.Permissions) | 0o600
|
||||
if s.ignorePerms {
|
||||
// When ignorePerms is set we use a very permissive mode and let the
|
||||
// system umask filter it.
|
||||
mode = 0666
|
||||
mode = 0o666
|
||||
}
|
||||
|
||||
// Attempt to create the temp file
|
||||
@ -261,19 +261,34 @@ func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
|
||||
s.mut.Unlock()
|
||||
}
|
||||
|
||||
func (s *sharedPullerState) copiedFromOrigin() {
|
||||
func (s *sharedPullerState) copiedFromOrigin(bytes int) {
|
||||
s.mut.Lock()
|
||||
s.copyOrigin++
|
||||
s.updated = time.Now()
|
||||
s.mut.Unlock()
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOrigin).Add(float64(bytes))
|
||||
}
|
||||
|
||||
func (s *sharedPullerState) copiedFromOriginShifted() {
|
||||
func (s *sharedPullerState) copiedFromElsewhere(bytes int) {
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOther).Add(float64(bytes))
|
||||
}
|
||||
|
||||
func (s *sharedPullerState) skippedSparseBlock(bytes int) {
|
||||
// pretend we copied it, historical
|
||||
s.mut.Lock()
|
||||
s.copyOrigin++
|
||||
s.updated = time.Now()
|
||||
s.mut.Unlock()
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceSkipped).Add(float64(bytes))
|
||||
}
|
||||
|
||||
func (s *sharedPullerState) copiedFromOriginShifted(bytes int) {
|
||||
s.mut.Lock()
|
||||
s.copyOrigin++
|
||||
s.copyOriginShifted++
|
||||
s.updated = time.Now()
|
||||
s.mut.Unlock()
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalShifted).Add(float64(bytes))
|
||||
}
|
||||
|
||||
func (s *sharedPullerState) pullStarted() {
|
||||
@ -295,6 +310,7 @@ func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
|
||||
s.availableUpdated = time.Now()
|
||||
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
|
||||
s.mut.Unlock()
|
||||
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceNetwork).Add(float64(block.Size))
|
||||
}
|
||||
|
||||
// finalClose atomically closes and returns closed status of a file. A true
|
||||
|
@ -10,8 +10,9 @@ import (
|
||||
|
||||
type countingReader struct {
|
||||
io.Reader
|
||||
tot atomic.Int64 // bytes
|
||||
last atomic.Int64 // unix nanos
|
||||
idString string
|
||||
tot atomic.Int64 // bytes
|
||||
last atomic.Int64 // unix nanos
|
||||
}
|
||||
|
||||
var (
|
||||
@ -24,6 +25,7 @@ func (c *countingReader) Read(bs []byte) (int, error) {
|
||||
c.tot.Add(int64(n))
|
||||
totalIncoming.Add(int64(n))
|
||||
c.last.Store(time.Now().UnixNano())
|
||||
metricDeviceRecvBytes.WithLabelValues(c.idString).Add(float64(n))
|
||||
return n, err
|
||||
}
|
||||
|
||||
@ -35,8 +37,9 @@ func (c *countingReader) Last() time.Time {
|
||||
|
||||
type countingWriter struct {
|
||||
io.Writer
|
||||
tot atomic.Int64 // bytes
|
||||
last atomic.Int64 // unix nanos
|
||||
idString string
|
||||
tot atomic.Int64 // bytes
|
||||
last atomic.Int64 // unix nanos
|
||||
}
|
||||
|
||||
func (c *countingWriter) Write(bs []byte) (int, error) {
|
||||
@ -44,6 +47,7 @@ func (c *countingWriter) Write(bs []byte) (int, error) {
|
||||
c.tot.Add(int64(n))
|
||||
totalOutgoing.Add(int64(n))
|
||||
c.last.Store(time.Now().UnixNano())
|
||||
metricDeviceSentBytes.WithLabelValues(c.idString).Add(float64(n))
|
||||
return n, err
|
||||
}
|
||||
|
||||
|
56
lib/protocol/metrics.go
Normal file
56
lib/protocol/metrics.go
Normal file
@ -0,0 +1,56 @@
|
||||
// Copyright (C) 2023 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var (
|
||||
metricDeviceSentBytes = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "protocol",
|
||||
Name: "sent_bytes_total",
|
||||
Help: "Total amount of data sent",
|
||||
}, []string{"device"})
|
||||
metricDeviceSentUncompressedBytes = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "protocol",
|
||||
Name: "sent_uncompressed_bytes_total",
|
||||
Help: "Total amount of data sent, before compression",
|
||||
}, []string{"device"})
|
||||
metricDeviceSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "protocol",
|
||||
Name: "sent_messages_total",
|
||||
Help: "Total number of messages sent",
|
||||
}, []string{"device"})
|
||||
|
||||
metricDeviceRecvBytes = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "protocol",
|
||||
Name: "recv_bytes_total",
|
||||
Help: "Total amount of data received",
|
||||
}, []string{"device"})
|
||||
metricDeviceRecvMessages = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "syncthing",
|
||||
Subsystem: "protocol",
|
||||
Name: "recv_messages_total",
|
||||
Help: "Total number of messages received",
|
||||
}, []string{"device"})
|
||||
)
|
||||
|
||||
func registerDeviceMetrics(deviceID string) {
|
||||
// Register metrics for this device, so that counters are present even
|
||||
// when zero.
|
||||
metricDeviceSentBytes.WithLabelValues(deviceID)
|
||||
metricDeviceSentUncompressedBytes.WithLabelValues(deviceID)
|
||||
metricDeviceSentMessages.WithLabelValues(deviceID)
|
||||
metricDeviceRecvBytes.WithLabelValues(deviceID)
|
||||
metricDeviceRecvMessages.WithLabelValues(deviceID)
|
||||
}
|
@ -172,6 +172,7 @@ type rawConnection struct {
|
||||
ConnectionInfo
|
||||
|
||||
id DeviceID
|
||||
idString string
|
||||
receiver Model
|
||||
startTime time.Time
|
||||
|
||||
@ -245,12 +246,15 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer
|
||||
}
|
||||
|
||||
func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver Model, connInfo ConnectionInfo, compress Compression) *rawConnection {
|
||||
cr := &countingReader{Reader: reader}
|
||||
cw := &countingWriter{Writer: writer}
|
||||
idString := deviceID.String()
|
||||
cr := &countingReader{Reader: reader, idString: idString}
|
||||
cw := &countingWriter{Writer: writer, idString: idString}
|
||||
registerDeviceMetrics(idString)
|
||||
|
||||
return &rawConnection{
|
||||
ConnectionInfo: connInfo,
|
||||
id: deviceID,
|
||||
idString: deviceID.String(),
|
||||
receiver: receiver,
|
||||
cr: cr,
|
||||
cw: cw,
|
||||
@ -427,6 +431,8 @@ func (c *rawConnection) dispatcherLoop() (err error) {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
metricDeviceRecvMessages.WithLabelValues(c.idString).Inc()
|
||||
|
||||
msgContext, err := messageContext(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("protocol error: %w", err)
|
||||
@ -740,6 +746,10 @@ func (c *rawConnection) writeMessage(msg message) error {
|
||||
msgContext, _ := messageContext(msg)
|
||||
l.Debugf("Writing %v", msgContext)
|
||||
|
||||
defer func() {
|
||||
metricDeviceSentMessages.WithLabelValues(c.idString).Inc()
|
||||
}()
|
||||
|
||||
size := msg.ProtoSize()
|
||||
hdr := Header{
|
||||
Type: typeOf(msg),
|
||||
@ -766,6 +776,8 @@ func (c *rawConnection) writeMessage(msg message) error {
|
||||
}
|
||||
}
|
||||
|
||||
metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(totSize))
|
||||
|
||||
// Header length
|
||||
binary.BigEndian.PutUint16(buf, uint16(hdrSize))
|
||||
// Header
|
||||
@ -799,6 +811,9 @@ func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte) (o
|
||||
}
|
||||
|
||||
cOverhead := 2 + hdrSize + 4
|
||||
|
||||
metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(cOverhead + len(marshaled)))
|
||||
|
||||
// The compressed size may be at most n-n/32 = .96875*n bytes,
|
||||
// I.e., if we can't save at least 3.125% bandwidth, we forgo compression.
|
||||
// This number is arbitrary but cheap to compute.
|
||||
|
Loading…
Reference in New Issue
Block a user