From f95788ed90831b7e4a727ab5e3b9d804889aa4af Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 23 Nov 2014 16:48:00 +0100 Subject: [PATCH] Limit memory usage, add several sync.Pool --- archiver.go | 31 ++++++++---- archiver_test.go | 7 ++- backend/generic.go | 5 +- backend/id.go | 14 ++++-- bloblist_test.go | 7 ++- chunker/chunker.go | 64 +++++++++++++------------ chunker/chunker_test.go | 13 +++-- cmd/khepri/main.go | 4 ++ contenthandler.go | 6 ++- key.go | 72 ++++++++++++++++++---------- key_int_test.go | 4 +- key_test.go | 23 +++++---- pools.go | 103 ++++++++++++++++++++++++++++++++++++++++ tree.go | 21 +++++--- 14 files changed, 277 insertions(+), 97 deletions(-) create mode 100644 pools.go diff --git a/archiver.go b/archiver.go index 0bd35b8e2..b4ef607dc 100644 --- a/archiver.go +++ b/archiver.go @@ -2,7 +2,6 @@ package khepri import ( "io" - "io/ioutil" "os" "path/filepath" "sync" @@ -14,8 +13,8 @@ import ( ) const ( - maxConcurrentFiles = 32 - maxConcurrentBlobs = 32 + maxConcurrentFiles = 8 + maxConcurrentBlobs = 8 statTimeout = 20 * time.Millisecond ) @@ -159,12 +158,20 @@ func (arch *Archiver) SaveFile(node *Node) error { // if the file is small enough, store it directly if node.Size < chunker.MinSize { - buf, err := ioutil.ReadAll(file) + // acquire token + token := <-arch.blobToken + defer func() { + arch.blobToken <- token + }() + + buf := GetChunkBuf("blob single file") + defer FreeChunkBuf("blob single file", buf) + n, err := io.ReadFull(file, buf) if err != nil { return err } - blob, err := arch.ch.Save(backend.Data, buf) + blob, err := arch.ch.Save(backend.Data, buf[:n]) if err != nil { return err } @@ -176,14 +183,18 @@ func (arch *Archiver) SaveFile(node *Node) error { // else store all chunks chnker := chunker.New(file) chans := [](<-chan Blob){} + defer chnker.Free() for { - chunk, err := chnker.Next() + buf := GetChunkBuf("blob chunker") + chunk, err := chnker.Next(buf) if err == io.EOF { + FreeChunkBuf("blob chunker", buf) break } if err != nil { + FreeChunkBuf("blob chunker", buf) return err } @@ -198,6 +209,8 @@ func (arch *Archiver) SaveFile(node *Node) error { panic(err) } + FreeChunkBuf("blob chunker", buf) + arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.blobToken <- token ch <- blob @@ -318,13 +331,13 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { node.Subtree = b.ID arch.update(arch.SaveStats, Stats{Directories: 1}) } else if node.Type == "file" && len(node.Content) == 0 { + // get token + token := <-arch.fileToken + // start goroutine wg.Add(1) go func(n *Node) { defer wg.Done() - - // get token - token := <-arch.fileToken defer func() { arch.fileToken <- token }() diff --git a/archiver_test.go b/archiver_test.go index 529b2bb5f..7739847d4 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "testing" + "github.com/fd0/khepri" "github.com/fd0/khepri/chunker" ) @@ -30,6 +31,7 @@ func BenchmarkChunkEncrypt(b *testing.B) { be := setupBackend(b) defer teardownBackend(b, be) key := setupKey(b, be, "geheim") + chunkBuf := make([]byte, chunker.MaxSize) b.ResetTimer() b.SetBytes(int64(len(data))) @@ -38,7 +40,7 @@ func BenchmarkChunkEncrypt(b *testing.B) { ch := chunker.New(bytes.NewReader(data)) for { - chunk_data, err := ch.Next() + chunk_data, err := ch.Next(chunkBuf) if err == io.EOF { break @@ -46,7 +48,8 @@ func BenchmarkChunkEncrypt(b *testing.B) { ok(b, err) - _, err = key.Encrypt(chunk_data.Data) + buf := make([]byte, khepri.MaxCiphertextSize) + _, err = key.Encrypt(buf, chunk_data.Data) ok(b, err) } } diff --git a/backend/generic.go b/backend/generic.go index 237712ced..01b7f3f25 100644 --- a/backend/generic.go +++ b/backend/generic.go @@ -5,8 +5,11 @@ import ( "compress/zlib" "crypto/sha256" "io/ioutil" + "sync" ) +var idPool = sync.Pool{New: func() interface{} { return ID(make([]byte, IDSize)) }} + // Each lists all entries of type t in the backend and calls function f() with // the id and data. func Each(be Server, t Type, f func(id ID, data []byte, err error)) error { @@ -78,7 +81,7 @@ func Uncompress(data []byte) []byte { // Hash returns the ID for data. func Hash(data []byte) ID { h := sha256.Sum256(data) - id := make(ID, 32) + id := idPool.Get().(ID) copy(id, h[:]) return id } diff --git a/backend/id.go b/backend/id.go index 0c15cad26..87071e577 100644 --- a/backend/id.go +++ b/backend/id.go @@ -8,7 +8,8 @@ import ( "errors" ) -const sha256_length = 32 // in bytes +// IDSize contains the size of an ID, in bytes. +const IDSize = sha256.Size // References content within a repository. type ID []byte @@ -21,7 +22,7 @@ func ParseID(s string) (ID, error) { return nil, err } - if len(b) != sha256_length { + if len(b) != IDSize { return nil, errors.New("invalid length for sha256 hash") } @@ -63,7 +64,7 @@ func (id *ID) UnmarshalJSON(b []byte) error { return err } - *id = make([]byte, len(s)/2) + *id = idPool.Get().(ID) _, err = hex.Decode(*id, []byte(s)) if err != nil { return err @@ -74,11 +75,16 @@ func (id *ID) UnmarshalJSON(b []byte) error { func IDFromData(d []byte) ID { hash := sha256.Sum256(d) - id := make([]byte, sha256_length) + id := idPool.Get().(ID) copy(id, hash[:]) return id } +// Free returns the ID byte slice back to the allocation pool. +func (id ID) Free() { + idPool.Put(id) +} + type IDs []ID func (ids IDs) Len() int { diff --git a/bloblist_test.go b/bloblist_test.go index 3add85cf9..dea2fbbc2 100644 --- a/bloblist_test.go +++ b/bloblist_test.go @@ -11,14 +11,13 @@ import ( "time" "github.com/fd0/khepri" + "github.com/fd0/khepri/backend" ) -const backendIDSize = 8 - var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against") func randomID() []byte { - buf := make([]byte, backendIDSize) + buf := make([]byte, backend.IDSize) _, err := io.ReadFull(rand.Reader, buf) if err != nil { panic(err) @@ -69,7 +68,7 @@ func TestBlobList(t *testing.T) { // Test JSON encode/decode func TestBlobListJSON(t *testing.T) { bl := khepri.NewBlobList() - b := khepri.Blob{ID: []byte{1, 2, 3, 4}} + b := khepri.Blob{ID: randomID()} bl.Insert(b) b2, err := bl.Find(b) diff --git a/chunker/chunker.go b/chunker/chunker.go index 2cd5a4e34..6fe5ab32c 100644 --- a/chunker/chunker.go +++ b/chunker/chunker.go @@ -30,6 +30,15 @@ var ( once sync.Once mod_table [256]uint64 out_table [256]uint64 + + chunkerPool = sync.Pool{ + New: func() interface{} { + return &Chunker{ + window: make([]byte, WindowSize), + buf: make([]byte, MaxSize), + } + }, + } ) // A chunk is one content-dependent chunk of bytes whose end was cut when the @@ -41,17 +50,8 @@ type Chunk struct { Data []byte } -// A chunker takes a stream of bytes and emits average size chunks. -type Chunker interface { - // Next returns the next chunk of data. If an error occurs while reading, - // the error is returned with a nil chunk. The state of the current chunk - // is undefined. When the last chunk has been returned, all subsequent - // calls yield a nil chunk and an io.EOF error. - Next() (*Chunk, error) -} - // A chunker internally holds everything needed to split content. -type chunker struct { +type Chunker struct { rd io.Reader closed bool @@ -62,7 +62,6 @@ type chunker struct { bpos int bmax int - data []byte start int count int pos int @@ -71,16 +70,9 @@ type chunker struct { } // New returns a new Chunker that reads from data from rd. -func New(rd io.Reader) Chunker { - c := &chunker{ - rd: rd, - - window: make([]byte, WindowSize), - - buf: make([]byte, MaxSize), - - data: make([]byte, 0, MaxSize), - } +func New(rd io.Reader) *Chunker { + c := chunkerPool.Get().(*Chunker) + c.rd = rd once.Do(c.fill_tables) c.reset() @@ -88,7 +80,13 @@ func New(rd io.Reader) Chunker { return c } -func (c *chunker) reset() { +// Free returns this chunker to the allocation pool +func (c *Chunker) Free() { + c.rd = nil + chunkerPool.Put(c) +} + +func (c *Chunker) reset() { for i := 0; i < WindowSize; i++ { c.window[i] = 0 } @@ -97,11 +95,10 @@ func (c *chunker) reset() { c.pos = 0 c.count = 0 c.slide(1) - c.data = make([]byte, 0, MaxSize) } // Calculate out_table and mod_table for optimization. Must be called only once. -func (c *chunker) fill_tables() { +func (c *Chunker) fill_tables() { // calculate table for sliding out bytes. The byte to slide out is used as // the index for the table, the value contains the following: // out_table[b] = Hash(b || 0 || ... || 0) @@ -137,7 +134,12 @@ func (c *chunker) fill_tables() { } } -func (c *chunker) Next() (*Chunk, error) { +// Next returns the next chunk of data. If an error occurs while reading, +// the error is returned with a nil chunk. The state of the current chunk +// is undefined. When the last chunk has been returned, all subsequent +// calls yield a nil chunk and an io.EOF error. +func (c *Chunker) Next(dst []byte) (*Chunk, error) { + dst = dst[:0] for { if c.bpos >= c.bmax { n, err := io.ReadFull(c.rd, c.buf) @@ -160,7 +162,7 @@ func (c *chunker) Next() (*Chunk, error) { Start: c.start, Length: c.count, Cut: c.digest, - Data: c.data, + Data: dst, }, nil } } @@ -188,7 +190,7 @@ func (c *chunker) Next() (*Chunk, error) { c.digest ^= mod_table[index] if (c.count+i+1 >= MinSize && (c.digest&splitmask) == 0) || c.count+i+1 >= MaxSize { - c.data = append(c.data, c.buf[c.bpos:c.bpos+i+1]...) + dst = append(dst, c.buf[c.bpos:c.bpos+i+1]...) c.count += i + 1 c.pos += i + 1 c.bpos += i + 1 @@ -197,7 +199,7 @@ func (c *chunker) Next() (*Chunk, error) { Start: c.start, Length: c.count, Cut: c.digest, - Data: c.data, + Data: dst, } // keep position @@ -212,7 +214,7 @@ func (c *chunker) Next() (*Chunk, error) { steps := c.bmax - c.bpos if steps > 0 { - c.data = append(c.data, c.buf[c.bpos:c.bpos+steps]...) + dst = append(dst, c.buf[c.bpos:c.bpos+steps]...) } c.count += steps c.pos += steps @@ -220,7 +222,7 @@ func (c *chunker) Next() (*Chunk, error) { } } -func (c *chunker) append(b byte) { +func (c *Chunker) append(b byte) { index := c.digest >> uint(pol_shift) c.digest <<= 8 c.digest |= uint64(b) @@ -228,7 +230,7 @@ func (c *chunker) append(b byte) { c.digest ^= mod_table[index] } -func (c *chunker) slide(b byte) { +func (c *Chunker) slide(b byte) { out := c.window[c.wpos] c.window[c.wpos] = b c.digest ^= out_table[out] diff --git a/chunker/chunker_test.go b/chunker/chunker_test.go index a1f735c62..a62075c1f 100644 --- a/chunker/chunker_test.go +++ b/chunker/chunker_test.go @@ -54,9 +54,10 @@ var chunks2 = []chunk{ chunk{chunker.MinSize, 0}, } -func test_with_data(t *testing.T, chunker chunker.Chunker, chunks []chunk) { +func test_with_data(t *testing.T, chnker *chunker.Chunker, chunks []chunk) { + buf := make([]byte, chunker.MaxSize) for i, chunk := range chunks { - c, err := chunker.Next() + c, err := chnker.Next(buf) if err != nil { t.Fatalf("Error returned with chunk %d: %v", i, err) @@ -84,7 +85,7 @@ func test_with_data(t *testing.T, chunker chunker.Chunker, chunks []chunk) { } } - c, err := chunker.Next() + c, err := chnker.Next(buf) if c != nil { t.Fatal("additional non-nil chunk returned") @@ -115,20 +116,24 @@ func TestChunker(t *testing.T) { buf := get_random(23, 32*1024*1024) ch := chunker.New(bytes.NewReader(buf)) test_with_data(t, ch, chunks1) + ch.Free() // setup nullbyte data source buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize) ch = chunker.New(bytes.NewReader(buf)) test_with_data(t, ch, chunks2) + ch.Free() } func BenchmarkChunker(b *testing.B) { size := 10 * 1024 * 1024 buf := get_random(23, size) + dst := make([]byte, chunker.MaxSize) b.ResetTimer() b.SetBytes(int64(size)) + var chunks int for i := 0; i < b.N; i++ { chunks = 0 @@ -136,7 +141,7 @@ func BenchmarkChunker(b *testing.B) { ch := chunker.New(bytes.NewReader(buf)) for { - _, err := ch.Next() + _, err := ch.Next(dst) if err == io.EOF { break diff --git a/cmd/khepri/main.go b/cmd/khepri/main.go index cd1c0a777..64ac4e8b5 100644 --- a/cmd/khepri/main.go +++ b/cmd/khepri/main.go @@ -135,6 +135,8 @@ func init() { } func main() { + // defer profile.Start(profile.MemProfileRate(100000), profile.ProfilePath(".")).Stop() + log.SetOutput(os.Stdout) opts.Repo = os.Getenv("KHEPRI_REPOSITORY") @@ -192,4 +194,6 @@ func main() { if err != nil { errx(1, "error executing command %q: %v", cmd, err) } + + khepri.PoolAlloc() } diff --git a/contenthandler.go b/contenthandler.go index 625261993..8ccf7654f 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -66,6 +66,7 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) { // test if the hash is already in the backend blob, err := ch.bl.Find(Blob{ID: id}) if err == nil { + id.Free() return blob, nil } @@ -76,10 +77,13 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) { } // encrypt blob - ciphertext, err := ch.key.Encrypt(data) + ciphertext := GetChunkBuf("ch.Save()") + defer FreeChunkBuf("ch.Save()", ciphertext) + n, err := ch.key.Encrypt(ciphertext, data) if err != nil { return Blob{}, err } + ciphertext = ciphertext[:n] // save blob sid, err := ch.be.Create(t, ciphertext) diff --git a/key.go b/key.go index 48cc2f67e..300847001 100644 --- a/key.go +++ b/key.go @@ -15,10 +15,17 @@ import ( "time" "github.com/fd0/khepri/backend" + "github.com/fd0/khepri/chunker" "golang.org/x/crypto/scrypt" ) +// max size is 8MiB, defined in chunker +const maxDataSize = chunker.MaxSize +const ivSize = aes.BlockSize +const hmacSize = sha256.Size +const MaxCiphertextSize = ivSize + maxDataSize + hmacSize + var ( // ErrUnauthenticated is returned when ciphertext verification has failed. ErrUnauthenticated = errors.New("ciphertext verification failed") @@ -108,7 +115,9 @@ func CreateKey(be backend.Server, password string) (*Key, error) { return nil, err } - k.Data, err = k.EncryptUser(buf) + k.Data = GetChunkBuf("key") + n, err = k.EncryptUser(k.Data, buf) + k.Data = k.Data[:n] // dump as json buf, err = json.Marshal(k) @@ -122,6 +131,8 @@ func CreateKey(be backend.Server, password string) (*Key, error) { return nil, err } + FreeChunkBuf("key", k.Data) + return k, nil } @@ -229,20 +240,29 @@ func (k *Key) newKeys() (*keys, error) { return ks, nil } -func (k *Key) newIV() ([]byte, error) { - buf := make([]byte, aes.BlockSize) - _, err := io.ReadFull(rand.Reader, buf) +func (k *Key) newIV(buf []byte) error { + _, err := io.ReadFull(rand.Reader, buf[:ivSize]) + buf = buf[:ivSize] if err != nil { - return nil, err + return err } - return buf, nil + return nil } -// Encrypt encrypts and signs data. Returned is IV || Ciphertext || HMAC. For -// the hash function, SHA256 is used, so the overhead is 16+32=48 byte. -func (k *Key) encrypt(ks *keys, plaintext []byte) ([]byte, error) { - iv, err := k.newIV() +// Encrypt encrypts and signs data. Stored in ciphertext is IV || Ciphertext || +// HMAC. Encrypt returns the ciphertext's length. For the hash function, SHA256 +// is used, so the overhead is 16+32=48 byte. +func (k *Key) encrypt(ks *keys, ciphertext, plaintext []byte) (int, error) { + if cap(ciphertext) < MaxCiphertextSize { + panic("encryption buffer is too small") + } + + if len(plaintext) > maxDataSize { + panic("plaintext is too large") + } + + _, err := io.ReadFull(rand.Reader, ciphertext[:ivSize]) if err != nil { panic(fmt.Sprintf("unable to generate new random iv: %v", err)) } @@ -252,11 +272,9 @@ func (k *Key) encrypt(ks *keys, plaintext []byte) ([]byte, error) { panic(fmt.Sprintf("unable to create cipher: %v", err)) } - e := cipher.NewCTR(c, iv) - l := len(iv) - ciphertext := make([]byte, l+len(plaintext)) - copy(ciphertext[:l], iv) - e.XORKeyStream(ciphertext[l:], plaintext) + e := cipher.NewCTR(c, ciphertext[:ivSize]) + e.XORKeyStream(ciphertext[ivSize:cap(ciphertext)], plaintext) + ciphertext = ciphertext[:ivSize+len(plaintext)] hm := hmac.New(sha256.New, ks.Sign) @@ -265,21 +283,23 @@ func (k *Key) encrypt(ks *keys, plaintext []byte) ([]byte, error) { panic(fmt.Sprintf("unable to calculate hmac of ciphertext: %v", err)) } - return hm.Sum(ciphertext), nil + ciphertext = hm.Sum(ciphertext) + + return len(ciphertext), nil } -// EncryptUser encrypts and signs data with the user key. Returned is IV || -// Ciphertext || HMAC. For the hash function, SHA256 is used, so the overhead -// is 16+32=48 byte. -func (k *Key) EncryptUser(plaintext []byte) ([]byte, error) { - return k.encrypt(k.user, plaintext) +// EncryptUser encrypts and signs data with the user key. Stored in ciphertext +// is IV || Ciphertext || HMAC. Returns the ciphertext length. For the hash +// function, SHA256 is used, so the overhead is 16+32=48 byte. +func (k *Key) EncryptUser(ciphertext, plaintext []byte) (int, error) { + return k.encrypt(k.user, ciphertext, plaintext) } -// Encrypt encrypts and signs data with the master key. Returned is IV || -// Ciphertext || HMAC. For the hash function, SHA256 is used, so the overhead -// is 16+32=48 byte. -func (k *Key) Encrypt(plaintext []byte) ([]byte, error) { - return k.encrypt(k.master, plaintext) +// Encrypt encrypts and signs data with the master key. Stored in ciphertext is +// IV || Ciphertext || HMAC. Returns the ciphertext length. For the hash +// function, SHA256 is used, so the overhead is 16+32=48 byte. +func (k *Key) Encrypt(ciphertext, plaintext []byte) (int, error) { + return k.encrypt(k.master, ciphertext, plaintext) } // Decrypt verifes and decrypts the ciphertext. Ciphertext must be in the form diff --git a/key_int_test.go b/key_int_test.go index 232e68590..a6249bfdf 100644 --- a/key_int_test.go +++ b/key_int_test.go @@ -48,10 +48,12 @@ func TestCrypto(t *testing.T) { Sign: tv.skey, } - msg, err := r.encrypt(r.master, tv.plaintext) + msg := make([]byte, MaxCiphertextSize) + n, err := r.encrypt(r.master, msg, tv.plaintext) if err != nil { t.Fatal(err) } + msg = msg[:n] // decrypt message _, err = r.decrypt(r.master, msg) diff --git a/key_test.go b/key_test.go index 391a6933f..6bf4a284d 100644 --- a/key_test.go +++ b/key_test.go @@ -51,7 +51,7 @@ func TestEncryptDecrypt(t *testing.T) { defer teardownBackend(t, be) k := setupKey(t, be, testPassword) - for _, size := range []int{5, 23, 1 << 20, 10<<20 + 123} { + for _, size := range []int{5, 23, 1 << 20, 7<<20 + 123} { data := make([]byte, size) f, err := os.Open("/dev/urandom") ok(t, err) @@ -59,18 +59,21 @@ func TestEncryptDecrypt(t *testing.T) { _, err = io.ReadFull(f, data) ok(t, err) - ciphertext, err := k.Encrypt(data) + ciphertext := khepri.GetChunkBuf("TestEncryptDecrypt") + n, err := k.Encrypt(ciphertext, data) ok(t, err) - plaintext, err := k.Decrypt(ciphertext) + plaintext, err := k.Decrypt(ciphertext[:n]) ok(t, err) + khepri.FreeChunkBuf("TestEncryptDecrypt", ciphertext) + equals(t, plaintext, data) } } func BenchmarkEncrypt(b *testing.B) { - size := 16 << 20 // 16MiB + size := 8 << 20 // 8MiB data := make([]byte, size) be := setupBackend(b) @@ -80,28 +83,32 @@ func BenchmarkEncrypt(b *testing.B) { b.ResetTimer() b.SetBytes(int64(size)) + buf := khepri.GetChunkBuf("BenchmarkEncrypt") for i := 0; i < b.N; i++ { - _, err := k.Encrypt(data) + _, err := k.Encrypt(buf, data) ok(b, err) } + khepri.FreeChunkBuf("BenchmarkEncrypt", buf) } func BenchmarkDecrypt(b *testing.B) { - size := 16 << 20 // 16MiB + size := 8 << 20 // 8MiB data := make([]byte, size) be := setupBackend(b) defer teardownBackend(b, be) k := setupKey(b, be, testPassword) - ciphertext, err := k.Encrypt(data) + ciphertext := khepri.GetChunkBuf("BenchmarkDecrypt") + n, err := k.Encrypt(ciphertext, data) ok(b, err) b.ResetTimer() b.SetBytes(int64(size)) for i := 0; i < b.N; i++ { - _, err := k.Decrypt(ciphertext) + _, err := k.Decrypt(ciphertext[:n]) ok(b, err) } + khepri.FreeChunkBuf("BenchmarkDecrypt", ciphertext) } diff --git a/pools.go b/pools.go new file mode 100644 index 000000000..a346c1e32 --- /dev/null +++ b/pools.go @@ -0,0 +1,103 @@ +package khepri + +import "sync" + +var ( + chunkPool = sync.Pool{New: newChunkBuf} + nodePool = sync.Pool{New: newNode} +) + +type alloc_stats struct { + m sync.Mutex + alloc_map map[string]int + free_map map[string]int + alloc int + free int + new int + all int + max int +} + +var ( + chunk_stats alloc_stats + node_stats alloc_stats +) + +func init() { + chunk_stats.alloc_map = make(map[string]int) + chunk_stats.free_map = make(map[string]int) +} + +func newChunkBuf() interface{} { + chunk_stats.m.Lock() + chunk_stats.new += 1 + chunk_stats.m.Unlock() + + // create buffer for iv, data and hmac + return make([]byte, MaxCiphertextSize) +} + +func newNode() interface{} { + node_stats.m.Lock() + node_stats.new += 1 + node_stats.m.Unlock() + + // create buffer for iv, data and hmac + return new(Node) +} + +func GetChunkBuf(s string) []byte { + chunk_stats.m.Lock() + if _, ok := chunk_stats.alloc_map[s]; !ok { + chunk_stats.alloc_map[s] = 0 + } + chunk_stats.alloc_map[s] += 1 + chunk_stats.all += 1 + if chunk_stats.all > chunk_stats.max { + chunk_stats.max = chunk_stats.all + } + chunk_stats.m.Unlock() + + return chunkPool.Get().([]byte) +} + +func FreeChunkBuf(s string, buf []byte) { + chunk_stats.m.Lock() + if _, ok := chunk_stats.free_map[s]; !ok { + chunk_stats.free_map[s] = 0 + } + chunk_stats.free_map[s] += 1 + chunk_stats.all -= 1 + chunk_stats.m.Unlock() + + chunkPool.Put(buf) +} + +func GetNode() *Node { + node_stats.m.Lock() + node_stats.alloc += 1 + node_stats.all += 1 + if node_stats.all > node_stats.max { + node_stats.max = node_stats.all + } + node_stats.m.Unlock() + return nodePool.Get().(*Node) +} + +func FreeNode(n *Node) { + node_stats.m.Lock() + node_stats.all -= 1 + node_stats.free += 1 + node_stats.m.Unlock() + nodePool.Put(n) +} + +func PoolAlloc() { + // fmt.Fprintf(os.Stderr, "alloc max: %d, new: %d\n", chunk_stats.max, chunk_stats.new) + // for k, v := range chunk_stats.alloc_map { + // fmt.Fprintf(os.Stderr, "alloc[%s] %d, free %d diff: %d\n", k, v, chunk_stats.free_map[k], v-chunk_stats.free_map[k]) + // } + + // fmt.Fprintf(os.Stderr, "nodes alloc max: %d, new: %d\n", node_stats.max, node_stats.new) + // fmt.Fprintf(os.Stderr, "alloc %d, free %d diff: %d\n", node_stats.alloc, node_stats.free, node_stats.alloc-node_stats.free) +} diff --git a/tree.go b/tree.go index 0aa41a44d..1635db19a 100644 --- a/tree.go +++ b/tree.go @@ -115,12 +115,11 @@ func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) { } func NodeFromFileInfo(path string, fi os.FileInfo) (*Node, error) { - node := &Node{ - path: path, - Name: fi.Name(), - Mode: fi.Mode() & os.ModePerm, - ModTime: fi.ModTime(), - } + node := GetNode() + node.path = path + node.Name = fi.Name() + node.Mode = fi.Mode() & os.ModePerm + node.ModTime = fi.ModTime() switch fi.Mode() & (os.ModeType | os.ModeCharDevice) { case 0: @@ -265,3 +264,13 @@ func (node *Node) CreateAt(ch *ContentHandler, path string) error { return nil } + +func (b Blob) Free() { + if b.ID != nil { + b.ID.Free() + } + + if b.Storage != nil { + b.Storage.Free() + } +}