mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 22:58:25 +00:00
lib/db: Do all update operations on a single item at once (#5441)
To do so the BlockMap struct has been removed. It behaves like any other prefixed part of the database, but was not integrated in the recent keyer refactor. Now the database is only flushed when files are in a consistent state.
This commit is contained in:
parent
6421693cce
commit
42bd42df5a
@ -11,125 +11,14 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/osutil"
|
"github.com/syncthing/syncthing/lib/osutil"
|
||||||
"github.com/syncthing/syncthing/lib/protocol"
|
|
||||||
|
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/util"
|
"github.com/syndtr/goleveldb/leveldb/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
var blockFinder *BlockFinder
|
var blockFinder *BlockFinder
|
||||||
|
|
||||||
const maxBatchSize = 1000
|
|
||||||
|
|
||||||
type BlockMap struct {
|
|
||||||
db *Lowlevel
|
|
||||||
folder uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewBlockMap(db *Lowlevel, folder string) *BlockMap {
|
|
||||||
return &BlockMap{
|
|
||||||
db: db,
|
|
||||||
folder: db.folderIdx.ID([]byte(folder)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add files to the block map, ignoring any deleted or invalid files.
|
|
||||||
func (m *BlockMap) Add(files []protocol.FileInfo) error {
|
|
||||||
batch := new(leveldb.Batch)
|
|
||||||
buf := make([]byte, 4)
|
|
||||||
var key []byte
|
|
||||||
for _, file := range files {
|
|
||||||
m.checkFlush(batch)
|
|
||||||
|
|
||||||
if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, block := range file.Blocks {
|
|
||||||
binary.BigEndian.PutUint32(buf, uint32(i))
|
|
||||||
key = m.blockKeyInto(key, block.Hash, file.Name)
|
|
||||||
batch.Put(key, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return m.db.Write(batch, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update block map state, removing any deleted or invalid files.
|
|
||||||
func (m *BlockMap) Update(files []protocol.FileInfo) error {
|
|
||||||
batch := new(leveldb.Batch)
|
|
||||||
buf := make([]byte, 4)
|
|
||||||
var key []byte
|
|
||||||
for _, file := range files {
|
|
||||||
m.checkFlush(batch)
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case file.IsDirectory():
|
|
||||||
case file.IsDeleted() || file.IsInvalid():
|
|
||||||
for _, block := range file.Blocks {
|
|
||||||
key = m.blockKeyInto(key, block.Hash, file.Name)
|
|
||||||
batch.Delete(key)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
for i, block := range file.Blocks {
|
|
||||||
binary.BigEndian.PutUint32(buf, uint32(i))
|
|
||||||
key = m.blockKeyInto(key, block.Hash, file.Name)
|
|
||||||
batch.Put(key, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return m.db.Write(batch, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Discard block map state, removing the given files
|
|
||||||
func (m *BlockMap) Discard(files []protocol.FileInfo) error {
|
|
||||||
batch := new(leveldb.Batch)
|
|
||||||
var key []byte
|
|
||||||
for _, file := range files {
|
|
||||||
m.checkFlush(batch)
|
|
||||||
m.discard(file, key, batch)
|
|
||||||
}
|
|
||||||
return m.db.Write(batch, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *BlockMap) discard(file protocol.FileInfo, key []byte, batch *leveldb.Batch) {
|
|
||||||
for _, block := range file.Blocks {
|
|
||||||
key = m.blockKeyInto(key, block.Hash, file.Name)
|
|
||||||
batch.Delete(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *BlockMap) checkFlush(batch *leveldb.Batch) error {
|
|
||||||
if batch.Len() > maxBatchSize {
|
|
||||||
if err := m.db.Write(batch, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
batch.Reset()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drop block map, removing all entries related to this block map from the db.
|
|
||||||
func (m *BlockMap) Drop() error {
|
|
||||||
batch := new(leveldb.Batch)
|
|
||||||
iter := m.db.NewIterator(util.BytesPrefix(m.blockKeyInto(nil, nil, "")[:keyPrefixLen+keyFolderLen]), nil)
|
|
||||||
defer iter.Release()
|
|
||||||
for iter.Next() {
|
|
||||||
m.checkFlush(batch)
|
|
||||||
|
|
||||||
batch.Delete(iter.Key())
|
|
||||||
}
|
|
||||||
if iter.Error() != nil {
|
|
||||||
return iter.Error()
|
|
||||||
}
|
|
||||||
return m.db.Write(batch, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *BlockMap) blockKeyInto(o, hash []byte, file string) []byte {
|
|
||||||
return blockKeyInto(o, hash, m.folder, file)
|
|
||||||
}
|
|
||||||
|
|
||||||
type BlockFinder struct {
|
type BlockFinder struct {
|
||||||
db *Lowlevel
|
db *instance
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockFinder(db *Lowlevel) *BlockFinder {
|
func NewBlockFinder(db *Lowlevel) *BlockFinder {
|
||||||
@ -137,11 +26,9 @@ func NewBlockFinder(db *Lowlevel) *BlockFinder {
|
|||||||
return blockFinder
|
return blockFinder
|
||||||
}
|
}
|
||||||
|
|
||||||
f := &BlockFinder{
|
return &BlockFinder{
|
||||||
db: db,
|
db: newInstance(db),
|
||||||
}
|
}
|
||||||
|
|
||||||
return f
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *BlockFinder) String() string {
|
func (f *BlockFinder) String() string {
|
||||||
@ -154,52 +41,23 @@ func (f *BlockFinder) String() string {
|
|||||||
// reason. The iterator finally returns the result, whether or not a
|
// reason. The iterator finally returns the result, whether or not a
|
||||||
// satisfying block was eventually found.
|
// satisfying block was eventually found.
|
||||||
func (f *BlockFinder) Iterate(folders []string, hash []byte, iterFn func(string, string, int32) bool) bool {
|
func (f *BlockFinder) Iterate(folders []string, hash []byte, iterFn func(string, string, int32) bool) bool {
|
||||||
|
t := f.db.newReadOnlyTransaction()
|
||||||
|
defer t.close()
|
||||||
|
|
||||||
var key []byte
|
var key []byte
|
||||||
for _, folder := range folders {
|
for _, folder := range folders {
|
||||||
folderID := f.db.folderIdx.ID([]byte(folder))
|
key = f.db.keyer.GenerateBlockMapKey(key, []byte(folder), hash, nil)
|
||||||
key = blockKeyInto(key, hash, folderID, "")
|
iter := t.NewIterator(util.BytesPrefix(key), nil)
|
||||||
iter := f.db.NewIterator(util.BytesPrefix(key), nil)
|
|
||||||
defer iter.Release()
|
|
||||||
|
|
||||||
for iter.Next() && iter.Error() == nil {
|
for iter.Next() && iter.Error() == nil {
|
||||||
file := blockKeyName(iter.Key())
|
file := string(f.db.keyer.NameFromBlockMapKey(iter.Key()))
|
||||||
index := int32(binary.BigEndian.Uint32(iter.Value()))
|
index := int32(binary.BigEndian.Uint32(iter.Value()))
|
||||||
if iterFn(folder, osutil.NativeFilename(file), index) {
|
if iterFn(folder, osutil.NativeFilename(file), index) {
|
||||||
|
iter.Release()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
iter.Release()
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// m.blockKey returns a byte slice encoding the following information:
|
|
||||||
// keyTypeBlock (1 byte)
|
|
||||||
// folder (4 bytes)
|
|
||||||
// block hash (32 bytes)
|
|
||||||
// file name (variable size)
|
|
||||||
func blockKeyInto(o, hash []byte, folder uint32, file string) []byte {
|
|
||||||
reqLen := keyPrefixLen + keyFolderLen + keyHashLen + len(file)
|
|
||||||
if cap(o) < reqLen {
|
|
||||||
o = make([]byte, reqLen)
|
|
||||||
} else {
|
|
||||||
o = o[:reqLen]
|
|
||||||
}
|
|
||||||
o[0] = KeyTypeBlock
|
|
||||||
binary.BigEndian.PutUint32(o[keyPrefixLen:], folder)
|
|
||||||
copy(o[keyPrefixLen+keyFolderLen:], hash)
|
|
||||||
copy(o[keyPrefixLen+keyFolderLen+keyHashLen:], []byte(file))
|
|
||||||
return o
|
|
||||||
}
|
|
||||||
|
|
||||||
// blockKeyName returns the file name from the block key
|
|
||||||
func blockKeyName(data []byte) string {
|
|
||||||
if len(data) < keyPrefixLen+keyFolderLen+keyHashLen+1 {
|
|
||||||
panic("Incorrect key length")
|
|
||||||
}
|
|
||||||
if data[0] != KeyTypeBlock {
|
|
||||||
panic("Incorrect key type")
|
|
||||||
}
|
|
||||||
|
|
||||||
file := string(data[keyPrefixLen+keyFolderLen+keyHashLen:])
|
|
||||||
return file
|
|
||||||
}
|
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/protocol"
|
"github.com/syncthing/syncthing/lib/protocol"
|
||||||
@ -48,19 +49,53 @@ func init() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func setup() (*Lowlevel, *BlockFinder) {
|
func setup() (*instance, *BlockFinder) {
|
||||||
// Setup
|
// Setup
|
||||||
|
|
||||||
db := OpenMemory()
|
db := OpenMemory()
|
||||||
return db, NewBlockFinder(db)
|
return newInstance(db), NewBlockFinder(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func dbEmpty(db *Lowlevel) bool {
|
func dbEmpty(db *instance) bool {
|
||||||
iter := db.NewIterator(util.BytesPrefix([]byte{KeyTypeBlock}), nil)
|
iter := db.NewIterator(util.BytesPrefix([]byte{KeyTypeBlock}), nil)
|
||||||
defer iter.Release()
|
defer iter.Release()
|
||||||
return !iter.Next()
|
return !iter.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addToBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) {
|
||||||
|
t := db.newReadWriteTransaction()
|
||||||
|
defer t.close()
|
||||||
|
|
||||||
|
var keyBuf []byte
|
||||||
|
blockBuf := make([]byte, 4)
|
||||||
|
for _, f := range fs {
|
||||||
|
if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() {
|
||||||
|
name := []byte(f.Name)
|
||||||
|
for i, block := range f.Blocks {
|
||||||
|
binary.BigEndian.PutUint32(blockBuf, uint32(i))
|
||||||
|
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
|
||||||
|
t.Put(keyBuf, blockBuf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func discardFromBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) {
|
||||||
|
t := db.newReadWriteTransaction()
|
||||||
|
defer t.close()
|
||||||
|
|
||||||
|
var keyBuf []byte
|
||||||
|
for _, ef := range fs {
|
||||||
|
if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
|
||||||
|
name := []byte(ef.Name)
|
||||||
|
for _, block := range ef.Blocks {
|
||||||
|
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
|
||||||
|
t.Delete(keyBuf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestBlockMapAddUpdateWipe(t *testing.T) {
|
func TestBlockMapAddUpdateWipe(t *testing.T) {
|
||||||
db, f := setup()
|
db, f := setup()
|
||||||
|
|
||||||
@ -68,14 +103,11 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
|
|||||||
t.Fatal("db not empty")
|
t.Fatal("db not empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
m := NewBlockMap(db, "folder1")
|
folder := []byte("folder1")
|
||||||
|
|
||||||
f3.Type = protocol.FileInfoTypeDirectory
|
f3.Type = protocol.FileInfoTypeDirectory
|
||||||
|
|
||||||
err := m.Add([]protocol.FileInfo{f1, f2, f3})
|
addToBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3})
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
||||||
if folder != "folder1" || file != "f1" || index != 0 {
|
if folder != "folder1" || file != "f1" || index != 0 {
|
||||||
@ -96,14 +128,12 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
|
|||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
discardFromBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3})
|
||||||
|
|
||||||
f1.Deleted = true
|
f1.Deleted = true
|
||||||
f2.LocalFlags = protocol.FlagLocalMustRescan // one of the invalid markers
|
f2.LocalFlags = protocol.FlagLocalMustRescan // one of the invalid markers
|
||||||
|
|
||||||
// Should remove
|
addToBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3})
|
||||||
err = m.Update([]protocol.FileInfo{f1, f2, f3})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
||||||
t.Fatal("Unexpected block")
|
t.Fatal("Unexpected block")
|
||||||
@ -122,20 +152,14 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
|
|||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
err = m.Drop()
|
db.dropFolder(folder)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !dbEmpty(db) {
|
if !dbEmpty(db) {
|
||||||
t.Fatal("db not empty")
|
t.Fatal("db not empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should not add
|
// Should not add
|
||||||
err = m.Add([]protocol.FileInfo{f1, f2})
|
addToBlockMap(db, folder, []protocol.FileInfo{f1, f2})
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !dbEmpty(db) {
|
if !dbEmpty(db) {
|
||||||
t.Fatal("db not empty")
|
t.Fatal("db not empty")
|
||||||
@ -152,17 +176,11 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
|
|||||||
func TestBlockFinderLookup(t *testing.T) {
|
func TestBlockFinderLookup(t *testing.T) {
|
||||||
db, f := setup()
|
db, f := setup()
|
||||||
|
|
||||||
m1 := NewBlockMap(db, "folder1")
|
folder1 := []byte("folder1")
|
||||||
m2 := NewBlockMap(db, "folder2")
|
folder2 := []byte("folder2")
|
||||||
|
|
||||||
err := m1.Add([]protocol.FileInfo{f1})
|
addToBlockMap(db, folder1, []protocol.FileInfo{f1})
|
||||||
if err != nil {
|
addToBlockMap(db, folder2, []protocol.FileInfo{f1})
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
err = m2.Add([]protocol.FileInfo{f1})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
counter := 0
|
counter := 0
|
||||||
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
||||||
@ -186,12 +204,11 @@ func TestBlockFinderLookup(t *testing.T) {
|
|||||||
t.Fatal("Incorrect count", counter)
|
t.Fatal("Incorrect count", counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
discardFromBlockMap(db, folder1, []protocol.FileInfo{f1})
|
||||||
|
|
||||||
f1.Deleted = true
|
f1.Deleted = true
|
||||||
|
|
||||||
err = m1.Update([]protocol.FileInfo{f1})
|
addToBlockMap(db, folder1, []protocol.FileInfo{f1})
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
counter = 0
|
counter = 0
|
||||||
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
|
||||||
|
@ -8,6 +8,7 @@ package db
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/protocol"
|
"github.com/syncthing/syncthing/lib/protocol"
|
||||||
@ -30,7 +31,9 @@ func newInstance(ll *Lowlevel) *instance {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
|
// updateRemoteFiles adds a list of fileinfos to the database and updates the
|
||||||
|
// global versionlist and metadata.
|
||||||
|
func (db *instance) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
|
||||||
t := db.newReadWriteTransaction()
|
t := db.newReadWriteTransaction()
|
||||||
defer t.close()
|
defer t.close()
|
||||||
|
|
||||||
@ -56,34 +59,65 @@ func (db *instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m
|
|||||||
gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
|
gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
|
||||||
keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
|
keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
|
||||||
|
|
||||||
// Write out and reuse the batch every few records, to avoid the batch
|
|
||||||
// growing too large and thus allocating unnecessarily much memory.
|
|
||||||
t.checkFlush()
|
t.checkFlush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *instance) addSequences(folder []byte, fs []protocol.FileInfo) {
|
// updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
|
||||||
|
// metadata, sequence and blockmap buckets.
|
||||||
|
func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) {
|
||||||
t := db.newReadWriteTransaction()
|
t := db.newReadWriteTransaction()
|
||||||
defer t.close()
|
defer t.close()
|
||||||
|
|
||||||
var dk, sk []byte
|
var dk, gk, keyBuf []byte
|
||||||
|
blockBuf := make([]byte, 4)
|
||||||
for _, f := range fs {
|
for _, f := range fs {
|
||||||
sk = db.keyer.GenerateSequenceKey(sk, folder, f.Sequence)
|
name := []byte(f.Name)
|
||||||
dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name))
|
dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
|
||||||
t.Put(sk, dk)
|
|
||||||
|
ef, ok := t.getFileByKey(dk)
|
||||||
|
if ok && unchanged(f, ef) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
|
||||||
|
for _, block := range ef.Blocks {
|
||||||
|
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
|
||||||
|
t.Delete(keyBuf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
|
||||||
|
t.Delete(keyBuf)
|
||||||
|
l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
|
||||||
|
}
|
||||||
|
|
||||||
|
f.Sequence = meta.nextLocalSeq()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
meta.removeFile(protocol.LocalDeviceID, ef)
|
||||||
|
}
|
||||||
|
meta.addFile(protocol.LocalDeviceID, f)
|
||||||
|
|
||||||
|
l.Debugf("insert (local); folder=%q %v", folder, f)
|
||||||
|
t.Put(dk, mustMarshal(&f))
|
||||||
|
|
||||||
|
gk = t.db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
|
||||||
|
keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
|
||||||
|
|
||||||
|
keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
|
||||||
|
t.Put(keyBuf, dk)
|
||||||
l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
|
l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
|
||||||
t.checkFlush()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
|
if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() {
|
||||||
t := db.newReadWriteTransaction()
|
for i, block := range f.Blocks {
|
||||||
defer t.close()
|
binary.BigEndian.PutUint32(blockBuf, uint32(i))
|
||||||
|
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
|
||||||
|
t.Put(keyBuf, blockBuf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var sk []byte
|
|
||||||
for _, f := range fs {
|
|
||||||
t.Delete(db.keyer.GenerateSequenceKey(sk, folder, f.Sequence))
|
|
||||||
l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
|
|
||||||
t.checkFlush()
|
t.checkFlush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -383,6 +417,8 @@ func (db *instance) dropFolder(folder []byte) {
|
|||||||
db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName(),
|
db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName(),
|
||||||
// Remove all needs related to the folder
|
// Remove all needs related to the folder
|
||||||
db.keyer.GenerateNeedFileKey(nil, folder, nil).WithoutName(),
|
db.keyer.GenerateNeedFileKey(nil, folder, nil).WithoutName(),
|
||||||
|
// Remove the blockmap of the folder
|
||||||
|
db.keyer.GenerateBlockMapKey(nil, folder, nil, nil).WithoutHashAndName(),
|
||||||
} {
|
} {
|
||||||
t.deleteKeyPrefix(key)
|
t.deleteKeyPrefix(key)
|
||||||
}
|
}
|
||||||
@ -403,6 +439,9 @@ func (db *instance) dropDeviceFolder(device, folder []byte, meta *metadataTracke
|
|||||||
t.Delete(dbi.Key())
|
t.Delete(dbi.Key())
|
||||||
t.checkFlush()
|
t.checkFlush()
|
||||||
}
|
}
|
||||||
|
if bytes.Equal(device, protocol.LocalDeviceID[:]) {
|
||||||
|
t.deleteKeyPrefix(db.keyer.GenerateBlockMapKey(nil, folder, nil, nil).WithoutHashAndName())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *instance) checkGlobals(folder []byte, meta *metadataTracker) {
|
func (db *instance) checkGlobals(folder []byte, meta *metadataTracker) {
|
||||||
|
@ -73,6 +73,10 @@ type keyer interface {
|
|||||||
NameFromGlobalVersionKey(key []byte) []byte
|
NameFromGlobalVersionKey(key []byte) []byte
|
||||||
FolderFromGlobalVersionKey(key []byte) ([]byte, bool)
|
FolderFromGlobalVersionKey(key []byte) ([]byte, bool)
|
||||||
|
|
||||||
|
// block map key stuff (former BlockMap)
|
||||||
|
GenerateBlockMapKey(key, folder, hash, name []byte) blockMapKey
|
||||||
|
NameFromBlockMapKey(key []byte) []byte
|
||||||
|
|
||||||
// file need index
|
// file need index
|
||||||
GenerateNeedFileKey(key, folder, name []byte) needFileKey
|
GenerateNeedFileKey(key, folder, name []byte) needFileKey
|
||||||
|
|
||||||
@ -154,6 +158,25 @@ func (k defaultKeyer) FolderFromGlobalVersionKey(key []byte) ([]byte, bool) {
|
|||||||
return k.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
|
return k.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type blockMapKey []byte
|
||||||
|
|
||||||
|
func (k defaultKeyer) GenerateBlockMapKey(key, folder, hash, name []byte) blockMapKey {
|
||||||
|
key = resize(key, keyPrefixLen+keyFolderLen+keyHashLen+len(name))
|
||||||
|
key[0] = KeyTypeBlock
|
||||||
|
binary.BigEndian.PutUint32(key[keyPrefixLen:], k.folderIdx.ID(folder))
|
||||||
|
copy(key[keyPrefixLen+keyFolderLen:], hash)
|
||||||
|
copy(key[keyPrefixLen+keyFolderLen+keyHashLen:], name)
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k defaultKeyer) NameFromBlockMapKey(key []byte) []byte {
|
||||||
|
return key[keyPrefixLen+keyFolderLen+keyHashLen:]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k blockMapKey) WithoutHashAndName() []byte {
|
||||||
|
return k[:keyPrefixLen+keyFolderLen]
|
||||||
|
}
|
||||||
|
|
||||||
type needFileKey []byte
|
type needFileKey []byte
|
||||||
|
|
||||||
func (k needFileKey) WithoutName() []byte {
|
func (k needFileKey) WithoutName() []byte {
|
||||||
|
@ -14,7 +14,6 @@ package db
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/fs"
|
"github.com/syncthing/syncthing/lib/fs"
|
||||||
@ -25,11 +24,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type FileSet struct {
|
type FileSet struct {
|
||||||
folder string
|
folder string
|
||||||
fs fs.Filesystem
|
fs fs.Filesystem
|
||||||
db *instance
|
db *instance
|
||||||
blockmap *BlockMap
|
meta *metadataTracker
|
||||||
meta *metadataTracker
|
|
||||||
|
|
||||||
updateMutex sync.Mutex // protects database updates and the corresponding metadata changes
|
updateMutex sync.Mutex // protects database updates and the corresponding metadata changes
|
||||||
}
|
}
|
||||||
@ -75,7 +73,6 @@ func NewFileSet(folder string, fs fs.Filesystem, ll *Lowlevel) *FileSet {
|
|||||||
folder: folder,
|
folder: folder,
|
||||||
fs: fs,
|
fs: fs,
|
||||||
db: db,
|
db: db,
|
||||||
blockmap: NewBlockMap(ll, folder),
|
|
||||||
meta: newMetadataTracker(),
|
meta: newMetadataTracker(),
|
||||||
updateMutex: sync.NewMutex(),
|
updateMutex: sync.NewMutex(),
|
||||||
}
|
}
|
||||||
@ -116,7 +113,6 @@ func (s *FileSet) Drop(device protocol.DeviceID) {
|
|||||||
s.db.dropDeviceFolder(device[:], []byte(s.folder), s.meta)
|
s.db.dropDeviceFolder(device[:], []byte(s.folder), s.meta)
|
||||||
|
|
||||||
if device == protocol.LocalDeviceID {
|
if device == protocol.LocalDeviceID {
|
||||||
s.blockmap.Drop()
|
|
||||||
s.meta.resetCounts(device)
|
s.meta.resetCounts(device)
|
||||||
// We deliberately do not reset the sequence number here. Dropping
|
// We deliberately do not reset the sequence number here. Dropping
|
||||||
// all files for the local device ID only happens in testing - which
|
// all files for the local device ID only happens in testing - which
|
||||||
@ -147,52 +143,13 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
|
|||||||
|
|
||||||
defer s.meta.toDB(s.db, []byte(s.folder))
|
defer s.meta.toDB(s.db, []byte(s.folder))
|
||||||
|
|
||||||
if device != protocol.LocalDeviceID {
|
if device == protocol.LocalDeviceID {
|
||||||
// Easy case, just update the files and we're done.
|
// For the local device we have a bunch of metadata to track.
|
||||||
s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta)
|
s.db.updateLocalFiles([]byte(s.folder), fs, s.meta)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Easy case, just update the files and we're done.
|
||||||
// For the local device we have a bunch of metadata to track however...
|
s.db.updateRemoteFiles([]byte(s.folder), device[:], fs, s.meta)
|
||||||
|
|
||||||
discards := make([]protocol.FileInfo, 0, len(fs))
|
|
||||||
updates := make([]protocol.FileInfo, 0, len(fs))
|
|
||||||
// db.UpdateFiles will sort unchanged files out -> save one db lookup
|
|
||||||
// filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
|
||||||
oldFs := fs
|
|
||||||
fs = fs[:0]
|
|
||||||
folder := []byte(s.folder)
|
|
||||||
for _, nf := range oldFs {
|
|
||||||
ef, ok := s.db.getFileDirty(folder, device[:], []byte(nf.Name))
|
|
||||||
if ok && unchanged(nf, ef) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nf.Sequence = s.meta.nextLocalSeq()
|
|
||||||
fs = append(fs, nf)
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
discards = append(discards, ef)
|
|
||||||
}
|
|
||||||
updates = append(updates, nf)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The ordering here is important. We first remove stuff that point to
|
|
||||||
// files we are going to update, then update them, then add new index
|
|
||||||
// pointers etc. In addition, we do the discards in reverse order so
|
|
||||||
// that a reader traversing the sequence index will get a consistent
|
|
||||||
// view up until the point they meet the writer.
|
|
||||||
|
|
||||||
sort.Slice(discards, func(a, b int) bool {
|
|
||||||
// n.b. "b < a" instead of the usual "a < b"
|
|
||||||
return discards[b].Sequence < discards[a].Sequence
|
|
||||||
})
|
|
||||||
|
|
||||||
s.blockmap.Discard(discards)
|
|
||||||
s.db.removeSequences(folder, discards)
|
|
||||||
s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta)
|
|
||||||
s.db.addSequences(folder, updates)
|
|
||||||
s.blockmap.Update(updates)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FileSet) WithNeed(device protocol.DeviceID, fn Iterator) {
|
func (s *FileSet) WithNeed(device protocol.DeviceID, fn Iterator) {
|
||||||
@ -327,8 +284,6 @@ func DropFolder(ll *Lowlevel, folder string) {
|
|||||||
db.dropFolder([]byte(folder))
|
db.dropFolder([]byte(folder))
|
||||||
db.dropMtimes([]byte(folder))
|
db.dropMtimes([]byte(folder))
|
||||||
db.dropFolderMeta([]byte(folder))
|
db.dropFolderMeta([]byte(folder))
|
||||||
bm := NewBlockMap(ll, folder)
|
|
||||||
bm.Drop()
|
|
||||||
|
|
||||||
// Also clean out the folder ID mapping.
|
// Also clean out the folder ID mapping.
|
||||||
db.folderIdx.Delete([]byte(folder))
|
db.folderIdx.Delete([]byte(folder))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user