mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-09 14:50:56 +00:00
I was working on indirecting version vectors, and that resulted in some refactoring and improving the existing block indirection stuff. We may or may not end up doing the version vector indirection, but I think these changes are reasonable anyhow and will simplify the diff significantly if we do go there. The main points are: - A bunch of renaming to make the indirection and GC not about "blocks" but about "indirection". - Adding a cutoff so that we don't actually indirect for small block lists. This gets us better performance when handling small files as it cuts out the indirection for quite small loss in space efficiency. - Being paranoid and always recalculating the hash on put. This costs some CPU, but the consequences if a buggy or malicious implementation silently substituted the block list by lying about the hash would be bad.
This commit is contained in:
parent
a5e12a0a3d
commit
ee61da5b6a
@ -148,7 +148,7 @@ func idxck(ldb backend.Backend) (success bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if fi.BlocksHash != nil {
|
if len(fi.Blocks) == 0 && len(fi.BlocksHash) != 0 {
|
||||||
key := string(fi.BlocksHash)
|
key := string(fi.BlocksHash)
|
||||||
if _, ok := blocklists[key]; !ok {
|
if _, ok := blocklists[key]; !ok {
|
||||||
fmt.Printf("Missing block list for file %q, block list hash %x\n", fi.Name, fi.BlocksHash)
|
fmt.Printf("Missing block list for file %q, block list hash %x\n", fi.Name, fi.BlocksHash)
|
||||||
|
@ -119,8 +119,8 @@ are mostly useful for developers. Use with care.
|
|||||||
"h", "m" and "s" abbreviations for hours minutes and seconds.
|
"h", "m" and "s" abbreviations for hours minutes and seconds.
|
||||||
Valid values are like "720h", "30s", etc.
|
Valid values are like "720h", "30s", etc.
|
||||||
|
|
||||||
STGCBLOCKSEVERY Set to a time interval to override the default database
|
STGCINDIRECTEVERY Set to a time interval to override the default database
|
||||||
block GC interval of 13 hours. Same format as the
|
indirection GC interval of 13 hours. Same format as the
|
||||||
STRECHECKDBEVERY variable.
|
STRECHECKDBEVERY variable.
|
||||||
|
|
||||||
GOMAXPROCS Set the maximum number of CPU cores to use. Defaults to all
|
GOMAXPROCS Set the maximum number of CPU cores to use. Defaults to all
|
||||||
|
@ -19,22 +19,30 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// We set the bloom filter capacity to handle 100k individual block lists
|
// We set the bloom filter capacity to handle 100k individual items with
|
||||||
// with a false positive probability of 1% for the first pass. Once we know
|
// a false positive probability of 1% for the first pass. Once we know
|
||||||
// how many block lists we have we will use that number instead, if it's
|
// how many items we have we will use that number instead, if it's more
|
||||||
// more than 100k. For fewer than 100k block lists we will just get better
|
// than 100k. For fewer than 100k items we will just get better false
|
||||||
// false positive rate instead.
|
// positive rate instead.
|
||||||
blockGCBloomCapacity = 100000
|
indirectGCBloomCapacity = 100000
|
||||||
blockGCBloomFalsePositiveRate = 0.01 // 1%
|
indirectGCBloomFalsePositiveRate = 0.01 // 1%
|
||||||
blockGCDefaultInterval = 13 * time.Hour
|
indirectGCDefaultInterval = 13 * time.Hour
|
||||||
blockGCTimeKey = "lastBlockGCTime"
|
indirectGCTimeKey = "lastIndirectGCTime"
|
||||||
|
|
||||||
|
// Use indirection for the block list when it exceeds this many entries
|
||||||
|
blocksIndirectionCutoff = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
var blockGCInterval = blockGCDefaultInterval
|
var indirectGCInterval = indirectGCDefaultInterval
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
// deprecated
|
||||||
if dur, err := time.ParseDuration(os.Getenv("STGCBLOCKSEVERY")); err == nil {
|
if dur, err := time.ParseDuration(os.Getenv("STGCBLOCKSEVERY")); err == nil {
|
||||||
blockGCInterval = dur
|
indirectGCInterval = dur
|
||||||
|
}
|
||||||
|
// current
|
||||||
|
if dur, err := time.ParseDuration(os.Getenv("STGCINDIRECTEVERY")); err == nil {
|
||||||
|
indirectGCInterval = dur
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -485,18 +493,18 @@ func (db *Lowlevel) dropPrefix(prefix []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *Lowlevel) gcRunner() {
|
func (db *Lowlevel) gcRunner() {
|
||||||
t := time.NewTimer(db.timeUntil(blockGCTimeKey, blockGCInterval))
|
t := time.NewTimer(db.timeUntil(indirectGCTimeKey, indirectGCInterval))
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-db.gcStop:
|
case <-db.gcStop:
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
if err := db.gcBlocks(); err != nil {
|
if err := db.gcIndirect(); err != nil {
|
||||||
l.Warnln("Database block GC failed:", err)
|
l.Warnln("Database indirection GC failed:", err)
|
||||||
}
|
}
|
||||||
db.recordTime(blockGCTimeKey)
|
db.recordTime(indirectGCTimeKey)
|
||||||
t.Reset(db.timeUntil(blockGCTimeKey, blockGCInterval))
|
t.Reset(db.timeUntil(indirectGCTimeKey, indirectGCInterval))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -521,15 +529,16 @@ func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
|
|||||||
return sleepTime
|
return sleepTime
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Lowlevel) gcBlocks() error {
|
func (db *Lowlevel) gcIndirect() error {
|
||||||
// The block GC uses a bloom filter to track used block lists. This means
|
// The indirection GC uses bloom filters to track used block lists and
|
||||||
// iterating over all items, adding their block lists to the filter, then
|
// versions. This means iterating over all items, adding their hashes to
|
||||||
// iterating over the block lists and removing those that don't match the
|
// the filter, then iterating over the indirected items and removing
|
||||||
// filter. The filter will give false positives so we will keep around one
|
// those that don't match the filter. The filter will give false
|
||||||
// percent of block lists that we don't really need (at most).
|
// positives so we will keep around one percent of things that we don't
|
||||||
|
// really need (at most).
|
||||||
//
|
//
|
||||||
// Block GC needs to run when there are no modifications to the FileInfos or
|
// Indirection GC needs to run when there are no modifications to the
|
||||||
// block lists.
|
// FileInfos or indirected items.
|
||||||
|
|
||||||
db.gcMut.Lock()
|
db.gcMut.Lock()
|
||||||
defer db.gcMut.Unlock()
|
defer db.gcMut.Unlock()
|
||||||
@ -540,18 +549,19 @@ func (db *Lowlevel) gcBlocks() error {
|
|||||||
}
|
}
|
||||||
defer t.Release()
|
defer t.Release()
|
||||||
|
|
||||||
// Set up the bloom filter with the initial capacity and false positive
|
// Set up the bloom filters with the initial capacity and false positive
|
||||||
// rate, or higher capacity if we've done this before and seen lots of block
|
// rate, or higher capacity if we've done this before and seen lots of
|
||||||
// lists.
|
// items. For simplicity's sake we track just one count, which is the
|
||||||
|
// highest of the various indirected items.
|
||||||
|
|
||||||
capacity := blockGCBloomCapacity
|
capacity := indirectGCBloomCapacity
|
||||||
if db.gcKeyCount > capacity {
|
if db.gcKeyCount > capacity {
|
||||||
capacity = db.gcKeyCount
|
capacity = db.gcKeyCount
|
||||||
}
|
}
|
||||||
filter := bloom.NewWithEstimates(uint(capacity), blockGCBloomFalsePositiveRate)
|
blockFilter := bloom.NewWithEstimates(uint(capacity), indirectGCBloomFalsePositiveRate)
|
||||||
|
|
||||||
// Iterate the FileInfos, unmarshal the blocks hashes and add them to
|
// Iterate the FileInfos, unmarshal the block and version hashes and
|
||||||
// the filter.
|
// add them to the filter.
|
||||||
|
|
||||||
it, err := db.NewPrefixIterator([]byte{KeyTypeDevice})
|
it, err := db.NewPrefixIterator([]byte{KeyTypeDevice})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -563,7 +573,7 @@ func (db *Lowlevel) gcBlocks() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(bl.BlocksHash) > 0 {
|
if len(bl.BlocksHash) > 0 {
|
||||||
filter.Add(bl.BlocksHash)
|
blockFilter.Add(bl.BlocksHash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
it.Release()
|
it.Release()
|
||||||
@ -578,11 +588,11 @@ func (db *Lowlevel) gcBlocks() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
matched := 0
|
matchedBlocks := 0
|
||||||
for it.Next() {
|
for it.Next() {
|
||||||
key := blockListKey(it.Key())
|
key := blockListKey(it.Key())
|
||||||
if filter.Test(key.BlocksHash()) {
|
if blockFilter.Test(key.BlocksHash()) {
|
||||||
matched++
|
matchedBlocks++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := t.Delete(key); err != nil {
|
if err := t.Delete(key); err != nil {
|
||||||
@ -595,7 +605,7 @@ func (db *Lowlevel) gcBlocks() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remember the number of unique keys we kept until the next pass.
|
// Remember the number of unique keys we kept until the next pass.
|
||||||
db.gcKeyCount = matched
|
db.gcKeyCount = matchedBlocks
|
||||||
|
|
||||||
if err := t.Commit(); err != nil {
|
if err := t.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -446,10 +446,11 @@ func (db *schemaUpdater) updateSchemato9(prev int) error {
|
|||||||
}
|
}
|
||||||
metas := make(map[string]*metadataTracker)
|
metas := make(map[string]*metadataTracker)
|
||||||
for it.Next() {
|
for it.Next() {
|
||||||
var fi protocol.FileInfo
|
intf, err := t.unmarshalTrunc(it.Value(), false)
|
||||||
if err := fi.Unmarshal(it.Value()); err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
fi := intf.(protocol.FileInfo)
|
||||||
device, ok := t.keyer.DeviceFromDeviceFileKey(it.Key())
|
device, ok := t.keyer.DeviceFromDeviceFileKey(it.Key())
|
||||||
if !ok {
|
if !ok {
|
||||||
return errDeviceIdxMissing
|
return errDeviceIdxMissing
|
||||||
@ -510,7 +511,7 @@ func (db *schemaUpdater) updateSchemato9(prev int) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
db.recordTime(blockGCTimeKey)
|
db.recordTime(indirectGCTimeKey)
|
||||||
|
|
||||||
return t.Commit()
|
return t.Commit()
|
||||||
}
|
}
|
||||||
|
@ -78,22 +78,24 @@ func (t readOnlyTransaction) unmarshalTrunc(bs []byte, trunc bool) (FileIntf, er
|
|||||||
return tf, nil
|
return tf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var tf protocol.FileInfo
|
var fi protocol.FileInfo
|
||||||
if err := tf.Unmarshal(bs); err != nil {
|
if err := fi.Unmarshal(bs); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := t.fillBlockList(&tf); err != nil {
|
if err := t.fillFileInfo(&fi); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return tf, nil
|
return fi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t readOnlyTransaction) fillBlockList(fi *protocol.FileInfo) error {
|
// fillFileInfo follows the (possible) indirection of blocks and fills it out.
|
||||||
if len(fi.BlocksHash) == 0 {
|
func (t readOnlyTransaction) fillFileInfo(fi *protocol.FileInfo) error {
|
||||||
return nil
|
var key []byte
|
||||||
}
|
|
||||||
blocksKey := t.keyer.GenerateBlockListKey(nil, fi.BlocksHash)
|
if len(fi.Blocks) == 0 && len(fi.BlocksHash) != 0 {
|
||||||
bs, err := t.Get(blocksKey)
|
// The blocks list is indirected and we need to load it.
|
||||||
|
key = t.keyer.GenerateBlockListKey(key, fi.BlocksHash)
|
||||||
|
bs, err := t.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -102,6 +104,8 @@ func (t readOnlyTransaction) fillBlockList(fi *protocol.FileInfo) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fi.Blocks = bl.Blocks
|
fi.Blocks = bl.Blocks
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -453,26 +457,33 @@ func (t readWriteTransaction) close() {
|
|||||||
t.WriteTransaction.Release()
|
t.WriteTransaction.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t readWriteTransaction) putFile(key []byte, fi protocol.FileInfo) error {
|
func (t readWriteTransaction) putFile(fkey []byte, fi protocol.FileInfo) error {
|
||||||
if fi.Blocks != nil {
|
var bkey []byte
|
||||||
if fi.BlocksHash == nil {
|
|
||||||
|
// Always set the blocks hash when there are blocks.
|
||||||
|
if len(fi.Blocks) > 0 {
|
||||||
fi.BlocksHash = protocol.BlocksHash(fi.Blocks)
|
fi.BlocksHash = protocol.BlocksHash(fi.Blocks)
|
||||||
|
} else {
|
||||||
|
fi.BlocksHash = nil
|
||||||
}
|
}
|
||||||
blocksKey := t.keyer.GenerateBlockListKey(nil, fi.BlocksHash)
|
|
||||||
if _, err := t.Get(blocksKey); backend.IsNotFound(err) {
|
// Indirect the blocks if the block list is large enough.
|
||||||
|
if len(fi.Blocks) > blocksIndirectionCutoff {
|
||||||
|
bkey = t.keyer.GenerateBlockListKey(bkey, fi.BlocksHash)
|
||||||
|
if _, err := t.Get(bkey); backend.IsNotFound(err) {
|
||||||
// Marshal the block list and save it
|
// Marshal the block list and save it
|
||||||
blocksBs := mustMarshal(&BlockList{Blocks: fi.Blocks})
|
blocksBs := mustMarshal(&BlockList{Blocks: fi.Blocks})
|
||||||
if err := t.Put(blocksKey, blocksBs); err != nil {
|
if err := t.Put(bkey, blocksBs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
fi.Blocks = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fi.Blocks = nil
|
|
||||||
fiBs := mustMarshal(&fi)
|
fiBs := mustMarshal(&fi)
|
||||||
return t.Put(key, fiBs)
|
return t.Put(fkey, fiBs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateGlobal adds this device+version to the version list for the given
|
// updateGlobal adds this device+version to the version list for the given
|
||||||
@ -723,15 +734,12 @@ func (t *readWriteTransaction) withAllFolderTruncated(folder []byte, fn func(dev
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var f FileInfoTruncated
|
|
||||||
// The iterator function may keep a reference to the unmarshalled
|
intf, err := t.unmarshalTrunc(dbi.Value(), true)
|
||||||
// struct, which in turn references the buffer it was unmarshalled
|
|
||||||
// from. dbi.Value() just returns an internal slice that it reuses, so
|
|
||||||
// we need to copy it.
|
|
||||||
err := f.Unmarshal(append([]byte{}, dbi.Value()...))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
f := intf.(FileInfoTruncated)
|
||||||
|
|
||||||
switch f.Name {
|
switch f.Name {
|
||||||
case "", ".", "..", "/": // A few obviously invalid filenames
|
case "", ".", "..", "/": // A few obviously invalid filenames
|
||||||
|
Loading…
Reference in New Issue
Block a user