Major restructure, bundle blobs

This commit also breaks the repository format.
This commit is contained in:
Alexander Neumann 2015-04-26 17:44:38 +02:00
parent b836da1980
commit 60a0fe8349
29 changed files with 937 additions and 1123 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/restic/restic/backend"
"github.com/restic/restic/chunker"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/pipe"
"github.com/restic/restic/server"
)
@ -29,8 +30,6 @@ const (
type Archiver struct {
s *server.Server
m *Map
c *Cache
blobToken chan struct{}
@ -50,15 +49,6 @@ func NewArchiver(s *server.Server) (*Archiver, error) {
arch.blobToken <- struct{}{}
}
// create new map to store all blobs in
arch.m = NewMap()
// init cache
arch.c, err = NewCache(s)
if err != nil {
return nil, err
}
// abort on all errors
arch.Error = func(string, os.FileInfo, error) error { return err }
// allow all files
@ -67,119 +57,59 @@ func NewArchiver(s *server.Server) (*Archiver, error) {
return arch, nil
}
// Cache returns the current cache for the Archiver.
func (arch *Archiver) Cache() *Cache {
return arch.c
}
// Preload loads all blobs for all cached snapshots.
func (arch *Archiver) Preload() error {
done := make(chan struct{})
defer close(done)
// list snapshots
// TODO: track seen tree ids, load trees that aren't in the set
snapshots := 0
for name := range arch.s.List(backend.Snapshot, done) {
id, err := backend.ParseID(name)
if err != nil {
debug.Log("Archiver.Preload", "unable to parse name %v as id: %v", name, err)
continue
}
m, err := arch.c.LoadMap(arch.s, id)
if err != nil {
debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err)
continue
}
arch.m.Merge(m)
debug.Log("Archiver.Preload", "done loading cached blobs for snapshot %v", id.Str())
snapshots++
}
debug.Log("Archiver.Preload", "Loaded %v blobs from %v snapshots", arch.m.Len(), snapshots)
return nil
}
func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (server.Blob, error) {
func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error {
debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str())
// test if this blob is already known
blob, err := arch.m.FindID(id)
if err == nil {
debug.Log("Archiver.Save", "Save(%v, %v): reusing %v\n", t, id.Str(), blob.Storage.Str())
return blob, nil
if arch.s.Index().Has(id) {
debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str())
return nil
}
// else encrypt and save data
blob, err = arch.s.SaveFrom(t, id, length, rd)
// store blob in storage map
smapblob := arch.m.Insert(blob)
// if the map has a different storage id for this plaintext blob, use that
// one and remove the other. This happens if the same plaintext blob was
// stored concurrently and finished earlier than this blob.
if blob.Storage.Compare(smapblob.Storage) != 0 {
debug.Log("Archiver.Save", "using other block, removing %v\n", blob.Storage.Str())
// remove the blob again
// TODO: implement a list of blobs in transport, so this doesn't happen so often
err = arch.s.Remove(t, blob.Storage.String())
if err != nil {
return server.Blob{}, err
}
// otherwise save blob
err := arch.s.SaveFrom(t, id, length, rd)
if err != nil {
debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err)
return err
}
debug.Log("Archiver.Save", "Save(%v, %v): new blob %v\n", t, id.Str(), blob)
return smapblob, nil
debug.Log("Archiver.Save", "Save(%v, %v): new blob\n", t, id.Str())
return nil
}
func (arch *Archiver) SaveTreeJSON(item interface{}) (server.Blob, error) {
func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) {
// convert to json
data, err := json.Marshal(item)
// append newline
data = append(data, '\n')
if err != nil {
return server.Blob{}, err
return nil, err
}
// check if tree has been saved before
id := backend.Hash(data)
blob, err := arch.m.FindID(id)
// return the blob if we found it
if err == nil {
return blob, nil
if arch.s.Index().Has(id) {
return id, nil
}
// otherwise save the data
blob, err = arch.s.SaveJSON(backend.Tree, item)
if err != nil {
return server.Blob{}, err
}
// store blob in storage map
arch.m.Insert(blob)
return blob, nil
return arch.s.SaveJSON(pack.Tree, item)
}
// SaveFile stores the content of the file on the backend as a Blob by calling
// Save for each chunk.
func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
func (arch *Archiver) SaveFile(p *Progress, node *Node) error {
file, err := node.OpenForReading()
defer file.Close()
if err != nil {
return nil, err
return err
}
// check file again
fi, err := file.Stat()
if err != nil {
return nil, err
return err
}
if fi.ModTime() != node.ModTime {
@ -190,7 +120,7 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
n, err := NodeFromFileInfo(node.path, fi)
if err != nil {
debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err)
return nil, err
return err
}
// copy node
@ -198,12 +128,15 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
}
}
var blobs server.Blobs
type result struct {
id backend.ID
bytes uint64
}
// store all chunks
chnker := GetChunker("archiver.SaveFile")
chnker.Reset(file, arch.s.ChunkerPolynomial())
chans := [](<-chan server.Blob){}
chans := [](<-chan result){}
defer FreeChunker("archiver.SaveFile", chnker)
chunks := 0
@ -215,75 +148,71 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
}
if err != nil {
return nil, arrar.Annotate(err, "SaveFile() chunker.Next()")
return arrar.Annotate(err, "SaveFile() chunker.Next()")
}
chunks++
// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan server.Blob, 1)
resCh := make(chan result, 1)
go func(ch chan<- server.Blob) {
blob, err := arch.Save(backend.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
go func(ch chan<- result) {
err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
// TODO handle error
if err != nil {
panic(err)
}
p.Report(Stat{Bytes: blob.Size})
p.Report(Stat{Bytes: uint64(chunk.Length)})
arch.blobToken <- token
ch <- blob
ch <- result{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)}
}(resCh)
chans = append(chans, resCh)
}
blobs = []server.Blob{}
results := []result{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
results = append(results, <-ch)
}
if len(blobs) != chunks {
return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs))
if len(results) != chunks {
return fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(results))
}
var bytes uint64
node.Content = make([]backend.ID, len(blobs))
node.Content = make([]backend.ID, len(results))
debug.Log("Archiver.Save", "checking size for file %s", node.path)
for i, blob := range blobs {
node.Content[i] = blob.ID
bytes += blob.Size
for i, b := range results {
node.Content[i] = b.id
bytes += b.bytes
debug.Log("Archiver.Save", " adding blob %s", blob)
debug.Log("Archiver.Save", " adding blob %s, %d bytes", b.id.Str(), b.bytes)
}
if bytes != node.Size {
return nil, fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size)
return fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size)
}
debug.Log("Archiver.SaveFile", "SaveFile(%q): %v\n", node.path, blobs)
debug.Log("Archiver.SaveFile", "SaveFile(%q): %v blobs\n", node.path, len(results))
return blobs, nil
return nil
}
func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
func (arch *Archiver) saveTree(p *Progress, t *Tree) (backend.ID, error) {
debug.Log("Archiver.saveTree", "saveTree(%v)\n", t)
var wg sync.WaitGroup
// add all blobs to global map
arch.m.Merge(t.Map)
// TODO: do all this in parallel
for _, node := range t.Nodes {
if node.tree != nil {
b, err := arch.saveTree(p, node.tree)
id, err := arch.saveTree(p, node.tree)
if err != nil {
return server.Blob{}, err
return nil, err
}
node.Subtree = b.ID
t.Map.Insert(b)
node.Subtree = id
p.Report(Stat{Dirs: 1})
} else if node.Type == "file" {
if len(node.Content) > 0 {
@ -291,22 +220,18 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
// check content
for _, id := range node.Content {
blob, err := t.Map.FindID(id)
packID, _, _, _, err := arch.s.Index().Lookup(id)
if err != nil {
debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v", id.Str())
arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v", id.Str()))
debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v: %v", id.Str(), err)
arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v: %v", id.Str(), err))
removeContent = true
t.Map.DeleteID(id)
arch.m.DeleteID(id)
continue
}
if ok, err := arch.s.Test(backend.Data, blob.Storage.String()); !ok || err != nil {
debug.Log("Archiver.saveTree", "blob %v not in repository (error is %v)", blob, err)
arch.Error(node.path, nil, fmt.Errorf("blob %v not in repository (error is %v)", blob.Storage.Str(), err))
if ok, err := arch.s.Test(backend.Data, packID.String()); !ok || err != nil {
debug.Log("Archiver.saveTree", "pack %v of blob %v not in repository (error is %v)", packID, id, err)
arch.Error(node.path, nil, fmt.Errorf("pack %v of blob %v not in repository (error is %v)", packID, id, err))
removeContent = true
t.Map.DeleteID(id)
arch.m.DeleteID(id)
}
}
@ -322,12 +247,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
go func(n *Node) {
defer wg.Done()
var blobs server.Blobs
blobs, n.err = arch.SaveFile(p, n)
for _, b := range blobs {
t.Map.Insert(b)
}
n.err = arch.SaveFile(p, n)
p.Report(Stat{Files: 1})
}(node)
}
@ -341,7 +261,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
// check for invalid file nodes
for _, node := range t.Nodes {
if node.Type == "file" && node.Content == nil && node.err == nil {
return server.Blob{}, fmt.Errorf("node %v has empty content", node.Name)
return nil, fmt.Errorf("node %v has empty content", node.Name)
}
// remember used hashes
@ -358,7 +278,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
if node.err != nil {
err := arch.Error(node.path, nil, node.err)
if err != nil {
return server.Blob{}, err
return nil, err
}
// save error message in node
@ -366,20 +286,12 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
}
}
before := len(t.Map.IDs())
t.Map.Prune(usedIDs)
after := len(t.Map.IDs())
if before != after {
debug.Log("Archiver.saveTree", "pruned %d ids from map for tree %v\n", before-after, t)
}
blob, err := arch.SaveTreeJSON(t)
id, err := arch.SaveTreeJSON(t)
if err != nil {
return server.Blob{}, err
return nil, err
}
return blob, nil
return id, nil
}
func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) {
@ -444,7 +356,7 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st
// otherwise read file normally
if node.Type == "file" && len(node.Content) == 0 {
debug.Log("Archiver.fileWorker", " read and save %v, content: %v", e.Path(), node.Content)
node.blobs, err = arch.SaveFile(p, node)
err = arch.SaveFile(p, node)
if err != nil {
// TODO: integrate error reporting
fmt.Fprintf(os.Stderr, "error for %v: %v\n", node.path, err)
@ -501,11 +413,6 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
if node.Type == "dir" {
debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs)
}
// also store blob in tree map
for _, blob := range node.blobs {
tree.Map.Insert(blob)
}
}
var (
@ -525,14 +432,13 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
}
}
blob, err := arch.SaveTreeJSON(tree)
id, err := arch.SaveTreeJSON(tree)
if err != nil {
panic(err)
}
debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), blob)
debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), id.Str())
node.Subtree = blob.ID
node.blobs = server.Blobs{blob}
node.Subtree = id
dir.Result() <- node
if dir.Path() != "" {
@ -792,30 +698,35 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn
// receive the top-level tree
root := (<-resCh).(*Node)
blob := root.blobs[0]
debug.Log("Archiver.Snapshot", "root node received: %v", blob)
sn.Tree = blob
debug.Log("Archiver.Snapshot", "root node received: %v", root.Subtree.Str())
sn.Tree = root.Subtree
// save snapshot
blob, err = arch.s.SaveJSON(backend.Snapshot, sn)
id, err := arch.s.SaveJSONUnpacked(backend.Snapshot, sn)
if err != nil {
return nil, nil, err
}
// store ID in snapshot struct
sn.id = blob.Storage
sn.id = id
debug.Log("Archiver.Snapshot", "saved snapshot %v", id.Str())
debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str())
// cache blobs
err = arch.c.StoreMap(sn.id, arch.m)
// flush server
err = arch.s.Flush()
if err != nil {
debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err)
fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err)
return sn, blob.Storage, nil
return nil, nil, err
}
return sn, blob.Storage, nil
// save index
indexID, err := arch.s.SaveIndex()
if err != nil {
debug.Log("Archiver.Snapshot", "error saving index: %v", err)
return nil, nil, err
}
debug.Log("Archiver.Snapshot", "saved index %v", indexID.Str())
return sn, id, nil
}
func isFile(fi os.FileInfo) bool {

View File

@ -9,6 +9,7 @@ import (
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/chunker"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
. "github.com/restic/restic/test"
)
@ -114,11 +115,7 @@ func BenchmarkChunkEncryptParallel(b *testing.B) {
restic.FreeChunkBuf("BenchmarkChunkEncryptParallel", buf)
}
func BenchmarkArchiveDirectory(b *testing.B) {
if *benchArchiveDirectory == "" {
b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory")
}
func archiveDirectory(b testing.TB) {
server := SetupBackend(b)
defer TeardownBackend(b, server)
key := SetupKey(b, server, "geheim")
@ -132,13 +129,25 @@ func BenchmarkArchiveDirectory(b *testing.B) {
b.Logf("snapshot archived as %v", id)
}
func countBlobs(t testing.TB, server *server.Server) (trees int, data int) {
return server.Count(backend.Tree), server.Count(backend.Data)
func TestArchiveDirectory(t *testing.T) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiveDirectory")
}
archiveDirectory(t)
}
func archiveWithPreload(t testing.TB) {
func BenchmarkArchiveDirectory(b *testing.B) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverPreload")
b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory")
}
archiveDirectory(b)
}
func archiveWithDedup(t testing.TB) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverDedup")
}
server := SetupBackend(t)
@ -146,78 +155,81 @@ func archiveWithPreload(t testing.TB) {
key := SetupKey(t, server, "geheim")
server.SetKey(key)
var cnt struct {
before, after, after2 struct {
packs, dataBlobs, treeBlobs uint
}
}
// archive a few files
sn := SnapshotDir(t, server, *benchArchiveDirectory, nil)
t.Logf("archived snapshot %v", sn.ID().Str())
// get archive stats
beforeTrees, beforeData := countBlobs(t, server)
t.Logf("found %v trees, %v data blobs", beforeTrees, beforeData)
cnt.before.packs = server.Count(backend.Data)
cnt.before.dataBlobs = server.Index().Count(pack.Data)
cnt.before.treeBlobs = server.Index().Count(pack.Tree)
t.Logf("packs %v, data blobs %v, tree blobs %v",
cnt.before.packs, cnt.before.dataBlobs, cnt.before.treeBlobs)
// archive the same files again, without parent snapshot
sn2 := SnapshotDir(t, server, *benchArchiveDirectory, nil)
t.Logf("archived snapshot %v", sn2.ID().Str())
// get archive stats
afterTrees2, afterData2 := countBlobs(t, server)
t.Logf("found %v trees, %v data blobs", afterTrees2, afterData2)
// get archive stats again
cnt.after.packs = server.Count(backend.Data)
cnt.after.dataBlobs = server.Index().Count(pack.Data)
cnt.after.treeBlobs = server.Index().Count(pack.Tree)
t.Logf("packs %v, data blobs %v, tree blobs %v",
cnt.after.packs, cnt.after.dataBlobs, cnt.after.treeBlobs)
// if there are more blobs, something is wrong
if afterData2 > beforeData {
t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d",
beforeData, afterData2)
// if there are more packs or blobs, something is wrong
if cnt.after.packs > cnt.before.packs {
t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d",
cnt.before.packs, cnt.after.packs)
}
if cnt.after.dataBlobs > cnt.before.dataBlobs {
t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d",
cnt.before.dataBlobs, cnt.after.dataBlobs)
}
if cnt.after.treeBlobs > cnt.before.treeBlobs {
t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d",
cnt.before.treeBlobs, cnt.after.treeBlobs)
}
// archive the same files again, with a parent snapshot
sn3 := SnapshotDir(t, server, *benchArchiveDirectory, sn2.ID())
t.Logf("archived snapshot %v, parent %v", sn3.ID().Str(), sn2.ID().Str())
// get archive stats
afterTrees3, afterData3 := countBlobs(t, server)
t.Logf("found %v trees, %v data blobs", afterTrees3, afterData3)
// get archive stats again
cnt.after2.packs = server.Count(backend.Data)
cnt.after2.dataBlobs = server.Index().Count(pack.Data)
cnt.after2.treeBlobs = server.Index().Count(pack.Tree)
t.Logf("packs %v, data blobs %v, tree blobs %v",
cnt.after2.packs, cnt.after2.dataBlobs, cnt.after2.treeBlobs)
// if there are more blobs, something is wrong
if afterData3 > beforeData {
t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d",
beforeData, afterData3)
// if there are more packs or blobs, something is wrong
if cnt.after2.packs > cnt.before.packs {
t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d",
cnt.before.packs, cnt.after2.packs)
}
if cnt.after2.dataBlobs > cnt.before.dataBlobs {
t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d",
cnt.before.dataBlobs, cnt.after2.dataBlobs)
}
if cnt.after2.treeBlobs > cnt.before.treeBlobs {
t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d",
cnt.before.treeBlobs, cnt.after2.treeBlobs)
}
}
func TestArchivePreload(t *testing.T) {
archiveWithPreload(t)
}
func BenchmarkPreload(t *testing.B) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverPreload")
}
server := SetupBackend(t)
defer TeardownBackend(t, server)
key := SetupKey(t, server, "geheim")
server.SetKey(key)
// archive a few files
arch, err := restic.NewArchiver(server)
OK(t, err)
sn, _, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil)
OK(t, err)
t.Logf("archived snapshot %v", sn.ID())
// start benchmark
t.ResetTimer()
for i := 0; i < t.N; i++ {
// create new archiver and preload
arch2, err := restic.NewArchiver(server)
OK(t, err)
OK(t, arch2.Preload())
}
func TestArchiveDedup(t *testing.T) {
archiveWithDedup(t)
}
func BenchmarkLoadTree(t *testing.B) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverPreload")
t.Skip("benchdir not set, skipping TestArchiverDedup")
}
s := SetupBackend(t)
@ -254,7 +266,7 @@ func BenchmarkLoadTree(t *testing.B) {
for i := 0; i < t.N; i++ {
for _, id := range list {
_, err := restic.LoadTree(s, server.Blob{Storage: id})
_, err := restic.LoadTree(s, id)
OK(t, err)
}
}

View File

@ -3,6 +3,7 @@ package backend
import (
"crypto/sha256"
"errors"
"io"
)
const (
@ -96,3 +97,33 @@ outer:
return IDSize, nil
}
// wrap around io.LimitedReader that implements io.ReadCloser
type blobReader struct {
f io.Closer
rd io.Reader
closed bool
}
func (l *blobReader) Read(p []byte) (int, error) {
n, err := l.rd.Read(p)
if err == io.EOF {
l.Close()
}
return n, err
}
func (l *blobReader) Close() error {
if !l.closed {
err := l.f.Close()
l.closed = true
return err
}
return nil
}
func LimitReader(f io.ReadCloser, n int64) *blobReader {
return &blobReader{f: f, rd: io.LimitReader(f, n)}
}

View File

@ -18,7 +18,7 @@ const (
Version = 1
)
// A Backend manages blobs of data.
// A Backend manages data stored somewhere.
type Backend interface {
// Location returns a string that specifies the location of the repository,
// like a URL.
@ -31,6 +31,10 @@ type Backend interface {
// Get returns an io.ReadCloser for the Blob with the given name of type t.
Get(t Type, name string) (io.ReadCloser, error)
// GetReader returns an io.ReadCloser for the Blob with the given name of
// type t at offset and length.
GetReader(t Type, name string, offset, length uint) (io.ReadCloser, error)
// Test a boolean value whether a Blob with the name and type exists.
Test(t Type, name string) (bool, error)

View File

@ -302,6 +302,26 @@ func (b *Local) Get(t backend.Type, name string) (io.ReadCloser, error) {
return os.Open(filename(b.p, t, name))
}
// GetReader returns an io.ReadCloser for the Blob with the given name of
// type t at offset and length. If length is 0, the reader reads until EOF.
func (b *Local) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
f, err := os.Open(filename(b.p, t, name))
if err != nil {
return nil, err
}
_, err = f.Seek(int64(offset), 0)
if err != nil {
return nil, err
}
if length == 0 {
return f, nil
}
return backend.LimitReader(f, int64(length)), nil
}
// Test returns true if a blob of the given type and name exists in the backend.
func (b *Local) Test(t backend.Type, name string) (bool, error) {
_, err := os.Stat(filename(b.p, t, name))

View File

@ -436,6 +436,26 @@ func (r *SFTP) Get(t backend.Type, name string) (io.ReadCloser, error) {
return file, nil
}
// GetReader returns an io.ReadCloser for the Blob with the given name of
// type t at offset and length. If length is 0, the reader reads until EOF.
func (r *SFTP) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
f, err := r.c.Open(r.filename(t, name))
if err != nil {
return nil, err
}
_, err = f.Seek(int64(offset), 0)
if err != nil {
return nil, err
}
if length == 0 {
return f, nil
}
return backend.LimitReader(f, int64(length)), nil
}
// Test returns true if a blob of the given type and name exists in the backend.
func (r *SFTP) Test(t backend.Type, name string) (bool, error) {
_, err := r.c.Lstat(r.filename(t, name))

144
cache.go
View File

@ -1,14 +1,12 @@
package restic
import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
@ -211,146 +209,6 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string,
// high-level functions
// RefreshSnapshots loads the maps for all snapshots and saves them to the
// local cache. Old cache entries are purged.
func (c *Cache) RefreshSnapshots(s *server.Server, p *Progress) error {
defer p.Done()
// list cache entries
entries, err := c.List(backend.Snapshot)
if err != nil {
return err
}
// list snapshots first
done := make(chan struct{})
defer close(done)
// check that snapshot blobs are cached
for name := range s.List(backend.Snapshot, done) {
id, err := backend.ParseID(name)
if err != nil {
continue
}
// remove snapshot from list of entries
for i, e := range entries {
if e.ID.Equal(id) {
entries = append(entries[:i], entries[i+1:]...)
break
}
}
has, err := c.Has(backend.Snapshot, "blobs", id)
if err != nil {
return err
}
if has {
continue
}
// else start progress reporting
p.Start()
// build new cache
_, err = cacheSnapshotBlobs(p, s, c, id)
if err != nil {
debug.Log("Cache.RefreshSnapshots", "unable to cache snapshot blobs for %v: %v", id.Str(), err)
return err
}
}
// remove other entries
for _, e := range entries {
debug.Log("Cache.RefreshSnapshots", "remove entry %v", e)
err = c.Purge(backend.Snapshot, e.Subtype, e.ID)
if err != nil {
return err
}
}
return nil
}
// cacheSnapshotBlobs creates a cache of all the blobs used within the
// snapshot. It collects all blobs from all trees and saves the resulting map
// to the cache and returns the map.
func cacheSnapshotBlobs(p *Progress, s *server.Server, c *Cache, id backend.ID) (*Map, error) {
debug.Log("CacheSnapshotBlobs", "create cache for snapshot %v", id.Str())
sn, err := LoadSnapshot(s, id)
if err != nil {
debug.Log("CacheSnapshotBlobs", "unable to load snapshot %v: %v", id.Str(), err)
return nil, err
}
m := NewMap()
// add top-level node
m.Insert(sn.Tree)
p.Report(Stat{Trees: 1})
// start walker
var wg sync.WaitGroup
ch := make(chan WalkTreeJob)
wg.Add(1)
go func() {
WalkTree(s, sn.Tree, nil, ch)
wg.Done()
}()
for i := 0; i < maxConcurrencyPreload; i++ {
wg.Add(1)
go func() {
for job := range ch {
if job.Tree == nil {
continue
}
p.Report(Stat{Trees: 1})
debug.Log("CacheSnapshotBlobs", "got job %v", job)
m.Merge(job.Tree.Map)
}
wg.Done()
}()
}
wg.Wait()
// save blob list for snapshot
return m, c.StoreMap(id, m)
}
func (c *Cache) StoreMap(snid backend.ID, m *Map) error {
wr, err := c.Store(backend.Snapshot, "blobs", snid)
if err != nil {
return nil
}
defer wr.Close()
enc := json.NewEncoder(wr)
err = enc.Encode(m)
if err != nil {
return err
}
return nil
}
func (c *Cache) LoadMap(s *server.Server, snid backend.ID) (*Map, error) {
rd, err := c.Load(backend.Snapshot, "blobs", snid)
if err != nil {
return nil, err
}
m := &Map{}
err = json.NewDecoder(rd).Decode(m)
return m, err
}
// GetCacheDir returns the cache directory according to XDG basedir spec, see
// http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
func GetCacheDir() (string, error) {
@ -389,3 +247,5 @@ func GetCacheDir() (string, error) {
return cachedir, nil
}
// TODO: implement RefreshIndex()

View File

@ -1,11 +1,9 @@
package restic_test
import (
"encoding/json"
"testing"
"github.com/restic/restic"
"github.com/restic/restic/backend"
. "github.com/restic/restic/test"
)
@ -15,46 +13,14 @@ func TestCache(t *testing.T) {
key := SetupKey(t, server, "geheim")
server.SetKey(key)
cache, err := restic.NewCache(server)
_, err := restic.NewCache(server)
OK(t, err)
arch, err := restic.NewArchiver(server)
OK(t, err)
// archive some files, this should automatically cache all blobs from the snapshot
_, id, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil)
_, _, err = arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil)
// try to load map from cache
rd, err := cache.Load(backend.Snapshot, "blobs", id)
OK(t, err)
dec := json.NewDecoder(rd)
m := &restic.Map{}
err = dec.Decode(m)
OK(t, err)
// remove cached blob list
OK(t, cache.Purge(backend.Snapshot, "blobs", id))
// load map from cache again, this should fail
rd, err = cache.Load(backend.Snapshot, "blobs", id)
Assert(t, err != nil, "Expected failure did not occur")
// recreate cached blob list
err = cache.RefreshSnapshots(server, nil)
OK(t, err)
// load map from cache again
rd, err = cache.Load(backend.Snapshot, "blobs", id)
OK(t, err)
dec = json.NewDecoder(rd)
m2 := &restic.Map{}
err = dec.Decode(m2)
OK(t, err)
// compare maps
Assert(t, m.Equals(m2), "Maps are not equal")
// TODO: test caching index
}

View File

@ -96,26 +96,6 @@ func (cmd CmdBackup) Usage() string {
return "DIR/FILE [snapshot-ID]"
}
func newCacheRefreshProgress() *restic.Progress {
p := restic.NewProgress(time.Second)
p.OnStart = func() {
fmt.Printf("refreshing cache\n")
}
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return p
}
p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2K[%s] %d trees loaded\r", formatDuration(d), s.Trees)
}
p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2Krefreshed cache in %s\n", formatDuration(d))
}
return p
}
func newScanProgress() *restic.Progress {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return nil
@ -200,6 +180,11 @@ func (cmd CmdBackup) Execute(args []string) error {
return err
}
err = s.LoadIndex()
if err != nil {
return err
}
var (
parentSnapshot string
parentSnapshotID backend.ID
@ -278,16 +263,7 @@ func (cmd CmdBackup) Execute(args []string) error {
return nil
}
err = arch.Cache().RefreshSnapshots(s, newCacheRefreshProgress())
if err != nil {
return err
}
fmt.Printf("loading blobs\n")
err = arch.Preload()
if err != nil {
return err
}
// TODO: load index
_, id, err := arch.Snapshot(newArchiveProgress(stat), target, parentSnapshotID)
if err != nil {

View File

@ -4,10 +4,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
@ -24,7 +27,7 @@ func init() {
}
func (cmd CmdCat) Usage() string {
return "[data|tree|snapshot|key|masterkey|lock] ID"
return "[pack|blob|tree|snapshot|key|masterkey|lock] ID"
}
func (cmd CmdCat) Execute(args []string) error {
@ -62,37 +65,20 @@ func (cmd CmdCat) Execute(args []string) error {
}
}
// handle all types that don't need an index
switch tpe {
case "data":
// try storage id
data, err := s.LoadID(backend.Data, id)
if err == nil {
_, err = os.Stdout.Write(data)
case "index":
buf, err := s.Load(backend.Index, id)
if err != nil {
return err
}
_, err = os.Stdout.Write(data)
_, err = os.Stdout.Write(append(buf, '\n'))
return err
case "tree":
// try storage id
tree := &restic.Tree{}
err := s.LoadJSONID(backend.Tree, id, tree)
if err != nil {
return err
}
buf, err := json.MarshalIndent(&tree, "", " ")
if err != nil {
return err
}
fmt.Println(string(buf))
return nil
case "snapshot":
sn := &restic.Snapshot{}
err = s.LoadJSONID(backend.Snapshot, id, sn)
err = s.LoadJSONEncrypted(backend.Snapshot, id, sn)
if err != nil {
return err
}
@ -136,6 +122,52 @@ func (cmd CmdCat) Execute(args []string) error {
return nil
case "lock":
return errors.New("not yet implemented")
}
// load index, handle all the other types
err = s.LoadIndex()
if err != nil {
return err
}
switch tpe {
case "pack":
rd, err := s.Backend().Get(backend.Data, id.String())
if err != nil {
return err
}
_, err = io.Copy(os.Stdout, rd)
return err
case "blob":
data, err := s.LoadBlob(pack.Data, id)
if err == nil {
_, err = os.Stdout.Write(data)
return err
}
_, err = os.Stdout.Write(data)
return err
case "tree":
debug.Log("cat", "cat tree %v", id.Str())
tree := restic.NewTree()
err = s.LoadJSONPack(pack.Tree, id, tree)
if err != nil {
debug.Log("cat", "unable to load tree %v: %v", id.Str(), err)
return err
}
buf, err := json.MarshalIndent(&tree, "", " ")
if err != nil {
debug.Log("cat", "error json.MarshalIndent(): %v", err)
return err
}
_, err = os.Stdout.Write(append(buf, '\n'))
return nil
default:
return errors.New("invalid type")
}

View File

@ -59,9 +59,9 @@ func parseTime(str string) (time.Time, error) {
return time.Time{}, fmt.Errorf("unable to parse time: %q", str)
}
func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([]findResult, error) {
debug.Log("restic.find", "checking tree %v\n", blob)
tree, err := restic.LoadTree(s, blob)
func (c CmdFind) findInTree(s *server.Server, id backend.ID, path string) ([]findResult, error) {
debug.Log("restic.find", "checking tree %v\n", id)
tree, err := restic.LoadTree(s, id)
if err != nil {
return nil, err
}
@ -93,12 +93,7 @@ func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([]
}
if node.Type == "dir" {
b, err := tree.Map.FindID(node.Subtree)
if err != nil {
return nil, err
}
subdirResults, err := c.findInTree(s, b, filepath.Join(path, node.Name))
subdirResults, err := c.findInTree(s, id, filepath.Join(path, node.Name))
if err != nil {
return nil, err
}

View File

@ -7,7 +7,9 @@ import (
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/crypto"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
@ -32,31 +34,31 @@ func init() {
}
}
func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) (uint64, error) {
func fsckFile(opts CmdFsck, s *server.Server, IDs []backend.ID) (uint64, error) {
debug.Log("restic.fsckFile", "checking file %v", IDs)
var bytes uint64
for _, id := range IDs {
debug.Log("restic.fsck", " checking data blob %v\n", id)
// test if blob is in map
blob, err := m.FindID(id)
// test if blob is in the index
packID, tpe, _, length, err := s.Index().Lookup(id)
if err != nil {
return 0, fmt.Errorf("storage ID for data blob %v not found", id)
return 0, fmt.Errorf("storage for blob %v (%v) not found", id, tpe)
}
bytes += blob.Size
debug.Log("restic.fsck", " data blob found: %v\n", blob)
bytes += uint64(length - crypto.Extension)
debug.Log("restic.fsck", " blob found in pack %v\n", packID)
if opts.CheckData {
// load content
_, err := s.Load(backend.Data, blob)
_, err := s.LoadBlob(pack.Data, id)
if err != nil {
return 0, err
}
} else {
// test if data blob is there
ok, err := s.Test(backend.Data, blob.Storage.String())
ok, err := s.Test(backend.Data, packID.String())
if err != nil {
return 0, err
}
@ -68,17 +70,17 @@ func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) (
// if orphan check is active, record storage id
if opts.o_data != nil {
opts.o_data.Insert(blob.Storage)
opts.o_data.Insert(id)
}
}
return bytes, nil
}
func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
debug.Log("restic.fsckTree", "checking tree %v", blob)
func fsckTree(opts CmdFsck, s *server.Server, id backend.ID) error {
debug.Log("restic.fsckTree", "checking tree %v", id.Str())
tree, err := restic.LoadTree(s, blob)
tree, err := restic.LoadTree(s, id)
if err != nil {
return err
}
@ -86,7 +88,7 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
// if orphan check is active, record storage id
if opts.o_trees != nil {
// add ID to list
opts.o_trees.Insert(blob.Storage)
opts.o_trees.Insert(id)
}
var firstErr error
@ -95,23 +97,23 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
for i, node := range tree.Nodes {
if node.Name == "" {
return fmt.Errorf("node %v of tree %v has no name", i, blob.ID)
return fmt.Errorf("node %v of tree %v has no name", i, id.Str())
}
if node.Type == "" {
return fmt.Errorf("node %q of tree %v has no type", node.Name, blob.ID)
return fmt.Errorf("node %q of tree %v has no type", node.Name, id.Str())
}
switch node.Type {
case "file":
if node.Content == nil {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, blob.ID, node)
return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, blob.ID, node)
debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, id, node)
return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, id, node)
}
if node.Content == nil && node.Error == "" {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, blob.ID)
return fmt.Errorf("file node %q of tree %v has no content", node.Name, blob.ID)
debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, id)
return fmt.Errorf("file node %q of tree %v has no content", node.Name, id)
}
// record ids
@ -119,32 +121,25 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
seenIDs.Insert(id)
}
debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, blob.ID.Str())
bytes, err := fsckFile(opts, s, tree.Map, node.Content)
debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, id.Str())
bytes, err := fsckFile(opts, s, node.Content)
if err != nil {
return err
}
if bytes != node.Size {
debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes)
return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes)
debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
}
case "dir":
if node.Subtree == nil {
return fmt.Errorf("dir node %q of tree %v (storage id %v) has no subtree", node.Name, blob.ID, blob.Storage)
}
// lookup blob
subtreeBlob, err := tree.Map.FindID(node.Subtree)
if err != nil {
firstErr = err
fmt.Fprintf(os.Stderr, "%v\n", err)
return fmt.Errorf("dir node %q of tree %v has no subtree", node.Name, id)
}
// record id
seenIDs.Insert(node.Subtree)
err = fsckTree(opts, s, subtreeBlob)
err = fsckTree(opts, s, node.Subtree)
if err != nil {
firstErr = err
fmt.Fprintf(os.Stderr, "%v\n", err)
@ -153,11 +148,11 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
}
// check map for unused ids
for _, id := range tree.Map.IDs() {
if seenIDs.Find(id) != nil {
return fmt.Errorf("tree %v: map contains unused ID %v", blob.ID, id)
}
}
// for _, id := range tree.Map.IDs() {
// if seenIDs.Find(id) != nil {
// return fmt.Errorf("tree %v: map contains unused ID %v", id, id)
// }
// }
return firstErr
}
@ -170,10 +165,6 @@ func fsckSnapshot(opts CmdFsck, s *server.Server, id backend.ID) error {
return fmt.Errorf("loading snapshot %v failed: %v", id, err)
}
if !sn.Tree.Valid() {
return fmt.Errorf("snapshot %s has invalid tree %v", sn.ID(), sn.Tree)
}
err = fsckTree(opts, s, sn.Tree)
if err != nil {
debug.Log("restic.fsck", " checking tree %v for snapshot %v\n", sn.Tree, id)
@ -201,6 +192,11 @@ func (cmd CmdFsck) Execute(args []string) error {
return err
}
err = s.LoadIndex()
if err != nil {
return err
}
if cmd.Snapshot != "" {
name, err := s.FindSnapshot(cmd.Snapshot)
if err != nil {

View File

@ -20,7 +20,7 @@ func init() {
}
func (cmd CmdList) Usage() string {
return "[data|trees|snapshots|keys|locks]"
return "[blobs|packs|index|snapshots|keys|locks]"
}
func (cmd CmdList) Execute(args []string) error {
@ -35,10 +35,21 @@ func (cmd CmdList) Execute(args []string) error {
var t backend.Type
switch args[0] {
case "data":
case "blobs":
err = s.LoadIndex()
if err != nil {
return err
}
for blob := range s.Index().Each(nil) {
fmt.Println(blob.ID)
}
return nil
case "packs":
t = backend.Data
case "trees":
t = backend.Tree
case "index":
t = backend.Index
case "snapshots":
t = backend.Snapshot
case "keys":

View File

@ -38,8 +38,8 @@ func printNode(prefix string, n *restic.Node) string {
}
}
func printTree(prefix string, s *server.Server, blob server.Blob) error {
tree, err := restic.LoadTree(s, blob)
func printTree(prefix string, s *server.Server, id backend.ID) error {
tree, err := restic.LoadTree(s, id)
if err != nil {
return err
}
@ -48,12 +48,7 @@ func printTree(prefix string, s *server.Server, blob server.Blob) error {
fmt.Println(printNode(prefix, entry))
if entry.Type == "dir" && entry.Subtree != nil {
b, err := tree.Map.FindID(entry.Subtree)
if err != nil {
return err
}
err = printTree(filepath.Join(prefix, entry.Name), s, b)
err = printTree(filepath.Join(prefix, entry.Name), s, id)
if err != nil {
return err
}

View File

@ -35,6 +35,11 @@ func (cmd CmdRestore) Execute(args []string) error {
return err
}
err = s.LoadIndex()
if err != nil {
return err
}
name, err := backend.FindSnapshot(s, args[0])
if err != nil {
errx(1, "invalid id %q: %v", args[0], err)

219
map.go
View File

@ -1,219 +0,0 @@
package restic
import (
"encoding/json"
"errors"
"sort"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/server"
)
type Map struct {
list []server.Blob
m sync.Mutex
}
var ErrBlobNotFound = errors.New("Blob not found")
func NewMap() *Map {
return &Map{
list: []server.Blob{},
}
}
func (bl *Map) find(blob server.Blob, checkSize bool) (int, server.Blob, error) {
pos := sort.Search(len(bl.list), func(i int) bool {
return blob.ID.Compare(bl.list[i].ID) >= 0
})
if pos < len(bl.list) {
b := bl.list[pos]
if blob.ID.Compare(b.ID) == 0 && (!checkSize || blob.Size == b.Size) {
return pos, b, nil
}
}
return pos, server.Blob{}, ErrBlobNotFound
}
func (bl *Map) Find(blob server.Blob) (server.Blob, error) {
bl.m.Lock()
defer bl.m.Unlock()
_, blob, err := bl.find(blob, true)
return blob, err
}
func (bl *Map) FindID(id backend.ID) (server.Blob, error) {
bl.m.Lock()
defer bl.m.Unlock()
_, blob, err := bl.find(server.Blob{ID: id}, false)
return blob, err
}
func (bl *Map) Merge(other *Map) {
bl.m.Lock()
defer bl.m.Unlock()
other.m.Lock()
defer other.m.Unlock()
for _, blob := range other.list {
bl.insert(blob)
}
}
func (bl *Map) insert(blob server.Blob) server.Blob {
pos, b, err := bl.find(blob, true)
if err == nil {
// already present
return b
}
// insert blob
// https://code.google.com/p/go-wiki/wiki/SliceTricks
bl.list = append(bl.list, server.Blob{})
copy(bl.list[pos+1:], bl.list[pos:])
bl.list[pos] = blob
return blob
}
func (bl *Map) Insert(blob server.Blob) server.Blob {
bl.m.Lock()
defer bl.m.Unlock()
debug.Log("Map.Insert", " Map<%p> insert %v", bl, blob)
return bl.insert(blob)
}
func (bl *Map) MarshalJSON() ([]byte, error) {
return json.Marshal(bl.list)
}
func (bl *Map) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &bl.list)
}
func (bl *Map) IDs() []backend.ID {
bl.m.Lock()
defer bl.m.Unlock()
ids := make([]backend.ID, 0, len(bl.list))
for _, b := range bl.list {
ids = append(ids, b.ID)
}
return ids
}
func (bl *Map) StorageIDs() []backend.ID {
bl.m.Lock()
defer bl.m.Unlock()
ids := make([]backend.ID, 0, len(bl.list))
for _, b := range bl.list {
ids = append(ids, b.Storage)
}
return ids
}
func (bl *Map) Equals(other *Map) bool {
if bl == nil && other == nil {
return true
}
if bl == nil || other == nil {
return false
}
bl.m.Lock()
defer bl.m.Unlock()
if len(bl.list) != len(other.list) {
debug.Log("Map.Equals", "length does not match: %d != %d", len(bl.list), len(other.list))
return false
}
for i := 0; i < len(bl.list); i++ {
if bl.list[i].Compare(other.list[i]) != 0 {
debug.Log("Map.Equals", "entry %d does not match: %v != %v", i, bl.list[i], other.list[i])
return false
}
}
return true
}
// Each calls f for each blob in the Map. While Each is running, no other
// operation is possible, since a mutex is held for the whole time.
func (bl *Map) Each(f func(blob server.Blob)) {
bl.m.Lock()
defer bl.m.Unlock()
for _, blob := range bl.list {
f(blob)
}
}
// Select returns a list of of blobs from the plaintext IDs given in list.
func (bl *Map) Select(list backend.IDs) (server.Blobs, error) {
bl.m.Lock()
defer bl.m.Unlock()
blobs := make(server.Blobs, 0, len(list))
for _, id := range list {
_, blob, err := bl.find(server.Blob{ID: id}, false)
if err != nil {
return nil, err
}
blobs = append(blobs, blob)
}
return blobs, nil
}
// Len returns the number of blobs in the map.
func (bl *Map) Len() int {
bl.m.Lock()
defer bl.m.Unlock()
return len(bl.list)
}
// Prune deletes all IDs from the map except the ones listed in ids.
func (bl *Map) Prune(ids *backend.IDSet) {
bl.m.Lock()
defer bl.m.Unlock()
pos := 0
for pos < len(bl.list) {
blob := bl.list[pos]
if ids.Find(blob.ID) != nil {
// remove element
bl.list = append(bl.list[:pos], bl.list[pos+1:]...)
continue
}
pos++
}
}
// DeleteID removes the plaintext ID id from the map.
func (bl *Map) DeleteID(id backend.ID) {
bl.m.Lock()
defer bl.m.Unlock()
pos, _, err := bl.find(server.Blob{ID: id}, false)
if err != nil {
return
}
bl.list = append(bl.list[:pos], bl.list[pos+1:]...)
}

View File

@ -1,147 +0,0 @@
package restic_test
import (
"crypto/rand"
"encoding/json"
"flag"
"io"
mrand "math/rand"
"sync"
"testing"
"time"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/server"
. "github.com/restic/restic/test"
)
var maxWorkers = flag.Uint("workers", 20, "number of workers to test Map concurrent access against")
func randomID() []byte {
buf := make([]byte, backend.IDSize)
_, err := io.ReadFull(rand.Reader, buf)
if err != nil {
panic(err)
}
return buf
}
func newBlob() server.Blob {
return server.Blob{
ID: randomID(),
Size: uint64(mrand.Uint32()),
Storage: randomID(),
StorageSize: uint64(mrand.Uint32()),
}
}
// Test basic functionality
func TestMap(t *testing.T) {
bl := restic.NewMap()
b := newBlob()
bl.Insert(b)
for i := 0; i < 1000; i++ {
bl.Insert(newBlob())
}
b2, err := bl.Find(server.Blob{ID: b.ID, Size: b.Size})
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
b2, err = bl.FindID(b.ID)
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
bl2 := restic.NewMap()
for i := 0; i < 1000; i++ {
bl.Insert(newBlob())
}
b2, err = bl2.Find(b)
Assert(t, err != nil, "found ID in restic that was never inserted: %v", b2)
bl2.Merge(bl)
b2, err = bl2.Find(b)
if err != nil {
t.Fatal(err)
}
if b.Compare(b2) != 0 {
t.Fatalf("items are not equal: want %v, got %v", b, b2)
}
}
// Test JSON encode/decode
func TestMapJSON(t *testing.T) {
bl := restic.NewMap()
b := server.Blob{ID: randomID()}
bl.Insert(b)
b2, err := bl.Find(b)
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
buf, err := json.Marshal(bl)
OK(t, err)
bl2 := restic.Map{}
json.Unmarshal(buf, &bl2)
b2, err = bl2.Find(b)
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
buf, err = json.Marshal(bl2)
OK(t, err)
}
// random insert/find access by several goroutines
func TestMapRandom(t *testing.T) {
var wg sync.WaitGroup
worker := func(bl *restic.Map) {
defer wg.Done()
b := newBlob()
bl.Insert(b)
for i := 0; i < 200; i++ {
bl.Insert(newBlob())
}
d := time.Duration(mrand.Intn(10)*100) * time.Millisecond
time.Sleep(d)
for i := 0; i < 100; i++ {
b2, err := bl.Find(b)
if err != nil {
t.Fatal(err)
}
if b.Compare(b2) != 0 {
t.Fatalf("items are not equal: want %v, got %v", b, b2)
}
}
bl2 := restic.NewMap()
for i := 0; i < 200; i++ {
bl2.Insert(newBlob())
}
bl2.Merge(bl)
}
bl := restic.NewMap()
for i := 0; uint(i) < *maxWorkers; i++ {
wg.Add(1)
go worker(bl)
}
wg.Wait()
}

16
node.go
View File

@ -12,6 +12,7 @@ import (
"github.com/juju/arrar"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
@ -98,14 +99,14 @@ func nodeTypeFromFileInfo(fi os.FileInfo) string {
return ""
}
func (node *Node) CreateAt(path string, m *Map, s *server.Server) error {
func (node *Node) CreateAt(path string, s *server.Server) error {
switch node.Type {
case "dir":
if err := node.createDirAt(path); err != nil {
return err
}
case "file":
if err := node.createFileAt(path, m, s); err != nil {
if err := node.createFileAt(path, s); err != nil {
return err
}
case "symlink":
@ -171,7 +172,7 @@ func (node Node) createDirAt(path string) error {
return nil
}
func (node Node) createFileAt(path string, m *Map, s *server.Server) error {
func (node Node) createFileAt(path string, s *server.Server) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600)
defer f.Close()
@ -179,13 +180,8 @@ func (node Node) createFileAt(path string, m *Map, s *server.Server) error {
return arrar.Annotate(err, "OpenFile")
}
for _, blobid := range node.Content {
blob, err := m.FindID(blobid)
if err != nil {
return arrar.Annotate(err, "Find Blob")
}
buf, err := s.Load(backend.Data, blob)
for _, id := range node.Content {
buf, err := s.LoadBlob(pack.Data, id)
if err != nil {
return arrar.Annotate(err, "Load")
}

View File

@ -19,6 +19,17 @@ const (
Tree = 1
)
func (t BlobType) String() string {
switch t {
case Data:
return "data"
case Tree:
return "tree"
}
return fmt.Sprintf("<BlobType %d>", t)
}
func (t BlobType) MarshalJSON() ([]byte, error) {
switch t {
case Data:

View File

@ -36,8 +36,8 @@ func NewRestorer(s *server.Server, snid backend.ID) (*Restorer, error) {
return r, nil
}
func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
tree, err := LoadTree(res.s, treeBlob)
func (res *Restorer) to(dst string, dir string, treeID backend.ID) error {
tree, err := LoadTree(res.s, treeID)
if err != nil {
return res.Error(dir, nil, arrar.Annotate(err, "LoadTree"))
}
@ -47,7 +47,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
if res.Filter == nil ||
res.Filter(filepath.Join(dir, node.Name), dstpath, node) {
err := node.CreateAt(dstpath, tree.Map, res.s)
err := node.CreateAt(dstpath, res.s)
// Did it fail because of ENOENT?
if arrar.Check(err, func(err error) bool {
@ -60,7 +60,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
// Create parent directories and retry
err = os.MkdirAll(filepath.Dir(dstpath), 0700)
if err == nil || err == os.ErrExist {
err = node.CreateAt(dstpath, tree.Map, res.s)
err = node.CreateAt(dstpath, res.s)
}
}
@ -74,20 +74,11 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
if node.Type == "dir" {
if node.Subtree == nil {
return fmt.Errorf("Dir without subtree in tree %s", treeBlob)
return fmt.Errorf("Dir without subtree in tree %v", treeID.Str())
}
subp := filepath.Join(dir, node.Name)
subtreeBlob, err := tree.Map.FindID(node.Subtree)
if err != nil {
err = res.Error(subp, node, arrar.Annotate(err, "lookup subtree"))
if err != nil {
return err
}
}
err = res.to(dst, subp, subtreeBlob)
err = res.to(dst, subp, node.Subtree)
if err != nil {
err = res.Error(subp, node, arrar.Annotate(err, "restore subtree"))
if err != nil {

View File

@ -3,6 +3,7 @@ package server
import (
"encoding/json"
"errors"
"fmt"
"io"
"sync"
@ -47,8 +48,8 @@ func (idx *Index) Store(t pack.BlobType, id, pack backend.ID, offset, length uin
idx.m.Lock()
defer idx.m.Unlock()
debug.Log("Index.Store", "pack %v contains id %v, offset %v, length %v",
pack.Str(), id.Str(), offset, length)
debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v",
pack.Str(), id.Str(), t, offset, length)
idx.store(t, id, pack, offset, length, false)
}
@ -91,6 +92,22 @@ func (idx *Index) Has(id backend.ID) bool {
return false
}
// Merge loads all items from other into idx.
func (idx *Index) Merge(other *Index) {
debug.Log("Index.Merge", "Merge index with %p", other)
idx.m.Lock()
defer idx.m.Unlock()
for k, v := range other.pack {
if _, ok := idx.pack[k]; ok {
debug.Log("Index.Merge", "index already has key %v, updating", k[:8])
}
idx.pack[k] = v
}
debug.Log("Index.Merge", "done merging index")
}
// Each returns a channel that yields all blobs known to the index. If done is
// closed, the background goroutine terminates. This blocks any modification of
// the index.
@ -128,6 +145,22 @@ func (idx *Index) Each(done chan struct{}) <-chan pack.Blob {
return ch
}
// Count returns the number of blobs of type t in the index.
func (idx *Index) Count(t pack.BlobType) (n uint) {
debug.Log("Index.Count", "counting blobs of type %v", t)
idx.m.Lock()
defer idx.m.Unlock()
for id, blob := range idx.pack {
if blob.tpe == t {
n++
debug.Log("Index.Count", " blob %v counted: %v", id[:8], blob)
}
}
return
}
type packJSON struct {
ID string `json:"id"`
Blobs []blobJSON `json:"blobs"`
@ -144,6 +177,7 @@ type blobJSON struct {
// serialization only contains new blobs added via idx.Store(), not old ones
// introduced via DecodeIndex().
func (idx *Index) Encode(w io.Writer) error {
debug.Log("Index.Encode", "encoding index")
idx.m.Lock()
defer idx.m.Unlock()
@ -155,6 +189,14 @@ func (idx *Index) Encode(w io.Writer) error {
continue
}
debug.Log("Index.Encode", "handle blob %q", id[:8])
if blob.packID == nil {
debug.Log("Index.Encode", "blob %q has no packID! (type %v, offset %v, length %v)",
id[:8], blob.tpe, blob.offset, blob.length)
return fmt.Errorf("unable to serialize index: pack for blob %v hasn't been written yet", id)
}
// see if pack is already in map
p, ok := packs[blob.packID.String()]
if !ok {
@ -175,12 +217,15 @@ func (idx *Index) Encode(w io.Writer) error {
})
}
debug.Log("Index.Encode", "done")
enc := json.NewEncoder(w)
return enc.Encode(list)
}
// DecodeIndex loads and unserializes an index from rd.
func DecodeIndex(rd io.Reader) (*Index, error) {
debug.Log("Index.DecodeIndex", "Start decoding index")
list := []*packJSON{}
dec := json.NewDecoder(rd)
@ -193,12 +238,14 @@ func DecodeIndex(rd io.Reader) (*Index, error) {
for _, pack := range list {
packID, err := backend.ParseID(pack.ID)
if err != nil {
debug.Log("Index.DecodeIndex", "error parsing pack ID %q: %v", pack.ID, err)
return nil, err
}
for _, blob := range pack.Blobs {
blobID, err := backend.ParseID(blob.ID)
if err != nil {
debug.Log("Index.DecodeIndex", "error parsing blob ID %q: %v", blob.ID, err)
return nil, err
}
@ -206,5 +253,6 @@ func DecodeIndex(rd io.Reader) (*Index, error) {
}
}
debug.Log("Index.DecodeIndex", "done")
return idx, err
}

View File

@ -1,24 +1,35 @@
package server
import (
"bytes"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/chunker"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
)
type Server struct {
be backend.Backend
key *Key
idx *Index
pm sync.Mutex
packs []*pack.Packer
}
func NewServer(be backend.Backend) *Server {
return &Server{be: be}
return &Server{
be: be,
idx: NewIndex(),
}
}
func (s *Server) SetKey(k *Key) {
@ -49,14 +60,15 @@ func (s *Server) PrefixLength(t backend.Type) (int, error) {
return backend.PrefixLength(s.be, t)
}
// Load tries to load and decrypt content identified by t and blob from the
// backend. If the blob specifies an ID, the decrypted plaintext is checked
// against this ID. The same goes for blob.Size and blob.StorageSize: If they
// are set to a value > 0, this value is checked.
func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) {
// load data
rd, err := s.be.Get(t, blob.Storage.String())
// Load tries to load and decrypt content identified by t and id from the
// backend.
func (s *Server) Load(t backend.Type, id backend.ID) ([]byte, error) {
debug.Log("Server.Load", "load %v with id %v", t, id.Str())
// load blob from pack
rd, err := s.be.Get(t, id.String())
if err != nil {
debug.Log("Server.Load", "error loading %v: %v", id.Str(), err)
return nil, err
}
@ -65,58 +77,78 @@ func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) {
return nil, err
}
// check hash
if !backend.Hash(buf).Equal(blob.Storage) {
return nil, errors.New("invalid data returned")
}
// check length
if blob.StorageSize > 0 && len(buf) != int(blob.StorageSize) {
return nil, errors.New("Invalid storage length")
}
// decrypt
buf, err = s.Decrypt(buf)
err = rd.Close()
if err != nil {
return nil, err
}
// check length
if blob.Size > 0 && len(buf) != int(blob.Size) {
return nil, errors.New("Invalid length")
// check hash
if !backend.Hash(buf).Equal(id) {
return nil, errors.New("invalid data returned")
}
// check SHA256 sum
if blob.ID != nil {
id := backend.Hash(buf)
if !blob.ID.Equal(id) {
return nil, fmt.Errorf("load %v: expected plaintext hash %v, got %v", blob.Storage, blob.ID, id)
}
}
return buf, nil
}
// Load tries to load and decrypt content identified by t and id from the backend.
func (s *Server) LoadID(t backend.Type, storageID backend.ID) ([]byte, error) {
return s.Load(t, Blob{Storage: storageID})
}
// LoadJSON calls Load() to get content from the backend and afterwards calls
// json.Unmarshal on the item.
func (s *Server) LoadJSON(t backend.Type, blob Blob, item interface{}) error {
buf, err := s.Load(t, blob)
// decrypt
plain, err := s.Decrypt(buf)
if err != nil {
return err
return nil, err
}
return json.Unmarshal(buf, item)
return plain, nil
}
// LoadJSONID calls Load() to get content from the backend and afterwards calls
// json.Unmarshal on the item.
func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) error {
// read
// LoadBlob tries to load and decrypt content identified by t and id from a
// pack from the backend.
func (s *Server) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) {
debug.Log("Server.LoadPack", "load %v with id %v", t, id.Str())
// lookup pack
packID, tpe, offset, length, err := s.idx.Lookup(id)
if err != nil {
debug.Log("Server.LoadPack", "id %v not found in index: %v", id.Str(), err)
return nil, err
}
if tpe != t {
debug.Log("Server.LoadPack", "wrong type returned for %v: wanted %v, got %v", id.Str(), t, tpe)
return nil, fmt.Errorf("blob has wrong type %v (wanted: %v)", tpe, t)
}
debug.Log("Server.LoadPack", "id %v found in pack %v at offset %v (length %d)", id.Str(), packID.Str(), offset, length)
// load blob from pack
rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length)
if err != nil {
debug.Log("Server.LoadPack", "error loading pack %v for %v: %v", packID.Str(), id.Str(), err)
return nil, err
}
buf, err := ioutil.ReadAll(rd)
if err != nil {
return nil, err
}
err = rd.Close()
if err != nil {
return nil, err
}
// decrypt
plain, err := s.Decrypt(buf)
if err != nil {
return nil, err
}
// check hash
if !backend.Hash(plain).Equal(id) {
return nil, errors.New("invalid data returned")
}
return plain, nil
}
// LoadJSONEncrypted decrypts the data and afterwards calls json.Unmarshal on
// the item.
func (s *Server) LoadJSONEncrypted(t backend.Type, id backend.ID, item interface{}) error {
// load blob from backend
rd, err := s.be.Get(t, id.String())
if err != nil {
return err
@ -140,132 +172,254 @@ func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) err
return nil
}
// Save encrypts data and stores it to the backend as type t.
func (s *Server) Save(t backend.Type, data []byte, id backend.ID) (Blob, error) {
// LoadJSONPack calls LoadBlob() to load a blob from the backend, decrypt the
// data and afterwards call json.Unmarshal on the item.
func (s *Server) LoadJSONPack(t pack.BlobType, id backend.ID, item interface{}) error {
// lookup pack
packID, _, offset, length, err := s.idx.Lookup(id)
if err != nil {
return err
}
// load blob from pack
rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length)
if err != nil {
return err
}
defer rd.Close()
// decrypt
decryptRd, err := s.key.DecryptFrom(rd)
defer decryptRd.Close()
if err != nil {
return err
}
// decode
decoder := json.NewDecoder(decryptRd)
err = decoder.Decode(item)
if err != nil {
return err
}
return nil
}
const minPackSize = 4 * chunker.MiB
const maxPackSize = 16 * chunker.MiB
const maxPackers = 200
// findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs.
func (s *Server) findPacker(size uint) (*pack.Packer, error) {
s.pm.Lock()
defer s.pm.Unlock()
// search for a suitable packer
if len(s.packs) > 0 {
debug.Log("Server.findPacker", "searching packer for %d bytes\n", size)
for i, p := range s.packs {
if p.Size()+size < maxPackSize {
debug.Log("Server.findPacker", "found packer %v", p)
// remove from list
s.packs = append(s.packs[:i], s.packs[i+1:]...)
return p, nil
}
}
}
// no suitable packer found, return new
blob, err := s.be.Create()
if err != nil {
return nil, err
}
debug.Log("Server.findPacker", "create new pack %p", blob)
return pack.NewPacker(s.key.Master(), blob), nil
}
// insertPacker appends p to s.packs.
func (s *Server) insertPacker(p *pack.Packer) {
s.pm.Lock()
defer s.pm.Unlock()
s.packs = append(s.packs, p)
debug.Log("Server.insertPacker", "%d packers\n", len(s.packs))
}
// savePacker stores p in the backend.
func (s *Server) savePacker(p *pack.Packer) error {
debug.Log("Server.savePacker", "save packer with %d blobs\n", p.Count())
_, err := p.Finalize()
if err != nil {
return err
}
// move file to the final location
sid := p.ID()
err = p.Writer().(backend.Blob).Finalize(backend.Data, sid.String())
if err != nil {
debug.Log("Server.savePacker", "blob Finalize() error: %v", err)
return err
}
debug.Log("Server.savePacker", "saved as %v", sid.Str())
// update blobs in the index
for _, b := range p.Blobs() {
debug.Log("Server.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str())
s.idx.Store(b.Type, b.ID, sid, b.Offset, uint(b.Length))
}
return nil
}
// countPacker returns the number of open (unfinished) packers.
func (s *Server) countPacker() int {
s.pm.Lock()
defer s.pm.Unlock()
return len(s.packs)
}
// Save encrypts data and stores it to the backend as type t. If data is small
// enough, it will be packed together with other small blobs.
func (s *Server) Save(t pack.BlobType, data []byte, id backend.ID) (backend.ID, error) {
if id == nil {
// compute plaintext hash
id = backend.Hash(data)
}
// create a new blob
blob := Blob{
ID: id,
Size: uint64(len(data)),
}
debug.Log("Server.Save", "save id %v (%v, %d bytes)", id.Str(), t, len(data))
// get buf from the pool
ciphertext := getBuf()
defer freeBuf(ciphertext)
// encrypt blob
ciphertext, err := s.Encrypt(ciphertext, data)
if err != nil {
return Blob{}, err
return nil, err
}
// compute ciphertext hash
sid := backend.Hash(ciphertext)
// save blob
backendBlob, err := s.be.Create()
// find suitable packer and add blob
packer, err := s.findPacker(uint(len(ciphertext)))
if err != nil {
return Blob{}, err
return nil, err
}
_, err = backendBlob.Write(ciphertext)
if err != nil {
return Blob{}, err
// save ciphertext
packer.Add(t, id, bytes.NewReader(ciphertext))
// add this id to the index, although we don't know yet in which pack it
// will be saved, the entry will be updated when the pack is written.
s.idx.Store(t, id, nil, 0, 0)
debug.Log("Server.Save", "saving stub for %v (%v) in index", id.Str, t)
// if the pack is not full enough and there are less than maxPackers
// packers, put back to the list
if packer.Size() < minPackSize && s.countPacker() < maxPackers {
debug.Log("Server.Save", "pack is not full enough (%d bytes)", packer.Size())
s.insertPacker(packer)
return id, nil
}
err = backendBlob.Finalize(t, sid.String())
if err != nil {
return Blob{}, err
}
blob.Storage = sid
blob.StorageSize = uint64(len(ciphertext))
return blob, nil
// else write the pack to the backend
return id, s.savePacker(packer)
}
// SaveFrom encrypts data read from rd and stores it to the backend as type t.
func (s *Server) SaveFrom(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) {
// SaveFrom encrypts data read from rd and stores it in a pack in the backend as type t.
func (s *Server) SaveFrom(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error {
debug.Log("Server.SaveFrom", "save id %v (%v, %d bytes)", id.Str(), t, length)
if id == nil {
return Blob{}, errors.New("id is nil")
return errors.New("id is nil")
}
backendBlob, err := s.be.Create()
buf, err := ioutil.ReadAll(rd)
if err != nil {
return Blob{}, err
return err
}
hw := backend.NewHashingWriter(backendBlob, sha256.New())
encWr := s.key.EncryptTo(hw)
_, err = io.Copy(encWr, rd)
_, err = s.Save(t, buf, id)
if err != nil {
return Blob{}, err
return err
}
// finish encryption
err = encWr.Close()
if err != nil {
return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err)
}
// finish backend blob
sid := backend.ID(hw.Sum(nil))
err = backendBlob.Finalize(t, sid.String())
if err != nil {
return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err)
}
return Blob{
ID: id,
Size: uint64(length),
Storage: sid,
StorageSize: uint64(backendBlob.Size()),
}, nil
return nil
}
// SaveJSON serialises item as JSON and encrypts and saves it in the backend as
// type t.
func (s *Server) SaveJSON(t backend.Type, item interface{}) (Blob, error) {
backendBlob, err := s.be.Create()
// SaveJSON serialises item as JSON and encrypts and saves it in a pack in the
// backend as type t.
func (s *Server) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, error) {
debug.Log("Server.SaveJSON", "save %v blob", t)
buf := getBuf()[:0]
defer freeBuf(buf)
wr := bytes.NewBuffer(buf)
enc := json.NewEncoder(wr)
err := enc.Encode(item)
if err != nil {
return Blob{}, fmt.Errorf("Create: %v", err)
return nil, fmt.Errorf("json.Encode: %v", err)
}
storagehw := backend.NewHashingWriter(backendBlob, sha256.New())
encWr := s.key.EncryptTo(storagehw)
plainhw := backend.NewHashingWriter(encWr, sha256.New())
buf = wr.Bytes()
return s.Save(t, buf, nil)
}
enc := json.NewEncoder(plainhw)
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
// backend as type t, without a pack. It returns the storage hash.
func (s *Server) SaveJSONUnpacked(t backend.Type, item interface{}) (backend.ID, error) {
// create blob
blob, err := s.be.Create()
if err != nil {
return nil, err
}
debug.Log("Server.SaveJSONUnpacked", "create new pack %p", blob)
// hash
hw := backend.NewHashingWriter(blob, sha256.New())
// encrypt blob
ewr := s.key.EncryptTo(hw)
enc := json.NewEncoder(ewr)
err = enc.Encode(item)
if err != nil {
return Blob{}, fmt.Errorf("json.NewEncoder: %v", err)
return nil, fmt.Errorf("json.Encode: %v", err)
}
// finish encryption
err = encWr.Close()
err = ewr.Close()
if err != nil {
return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err)
return nil, err
}
// finish backend blob
sid := backend.ID(storagehw.Sum(nil))
err = backendBlob.Finalize(t, sid.String())
// finalize blob in the backend
sid := backend.ID(hw.Sum(nil))
err = blob.Finalize(t, sid.String())
if err != nil {
return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err)
return nil, err
}
id := backend.ID(plainhw.Sum(nil))
return sid, nil
}
return Blob{
ID: id,
Size: uint64(plainhw.Size()),
Storage: sid,
StorageSize: uint64(backendBlob.Size()),
}, nil
// Flush saves all remaining packs.
func (s *Server) Flush() error {
s.pm.Lock()
defer s.pm.Unlock()
debug.Log("Server.Flush", "manually flushing %d packs", len(s.packs))
for _, p := range s.packs {
err := s.savePacker(p)
if err != nil {
return err
}
}
s.packs = s.packs[:0]
return nil
}
// Returns the backend used for this server.
@ -273,6 +427,106 @@ func (s *Server) Backend() backend.Backend {
return s.be
}
// Returns the index of this server.
func (s *Server) Index() *Index {
return s.idx
}
// SetIndex instructs the server to use the given index.
func (s *Server) SetIndex(i *Index) {
s.idx = i
}
// SaveIndex saves all new packs in the index in the backend, returned is the
// storage ID.
func (s *Server) SaveIndex() (backend.ID, error) {
debug.Log("Server.SaveIndex", "Saving index")
// create blob
blob, err := s.be.Create()
if err != nil {
return nil, err
}
debug.Log("Server.SaveIndex", "create new pack %p", blob)
// hash
hw := backend.NewHashingWriter(blob, sha256.New())
// encrypt blob
ewr := s.key.EncryptTo(hw)
err = s.idx.Encode(ewr)
if err != nil {
return nil, err
}
err = ewr.Close()
if err != nil {
return nil, err
}
// finalize blob in the backend
sid := backend.ID(hw.Sum(nil))
err = blob.Finalize(backend.Index, sid.String())
if err != nil {
return nil, err
}
debug.Log("Server.SaveIndex", "Saved index as %v", sid.Str())
return sid, nil
}
// LoadIndex loads all index files from the backend and merges them with the
// current index.
func (s *Server) LoadIndex() error {
debug.Log("Server.LoadIndex", "Loading index")
done := make(chan struct{})
defer close(done)
for id := range s.be.List(backend.Index, done) {
err := s.loadIndex(id)
if err != nil {
return err
}
}
return nil
}
// loadIndex loads the index id and merges it with the currently used index.
func (s *Server) loadIndex(id string) error {
debug.Log("Server.loadIndex", "Loading index %v", id[:8])
before := len(s.idx.pack)
rd, err := s.be.Get(backend.Index, id)
defer rd.Close()
if err != nil {
return err
}
// decrypt
decryptRd, err := s.key.DecryptFrom(rd)
defer decryptRd.Close()
if err != nil {
return err
}
idx, err := DecodeIndex(decryptRd)
if err != nil {
debug.Log("Server.loadIndex", "error while decoding index %v: %v", id, err)
return err
}
s.idx.Merge(idx)
after := len(s.idx.pack)
debug.Log("Server.loadIndex", "Loaded index %v, added %v blobs", id[:8], after-before)
return nil
}
func (s *Server) SearchKey(password string) error {
key, err := SearchKey(s, password)
if err != nil {
@ -289,7 +543,7 @@ func (s *Server) Decrypt(ciphertext []byte) ([]byte, error) {
return nil, errors.New("key for server not set")
}
return s.key.Decrypt([]byte{}, ciphertext)
return s.key.Decrypt(nil, ciphertext)
}
func (s *Server) Encrypt(ciphertext, plaintext []byte) ([]byte, error) {
@ -305,8 +559,8 @@ func (s *Server) Key() *Key {
}
// Count returns the number of blobs of a given type in the backend.
func (s *Server) Count(t backend.Type) (n int) {
for _ = range s.List(t, nil) {
func (s *Server) Count(t backend.Type) (n uint) {
for _ = range s.be.List(t, nil) {
n++
}

View File

@ -11,6 +11,7 @@ import (
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/pack"
. "github.com/restic/restic/test"
)
@ -38,12 +39,12 @@ func TestSaveJSON(t *testing.T) {
data = append(data, '\n')
h := sha256.Sum256(data)
blob, err := server.SaveJSON(backend.Tree, obj)
id, err := server.SaveJSON(pack.Tree, obj)
OK(t, err)
Assert(t, bytes.Equal(h[:], blob.ID),
Assert(t, bytes.Equal(h[:], id),
"TestSaveJSON: wrong plaintext ID: expected %02x, got %02x",
h, blob.ID)
h, id)
}
}
@ -63,17 +64,51 @@ func BenchmarkSaveJSON(t *testing.B) {
t.ResetTimer()
for i := 0; i < t.N; i++ {
blob, err := server.SaveJSON(backend.Tree, obj)
id, err := server.SaveJSON(pack.Tree, obj)
OK(t, err)
Assert(t, bytes.Equal(h[:], blob.ID),
Assert(t, bytes.Equal(h[:], id),
"TestSaveJSON: wrong plaintext ID: expected %02x, got %02x",
h, blob.ID)
h, id)
}
}
var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20}
func TestSave(t *testing.T) {
server := SetupBackend(t)
defer TeardownBackend(t, server)
key := SetupKey(t, server, "geheim")
server.SetKey(key)
for _, size := range testSizes {
data := make([]byte, size)
_, err := io.ReadFull(rand.Reader, data)
OK(t, err)
id := backend.Hash(data)
// save
sid, err := server.Save(pack.Data, data, nil)
OK(t, err)
Equals(t, id, sid)
OK(t, server.Flush())
// read back
buf, err := server.LoadBlob(pack.Data, id)
Assert(t, len(buf) == len(data),
"number of bytes read back does not match: expected %d, got %d",
len(data), len(buf))
Assert(t, bytes.Equal(buf, data),
"data does not match: expected %02x, got %02x",
data, buf)
}
}
func TestSaveFrom(t *testing.T) {
server := SetupBackend(t)
defer TeardownBackend(t, server)
@ -85,14 +120,16 @@ func TestSaveFrom(t *testing.T) {
_, err := io.ReadFull(rand.Reader, data)
OK(t, err)
id := sha256.Sum256(data)
id := backend.Hash(data)
// save
blob, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data))
err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data))
OK(t, err)
OK(t, server.Flush())
// read back
buf, err := server.Load(backend.Data, blob)
buf, err := server.LoadBlob(pack.Data, id[:])
Assert(t, len(buf) == len(data),
"number of bytes read back does not match: expected %d, got %d",
@ -123,12 +160,12 @@ func BenchmarkSaveFrom(t *testing.B) {
for i := 0; i < t.N; i++ {
// save
_, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data))
err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data))
OK(t, err)
}
}
func TestLoadJSONID(t *testing.T) {
func TestLoadJSONPack(t *testing.T) {
if *benchTestDir == "" {
t.Skip("benchdir not set, skipping TestServerStats")
}
@ -140,23 +177,14 @@ func TestLoadJSONID(t *testing.T) {
// archive a few files
sn := SnapshotDir(t, server, *benchTestDir, nil)
t.Logf("archived snapshot %v", sn.ID())
// benchmark loading first tree
done := make(chan struct{})
first, found := <-server.List(backend.Tree, done)
Assert(t, found, "no Trees in repository found")
close(done)
id, err := backend.ParseID(first)
OK(t, err)
OK(t, server.Flush())
tree := restic.NewTree()
err = server.LoadJSONID(backend.Tree, id, &tree)
err := server.LoadJSONPack(pack.Tree, sn.Tree, &tree)
OK(t, err)
}
func BenchmarkLoadJSONID(t *testing.B) {
func TestLoadJSONEncrypted(t *testing.T) {
if *benchTestDir == "" {
t.Skip("benchdir not set, skipping TestServerStats")
}
@ -166,18 +194,20 @@ func BenchmarkLoadJSONID(t *testing.B) {
key := SetupKey(t, server, "geheim")
server.SetKey(key)
// archive a few files
sn := SnapshotDir(t, server, *benchTestDir, nil)
t.Logf("archived snapshot %v", sn.ID())
// archive a snapshot
sn := restic.Snapshot{}
sn.Hostname = "foobar"
sn.Username = "test!"
t.ResetTimer()
id, err := server.SaveJSONUnpacked(backend.Snapshot, &sn)
OK(t, err)
tree := restic.NewTree()
for i := 0; i < t.N; i++ {
for name := range server.List(backend.Tree, nil) {
id, err := backend.ParseID(name)
OK(t, err)
OK(t, server.LoadJSONID(backend.Tree, id, &tree))
}
}
var sn2 restic.Snapshot
// restore
err = server.LoadJSONEncrypted(backend.Snapshot, id, &sn2)
OK(t, err)
Equals(t, sn.Hostname, sn2.Hostname)
Equals(t, sn.Username, sn2.Username)
}

View File

@ -11,14 +11,14 @@ import (
)
type Snapshot struct {
Time time.Time `json:"time"`
Parent backend.ID `json:"parent,omitempty"`
Tree server.Blob `json:"tree"`
Paths []string `json:"paths"`
Hostname string `json:"hostname,omitempty"`
Username string `json:"username,omitempty"`
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
Time time.Time `json:"time"`
Parent backend.ID `json:"parent,omitempty"`
Tree backend.ID `json:"tree"`
Paths []string `json:"paths"`
Hostname string `json:"hostname,omitempty"`
Username string `json:"username,omitempty"`
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
id backend.ID // plaintext ID, used during restore
}
@ -50,7 +50,7 @@ func NewSnapshot(paths []string) (*Snapshot, error) {
func LoadSnapshot(s *server.Server, id backend.ID) (*Snapshot, error) {
sn := &Snapshot{id: id}
err := s.LoadJSONID(backend.Snapshot, id, sn)
err := s.LoadJSONEncrypted(backend.Snapshot, id, sn)
if err != nil {
return nil, err
}

View File

@ -52,7 +52,6 @@ func SetupKey(t testing.TB, s *server.Server, password string) *server.Key {
func SnapshotDir(t testing.TB, server *server.Server, path string, parent backend.ID) *restic.Snapshot {
arch, err := restic.NewArchiver(server)
OK(t, err)
OK(t, arch.Preload())
sn, _, err := arch.Snapshot(nil, []string{path}, parent)
OK(t, err)
return sn

18
tree.go
View File

@ -7,12 +7,12 @@ import (
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
type Tree struct {
Nodes []*Node `json:"nodes"`
Map *Map `json:"map"`
}
var (
@ -23,17 +23,16 @@ var (
func NewTree() *Tree {
return &Tree{
Nodes: []*Node{},
Map: NewMap(),
}
}
func (t Tree) String() string {
return fmt.Sprintf("Tree<%d nodes, %d blobs>", len(t.Nodes), len(t.Map.list))
return fmt.Sprintf("Tree<%d nodes>", len(t.Nodes))
}
func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) {
func LoadTree(s *server.Server, id backend.ID) (*Tree, error) {
tree := &Tree{}
err := s.LoadJSON(backend.Tree, blob, tree)
err := s.LoadJSONPack(pack.Tree, id, tree)
if err != nil {
return nil, err
}
@ -41,18 +40,13 @@ func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) {
return tree, nil
}
// Equals returns true if t and other have exactly the same nodes and map.
func (t Tree) Equals(other Tree) bool {
// Equals returns true if t and other have exactly the same nodes.
func (t Tree) Equals(other *Tree) bool {
if len(t.Nodes) != len(other.Nodes) {
debug.Log("Tree.Equals", "tree.Equals(): trees have different number of nodes")
return false
}
if !t.Map.Equals(other.Map) {
debug.Log("Tree.Equals", "tree.Equals(): maps aren't equal")
return false
}
for i := 0; i < len(t.Nodes); i++ {
if !t.Nodes[i].Equals(*other.Nodes[i]) {
debug.Log("Tree.Equals", "tree.Equals(): node %d is different:", i)

View File

@ -8,6 +8,7 @@ import (
"testing"
"github.com/restic/restic"
"github.com/restic/restic/pack"
. "github.com/restic/restic/test"
)
@ -91,3 +92,26 @@ func TestNodeComparison(t *testing.T) {
n2.Size -= 1
Assert(t, !node.Equals(n2), "nodes are equal")
}
func TestLoadTree(t *testing.T) {
server := SetupBackend(t)
defer TeardownBackend(t, server)
key := SetupKey(t, server, "geheim")
server.SetKey(key)
// save tree
tree := restic.NewTree()
id, err := server.SaveJSON(pack.Tree, tree)
OK(t, err)
// save packs
OK(t, server.Flush())
// load tree again
tree2, err := restic.LoadTree(server, id)
OK(t, err)
Assert(t, tree.Equals(tree2),
"trees are not equal: want %v, got %v",
tree, tree2)
}

27
walk.go
View File

@ -3,6 +3,7 @@ package restic
import (
"path/filepath"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/server"
)
@ -15,10 +16,10 @@ type WalkTreeJob struct {
Tree *Tree
}
func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("walkTree", "start on %q (%v)", path, treeBlob)
func walkTree(s *server.Server, path string, treeID backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("walkTree", "start on %q (%v)", path, treeID.Str())
// load tree
t, err := LoadTree(s, treeBlob)
t, err := LoadTree(s, treeID)
if err != nil {
jobCh <- WalkTreeJob{Path: path, Error: err}
return
@ -27,32 +28,22 @@ func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan str
for _, node := range t.Nodes {
p := filepath.Join(path, node.Name)
if node.Type == "dir" {
blob, err := t.Map.FindID(node.Subtree)
if err != nil {
jobCh <- WalkTreeJob{Path: p, Error: err}
continue
}
walkTree(s, p, blob, done, jobCh)
walkTree(s, p, node.Subtree, done, jobCh)
} else {
// load old blobs
node.blobs, err = t.Map.Select(node.Content)
if err != nil {
debug.Log("walkTree", "unable to load bobs for %q (%v): %v", path, treeBlob, err)
}
jobCh <- WalkTreeJob{Path: p, Node: node, Error: err}
}
}
jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t}
debug.Log("walkTree", "done for %q (%v)", path, treeBlob)
debug.Log("walkTree", "done for %q (%v)", path, treeID.Str())
}
// WalkTree walks the tree specified by ID recursively and sends a job for each
// file and directory it finds. When the channel done is closed, processing
// stops.
func WalkTree(server *server.Server, blob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("WalkTree", "start on %v", blob)
walkTree(server, "", blob, done, jobCh)
func WalkTree(server *server.Server, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("WalkTree", "start on %v", id.Str())
walkTree(server, "", id, done, jobCh)
close(jobCh)
debug.Log("WalkTree", "done")
}

View File

@ -27,6 +27,9 @@ func TestWalkTree(t *testing.T) {
sn, _, err := arch.Snapshot(nil, dirs, nil)
OK(t, err)
// flush server, write all packs
OK(t, server.Flush())
// start benchmark
// t.ResetTimer()
@ -48,6 +51,9 @@ func TestWalkTree(t *testing.T) {
fsJob, fsChOpen := <-fsJobs
Assert(t, !fsChOpen || fsJob != nil,
"received nil job from filesystem: %v %v", fsJob, fsChOpen)
if fsJob != nil {
OK(t, fsJob.Error())
}
var path string
fsEntries := 1
@ -63,6 +69,8 @@ func TestWalkTree(t *testing.T) {
treeJob, treeChOpen := <-treeJobs
treeEntries := 1
OK(t, treeJob.Error)
if treeJob.Tree != nil {
treeEntries = len(treeJob.Tree.Nodes)
}