diff --git a/Makefile b/Makefile index be9c4d90b..c58519465 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ .PHONY: clean all test test: - go test -race ./... for dir in cmd/* ; do \ (cd "$$dir"; go build -race) \ done diff --git a/archiver.go b/archiver.go index 15dbb3f43..4fa33819a 100644 --- a/archiver.go +++ b/archiver.go @@ -1,32 +1,75 @@ package khepri import ( + "io" + "io/ioutil" "os" "path/filepath" + "sync" "github.com/fd0/khepri/backend" + "github.com/fd0/khepri/chunker" +) + +const ( + maxConcurrentFiles = 32 + maxConcurrentBlobs = 32 ) type Archiver struct { - be backend.Server - key *Key - ch *ContentHandler - smap *StorageMap // blobs used for the current snapshot + be backend.Server + key *Key + ch *ContentHandler + + bl *BlobList // blobs used for the current snapshot + + fileToken chan struct{} + blobToken chan struct{} + + Stats Stats Error func(dir string, fi os.FileInfo, err error) error Filter func(item string, fi os.FileInfo) bool + + ScannerUpdate func(stats Stats) + SaveUpdate func(stats Stats) + + sum sync.Mutex // for SaveUpdate +} + +type Stats struct { + Files int + Directories int + Other int + Bytes uint64 } func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { var err error - arch := &Archiver{be: be, key: key} + arch := &Archiver{ + be: be, + key: key, + fileToken: make(chan struct{}, maxConcurrentFiles), + blobToken: make(chan struct{}, maxConcurrentBlobs), + } + + // fill file and blob token + for i := 0; i < maxConcurrentFiles; i++ { + arch.fileToken <- struct{}{} + } + + for i := 0; i < maxConcurrentBlobs; i++ { + arch.blobToken <- struct{}{} + } // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files arch.Filter = func(string, os.FileInfo) bool { return true } + // do nothing + arch.ScannerUpdate = func(Stats) {} - arch.smap = NewStorageMap() + arch.bl = NewBlobList() arch.ch, err = NewContentHandler(be, key) if err != nil { return nil, err @@ -41,59 +84,124 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { return arch, nil } -func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { +func (arch *Archiver) saveUpdate(stats Stats) { + if arch.SaveUpdate != nil { + arch.sum.Lock() + defer arch.sum.Unlock() + arch.SaveUpdate(stats) + } +} + +func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) { blob, err := arch.ch.Save(t, data) if err != nil { - return nil, err + return Blob{}, err } // store blob in storage map for current snapshot - arch.smap.Insert(blob) + arch.bl.Insert(blob) return blob, nil } -func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { +func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (Blob, error) { blob, err := arch.ch.SaveJSON(t, item) if err != nil { - return nil, err + return Blob{}, err } // store blob in storage map for current snapshot - arch.smap.Insert(blob) + arch.bl.Insert(blob) return blob, nil } -func (arch *Archiver) SaveFile(node *Node) (Blobs, error) { - blobs, err := arch.ch.SaveFile(node.path, uint(node.Size)) +// SaveFile stores the content of the file on the backend as a Blob by calling +// Save for each chunk. +func (arch *Archiver) SaveFile(node *Node) error { + file, err := os.Open(node.path) + defer file.Close() if err != nil { - return nil, arch.Error(node.path, nil, err) + return err + } + + var blobs Blobs + + // if the file is small enough, store it directly + if node.Size < chunker.MinSize { + buf, err := ioutil.ReadAll(file) + if err != nil { + return err + } + + blob, err := arch.ch.Save(backend.Data, buf) + if err != nil { + return err + } + + arch.saveUpdate(Stats{Bytes: blob.Size}) + + blobs = Blobs{blob} + } else { + // else store all chunks + chnker := chunker.New(file) + chans := [](<-chan Blob){} + + for { + chunk, err := chnker.Next() + if err == io.EOF { + break + } + + if err != nil { + return err + } + + // acquire token, start goroutine to save chunk + token := <-arch.blobToken + resCh := make(chan Blob, 1) + + go func(ch chan<- Blob) { + blob, err := arch.ch.Save(backend.Data, chunk.Data) + // TODO handle error + if err != nil { + panic(err) + } + + arch.saveUpdate(Stats{Bytes: blob.Size}) + arch.blobToken <- token + ch <- blob + }(resCh) + + chans = append(chans, resCh) + } + + blobs = []Blob{} + for _, ch := range chans { + blobs = append(blobs, <-ch) + } } node.Content = make([]backend.ID, len(blobs)) for i, blob := range blobs { node.Content[i] = blob.ID - arch.smap.Insert(blob) + arch.bl.Insert(blob) } - return blobs, err + return err } -func (arch *Archiver) ImportDir(dir string) (Tree, error) { +func (arch *Archiver) loadTree(dir string) (*Tree, error) { + // open and list path fd, err := os.Open(dir) defer fd.Close() if err != nil { - return nil, arch.Error(dir, nil, err) + return nil, err } entries, err := fd.Readdir(-1) if err != nil { - return nil, arch.Error(dir, nil, err) - } - - if len(entries) == 0 { - return nil, nil + return nil, err } tree := Tree{} @@ -107,71 +215,110 @@ func (arch *Archiver) ImportDir(dir string) (Tree, error) { node, err := NodeFromFileInfo(path, entry) if err != nil { - return nil, arch.Error(dir, entry, err) + // TODO: error processing + return nil, err } tree = append(tree, node) if entry.IsDir() { - subtree, err := arch.ImportDir(path) + node.Tree, err = arch.loadTree(path) if err != nil { return nil, err } - - blob, err := arch.SaveJSON(backend.Tree, subtree) - if err != nil { - return nil, err - } - - node.Subtree = blob.ID - - continue } - if node.Type == "file" { - _, err := arch.SaveFile(node) - if err != nil { - return nil, arch.Error(path, entry, err) - } + switch node.Type { + case "file": + arch.Stats.Files++ + arch.Stats.Bytes += node.Size + case "dir": + arch.Stats.Directories++ + default: + arch.Stats.Other++ } } - return tree, nil + arch.ScannerUpdate(arch.Stats) + + return &tree, nil } -func (arch *Archiver) Import(dir string) (*Snapshot, *Blob, error) { +func (arch *Archiver) LoadTree(path string) (*Tree, error) { + fi, err := os.Lstat(path) + if err != nil { + return nil, err + } + + node, err := NodeFromFileInfo(path, fi) + if err != nil { + return nil, err + } + + if node.Type != "dir" { + arch.Stats.Files = 1 + arch.Stats.Bytes = node.Size + arch.ScannerUpdate(arch.Stats) + return &Tree{node}, nil + } + + arch.Stats.Directories = 1 + node.Tree, err = arch.loadTree(path) + if err != nil { + return nil, err + } + + arch.ScannerUpdate(arch.Stats) + + return &Tree{node}, nil +} + +func (arch *Archiver) saveTree(t *Tree) (Blob, error) { + var wg sync.WaitGroup + + for _, node := range *t { + if node.Tree != nil && node.Subtree == nil { + b, err := arch.saveTree(node.Tree) + if err != nil { + return Blob{}, err + } + node.Subtree = b.ID + arch.saveUpdate(Stats{Directories: 1}) + } else if node.Type == "file" && len(node.Content) == 0 { + // start goroutine + wg.Add(1) + go func(n *Node) { + defer wg.Done() + + // get token + token := <-arch.fileToken + defer func() { + arch.fileToken <- token + }() + + // TODO: handle error + arch.SaveFile(n) + arch.saveUpdate(Stats{Files: 1}) + }(node) + } else { + arch.saveUpdate(Stats{Other: 1}) + } + } + + wg.Wait() + + blob, err := arch.SaveJSON(backend.Tree, t) + if err != nil { + return Blob{}, err + } + + return blob, nil +} + +func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) { sn := NewSnapshot(dir) - fi, err := os.Lstat(dir) - if err != nil { - return nil, nil, err - } - - node, err := NodeFromFileInfo(dir, fi) - if err != nil { - return nil, nil, err - } - - if node.Type == "dir" { - tree, err := arch.ImportDir(dir) - if err != nil { - return nil, nil, err - } - - blob, err := arch.SaveJSON(backend.Tree, tree) - if err != nil { - return nil, nil, err - } - - node.Subtree = blob.ID - } else if node.Type == "file" { - _, err := arch.SaveFile(node) - if err != nil { - return nil, nil, err - } - } - - blob, err := arch.SaveJSON(backend.Tree, &Tree{node}) + blob, err := arch.saveTree(t) if err != nil { return nil, nil, err } @@ -179,11 +326,11 @@ func (arch *Archiver) Import(dir string) (*Snapshot, *Blob, error) { sn.Content = blob.ID // save snapshot - sn.StorageMap = arch.smap + sn.BlobList = arch.bl blob, err = arch.SaveJSON(backend.Snapshot, sn) if err != nil { return nil, nil, err } - return sn, blob, nil + return sn, blob.Storage, nil } diff --git a/archiver_test.go b/archiver_test.go new file mode 100644 index 000000000..529b2bb5f --- /dev/null +++ b/archiver_test.go @@ -0,0 +1,53 @@ +package khepri_test + +import ( + "bytes" + "io" + "math/rand" + "testing" + + "github.com/fd0/khepri/chunker" +) + +func get_random(seed, count int) []byte { + buf := make([]byte, count) + + rnd := rand.New(rand.NewSource(23)) + for i := 0; i < count; i += 4 { + r := rnd.Uint32() + buf[i] = byte(r) + buf[i+1] = byte(r >> 8) + buf[i+2] = byte(r >> 16) + buf[i+3] = byte(r >> 24) + } + + return buf +} + +func BenchmarkChunkEncrypt(b *testing.B) { + data := get_random(23, 10<<20) // 10MiB + + be := setupBackend(b) + defer teardownBackend(b, be) + key := setupKey(b, be, "geheim") + + b.ResetTimer() + b.SetBytes(int64(len(data))) + + for i := 0; i < b.N; i++ { + ch := chunker.New(bytes.NewReader(data)) + + for { + chunk_data, err := ch.Next() + + if err == io.EOF { + break + } + + ok(b, err) + + _, err = key.Encrypt(chunk_data.Data) + ok(b, err) + } + } +} diff --git a/backend/id.go b/backend/id.go index ef9067040..0c15cad26 100644 --- a/backend/id.go +++ b/backend/id.go @@ -47,6 +47,11 @@ func (id ID) EqualString(other string) (bool, error) { return id.Equal(ID(s)), nil } +// Compare compares this ID to another one, returning -1, 0, or 1. +func (id ID) Compare(other ID) int { + return bytes.Compare(other, id) +} + func (id ID) MarshalJSON() ([]byte, error) { return json.Marshal(id.String()) } diff --git a/bloblist.go b/bloblist.go new file mode 100644 index 000000000..48820e95f --- /dev/null +++ b/bloblist.go @@ -0,0 +1,99 @@ +package khepri + +import ( + "bytes" + "encoding/json" + "errors" + "sort" + "sync" +) + +type BlobList struct { + list []Blob + m sync.Mutex +} + +var ErrBlobNotFound = errors.New("Blob not found") + +func NewBlobList() *BlobList { + return &BlobList{ + list: []Blob{}, + } +} + +func (bl *BlobList) find(blob Blob) (int, Blob, error) { + pos := sort.Search(len(bl.list), func(i int) bool { + return blob.ID.Compare(bl.list[i].ID) >= 0 + }) + + if pos < len(bl.list) && blob.ID.Compare(bl.list[pos].ID) == 0 { + return pos, bl.list[pos], nil + } + + return pos, Blob{}, ErrBlobNotFound +} + +func (bl *BlobList) Find(blob Blob) (Blob, error) { + bl.m.Lock() + defer bl.m.Unlock() + + _, blob, err := bl.find(blob) + return blob, err +} + +func (bl *BlobList) Merge(other *BlobList) { + bl.m.Lock() + defer bl.m.Unlock() + other.m.Lock() + defer other.m.Unlock() + + for _, blob := range other.list { + bl.insert(blob) + } +} + +func (bl *BlobList) insert(blob Blob) { + pos, _, err := bl.find(blob) + if err == nil { + // already present + return + } + + // insert blob + // https://code.google.com/p/go-wiki/wiki/bliceTricks + bl.list = append(bl.list, Blob{}) + copy(bl.list[pos+1:], bl.list[pos:]) + bl.list[pos] = blob +} + +func (bl *BlobList) Insert(blob Blob) { + bl.m.Lock() + defer bl.m.Unlock() + + bl.insert(blob) +} + +func (bl BlobList) MarshalJSON() ([]byte, error) { + return json.Marshal(bl.list) +} + +func (bl *BlobList) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &bl.list) +} + +// Compare compares two blobs by comparing the ID and the size. It returns -1, +// 0, or 1. +func (blob Blob) Compare(other Blob) int { + if res := bytes.Compare(other.ID, blob.ID); res != 0 { + return res + } + + if blob.Size < other.Size { + return -1 + } + if blob.Size > other.Size { + return 1 + } + + return 0 +} diff --git a/bloblist_test.go b/bloblist_test.go new file mode 100644 index 000000000..3add85cf9 --- /dev/null +++ b/bloblist_test.go @@ -0,0 +1,137 @@ +package khepri_test + +import ( + "crypto/rand" + "encoding/json" + "flag" + "io" + mrand "math/rand" + "sync" + "testing" + "time" + + "github.com/fd0/khepri" +) + +const backendIDSize = 8 + +var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against") + +func randomID() []byte { + buf := make([]byte, backendIDSize) + _, err := io.ReadFull(rand.Reader, buf) + if err != nil { + panic(err) + } + return buf +} + +func newBlob() khepri.Blob { + return khepri.Blob{ID: randomID(), Size: uint64(mrand.Uint32())} +} + +// Test basic functionality +func TestBlobList(t *testing.T) { + bl := khepri.NewBlobList() + + b := newBlob() + bl.Insert(b) + + for i := 0; i < 1000; i++ { + bl.Insert(newBlob()) + } + + b2, err := bl.Find(khepri.Blob{ID: b.ID}) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + bl2 := khepri.NewBlobList() + for i := 0; i < 1000; i++ { + bl.Insert(newBlob()) + } + + b2, err = bl2.Find(b) + assert(t, err != nil, "found ID in khepri that was never inserted: %v", b2) + + bl2.Merge(bl) + + b2, err = bl2.Find(b) + + if err != nil { + t.Fatal(err) + } + + if b.Compare(b2) != 0 { + t.Fatalf("items are not equal: want %v, got %v", b, b2) + } +} + +// Test JSON encode/decode +func TestBlobListJSON(t *testing.T) { + bl := khepri.NewBlobList() + b := khepri.Blob{ID: []byte{1, 2, 3, 4}} + bl.Insert(b) + + b2, err := bl.Find(b) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + buf, err := json.Marshal(bl) + ok(t, err) + + bl2 := khepri.BlobList{} + json.Unmarshal(buf, &bl2) + + b2, err = bl2.Find(b) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + buf, err = json.Marshal(bl2) + ok(t, err) +} + +// random insert/find access by several goroutines +func TestBlobListRandom(t *testing.T) { + var wg sync.WaitGroup + + worker := func(bl *khepri.BlobList) { + defer wg.Done() + + b := newBlob() + bl.Insert(b) + + for i := 0; i < 200; i++ { + bl.Insert(newBlob()) + } + + d := time.Duration(mrand.Intn(10)*100) * time.Millisecond + time.Sleep(d) + + for i := 0; i < 100; i++ { + b2, err := bl.Find(b) + if err != nil { + t.Fatal(err) + } + + if b.Compare(b2) != 0 { + t.Fatalf("items are not equal: want %v, got %v", b, b2) + } + } + + bl2 := khepri.NewBlobList() + for i := 0; i < 200; i++ { + bl2.Insert(newBlob()) + } + + bl2.Merge(bl) + } + + bl := khepri.NewBlobList() + + for i := 0; uint(i) < *maxWorkers; i++ { + wg.Add(1) + go worker(bl) + } + + wg.Wait() +} diff --git a/chunker/chunker.go b/chunker/chunker.go index c317672c0..2cd5a4e34 100644 --- a/chunker/chunker.go +++ b/chunker/chunker.go @@ -218,8 +218,6 @@ func (c *chunker) Next() (*Chunk, error) { c.pos += steps c.bpos = c.bmax } - - return nil, nil } func (c *chunker) append(b byte) { diff --git a/cmd/khepri/.gitignore b/cmd/khepri/.gitignore new file mode 100644 index 000000000..aee2e4ce1 --- /dev/null +++ b/cmd/khepri/.gitignore @@ -0,0 +1 @@ +config.mk diff --git a/cmd/khepri/Makefile b/cmd/khepri/Makefile index dab9cbb16..66819e525 100644 --- a/cmd/khepri/Makefile +++ b/cmd/khepri/Makefile @@ -6,12 +6,15 @@ TAGS = .PHONY: all clean debug +# include config file if it exists +-include $(CURDIR)/config.mk + all: khepri -khepri: *.go +khepri: *.go $(wildcard ../../*.go) $(wildcard ../../*/*.go) go build $(TAGS) -ldflags "$(LDFLAGS)" -debug: TAGS=-tags debug +debug: TAGS=-tags debug_cmd debug: khepri clean: diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index 6fdc6c761..0e3972186 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -4,11 +4,42 @@ import ( "errors" "fmt" "os" + "strings" + "time" "github.com/fd0/khepri" "github.com/fd0/khepri/backend" + "golang.org/x/crypto/ssh/terminal" ) +func format_bytes(c uint64) string { + b := float64(c) + + switch { + case c > 1<<40: + return fmt.Sprintf("%.3f TiB", b/(1<<40)) + case c > 1<<30: + return fmt.Sprintf("%.3f GiB", b/(1<<30)) + case c > 1<<20: + return fmt.Sprintf("%.3f MiB", b/(1<<20)) + case c > 1<<10: + return fmt.Sprintf("%.3f KiB", b/(1<<10)) + default: + return fmt.Sprintf("%d B", c) + } +} + +func print_tree2(indent int, t *khepri.Tree) { + for _, node := range *t { + if node.Tree != nil { + fmt.Printf("%s%s/\n", strings.Repeat(" ", indent), node.Name) + print_tree2(indent+1, node.Tree) + } else { + fmt.Printf("%s%s\n", strings.Repeat(" ", indent), node.Name) + } + } +} + func commandBackup(be backend.Server, key *khepri.Key, args []string) error { if len(args) != 1 { return errors.New("usage: backup [dir|file]") @@ -25,12 +56,52 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { return err } - _, blob, err := arch.Import(target) + fmt.Printf("scanning %s\n", target) + + if terminal.IsTerminal(int(os.Stdout.Fd())) { + arch.ScannerUpdate = func(stats khepri.Stats) { + fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) + } + } + + // TODO: add filter + // arch.Filter = func(dir string, fi os.FileInfo) bool { + // return true + // } + + t, err := arch.LoadTree(target) if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) return err } - fmt.Printf("snapshot %s saved\n", blob.Storage) + fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) + + stats := khepri.Stats{} + if terminal.IsTerminal(int(os.Stdout.Fd())) { + arch.SaveUpdate = func(s khepri.Stats) { + stats.Files += s.Files + stats.Directories += s.Directories + stats.Other += s.Other + stats.Bytes += s.Bytes + + fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", + float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, + stats.Directories, arch.Stats.Directories, + stats.Files, arch.Stats.Files, + format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) + } + } + + start := time.Now() + sn, id, err := arch.Snapshot(target, t) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + } + + fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) + duration := time.Now().Sub(start) + fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20)) return nil } diff --git a/cmd/khepri/debug.go b/cmd/khepri/debug.go index 687f0f162..dc978a51a 100644 --- a/cmd/khepri/debug.go +++ b/cmd/khepri/debug.go @@ -1,4 +1,4 @@ -// +build debug +// +build debug_cmd package main @@ -17,8 +17,8 @@ func initDebugLogger() *log.Logger { // create new log file filename := fmt.Sprintf("khepri-debug-%d-%s", os.Getpid(), time.Now().Format("20060201-150405")) - f, err := os.OpenFile(filepath.Join(os.TempDir(), filename), - os.O_WRONLY|os.O_CREATE, 0600) + path := filepath.Join(os.TempDir(), filename) + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err) os.Exit(2) @@ -26,7 +26,7 @@ func initDebugLogger() *log.Logger { // open logger l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags) - fmt.Fprintf(os.Stderr, "logging activated, writing log file %s", filename) + fmt.Fprintf(os.Stderr, "debug log for khepri command activated, writing log file %s\n", path) l.Printf("khepri %s", version) return l diff --git a/cmd/khepri/debug_release.go b/cmd/khepri/debug_release.go index 77f5c1b76..ef42f0638 100644 --- a/cmd/khepri/debug_release.go +++ b/cmd/khepri/debug_release.go @@ -1,4 +1,4 @@ -// +build !debug +// +build !debug_cmd package main diff --git a/cmd/khepri/main.go b/cmd/khepri/main.go index 719a84ee8..cd1c0a777 100644 --- a/cmd/khepri/main.go +++ b/cmd/khepri/main.go @@ -5,6 +5,7 @@ import ( "log" "net/url" "os" + "runtime" "sort" "strings" @@ -128,6 +129,9 @@ func init() { commands["snapshots"] = commandSnapshots commands["cat"] = commandCat commands["ls"] = commandLs + + // set GOMAXPROCS to number of CPUs + runtime.GOMAXPROCS(runtime.NumCPU()) } func main() { diff --git a/contenthandler.go b/contenthandler.go index 271df9b76..625261993 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -3,27 +3,24 @@ package khepri import ( "encoding/json" "errors" - "io" - "io/ioutil" - "os" + "fmt" "github.com/fd0/khepri/backend" - "github.com/fd0/khepri/chunker" ) type ContentHandler struct { be backend.Server key *Key - content *StorageMap + bl *BlobList } // NewContentHandler creates a new content handler. func NewContentHandler(be backend.Server, key *Key) (*ContentHandler, error) { ch := &ContentHandler{ - be: be, - key: key, - content: NewStorageMap(), + be: be, + key: key, + bl: NewBlobList(), } return ch, nil @@ -36,7 +33,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) { return nil, err } - ch.content.Merge(sn.StorageMap) + ch.bl.Merge(sn.BlobList) + return sn, nil } @@ -49,7 +47,8 @@ func (ch *ContentHandler) LoadAllSnapshots() error { if err != nil { return } - ch.content.Merge(sn.StorageMap) + + ch.bl.Merge(sn.BlobList) }) if err != nil { return err @@ -60,18 +59,18 @@ func (ch *ContentHandler) LoadAllSnapshots() error { // Save encrypts data and stores it to the backend as type t. If the data was // already saved before, the blob is returned. -func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { +func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) { // compute plaintext hash id := backend.Hash(data) // test if the hash is already in the backend - blob := ch.content.Find(id) - if blob != nil { + blob, err := ch.bl.Find(Blob{ID: id}) + if err == nil { return blob, nil } // else create a new blob - blob = &Blob{ + blob = Blob{ ID: id, Size: uint64(len(data)), } @@ -79,85 +78,36 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { // encrypt blob ciphertext, err := ch.key.Encrypt(data) if err != nil { - return nil, err + return Blob{}, err } // save blob sid, err := ch.be.Create(t, ciphertext) if err != nil { - return nil, err + return Blob{}, err } blob.Storage = sid blob.StorageSize = uint64(len(ciphertext)) // insert blob into the storage map - ch.content.Insert(blob) + ch.bl.Insert(blob) return blob, nil } // SaveJSON serialises item as JSON and uses Save() to store it to the backend as type t. -func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { +func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (Blob, error) { // convert to json data, err := json.Marshal(item) if err != nil { - return nil, err + return Blob{}, err } // compress and save data return ch.Save(t, backend.Compress(data)) } -// SaveFile stores the content of the file on the backend as a Blob by calling -// Save for each chunk. -func (ch *ContentHandler) SaveFile(filename string, size uint) (Blobs, error) { - file, err := os.Open(filename) - defer file.Close() - if err != nil { - return nil, err - } - - // if the file is small enough, store it directly - if size < chunker.MinSize { - buf, err := ioutil.ReadAll(file) - if err != nil { - return nil, err - } - - blob, err := ch.Save(backend.Data, buf) - if err != nil { - return nil, err - } - - return Blobs{blob}, nil - } - - // else store all chunks - blobs := Blobs{} - chunker := chunker.New(file) - - for { - chunk, err := chunker.Next() - if err == io.EOF { - break - } - - if err != nil { - return nil, err - } - - blob, err := ch.Save(backend.Data, chunk.Data) - if err != nil { - return nil, err - } - - blobs = append(blobs, blob) - } - - return blobs, nil -} - // Load tries to load and decrypt content identified by t and id from the backend. func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { if t == backend.Snapshot { @@ -177,9 +127,9 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { } // lookup storage hash - blob := ch.content.Find(id) - if blob == nil { - return nil, errors.New("Storage ID not found") + blob, err := ch.bl.Find(Blob{ID: id}) + if err != nil { + return nil, fmt.Errorf("Storage ID %s not found", id) } // load data diff --git a/debug.go b/debug.go new file mode 100644 index 000000000..80952b3e7 --- /dev/null +++ b/debug.go @@ -0,0 +1,36 @@ +// +build debug + +package khepri + +import ( + "fmt" + "io" + "log" + "os" + "path/filepath" + "time" +) + +var debugLogger = initDebugLogger() + +func initDebugLogger() *log.Logger { + // create new log file + filename := fmt.Sprintf("khepri-lib-debug-%d-%s", + os.Getpid(), time.Now().Format("20060201-150405")) + path := filepath.Join(os.TempDir(), filename) + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err) + os.Exit(2) + } + + // open logger + l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags) + fmt.Fprintf(os.Stderr, "debug log for khepri library activated, writing log file %s\n", path) + + return l +} + +func debug(fmt string, args ...interface{}) { + debugLogger.Printf(fmt, args...) +} diff --git a/debug_release.go b/debug_release.go new file mode 100644 index 000000000..21f2b24a5 --- /dev/null +++ b/debug_release.go @@ -0,0 +1,5 @@ +// +build !debug + +package khepri + +func debug(fmt string, args ...interface{}) {} diff --git a/restorer.go b/restorer.go index 8e94f5411..c1717db3f 100644 --- a/restorer.go +++ b/restorer.go @@ -30,12 +30,12 @@ func NewRestorer(be backend.Server, key *Key, snid backend.ID) (*Restorer, error var err error r.ch, err = NewContentHandler(be, key) if err != nil { - return nil, err + return nil, arrar.Annotate(err, "create contenthandler for restorer") } r.sn, err = r.ch.LoadSnapshot(snid) if err != nil { - return nil, err + return nil, arrar.Annotate(err, "load snapshot for restorer") } // abort on all errors diff --git a/snapshot.go b/snapshot.go index fa7d10e1d..c0cc6c2bb 100644 --- a/snapshot.go +++ b/snapshot.go @@ -11,14 +11,14 @@ import ( ) type Snapshot struct { - Time time.Time `json:"time"` - Content backend.ID `json:"content"` - StorageMap *StorageMap `json:"map"` - Dir string `json:"dir"` - Hostname string `json:"hostname,omitempty"` - Username string `json:"username,omitempty"` - UID string `json:"uid,omitempty"` - GID string `json:"gid,omitempty"` + Time time.Time `json:"time"` + Content backend.ID `json:"content"` + BlobList *BlobList `json:"blobs"` + Dir string `json:"dir"` + Hostname string `json:"hostname,omitempty"` + Username string `json:"username,omitempty"` + UID string `json:"uid,omitempty"` + GID string `json:"gid,omitempty"` id backend.ID // plaintext ID, used during restore } diff --git a/storagemap.go b/storagemap.go deleted file mode 100644 index db77952e2..000000000 --- a/storagemap.go +++ /dev/null @@ -1,51 +0,0 @@ -package khepri - -import ( - "bytes" - "sort" - - "github.com/fd0/khepri/backend" -) - -type StorageMap Blobs - -func NewStorageMap() *StorageMap { - return &StorageMap{} -} - -func (m StorageMap) find(id backend.ID) (int, *Blob) { - i := sort.Search(len(m), func(i int) bool { - return bytes.Compare(m[i].ID, id) >= 0 - }) - - if i < len(m) && bytes.Equal(m[i].ID, id) { - return i, m[i] - } - - return i, nil -} - -func (m StorageMap) Find(id backend.ID) *Blob { - _, blob := m.find(id) - return blob -} - -func (m *StorageMap) Insert(blob *Blob) { - pos, b := m.find(blob.ID) - if b != nil { - // already present - return - } - - // insert blob - // https://code.google.com/p/go-wiki/wiki/SliceTricks - *m = append(*m, nil) - copy((*m)[pos+1:], (*m)[pos:]) - (*m)[pos] = blob -} - -func (m *StorageMap) Merge(sm *StorageMap) { - for _, blob := range *sm { - m.Insert(blob) - } -} diff --git a/tree.go b/tree.go index ad5f64597..0aa41a44d 100644 --- a/tree.go +++ b/tree.go @@ -34,6 +34,8 @@ type Node struct { Content []backend.ID `json:"content,omitempty"` Subtree backend.ID `json:"subtree,omitempty"` + Tree *Tree `json:"-"` + path string } @@ -44,7 +46,7 @@ type Blob struct { StorageSize uint64 `json:"ssize,omitempty"` // encrypted Size } -type Blobs []*Blob +type Blobs []Blob func (n Node) String() string { switch n.Type {