Don't load block lists from db unless necessary

This commit is contained in:
Jakob Borg 2014-08-12 13:53:31 +02:00
parent fe7b77198c
commit 92eed3b33b
6 changed files with 221 additions and 105 deletions

View File

@ -32,7 +32,8 @@ func main() {
if *node == "" {
log.Printf("*** Global index for repo %q", *repo)
fs.WithGlobal(func(f protocol.FileInfo) bool {
fs.WithGlobalTruncated(func(fi protocol.FileIntf) bool {
f := fi.(protocol.FileInfoTruncated)
fmt.Println(f)
fmt.Println("\t", fs.Availability(f.Name))
return true
@ -43,7 +44,8 @@ func main() {
log.Fatal(err)
}
log.Printf("*** Have index for repo %q node %q", *repo, n)
fs.WithHave(n, func(f protocol.FileInfo) bool {
fs.WithHaveTruncated(n, func(fi protocol.FileIntf) bool {
f := fi.(protocol.FileInfoTruncated)
fmt.Println(f)
return true
})

View File

@ -119,7 +119,7 @@ func globalKeyName(key []byte) []byte {
type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64
type fileIterator func(f protocol.FileInfo) bool
type fileIterator func(f protocol.FileIntf) bool
func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) uint64 {
defer runtime.GC()
@ -181,7 +181,7 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo
case moreFs && moreDb && cmp == 0:
// File exists on both sides - compare versions.
var ef protocol.FileInfo
var ef protocol.FileInfoTruncated
ef.UnmarshalXDR(dbi.Value())
if fs[fsi].Version > ef.Version {
if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
@ -226,20 +226,23 @@ func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint6
func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
var f protocol.FileInfo
err := f.UnmarshalXDR(dbi.Value())
var tf protocol.FileInfoTruncated
err := tf.UnmarshalXDR(dbi.Value())
if err != nil {
panic(err)
}
if !protocol.IsDeleted(f.Flags) {
if !tf.IsDeleted() {
if debug {
l.Debugf("mark deleted; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name)
}
ts := clock(f.LocalVersion)
f.Blocks = nil
f.Version = lamport.Default.Tick(f.Version)
f.Flags |= protocol.FlagDeleted
f.LocalVersion = ts
ts := clock(tf.LocalVersion)
f := protocol.FileInfo{
Name: tf.Name,
Version: lamport.Default.Tick(tf.Version),
LocalVersion: ts,
Flags: tf.Flags | protocol.FlagDeleted,
Modified: tf.Modified,
}
batch.Put(dbi.Key(), f.MarshalXDR())
ldbUpdateGlobal(db, batch, repo, node, nodeKeyName(dbi.Key()), f.Version)
return ts
@ -271,7 +274,7 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64
continue
}
var ef protocol.FileInfo
var ef protocol.FileInfoTruncated
err = ef.UnmarshalXDR(bs)
if err != nil {
panic(err)
@ -395,7 +398,7 @@ func ldbRemoveFromGlobal(db dbReader, batch dbWriter, repo, node, file []byte) {
}
}
func ldbWithHave(db *leveldb.DB, repo, node []byte, fn fileIterator) {
func ldbWithHave(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) {
start := nodeKey(repo, node, nil) // before all repo/node files
limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
snap, err := db.GetSnapshot()
@ -407,8 +410,7 @@ func ldbWithHave(db *leveldb.DB, repo, node []byte, fn fileIterator) {
defer dbi.Release()
for dbi.Next() {
var f protocol.FileInfo
err := f.UnmarshalXDR(dbi.Value())
f, err := unmarshalTrunc(dbi.Value(), truncate)
if err != nil {
panic(err)
}
@ -418,7 +420,7 @@ func ldbWithHave(db *leveldb.DB, repo, node []byte, fn fileIterator) {
}
}
func ldbWithAllRepo(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol.FileInfo) bool) {
func ldbWithAllRepoTruncated(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol.FileInfoTruncated) bool) {
defer runtime.GC()
start := nodeKey(repo, nil, nil) // before all repo/node files
@ -433,7 +435,7 @@ func ldbWithAllRepo(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol
for dbi.Next() {
node := nodeKeyNode(dbi.Key())
var f protocol.FileInfo
var f protocol.FileInfoTruncated
err := f.UnmarshalXDR(dbi.Value())
if err != nil {
panic(err)
@ -444,40 +446,6 @@ func ldbWithAllRepo(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol
}
}
/*
func ldbCheckGlobalConsistency(db *leveldb.DB, repo []byte) {
l.Debugf("Checking global consistency for %q", repo)
start := nodeKey(repo, nil, nil) // before all repo/node files
limit := nodeKey(repo, protocol.LocalNodeID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
snap, err := db.GetSnapshot()
if err != nil {
panic(err)
}
defer snap.Release()
dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
defer dbi.Release()
batch := new(leveldb.Batch)
i := 0
for dbi.Next() {
repo := nodeKeyRepo(dbi.Key())
node := nodeKeyNode(dbi.Key())
var f protocol.FileInfo
err := f.UnmarshalXDR(dbi.Value())
if err != nil {
panic(err)
}
if ldbUpdateGlobal(snap, batch, repo, node, []byte(f.Name), f.Version) {
var nodeID protocol.NodeID
copy(nodeID[:], node)
l.Debugf("fixed global for %q %s %q", repo, nodeID, f.Name)
}
i++
}
l.Debugln("Done", i)
}
*/
func ldbGet(db *leveldb.DB, repo, node, file []byte) protocol.FileInfo {
nk := nodeKey(repo, node, file)
bs, err := db.Get(nk, nil)
@ -536,7 +504,7 @@ func ldbGetGlobal(db *leveldb.DB, repo, file []byte) protocol.FileInfo {
return f
}
func ldbWithGlobal(db *leveldb.DB, repo []byte, fn fileIterator) {
func ldbWithGlobal(db *leveldb.DB, repo []byte, truncate bool, fn fileIterator) {
defer runtime.GC()
start := globalKey(repo, nil)
@ -565,8 +533,7 @@ func ldbWithGlobal(db *leveldb.DB, repo []byte, fn fileIterator) {
panic(err)
}
var f protocol.FileInfo
err = f.UnmarshalXDR(bs)
f, err := unmarshalTrunc(bs, truncate)
if err != nil {
panic(err)
}
@ -605,7 +572,7 @@ func ldbAvailability(db *leveldb.DB, repo, file []byte) []protocol.NodeID {
return nodes
}
func ldbWithNeed(db *leveldb.DB, repo, node []byte, fn fileIterator) {
func ldbWithNeed(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) {
defer runtime.GC()
start := globalKey(repo, nil)
@ -649,13 +616,12 @@ func ldbWithNeed(db *leveldb.DB, repo, node []byte, fn fileIterator) {
panic(err)
}
var gf protocol.FileInfo
err = gf.UnmarshalXDR(bs)
gf, err := unmarshalTrunc(bs, truncate)
if err != nil {
panic(err)
}
if protocol.IsDeleted(gf.Flags) && !have {
if gf.IsDeleted() && !have {
// We don't need deleted files that we don't have
continue
}
@ -670,3 +636,15 @@ func ldbWithNeed(db *leveldb.DB, repo, node []byte, fn fileIterator) {
}
}
}
func unmarshalTrunc(bs []byte, truncate bool) (protocol.FileIntf, error) {
if truncate {
var tf protocol.FileInfoTruncated
err := tf.UnmarshalXDR(bs)
return tf, err
} else {
var tf protocol.FileInfo
err := tf.UnmarshalXDR(bs)
return tf, err
}
}

View File

@ -36,7 +36,7 @@ func NewSet(repo string, db *leveldb.DB) *Set {
}
var nodeID protocol.NodeID
ldbWithAllRepo(db, []byte(repo), func(node []byte, f protocol.FileInfo) bool {
ldbWithAllRepoTruncated(db, []byte(repo), func(node []byte, f protocol.FileInfoTruncated) bool {
copy(nodeID[:], node)
if f.LocalVersion > s.localVersion[nodeID] {
s.localVersion[nodeID] = f.LocalVersion
@ -87,21 +87,35 @@ func (s *Set) WithNeed(node protocol.NodeID, fn fileIterator) {
if debug {
l.Debugf("%s WithNeed(%v)", s.repo, node)
}
ldbWithNeed(s.db, []byte(s.repo), node[:], fn)
ldbWithNeed(s.db, []byte(s.repo), node[:], false, fn)
}
func (s *Set) WithNeedTruncated(node protocol.NodeID, fn fileIterator) {
if debug {
l.Debugf("%s WithNeedTruncated(%v)", s.repo, node)
}
ldbWithNeed(s.db, []byte(s.repo), node[:], true, fn)
}
func (s *Set) WithHave(node protocol.NodeID, fn fileIterator) {
if debug {
l.Debugf("%s WithHave(%v)", s.repo, node)
}
ldbWithHave(s.db, []byte(s.repo), node[:], fn)
ldbWithHave(s.db, []byte(s.repo), node[:], false, fn)
}
func (s *Set) WithGlobal(fn fileIterator) {
func (s *Set) WithHaveTruncated(node protocol.NodeID, fn fileIterator) {
if debug {
l.Debugf("%s WithGlobal()", s.repo)
l.Debugf("%s WithHaveTruncated(%v)", s.repo, node)
}
ldbWithGlobal(s.db, []byte(s.repo), fn)
ldbWithHave(s.db, []byte(s.repo), node[:], true, fn)
}
func (s *Set) WithGlobalTruncated(fn fileIterator) {
if debug {
l.Debugf("%s WithGlobalTrucnated()", s.repo)
}
ldbWithGlobal(s.db, []byte(s.repo), true, fn)
}
func (s *Set) Get(node protocol.NodeID, file string) protocol.FileInfo {

View File

@ -208,15 +208,9 @@ func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
return 0 // Repo doesn't exist, so we hardly have any of it
}
rf.WithGlobal(func(f protocol.FileInfo) bool {
if !protocol.IsDeleted(f.Flags) {
var size int64
if protocol.IsDirectory(f.Flags) {
size = zeroEntrySize
} else {
size = f.Size()
}
tot += size
rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
if !f.IsDeleted() {
tot += f.Size()
}
return true
})
@ -226,20 +220,19 @@ func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
}
var need int64
rf.WithNeed(node, func(f protocol.FileInfo) bool {
if !protocol.IsDeleted(f.Flags) {
var size int64
if protocol.IsDirectory(f.Flags) {
size = zeroEntrySize
} else {
size = f.Size()
}
need += size
rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
if !f.IsDeleted() {
need += f.Size()
}
return true
})
return 100 * (1 - float64(need)/float64(tot))
res := 100 * (1 - float64(need)/float64(tot))
if debug {
l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
}
return res
}
func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
@ -252,18 +245,13 @@ func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
return
}
func sizeOfFile(f protocol.FileInfo) (files, deleted int, bytes int64) {
if !protocol.IsDeleted(f.Flags) {
func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
if !f.IsDeleted() {
files++
if !protocol.IsDirectory(f.Flags) {
bytes += f.Size()
} else {
bytes += zeroEntrySize
}
} else {
deleted++
bytes += zeroEntrySize
}
bytes += f.Size()
return
}
@ -273,7 +261,7 @@ func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
rf.WithGlobal(func(f protocol.FileInfo) bool {
rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
@ -290,7 +278,7 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
rf.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
@ -306,13 +294,16 @@ func (m *Model) NeedSize(repo string) (files int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
fs, de, by := sizeOfFile(f)
files += fs + de
bytes += by
return true
})
}
if debug {
l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
}
return
}
@ -322,8 +313,8 @@ func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
fs := make([]protocol.FileInfo, 0, indexBatchSize)
rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
fs = append(fs, f)
rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
fs = append(fs, f.(protocol.FileInfo))
return len(fs) < indexBatchSize
})
return fs
@ -597,7 +588,8 @@ func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, rep
maxLocalVer := uint64(0)
var err error
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
f := fi.(protocol.FileInfo)
if f.LocalVersion <= minLocalVer {
return true
}
@ -802,7 +794,8 @@ func (m *Model) ScanRepoSub(repo, sub string) error {
batch = batch[:0]
// TODO: We should limit the Have scanning to start at sub
seenPrefix := false
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
f := fi.(protocol.FileInfoTruncated)
if !strings.HasPrefix(f.Name, sub) {
return !seenPrefix
}
@ -814,10 +807,12 @@ func (m *Model) ScanRepoSub(repo, sub string) error {
}
if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
// File has been deleted
f.Blocks = nil
f.Flags |= protocol.FlagDeleted
f.Version = lamport.Default.Tick(f.Version)
f.LocalVersion = 0
nf := protocol.FileInfo{
Name: f.Name,
Flags: f.Flags | protocol.FlagDeleted,
Modified: f.Modified,
Version: lamport.Default.Tick(f.Version),
}
events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
"repo": repo,
"name": f.Name,
@ -825,7 +820,7 @@ func (m *Model) ScanRepoSub(repo, sub string) error {
"flags": fmt.Sprintf("0%o", f.Flags),
"size": f.Size(),
})
batch = append(batch, f)
batch = append(batch, nf)
}
}
return true
@ -898,7 +893,8 @@ func (m *Model) Override(repo string) {
m.rmut.RUnlock()
batch := make([]protocol.FileInfo, 0, indexBatchSize)
fs.WithNeed(protocol.LocalNodeID, func(need protocol.FileInfo) bool {
fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
need := fi.(protocol.FileInfo)
if len(batch) == indexBatchSize {
fs.Update(protocol.LocalNodeID, batch)
batch = batch[:0]

View File

@ -26,12 +26,46 @@ func (f FileInfo) String() string {
}
func (f FileInfo) Size() (bytes int64) {
if IsDeleted(f.Flags) || IsDirectory(f.Flags) {
return 128
}
for _, b := range f.Blocks {
bytes += int64(b.Size)
}
return
}
func (f FileInfo) IsDeleted() bool {
return IsDeleted(f.Flags)
}
// Used for unmarshalling a FileInfo structure but skipping the actual block list
type FileInfoTruncated struct {
Name string // max:1024
Flags uint32
Modified int64
Version uint64
LocalVersion uint64
NumBlocks uint32
}
// Returns an upper bound on the size, not the exact figure
func (f FileInfoTruncated) Size() int64 {
if IsDeleted(f.Flags) || IsDirectory(f.Flags) {
return 128
}
return int64(f.NumBlocks) * BlockSize
}
func (f FileInfoTruncated) IsDeleted() bool {
return IsDeleted(f.Flags)
}
type FileIntf interface {
Size() int64
IsDeleted() bool
}
type BlockInfo struct {
Offset int64 // noencode (cache only)
Size uint32

View File

@ -199,6 +199,98 @@ func (o *FileInfo) decodeXDR(xr *xdr.Reader) error {
/*
FileInfoTruncated Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Name |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Name (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Modified (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Local Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Num Blocks |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct FileInfoTruncated {
string Name<1024>;
unsigned int Flags;
hyper Modified;
unsigned hyper Version;
unsigned hyper LocalVersion;
unsigned int NumBlocks;
}
*/
func (o FileInfoTruncated) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
}
func (o FileInfoTruncated) MarshalXDR() []byte {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o FileInfoTruncated) AppendXDR(bs []byte) []byte {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
o.encodeXDR(xw)
return []byte(aw)
}
func (o FileInfoTruncated) encodeXDR(xw *xdr.Writer) (int, error) {
if len(o.Name) > 1024 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteString(o.Name)
xw.WriteUint32(o.Flags)
xw.WriteUint64(uint64(o.Modified))
xw.WriteUint64(o.Version)
xw.WriteUint64(o.LocalVersion)
xw.WriteUint32(o.NumBlocks)
return xw.Tot(), xw.Error()
}
func (o *FileInfoTruncated) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}
func (o *FileInfoTruncated) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.decodeXDR(xr)
}
func (o *FileInfoTruncated) decodeXDR(xr *xdr.Reader) error {
o.Name = xr.ReadStringMax(1024)
o.Flags = xr.ReadUint32()
o.Modified = int64(xr.ReadUint64())
o.Version = xr.ReadUint64()
o.LocalVersion = xr.ReadUint64()
o.NumBlocks = xr.ReadUint32()
return xr.Error()
}
/*
BlockInfo Structure:
0 1 2 3