This commit is contained in:
Jakob Borg 2023-06-29 15:35:25 +02:00
parent dca496cd7d
commit 2a77eb54ee
6 changed files with 63 additions and 22 deletions

View File

@ -1139,12 +1139,12 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot
func (f *sendReceiveFolder) reuseBlocks(blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) { func (f *sendReceiveFolder) reuseBlocks(blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
// Check for an old temporary file which might have some blocks we could // Check for an old temporary file which might have some blocks we could
// reuse. // reuse.
tempBlocks, err := scanner.HashFile(f.ctx, f.mtimefs, tempName, file.BlockSize(), nil, false) tempBlocks, err := scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil, false)
if err != nil { if err != nil {
var caseErr *fs.ErrCaseConflict var caseErr *fs.ErrCaseConflict
if errors.As(err, &caseErr) { if errors.As(err, &caseErr) {
if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil { if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil {
tempBlocks, err = scanner.HashFile(f.ctx, f.mtimefs, tempName, file.BlockSize(), nil, false) tempBlocks, err = scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil, false)
} }
} }
} }

View File

@ -299,7 +299,7 @@ func TestCopierFinder(t *testing.T) {
} }
// Verify that the fetched blocks have actually been written to the temp file // Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(context.TODO(), f.Filesystem(nil), tempFile, protocol.MinBlockSize, nil, false) blks, err := scanner.HashFile(context.TODO(), f.ID, f.Filesystem(nil), tempFile, protocol.MinBlockSize, nil, false)
if err != nil { if err != nil {
t.Log(err) t.Log(err)
} }

View File

@ -16,7 +16,7 @@ import (
) )
// HashFile hashes the files and returns a list of blocks representing the file. // HashFile hashes the files and returns a list of blocks representing the file.
func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) { func HashFile(ctx context.Context, folderID string, fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
fd, err := fs.Open(path) fd, err := fs.Open(path)
if err != nil { if err != nil {
l.Debugln("open:", err) l.Debugln("open:", err)
@ -42,6 +42,8 @@ func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int,
return nil, err return nil, err
} }
metricHashesBytes.WithLabelValues(folderID).Add(float64(size))
// Recheck the size and modtime again. If they differ, the file changed // Recheck the size and modtime again. If they differ, the file changed
// while we were reading it and our hash results are invalid. // while we were reading it and our hash results are invalid.
@ -62,22 +64,24 @@ func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int,
// workers are used in parallel. The outbox will become closed when the inbox // workers are used in parallel. The outbox will become closed when the inbox
// is closed and all items handled. // is closed and all items handled.
type parallelHasher struct { type parallelHasher struct {
fs fs.Filesystem folderID string
outbox chan<- ScanResult fs fs.Filesystem
inbox <-chan protocol.FileInfo outbox chan<- ScanResult
counter Counter inbox <-chan protocol.FileInfo
done chan<- struct{} counter Counter
wg sync.WaitGroup done chan<- struct{}
wg sync.WaitGroup
} }
func newParallelHasher(ctx context.Context, fs fs.Filesystem, workers int, outbox chan<- ScanResult, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}) { func newParallelHasher(ctx context.Context, folderID string, fs fs.Filesystem, workers int, outbox chan<- ScanResult, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}) {
ph := &parallelHasher{ ph := &parallelHasher{
fs: fs, folderID: folderID,
outbox: outbox, fs: fs,
inbox: inbox, outbox: outbox,
counter: counter, inbox: inbox,
done: done, counter: counter,
wg: sync.NewWaitGroup(), done: done,
wg: sync.NewWaitGroup(),
} }
ph.wg.Add(workers) ph.wg.Add(workers)
@ -104,7 +108,7 @@ func (ph *parallelHasher) hashFiles(ctx context.Context) {
panic("Bug. Asked to hash a directory or a deleted file.") panic("Bug. Asked to hash a directory or a deleted file.")
} }
blocks, err := HashFile(ctx, ph.fs, f.Name, f.BlockSize(), ph.counter, true) blocks, err := HashFile(ctx, ph.folderID, ph.fs, f.Name, f.BlockSize(), ph.counter, true)
if err != nil { if err != nil {
handleError(ctx, "hashing", f.Name, err, ph.outbox) handleError(ctx, "hashing", f.Name, err, ph.outbox)
continue continue

35
lib/scanner/metrics.go Normal file
View File

@ -0,0 +1,35 @@
// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package scanner
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
metricHashesBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "scanner",
Name: "hashed_bytes_total",
Help: "Total amount of data hashed",
}, []string{"folder"})
metricScannedItems = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "scanner",
Name: "scanned_items_total",
Help: "Total number of files/directories inspected",
}, []string{"folder"})
)
func registerFolderMetrics(folderID string) {
// Register metrics for this folder, so that counters are present even
// when zero.
metricHashesBytes.WithLabelValues(folderID)
metricScannedItems.WithLabelValues(folderID)
}

View File

@ -132,7 +132,7 @@ func (w *walker) walk(ctx context.Context) chan ScanResult {
// We're not required to emit scan progress events, just kick off hashers, // We're not required to emit scan progress events, just kick off hashers,
// and feed inputs directly from the walker. // and feed inputs directly from the walker.
if w.ProgressTickIntervalS < 0 { if w.ProgressTickIntervalS < 0 {
newParallelHasher(ctx, w.Filesystem, w.Hashers, finishedChan, toHashChan, nil, nil) newParallelHasher(ctx, w.Folder, w.Filesystem, w.Hashers, finishedChan, toHashChan, nil, nil)
return finishedChan return finishedChan
} }
@ -163,7 +163,7 @@ func (w *walker) walk(ctx context.Context) chan ScanResult {
done := make(chan struct{}) done := make(chan struct{})
progress := newByteCounter() progress := newByteCounter()
newParallelHasher(ctx, w.Filesystem, w.Hashers, finishedChan, realToHashChan, progress, done) newParallelHasher(ctx, w.Folder, w.Filesystem, w.Hashers, finishedChan, realToHashChan, progress, done)
// A routine which actually emits the FolderScanProgress events // A routine which actually emits the FolderScanProgress events
// every w.ProgressTicker ticks, until the hasher routines terminate. // every w.ProgressTicker ticks, until the hasher routines terminate.
@ -255,6 +255,8 @@ func (w *walker) walkAndHashFiles(ctx context.Context, toHashChan chan<- protoco
default: default:
} }
metricScannedItems.WithLabelValues(w.Folder).Inc()
// Return value used when we are returning early and don't want to // Return value used when we are returning early and don't want to
// process the item. For directories, this means do-not-descend. // process the item. For directories, this means do-not-descend.
var skip error // nil var skip error // nil
@ -599,7 +601,7 @@ func (w *walker) updateFileInfo(dst, src protocol.FileInfo) protocol.FileInfo {
if dst.Type == protocol.FileInfoTypeFile && build.IsWindows { if dst.Type == protocol.FileInfoTypeFile && build.IsWindows {
// If we have an existing index entry, copy the executable bits // If we have an existing index entry, copy the executable bits
// from there. // from there.
dst.Permissions |= (src.Permissions & 0111) dst.Permissions |= (src.Permissions & 0o111)
} }
dst.Version = src.Version.Update(w.ShortID) dst.Version = src.Version.Update(w.ShortID)
dst.ModifiedBy = w.ShortID dst.ModifiedBy = w.ShortID

View File

@ -635,7 +635,7 @@ func BenchmarkHashFile(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
if _, err := HashFile(context.TODO(), testFs, testdataName, protocol.MinBlockSize, nil, true); err != nil { if _, err := HashFile(context.TODO(), "", testFs, testdataName, protocol.MinBlockSize, nil, true); err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }