// Copyright 2014 The lldb Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Two Phase Commit & Structural ACID package lldb import ( "bufio" "encoding/binary" "fmt" "io" "os" "github.com/cznic/fileutil" "github.com/cznic/mathutil" ) var _ Filer = &ACIDFiler0{} // Ensure ACIDFiler0 is a Filer type acidWrite struct { b []byte off int64 } type acidWriter0 ACIDFiler0 func (a *acidWriter0) WriteAt(b []byte, off int64) (n int, err error) { f := (*ACIDFiler0)(a) if f.newEpoch { f.newEpoch = false f.data = f.data[:0] if err = a.writePacket([]interface{}{wpt00Header, walTypeACIDFiler0, ""}); err != nil { return } } if err = a.writePacket([]interface{}{wpt00WriteData, b, off}); err != nil { return } f.data = append(f.data, acidWrite{b, off}) return len(b), nil } func (a *acidWriter0) writePacket(items []interface{}) (err error) { f := (*ACIDFiler0)(a) b, err := EncodeScalars(items...) if err != nil { return } var b4 [4]byte binary.BigEndian.PutUint32(b4[:], uint32(len(b))) if _, err = f.bwal.Write(b4[:]); err != nil { return } if _, err = f.bwal.Write(b); err != nil { return } if m := (4 + len(b)) % 16; m != 0 { var pad [15]byte _, err = f.bwal.Write(pad[:16-m]) } return } // WAL Packet Tags const ( wpt00Header = iota wpt00WriteData wpt00Checkpoint ) const ( walTypeACIDFiler0 = iota ) // ACIDFiler0 is a very simple, synchronous implementation of 2PC. It uses a // single write ahead log file to provide the structural atomicity // (BeginUpdate/EndUpdate/Rollback) and durability (DB can be recovered from // WAL if a crash occurred). // // ACIDFiler0 is a Filer. // // NOTE: Durable synchronous 2PC involves three fsyncs in this implementation // (WAL, DB, zero truncated WAL). Where possible, it's recommended to collect // transactions for, say one second before performing the two phase commit as // the typical performance for rotational hard disks is about few tens of // fsyncs per second atmost. For an example of such collective transaction // approach please see the colecting FSM STT in Dbm's documentation[1]. // // [1]: http://godoc.org/github.com/cznic/exp/dbm type ACIDFiler0 struct { *RollbackFiler bwal *bufio.Writer data []acidWrite newEpoch bool peakBitFilerPages int // track maximum transaction memory peakWal int64 // tracks WAL maximum used size testHook bool // keeps WAL untruncated (once) wal *os.File } // NewACIDFiler0 returns a newly created ACIDFiler0 with WAL in wal. // // If the WAL is zero sized then a previous clean shutdown of db is taken for // granted and no recovery procedure is taken. // // If the WAL is of non zero size then it is checked for having a // commited/fully finished transaction not yet been reflected in db. If such // transaction exists it's committed to db. If the recovery process finishes // successfully, the WAL is truncated to zero size and fsync'ed prior to return // from NewACIDFiler0. func NewACIDFiler(db Filer, wal *os.File) (r *ACIDFiler0, err error) { fi, err := wal.Stat() if err != nil { return } r = &ACIDFiler0{wal: wal} if fi.Size() != 0 { if err = r.recoverDb(db); err != nil { return } } r.bwal = bufio.NewWriter(r.wal) r.newEpoch = true acidWriter := (*acidWriter0)(r) if r.RollbackFiler, err = NewRollbackFiler( db, func(sz int64) (err error) { // Checkpoint if err = acidWriter.writePacket([]interface{}{wpt00Checkpoint, sz}); err != nil { return } if err = r.bwal.Flush(); err != nil { return } if err = r.wal.Sync(); err != nil { return } wfi, err := r.wal.Stat() if err == nil { r.peakWal = mathutil.MaxInt64(wfi.Size(), r.peakWal) } // Phase 1 commit complete for _, v := range r.data { if _, err := db.WriteAt(v.b, v.off); err != nil { return err } } if err = db.Truncate(sz); err != nil { return } if err = db.Sync(); err != nil { return } // Phase 2 commit complete if !r.testHook { if err = r.wal.Truncate(0); err != nil { return } if _, err = r.wal.Seek(0, 0); err != nil { return } } r.testHook = false r.bwal.Reset(r.wal) r.newEpoch = true return r.wal.Sync() }, acidWriter, ); err != nil { return } return r, nil } // PeakWALSize reports the maximum size WAL has ever used. func (a ACIDFiler0) PeakWALSize() int64 { return a.peakWal } func (a *ACIDFiler0) readPacket(f *bufio.Reader) (items []interface{}, err error) { var b4 [4]byte n, err := io.ReadAtLeast(f, b4[:], 4) if n != 4 { return } ln := int(binary.BigEndian.Uint32(b4[:])) m := (4 + ln) % 16 padd := (16 - m) % 16 b := make([]byte, ln+padd) if n, err = io.ReadAtLeast(f, b, len(b)); n != len(b) { return } return DecodeScalars(b[:ln]) } func (a *ACIDFiler0) recoverDb(db Filer) (err error) { fi, err := a.wal.Stat() if err != nil { return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: err} } if sz := fi.Size(); sz%16 != 0 { return &ErrILSEQ{Type: ErrFileSize, Name: a.wal.Name(), Arg: sz} } f := bufio.NewReader(a.wal) items, err := a.readPacket(f) if err != nil { return } if len(items) != 3 || items[0] != int64(wpt00Header) || items[1] != int64(walTypeACIDFiler0) { return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("invalid packet items %#v", items)} } tr := NewBTree(nil) for { items, err = a.readPacket(f) if err != nil { return } if len(items) < 2 { return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("too few packet items %#v", items)} } switch items[0] { case int64(wpt00WriteData): if len(items) != 3 { return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("invalid data packet items %#v", items)} } b, off := items[1].([]byte), items[2].(int64) var key [8]byte binary.BigEndian.PutUint64(key[:], uint64(off)) if err = tr.Set(key[:], b); err != nil { return } case int64(wpt00Checkpoint): var b1 [1]byte if n, err := f.Read(b1[:]); n != 0 || err == nil { return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("checkpoint n %d, err %v", n, err)} } if len(items) != 2 { return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("checkpoint packet invalid items %#v", items)} } sz := items[1].(int64) enum, err := tr.seekFirst() if err != nil { return err } for { k, v, err := enum.current() if err != nil { if fileutil.IsEOF(err) { break } return err } if _, err = db.WriteAt(v, int64(binary.BigEndian.Uint64(k))); err != nil { return err } if err = enum.next(); err != nil { if fileutil.IsEOF(err) { break } return err } } if err = db.Truncate(sz); err != nil { return err } if err = db.Sync(); err != nil { return err } // Recovery complete if err = a.wal.Truncate(0); err != nil { return err } return a.wal.Sync() default: return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("packet tag %v", items[0])} } } }