2
2
mirror of https://github.com/octoleo/restic.git synced 2024-05-29 07:00:49 +00:00

Merge branch 'concurrent-backup'

This commit is contained in:
Alexander Neumann 2014-11-22 22:19:58 +01:00
commit d6a202a853
20 changed files with 678 additions and 219 deletions

View File

@ -1,7 +1,6 @@
.PHONY: clean all test .PHONY: clean all test
test: test:
go test -race ./...
for dir in cmd/* ; do \ for dir in cmd/* ; do \
(cd "$$dir"; go build -race) \ (cd "$$dir"; go build -race) \
done done

View File

@ -1,32 +1,75 @@
package khepri package khepri
import ( import (
"io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"github.com/fd0/khepri/backend" "github.com/fd0/khepri/backend"
"github.com/fd0/khepri/chunker"
)
const (
maxConcurrentFiles = 32
maxConcurrentBlobs = 32
) )
type Archiver struct { type Archiver struct {
be backend.Server be backend.Server
key *Key key *Key
ch *ContentHandler ch *ContentHandler
smap *StorageMap // blobs used for the current snapshot
bl *BlobList // blobs used for the current snapshot
fileToken chan struct{}
blobToken chan struct{}
Stats Stats
Error func(dir string, fi os.FileInfo, err error) error Error func(dir string, fi os.FileInfo, err error) error
Filter func(item string, fi os.FileInfo) bool Filter func(item string, fi os.FileInfo) bool
ScannerUpdate func(stats Stats)
SaveUpdate func(stats Stats)
sum sync.Mutex // for SaveUpdate
}
type Stats struct {
Files int
Directories int
Other int
Bytes uint64
} }
func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
var err error var err error
arch := &Archiver{be: be, key: key} arch := &Archiver{
be: be,
key: key,
fileToken: make(chan struct{}, maxConcurrentFiles),
blobToken: make(chan struct{}, maxConcurrentBlobs),
}
// fill file and blob token
for i := 0; i < maxConcurrentFiles; i++ {
arch.fileToken <- struct{}{}
}
for i := 0; i < maxConcurrentBlobs; i++ {
arch.blobToken <- struct{}{}
}
// abort on all errors // abort on all errors
arch.Error = func(string, os.FileInfo, error) error { return err } arch.Error = func(string, os.FileInfo, error) error { return err }
// allow all files // allow all files
arch.Filter = func(string, os.FileInfo) bool { return true } arch.Filter = func(string, os.FileInfo) bool { return true }
// do nothing
arch.ScannerUpdate = func(Stats) {}
arch.smap = NewStorageMap() arch.bl = NewBlobList()
arch.ch, err = NewContentHandler(be, key) arch.ch, err = NewContentHandler(be, key)
if err != nil { if err != nil {
return nil, err return nil, err
@ -41,59 +84,124 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
return arch, nil return arch, nil
} }
func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { func (arch *Archiver) saveUpdate(stats Stats) {
if arch.SaveUpdate != nil {
arch.sum.Lock()
defer arch.sum.Unlock()
arch.SaveUpdate(stats)
}
}
func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
blob, err := arch.ch.Save(t, data) blob, err := arch.ch.Save(t, data)
if err != nil { if err != nil {
return nil, err return Blob{}, err
} }
// store blob in storage map for current snapshot // store blob in storage map for current snapshot
arch.smap.Insert(blob) arch.bl.Insert(blob)
return blob, nil return blob, nil
} }
func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (Blob, error) {
blob, err := arch.ch.SaveJSON(t, item) blob, err := arch.ch.SaveJSON(t, item)
if err != nil { if err != nil {
return nil, err return Blob{}, err
} }
// store blob in storage map for current snapshot // store blob in storage map for current snapshot
arch.smap.Insert(blob) arch.bl.Insert(blob)
return blob, nil return blob, nil
} }
func (arch *Archiver) SaveFile(node *Node) (Blobs, error) { // SaveFile stores the content of the file on the backend as a Blob by calling
blobs, err := arch.ch.SaveFile(node.path, uint(node.Size)) // Save for each chunk.
func (arch *Archiver) SaveFile(node *Node) error {
file, err := os.Open(node.path)
defer file.Close()
if err != nil { if err != nil {
return nil, arch.Error(node.path, nil, err) return err
}
var blobs Blobs
// if the file is small enough, store it directly
if node.Size < chunker.MinSize {
buf, err := ioutil.ReadAll(file)
if err != nil {
return err
}
blob, err := arch.ch.Save(backend.Data, buf)
if err != nil {
return err
}
arch.saveUpdate(Stats{Bytes: blob.Size})
blobs = Blobs{blob}
} else {
// else store all chunks
chnker := chunker.New(file)
chans := [](<-chan Blob){}
for {
chunk, err := chnker.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan Blob, 1)
go func(ch chan<- Blob) {
blob, err := arch.ch.Save(backend.Data, chunk.Data)
// TODO handle error
if err != nil {
panic(err)
}
arch.saveUpdate(Stats{Bytes: blob.Size})
arch.blobToken <- token
ch <- blob
}(resCh)
chans = append(chans, resCh)
}
blobs = []Blob{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
}
} }
node.Content = make([]backend.ID, len(blobs)) node.Content = make([]backend.ID, len(blobs))
for i, blob := range blobs { for i, blob := range blobs {
node.Content[i] = blob.ID node.Content[i] = blob.ID
arch.smap.Insert(blob) arch.bl.Insert(blob)
} }
return blobs, err return err
} }
func (arch *Archiver) ImportDir(dir string) (Tree, error) { func (arch *Archiver) loadTree(dir string) (*Tree, error) {
// open and list path
fd, err := os.Open(dir) fd, err := os.Open(dir)
defer fd.Close() defer fd.Close()
if err != nil { if err != nil {
return nil, arch.Error(dir, nil, err) return nil, err
} }
entries, err := fd.Readdir(-1) entries, err := fd.Readdir(-1)
if err != nil { if err != nil {
return nil, arch.Error(dir, nil, err) return nil, err
}
if len(entries) == 0 {
return nil, nil
} }
tree := Tree{} tree := Tree{}
@ -107,71 +215,110 @@ func (arch *Archiver) ImportDir(dir string) (Tree, error) {
node, err := NodeFromFileInfo(path, entry) node, err := NodeFromFileInfo(path, entry)
if err != nil { if err != nil {
return nil, arch.Error(dir, entry, err) // TODO: error processing
return nil, err
} }
tree = append(tree, node) tree = append(tree, node)
if entry.IsDir() { if entry.IsDir() {
subtree, err := arch.ImportDir(path) node.Tree, err = arch.loadTree(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
blob, err := arch.SaveJSON(backend.Tree, subtree)
if err != nil {
return nil, err
}
node.Subtree = blob.ID
continue
} }
if node.Type == "file" { switch node.Type {
_, err := arch.SaveFile(node) case "file":
if err != nil { arch.Stats.Files++
return nil, arch.Error(path, entry, err) arch.Stats.Bytes += node.Size
} case "dir":
arch.Stats.Directories++
default:
arch.Stats.Other++
} }
} }
return tree, nil arch.ScannerUpdate(arch.Stats)
return &tree, nil
} }
func (arch *Archiver) Import(dir string) (*Snapshot, *Blob, error) { func (arch *Archiver) LoadTree(path string) (*Tree, error) {
fi, err := os.Lstat(path)
if err != nil {
return nil, err
}
node, err := NodeFromFileInfo(path, fi)
if err != nil {
return nil, err
}
if node.Type != "dir" {
arch.Stats.Files = 1
arch.Stats.Bytes = node.Size
arch.ScannerUpdate(arch.Stats)
return &Tree{node}, nil
}
arch.Stats.Directories = 1
node.Tree, err = arch.loadTree(path)
if err != nil {
return nil, err
}
arch.ScannerUpdate(arch.Stats)
return &Tree{node}, nil
}
func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
var wg sync.WaitGroup
for _, node := range *t {
if node.Tree != nil && node.Subtree == nil {
b, err := arch.saveTree(node.Tree)
if err != nil {
return Blob{}, err
}
node.Subtree = b.ID
arch.saveUpdate(Stats{Directories: 1})
} else if node.Type == "file" && len(node.Content) == 0 {
// start goroutine
wg.Add(1)
go func(n *Node) {
defer wg.Done()
// get token
token := <-arch.fileToken
defer func() {
arch.fileToken <- token
}()
// TODO: handle error
arch.SaveFile(n)
arch.saveUpdate(Stats{Files: 1})
}(node)
} else {
arch.saveUpdate(Stats{Other: 1})
}
}
wg.Wait()
blob, err := arch.SaveJSON(backend.Tree, t)
if err != nil {
return Blob{}, err
}
return blob, nil
}
func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) {
sn := NewSnapshot(dir) sn := NewSnapshot(dir)
fi, err := os.Lstat(dir) blob, err := arch.saveTree(t)
if err != nil {
return nil, nil, err
}
node, err := NodeFromFileInfo(dir, fi)
if err != nil {
return nil, nil, err
}
if node.Type == "dir" {
tree, err := arch.ImportDir(dir)
if err != nil {
return nil, nil, err
}
blob, err := arch.SaveJSON(backend.Tree, tree)
if err != nil {
return nil, nil, err
}
node.Subtree = blob.ID
} else if node.Type == "file" {
_, err := arch.SaveFile(node)
if err != nil {
return nil, nil, err
}
}
blob, err := arch.SaveJSON(backend.Tree, &Tree{node})
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -179,11 +326,11 @@ func (arch *Archiver) Import(dir string) (*Snapshot, *Blob, error) {
sn.Content = blob.ID sn.Content = blob.ID
// save snapshot // save snapshot
sn.StorageMap = arch.smap sn.BlobList = arch.bl
blob, err = arch.SaveJSON(backend.Snapshot, sn) blob, err = arch.SaveJSON(backend.Snapshot, sn)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return sn, blob, nil return sn, blob.Storage, nil
} }

53
archiver_test.go Normal file
View File

@ -0,0 +1,53 @@
package khepri_test
import (
"bytes"
"io"
"math/rand"
"testing"
"github.com/fd0/khepri/chunker"
)
func get_random(seed, count int) []byte {
buf := make([]byte, count)
rnd := rand.New(rand.NewSource(23))
for i := 0; i < count; i += 4 {
r := rnd.Uint32()
buf[i] = byte(r)
buf[i+1] = byte(r >> 8)
buf[i+2] = byte(r >> 16)
buf[i+3] = byte(r >> 24)
}
return buf
}
func BenchmarkChunkEncrypt(b *testing.B) {
data := get_random(23, 10<<20) // 10MiB
be := setupBackend(b)
defer teardownBackend(b, be)
key := setupKey(b, be, "geheim")
b.ResetTimer()
b.SetBytes(int64(len(data)))
for i := 0; i < b.N; i++ {
ch := chunker.New(bytes.NewReader(data))
for {
chunk_data, err := ch.Next()
if err == io.EOF {
break
}
ok(b, err)
_, err = key.Encrypt(chunk_data.Data)
ok(b, err)
}
}
}

View File

@ -47,6 +47,11 @@ func (id ID) EqualString(other string) (bool, error) {
return id.Equal(ID(s)), nil return id.Equal(ID(s)), nil
} }
// Compare compares this ID to another one, returning -1, 0, or 1.
func (id ID) Compare(other ID) int {
return bytes.Compare(other, id)
}
func (id ID) MarshalJSON() ([]byte, error) { func (id ID) MarshalJSON() ([]byte, error) {
return json.Marshal(id.String()) return json.Marshal(id.String())
} }

99
bloblist.go Normal file
View File

@ -0,0 +1,99 @@
package khepri
import (
"bytes"
"encoding/json"
"errors"
"sort"
"sync"
)
type BlobList struct {
list []Blob
m sync.Mutex
}
var ErrBlobNotFound = errors.New("Blob not found")
func NewBlobList() *BlobList {
return &BlobList{
list: []Blob{},
}
}
func (bl *BlobList) find(blob Blob) (int, Blob, error) {
pos := sort.Search(len(bl.list), func(i int) bool {
return blob.ID.Compare(bl.list[i].ID) >= 0
})
if pos < len(bl.list) && blob.ID.Compare(bl.list[pos].ID) == 0 {
return pos, bl.list[pos], nil
}
return pos, Blob{}, ErrBlobNotFound
}
func (bl *BlobList) Find(blob Blob) (Blob, error) {
bl.m.Lock()
defer bl.m.Unlock()
_, blob, err := bl.find(blob)
return blob, err
}
func (bl *BlobList) Merge(other *BlobList) {
bl.m.Lock()
defer bl.m.Unlock()
other.m.Lock()
defer other.m.Unlock()
for _, blob := range other.list {
bl.insert(blob)
}
}
func (bl *BlobList) insert(blob Blob) {
pos, _, err := bl.find(blob)
if err == nil {
// already present
return
}
// insert blob
// https://code.google.com/p/go-wiki/wiki/bliceTricks
bl.list = append(bl.list, Blob{})
copy(bl.list[pos+1:], bl.list[pos:])
bl.list[pos] = blob
}
func (bl *BlobList) Insert(blob Blob) {
bl.m.Lock()
defer bl.m.Unlock()
bl.insert(blob)
}
func (bl BlobList) MarshalJSON() ([]byte, error) {
return json.Marshal(bl.list)
}
func (bl *BlobList) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &bl.list)
}
// Compare compares two blobs by comparing the ID and the size. It returns -1,
// 0, or 1.
func (blob Blob) Compare(other Blob) int {
if res := bytes.Compare(other.ID, blob.ID); res != 0 {
return res
}
if blob.Size < other.Size {
return -1
}
if blob.Size > other.Size {
return 1
}
return 0
}

137
bloblist_test.go Normal file
View File

@ -0,0 +1,137 @@
package khepri_test
import (
"crypto/rand"
"encoding/json"
"flag"
"io"
mrand "math/rand"
"sync"
"testing"
"time"
"github.com/fd0/khepri"
)
const backendIDSize = 8
var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against")
func randomID() []byte {
buf := make([]byte, backendIDSize)
_, err := io.ReadFull(rand.Reader, buf)
if err != nil {
panic(err)
}
return buf
}
func newBlob() khepri.Blob {
return khepri.Blob{ID: randomID(), Size: uint64(mrand.Uint32())}
}
// Test basic functionality
func TestBlobList(t *testing.T) {
bl := khepri.NewBlobList()
b := newBlob()
bl.Insert(b)
for i := 0; i < 1000; i++ {
bl.Insert(newBlob())
}
b2, err := bl.Find(khepri.Blob{ID: b.ID})
ok(t, err)
assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
bl2 := khepri.NewBlobList()
for i := 0; i < 1000; i++ {
bl.Insert(newBlob())
}
b2, err = bl2.Find(b)
assert(t, err != nil, "found ID in khepri that was never inserted: %v", b2)
bl2.Merge(bl)
b2, err = bl2.Find(b)
if err != nil {
t.Fatal(err)
}
if b.Compare(b2) != 0 {
t.Fatalf("items are not equal: want %v, got %v", b, b2)
}
}
// Test JSON encode/decode
func TestBlobListJSON(t *testing.T) {
bl := khepri.NewBlobList()
b := khepri.Blob{ID: []byte{1, 2, 3, 4}}
bl.Insert(b)
b2, err := bl.Find(b)
ok(t, err)
assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
buf, err := json.Marshal(bl)
ok(t, err)
bl2 := khepri.BlobList{}
json.Unmarshal(buf, &bl2)
b2, err = bl2.Find(b)
ok(t, err)
assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
buf, err = json.Marshal(bl2)
ok(t, err)
}
// random insert/find access by several goroutines
func TestBlobListRandom(t *testing.T) {
var wg sync.WaitGroup
worker := func(bl *khepri.BlobList) {
defer wg.Done()
b := newBlob()
bl.Insert(b)
for i := 0; i < 200; i++ {
bl.Insert(newBlob())
}
d := time.Duration(mrand.Intn(10)*100) * time.Millisecond
time.Sleep(d)
for i := 0; i < 100; i++ {
b2, err := bl.Find(b)
if err != nil {
t.Fatal(err)
}
if b.Compare(b2) != 0 {
t.Fatalf("items are not equal: want %v, got %v", b, b2)
}
}
bl2 := khepri.NewBlobList()
for i := 0; i < 200; i++ {
bl2.Insert(newBlob())
}
bl2.Merge(bl)
}
bl := khepri.NewBlobList()
for i := 0; uint(i) < *maxWorkers; i++ {
wg.Add(1)
go worker(bl)
}
wg.Wait()
}

View File

@ -218,8 +218,6 @@ func (c *chunker) Next() (*Chunk, error) {
c.pos += steps c.pos += steps
c.bpos = c.bmax c.bpos = c.bmax
} }
return nil, nil
} }
func (c *chunker) append(b byte) { func (c *chunker) append(b byte) {

1
cmd/khepri/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
config.mk

View File

@ -6,12 +6,15 @@ TAGS =
.PHONY: all clean debug .PHONY: all clean debug
# include config file if it exists
-include $(CURDIR)/config.mk
all: khepri all: khepri
khepri: *.go khepri: *.go $(wildcard ../../*.go) $(wildcard ../../*/*.go)
go build $(TAGS) -ldflags "$(LDFLAGS)" go build $(TAGS) -ldflags "$(LDFLAGS)"
debug: TAGS=-tags debug debug: TAGS=-tags debug_cmd
debug: khepri debug: khepri
clean: clean:

View File

@ -4,11 +4,42 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"strings"
"time"
"github.com/fd0/khepri" "github.com/fd0/khepri"
"github.com/fd0/khepri/backend" "github.com/fd0/khepri/backend"
"golang.org/x/crypto/ssh/terminal"
) )
func format_bytes(c uint64) string {
b := float64(c)
switch {
case c > 1<<40:
return fmt.Sprintf("%.3f TiB", b/(1<<40))
case c > 1<<30:
return fmt.Sprintf("%.3f GiB", b/(1<<30))
case c > 1<<20:
return fmt.Sprintf("%.3f MiB", b/(1<<20))
case c > 1<<10:
return fmt.Sprintf("%.3f KiB", b/(1<<10))
default:
return fmt.Sprintf("%d B", c)
}
}
func print_tree2(indent int, t *khepri.Tree) {
for _, node := range *t {
if node.Tree != nil {
fmt.Printf("%s%s/\n", strings.Repeat(" ", indent), node.Name)
print_tree2(indent+1, node.Tree)
} else {
fmt.Printf("%s%s\n", strings.Repeat(" ", indent), node.Name)
}
}
}
func commandBackup(be backend.Server, key *khepri.Key, args []string) error { func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
if len(args) != 1 { if len(args) != 1 {
return errors.New("usage: backup [dir|file]") return errors.New("usage: backup [dir|file]")
@ -25,12 +56,52 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
return err return err
} }
_, blob, err := arch.Import(target) fmt.Printf("scanning %s\n", target)
if terminal.IsTerminal(int(os.Stdout.Fd())) {
arch.ScannerUpdate = func(stats khepri.Stats) {
fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes))
}
}
// TODO: add filter
// arch.Filter = func(dir string, fi os.FileInfo) bool {
// return true
// }
t, err := arch.LoadTree(target)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
return err return err
} }
fmt.Printf("snapshot %s saved\n", blob.Storage) fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes))
stats := khepri.Stats{}
if terminal.IsTerminal(int(os.Stdout.Fd())) {
arch.SaveUpdate = func(s khepri.Stats) {
stats.Files += s.Files
stats.Directories += s.Directories
stats.Other += s.Other
stats.Bytes += s.Bytes
fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s",
float64(stats.Bytes)/float64(arch.Stats.Bytes)*100,
stats.Directories, arch.Stats.Directories,
stats.Files, arch.Stats.Files,
format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes))
}
}
start := time.Now()
sn, id, err := arch.Snapshot(target, t)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
fmt.Printf("\nsnapshot %s saved: %v\n", id, sn)
duration := time.Now().Sub(start)
fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20))
return nil return nil
} }

View File

@ -1,4 +1,4 @@
// +build debug // +build debug_cmd
package main package main
@ -17,8 +17,8 @@ func initDebugLogger() *log.Logger {
// create new log file // create new log file
filename := fmt.Sprintf("khepri-debug-%d-%s", filename := fmt.Sprintf("khepri-debug-%d-%s",
os.Getpid(), time.Now().Format("20060201-150405")) os.Getpid(), time.Now().Format("20060201-150405"))
f, err := os.OpenFile(filepath.Join(os.TempDir(), filename), path := filepath.Join(os.TempDir(), filename)
os.O_WRONLY|os.O_CREATE, 0600) f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err) fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err)
os.Exit(2) os.Exit(2)
@ -26,7 +26,7 @@ func initDebugLogger() *log.Logger {
// open logger // open logger
l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags) l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags)
fmt.Fprintf(os.Stderr, "logging activated, writing log file %s", filename) fmt.Fprintf(os.Stderr, "debug log for khepri command activated, writing log file %s\n", path)
l.Printf("khepri %s", version) l.Printf("khepri %s", version)
return l return l

View File

@ -1,4 +1,4 @@
// +build !debug // +build !debug_cmd
package main package main

View File

@ -5,6 +5,7 @@ import (
"log" "log"
"net/url" "net/url"
"os" "os"
"runtime"
"sort" "sort"
"strings" "strings"
@ -128,6 +129,9 @@ func init() {
commands["snapshots"] = commandSnapshots commands["snapshots"] = commandSnapshots
commands["cat"] = commandCat commands["cat"] = commandCat
commands["ls"] = commandLs commands["ls"] = commandLs
// set GOMAXPROCS to number of CPUs
runtime.GOMAXPROCS(runtime.NumCPU())
} }
func main() { func main() {

View File

@ -3,27 +3,24 @@ package khepri
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"io" "fmt"
"io/ioutil"
"os"
"github.com/fd0/khepri/backend" "github.com/fd0/khepri/backend"
"github.com/fd0/khepri/chunker"
) )
type ContentHandler struct { type ContentHandler struct {
be backend.Server be backend.Server
key *Key key *Key
content *StorageMap bl *BlobList
} }
// NewContentHandler creates a new content handler. // NewContentHandler creates a new content handler.
func NewContentHandler(be backend.Server, key *Key) (*ContentHandler, error) { func NewContentHandler(be backend.Server, key *Key) (*ContentHandler, error) {
ch := &ContentHandler{ ch := &ContentHandler{
be: be, be: be,
key: key, key: key,
content: NewStorageMap(), bl: NewBlobList(),
} }
return ch, nil return ch, nil
@ -36,7 +33,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) {
return nil, err return nil, err
} }
ch.content.Merge(sn.StorageMap) ch.bl.Merge(sn.BlobList)
return sn, nil return sn, nil
} }
@ -49,7 +47,8 @@ func (ch *ContentHandler) LoadAllSnapshots() error {
if err != nil { if err != nil {
return return
} }
ch.content.Merge(sn.StorageMap)
ch.bl.Merge(sn.BlobList)
}) })
if err != nil { if err != nil {
return err return err
@ -60,18 +59,18 @@ func (ch *ContentHandler) LoadAllSnapshots() error {
// Save encrypts data and stores it to the backend as type t. If the data was // Save encrypts data and stores it to the backend as type t. If the data was
// already saved before, the blob is returned. // already saved before, the blob is returned.
func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) {
// compute plaintext hash // compute plaintext hash
id := backend.Hash(data) id := backend.Hash(data)
// test if the hash is already in the backend // test if the hash is already in the backend
blob := ch.content.Find(id) blob, err := ch.bl.Find(Blob{ID: id})
if blob != nil { if err == nil {
return blob, nil return blob, nil
} }
// else create a new blob // else create a new blob
blob = &Blob{ blob = Blob{
ID: id, ID: id,
Size: uint64(len(data)), Size: uint64(len(data)),
} }
@ -79,85 +78,36 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) {
// encrypt blob // encrypt blob
ciphertext, err := ch.key.Encrypt(data) ciphertext, err := ch.key.Encrypt(data)
if err != nil { if err != nil {
return nil, err return Blob{}, err
} }
// save blob // save blob
sid, err := ch.be.Create(t, ciphertext) sid, err := ch.be.Create(t, ciphertext)
if err != nil { if err != nil {
return nil, err return Blob{}, err
} }
blob.Storage = sid blob.Storage = sid
blob.StorageSize = uint64(len(ciphertext)) blob.StorageSize = uint64(len(ciphertext))
// insert blob into the storage map // insert blob into the storage map
ch.content.Insert(blob) ch.bl.Insert(blob)
return blob, nil return blob, nil
} }
// SaveJSON serialises item as JSON and uses Save() to store it to the backend as type t. // SaveJSON serialises item as JSON and uses Save() to store it to the backend as type t.
func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (Blob, error) {
// convert to json // convert to json
data, err := json.Marshal(item) data, err := json.Marshal(item)
if err != nil { if err != nil {
return nil, err return Blob{}, err
} }
// compress and save data // compress and save data
return ch.Save(t, backend.Compress(data)) return ch.Save(t, backend.Compress(data))
} }
// SaveFile stores the content of the file on the backend as a Blob by calling
// Save for each chunk.
func (ch *ContentHandler) SaveFile(filename string, size uint) (Blobs, error) {
file, err := os.Open(filename)
defer file.Close()
if err != nil {
return nil, err
}
// if the file is small enough, store it directly
if size < chunker.MinSize {
buf, err := ioutil.ReadAll(file)
if err != nil {
return nil, err
}
blob, err := ch.Save(backend.Data, buf)
if err != nil {
return nil, err
}
return Blobs{blob}, nil
}
// else store all chunks
blobs := Blobs{}
chunker := chunker.New(file)
for {
chunk, err := chunker.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
blob, err := ch.Save(backend.Data, chunk.Data)
if err != nil {
return nil, err
}
blobs = append(blobs, blob)
}
return blobs, nil
}
// Load tries to load and decrypt content identified by t and id from the backend. // Load tries to load and decrypt content identified by t and id from the backend.
func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) {
if t == backend.Snapshot { if t == backend.Snapshot {
@ -177,9 +127,9 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) {
} }
// lookup storage hash // lookup storage hash
blob := ch.content.Find(id) blob, err := ch.bl.Find(Blob{ID: id})
if blob == nil { if err != nil {
return nil, errors.New("Storage ID not found") return nil, fmt.Errorf("Storage ID %s not found", id)
} }
// load data // load data

36
debug.go Normal file
View File

@ -0,0 +1,36 @@
// +build debug
package khepri
import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"
)
var debugLogger = initDebugLogger()
func initDebugLogger() *log.Logger {
// create new log file
filename := fmt.Sprintf("khepri-lib-debug-%d-%s",
os.Getpid(), time.Now().Format("20060201-150405"))
path := filepath.Join(os.TempDir(), filename)
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err)
os.Exit(2)
}
// open logger
l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags)
fmt.Fprintf(os.Stderr, "debug log for khepri library activated, writing log file %s\n", path)
return l
}
func debug(fmt string, args ...interface{}) {
debugLogger.Printf(fmt, args...)
}

5
debug_release.go Normal file
View File

@ -0,0 +1,5 @@
// +build !debug
package khepri
func debug(fmt string, args ...interface{}) {}

View File

@ -30,12 +30,12 @@ func NewRestorer(be backend.Server, key *Key, snid backend.ID) (*Restorer, error
var err error var err error
r.ch, err = NewContentHandler(be, key) r.ch, err = NewContentHandler(be, key)
if err != nil { if err != nil {
return nil, err return nil, arrar.Annotate(err, "create contenthandler for restorer")
} }
r.sn, err = r.ch.LoadSnapshot(snid) r.sn, err = r.ch.LoadSnapshot(snid)
if err != nil { if err != nil {
return nil, err return nil, arrar.Annotate(err, "load snapshot for restorer")
} }
// abort on all errors // abort on all errors

View File

@ -11,14 +11,14 @@ import (
) )
type Snapshot struct { type Snapshot struct {
Time time.Time `json:"time"` Time time.Time `json:"time"`
Content backend.ID `json:"content"` Content backend.ID `json:"content"`
StorageMap *StorageMap `json:"map"` BlobList *BlobList `json:"blobs"`
Dir string `json:"dir"` Dir string `json:"dir"`
Hostname string `json:"hostname,omitempty"` Hostname string `json:"hostname,omitempty"`
Username string `json:"username,omitempty"` Username string `json:"username,omitempty"`
UID string `json:"uid,omitempty"` UID string `json:"uid,omitempty"`
GID string `json:"gid,omitempty"` GID string `json:"gid,omitempty"`
id backend.ID // plaintext ID, used during restore id backend.ID // plaintext ID, used during restore
} }

View File

@ -1,51 +0,0 @@
package khepri
import (
"bytes"
"sort"
"github.com/fd0/khepri/backend"
)
type StorageMap Blobs
func NewStorageMap() *StorageMap {
return &StorageMap{}
}
func (m StorageMap) find(id backend.ID) (int, *Blob) {
i := sort.Search(len(m), func(i int) bool {
return bytes.Compare(m[i].ID, id) >= 0
})
if i < len(m) && bytes.Equal(m[i].ID, id) {
return i, m[i]
}
return i, nil
}
func (m StorageMap) Find(id backend.ID) *Blob {
_, blob := m.find(id)
return blob
}
func (m *StorageMap) Insert(blob *Blob) {
pos, b := m.find(blob.ID)
if b != nil {
// already present
return
}
// insert blob
// https://code.google.com/p/go-wiki/wiki/SliceTricks
*m = append(*m, nil)
copy((*m)[pos+1:], (*m)[pos:])
(*m)[pos] = blob
}
func (m *StorageMap) Merge(sm *StorageMap) {
for _, blob := range *sm {
m.Insert(blob)
}
}

View File

@ -34,6 +34,8 @@ type Node struct {
Content []backend.ID `json:"content,omitempty"` Content []backend.ID `json:"content,omitempty"`
Subtree backend.ID `json:"subtree,omitempty"` Subtree backend.ID `json:"subtree,omitempty"`
Tree *Tree `json:"-"`
path string path string
} }
@ -44,7 +46,7 @@ type Blob struct {
StorageSize uint64 `json:"ssize,omitempty"` // encrypted Size StorageSize uint64 `json:"ssize,omitempty"` // encrypted Size
} }
type Blobs []*Blob type Blobs []Blob
func (n Node) String() string { func (n Node) String() string {
switch n.Type { switch n.Type {