mirror of
https://github.com/octoleo/restic.git
synced 2024-11-24 13:47:42 +00:00
Refactor StorageMap to BlobList
This commit is contained in:
parent
d594cd89b7
commit
d1e4431514
1
Makefile
1
Makefile
@ -1,7 +1,6 @@
|
||||
.PHONY: clean all test
|
||||
|
||||
test:
|
||||
go test -race ./...
|
||||
for dir in cmd/* ; do \
|
||||
(cd "$$dir"; go build -race) \
|
||||
done
|
||||
|
33
archiver.go
33
archiver.go
@ -20,8 +20,7 @@ type Archiver struct {
|
||||
key *Key
|
||||
ch *ContentHandler
|
||||
|
||||
m sync.Mutex
|
||||
smap *StorageMap // blobs used for the current snapshot
|
||||
bl *BlobList // blobs used for the current snapshot
|
||||
|
||||
fileToken chan struct{}
|
||||
|
||||
@ -63,7 +62,7 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
|
||||
// do nothing
|
||||
arch.ScannerUpdate = func(Stats) {}
|
||||
|
||||
arch.smap = NewStorageMap()
|
||||
arch.bl = NewBlobList()
|
||||
arch.ch, err = NewContentHandler(be, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -86,30 +85,26 @@ func (arch *Archiver) saveUpdate(stats Stats) {
|
||||
}
|
||||
}
|
||||
|
||||
func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) {
|
||||
func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
|
||||
blob, err := arch.ch.Save(t, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Blob{}, err
|
||||
}
|
||||
|
||||
// store blob in storage map for current snapshot
|
||||
arch.m.Lock()
|
||||
defer arch.m.Unlock()
|
||||
arch.smap.Insert(blob)
|
||||
arch.bl.Insert(blob)
|
||||
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) {
|
||||
func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (Blob, error) {
|
||||
blob, err := arch.ch.SaveJSON(t, item)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Blob{}, err
|
||||
}
|
||||
|
||||
// store blob in storage map for current snapshot
|
||||
arch.m.Lock()
|
||||
defer arch.m.Unlock()
|
||||
arch.smap.Insert(blob)
|
||||
arch.bl.Insert(blob)
|
||||
|
||||
return blob, nil
|
||||
}
|
||||
@ -168,9 +163,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
|
||||
node.Content = make([]backend.ID, len(blobs))
|
||||
for i, blob := range blobs {
|
||||
node.Content[i] = blob.ID
|
||||
arch.m.Lock()
|
||||
arch.smap.Insert(blob)
|
||||
arch.m.Unlock()
|
||||
arch.bl.Insert(blob)
|
||||
}
|
||||
|
||||
return err
|
||||
@ -258,14 +251,14 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) {
|
||||
return &Tree{node}, nil
|
||||
}
|
||||
|
||||
func (arch *Archiver) saveTree(t *Tree) (*Blob, error) {
|
||||
func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, node := range *t {
|
||||
if node.Tree != nil && node.Subtree == nil {
|
||||
b, err := arch.saveTree(node.Tree)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Blob{}, err
|
||||
}
|
||||
node.Subtree = b.ID
|
||||
arch.saveUpdate(Stats{Directories: 1})
|
||||
@ -294,7 +287,7 @@ func (arch *Archiver) saveTree(t *Tree) (*Blob, error) {
|
||||
|
||||
blob, err := arch.SaveJSON(backend.Tree, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Blob{}, err
|
||||
}
|
||||
|
||||
return blob, nil
|
||||
@ -311,7 +304,7 @@ func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, erro
|
||||
sn.Content = blob.ID
|
||||
|
||||
// save snapshot
|
||||
sn.StorageMap = arch.smap
|
||||
sn.BlobList = arch.bl
|
||||
blob, err = arch.SaveJSON(backend.Snapshot, sn)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -47,6 +47,11 @@ func (id ID) EqualString(other string) (bool, error) {
|
||||
return id.Equal(ID(s)), nil
|
||||
}
|
||||
|
||||
// Compare compares this ID to another one, returning -1, 0, or 1.
|
||||
func (id ID) Compare(other ID) int {
|
||||
return bytes.Compare(other, id)
|
||||
}
|
||||
|
||||
func (id ID) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(id.String())
|
||||
}
|
||||
|
99
bloblist.go
Normal file
99
bloblist.go
Normal file
@ -0,0 +1,99 @@
|
||||
package khepri
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type BlobList struct {
|
||||
list []Blob
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
var ErrBlobNotFound = errors.New("Blob not found")
|
||||
|
||||
func NewBlobList() *BlobList {
|
||||
return &BlobList{
|
||||
list: []Blob{},
|
||||
}
|
||||
}
|
||||
|
||||
func (bl *BlobList) find(blob Blob) (int, Blob, error) {
|
||||
pos := sort.Search(len(bl.list), func(i int) bool {
|
||||
return blob.ID.Compare(bl.list[i].ID) >= 0
|
||||
})
|
||||
|
||||
if pos < len(bl.list) && blob.ID.Compare(bl.list[pos].ID) == 0 {
|
||||
return pos, bl.list[pos], nil
|
||||
}
|
||||
|
||||
return pos, Blob{}, ErrBlobNotFound
|
||||
}
|
||||
|
||||
func (bl *BlobList) Find(blob Blob) (Blob, error) {
|
||||
bl.m.Lock()
|
||||
defer bl.m.Unlock()
|
||||
|
||||
_, blob, err := bl.find(blob)
|
||||
return blob, err
|
||||
}
|
||||
|
||||
func (bl *BlobList) Merge(other *BlobList) {
|
||||
bl.m.Lock()
|
||||
defer bl.m.Unlock()
|
||||
other.m.Lock()
|
||||
defer other.m.Unlock()
|
||||
|
||||
for _, blob := range other.list {
|
||||
bl.insert(blob)
|
||||
}
|
||||
}
|
||||
|
||||
func (bl *BlobList) insert(blob Blob) {
|
||||
pos, _, err := bl.find(blob)
|
||||
if err == nil {
|
||||
// already present
|
||||
return
|
||||
}
|
||||
|
||||
// insert blob
|
||||
// https://code.google.com/p/go-wiki/wiki/bliceTricks
|
||||
bl.list = append(bl.list, Blob{})
|
||||
copy(bl.list[pos+1:], bl.list[pos:])
|
||||
bl.list[pos] = blob
|
||||
}
|
||||
|
||||
func (bl *BlobList) Insert(blob Blob) {
|
||||
bl.m.Lock()
|
||||
defer bl.m.Unlock()
|
||||
|
||||
bl.insert(blob)
|
||||
}
|
||||
|
||||
func (bl BlobList) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(bl.list)
|
||||
}
|
||||
|
||||
func (bl *BlobList) UnmarshalJSON(data []byte) error {
|
||||
return json.Unmarshal(data, &bl.list)
|
||||
}
|
||||
|
||||
// Compare compares two blobs by comparing the ID and the size. It returns -1,
|
||||
// 0, or 1.
|
||||
func (blob Blob) Compare(other Blob) int {
|
||||
if res := bytes.Compare(other.ID, blob.ID); res != 0 {
|
||||
return res
|
||||
}
|
||||
|
||||
if blob.Size < other.Size {
|
||||
return -1
|
||||
}
|
||||
if blob.Size > other.Size {
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
137
bloblist_test.go
Normal file
137
bloblist_test.go
Normal file
@ -0,0 +1,137 @@
|
||||
package khepri_test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"io"
|
||||
mrand "math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fd0/khepri"
|
||||
)
|
||||
|
||||
const backendIDSize = 8
|
||||
|
||||
var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against")
|
||||
|
||||
func randomID() []byte {
|
||||
buf := make([]byte, backendIDSize)
|
||||
_, err := io.ReadFull(rand.Reader, buf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func newBlob() khepri.Blob {
|
||||
return khepri.Blob{ID: randomID(), Size: uint64(mrand.Uint32())}
|
||||
}
|
||||
|
||||
// Test basic functionality
|
||||
func TestBlobList(t *testing.T) {
|
||||
bl := khepri.NewBlobList()
|
||||
|
||||
b := newBlob()
|
||||
bl.Insert(b)
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
bl.Insert(newBlob())
|
||||
}
|
||||
|
||||
b2, err := bl.Find(khepri.Blob{ID: b.ID})
|
||||
ok(t, err)
|
||||
assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
|
||||
|
||||
bl2 := khepri.NewBlobList()
|
||||
for i := 0; i < 1000; i++ {
|
||||
bl.Insert(newBlob())
|
||||
}
|
||||
|
||||
b2, err = bl2.Find(b)
|
||||
assert(t, err != nil, "found ID in khepri that was never inserted: %v", b2)
|
||||
|
||||
bl2.Merge(bl)
|
||||
|
||||
b2, err = bl2.Find(b)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if b.Compare(b2) != 0 {
|
||||
t.Fatalf("items are not equal: want %v, got %v", b, b2)
|
||||
}
|
||||
}
|
||||
|
||||
// Test JSON encode/decode
|
||||
func TestBlobListJSON(t *testing.T) {
|
||||
bl := khepri.NewBlobList()
|
||||
b := khepri.Blob{ID: []byte{1, 2, 3, 4}}
|
||||
bl.Insert(b)
|
||||
|
||||
b2, err := bl.Find(b)
|
||||
ok(t, err)
|
||||
assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
|
||||
|
||||
buf, err := json.Marshal(bl)
|
||||
ok(t, err)
|
||||
|
||||
bl2 := khepri.BlobList{}
|
||||
json.Unmarshal(buf, &bl2)
|
||||
|
||||
b2, err = bl2.Find(b)
|
||||
ok(t, err)
|
||||
assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
|
||||
|
||||
buf, err = json.Marshal(bl2)
|
||||
ok(t, err)
|
||||
}
|
||||
|
||||
// random insert/find access by several goroutines
|
||||
func TestBlobListRandom(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
worker := func(bl *khepri.BlobList) {
|
||||
defer wg.Done()
|
||||
|
||||
b := newBlob()
|
||||
bl.Insert(b)
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
bl.Insert(newBlob())
|
||||
}
|
||||
|
||||
d := time.Duration(mrand.Intn(10)*100) * time.Millisecond
|
||||
time.Sleep(d)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
b2, err := bl.Find(b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if b.Compare(b2) != 0 {
|
||||
t.Fatalf("items are not equal: want %v, got %v", b, b2)
|
||||
}
|
||||
}
|
||||
|
||||
bl2 := khepri.NewBlobList()
|
||||
for i := 0; i < 200; i++ {
|
||||
bl2.Insert(newBlob())
|
||||
}
|
||||
|
||||
bl2.Merge(bl)
|
||||
}
|
||||
|
||||
bl := khepri.NewBlobList()
|
||||
|
||||
for i := 0; uint(i) < *maxWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go worker(bl)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
@ -3,7 +3,7 @@ package khepri
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
"fmt"
|
||||
|
||||
"github.com/fd0/khepri/backend"
|
||||
)
|
||||
@ -12,16 +12,15 @@ type ContentHandler struct {
|
||||
be backend.Server
|
||||
key *Key
|
||||
|
||||
m sync.Mutex
|
||||
content *StorageMap
|
||||
bl *BlobList
|
||||
}
|
||||
|
||||
// NewContentHandler creates a new content handler.
|
||||
func NewContentHandler(be backend.Server, key *Key) (*ContentHandler, error) {
|
||||
ch := &ContentHandler{
|
||||
be: be,
|
||||
key: key,
|
||||
content: NewStorageMap(),
|
||||
be: be,
|
||||
key: key,
|
||||
bl: NewBlobList(),
|
||||
}
|
||||
|
||||
return ch, nil
|
||||
@ -34,9 +33,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ch.m.Lock()
|
||||
defer ch.m.Unlock()
|
||||
ch.content.Merge(sn.StorageMap)
|
||||
ch.bl.Merge(sn.BlobList)
|
||||
|
||||
return sn, nil
|
||||
}
|
||||
|
||||
@ -50,9 +48,7 @@ func (ch *ContentHandler) LoadAllSnapshots() error {
|
||||
return
|
||||
}
|
||||
|
||||
ch.m.Lock()
|
||||
defer ch.m.Unlock()
|
||||
ch.content.Merge(sn.StorageMap)
|
||||
ch.bl.Merge(sn.BlobList)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -63,20 +59,18 @@ func (ch *ContentHandler) LoadAllSnapshots() error {
|
||||
|
||||
// Save encrypts data and stores it to the backend as type t. If the data was
|
||||
// already saved before, the blob is returned.
|
||||
func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) {
|
||||
func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) {
|
||||
// compute plaintext hash
|
||||
id := backend.Hash(data)
|
||||
|
||||
// test if the hash is already in the backend
|
||||
ch.m.Lock()
|
||||
defer ch.m.Unlock()
|
||||
blob := ch.content.Find(id)
|
||||
if blob != nil {
|
||||
blob, err := ch.bl.Find(Blob{ID: id})
|
||||
if err == nil {
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
// else create a new blob
|
||||
blob = &Blob{
|
||||
blob = Blob{
|
||||
ID: id,
|
||||
Size: uint64(len(data)),
|
||||
}
|
||||
@ -84,30 +78,30 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) {
|
||||
// encrypt blob
|
||||
ciphertext, err := ch.key.Encrypt(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Blob{}, err
|
||||
}
|
||||
|
||||
// save blob
|
||||
sid, err := ch.be.Create(t, ciphertext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Blob{}, err
|
||||
}
|
||||
|
||||
blob.Storage = sid
|
||||
blob.StorageSize = uint64(len(ciphertext))
|
||||
|
||||
// insert blob into the storage map
|
||||
ch.content.Insert(blob)
|
||||
ch.bl.Insert(blob)
|
||||
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
// SaveJSON serialises item as JSON and uses Save() to store it to the backend as type t.
|
||||
func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (*Blob, error) {
|
||||
func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (Blob, error) {
|
||||
// convert to json
|
||||
data, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Blob{}, err
|
||||
}
|
||||
|
||||
// compress and save data
|
||||
@ -133,11 +127,9 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) {
|
||||
}
|
||||
|
||||
// lookup storage hash
|
||||
ch.m.Lock()
|
||||
defer ch.m.Unlock()
|
||||
blob := ch.content.Find(id)
|
||||
if blob == nil {
|
||||
return nil, errors.New("Storage ID not found")
|
||||
blob, err := ch.bl.Find(Blob{ID: id})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Storage ID %s not found", id)
|
||||
}
|
||||
|
||||
// load data
|
||||
|
@ -30,12 +30,12 @@ func NewRestorer(be backend.Server, key *Key, snid backend.ID) (*Restorer, error
|
||||
var err error
|
||||
r.ch, err = NewContentHandler(be, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, arrar.Annotate(err, "create contenthandler for restorer")
|
||||
}
|
||||
|
||||
r.sn, err = r.ch.LoadSnapshot(snid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, arrar.Annotate(err, "load snapshot for restorer")
|
||||
}
|
||||
|
||||
// abort on all errors
|
||||
|
16
snapshot.go
16
snapshot.go
@ -11,14 +11,14 @@ import (
|
||||
)
|
||||
|
||||
type Snapshot struct {
|
||||
Time time.Time `json:"time"`
|
||||
Content backend.ID `json:"content"`
|
||||
StorageMap *StorageMap `json:"map"`
|
||||
Dir string `json:"dir"`
|
||||
Hostname string `json:"hostname,omitempty"`
|
||||
Username string `json:"username,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
GID string `json:"gid,omitempty"`
|
||||
Time time.Time `json:"time"`
|
||||
Content backend.ID `json:"content"`
|
||||
BlobList *BlobList `json:"blobs"`
|
||||
Dir string `json:"dir"`
|
||||
Hostname string `json:"hostname,omitempty"`
|
||||
Username string `json:"username,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
GID string `json:"gid,omitempty"`
|
||||
|
||||
id backend.ID // plaintext ID, used during restore
|
||||
}
|
||||
|
@ -1,51 +0,0 @@
|
||||
package khepri
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
|
||||
"github.com/fd0/khepri/backend"
|
||||
)
|
||||
|
||||
type StorageMap Blobs
|
||||
|
||||
func NewStorageMap() *StorageMap {
|
||||
return &StorageMap{}
|
||||
}
|
||||
|
||||
func (m StorageMap) find(id backend.ID) (int, *Blob) {
|
||||
i := sort.Search(len(m), func(i int) bool {
|
||||
return bytes.Compare(m[i].ID, id) >= 0
|
||||
})
|
||||
|
||||
if i < len(m) && bytes.Equal(m[i].ID, id) {
|
||||
return i, m[i]
|
||||
}
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m StorageMap) Find(id backend.ID) *Blob {
|
||||
_, blob := m.find(id)
|
||||
return blob
|
||||
}
|
||||
|
||||
func (m *StorageMap) Insert(blob *Blob) {
|
||||
pos, b := m.find(blob.ID)
|
||||
if b != nil {
|
||||
// already present
|
||||
return
|
||||
}
|
||||
|
||||
// insert blob
|
||||
// https://code.google.com/p/go-wiki/wiki/SliceTricks
|
||||
*m = append(*m, nil)
|
||||
copy((*m)[pos+1:], (*m)[pos:])
|
||||
(*m)[pos] = blob
|
||||
}
|
||||
|
||||
func (m *StorageMap) Merge(sm *StorageMap) {
|
||||
for _, blob := range *sm {
|
||||
m.Insert(blob)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user