253 lines
5.1 KiB
Go
Raw Normal View History

2014-07-06 14:46:48 +02:00
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
"fmt"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/storage"
)
2014-07-23 08:31:36 +02:00
// Logging.
2014-07-06 14:46:48 +02:00
type dropper struct {
s *session
fd storage.FileDesc
2014-07-06 14:46:48 +02:00
}
func (d dropper) Drop(err error) {
2014-11-18 16:24:42 +04:00
if e, ok := err.(*journal.ErrCorrupted); ok {
d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
2014-07-06 14:46:48 +02:00
} else {
d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
2014-07-06 14:46:48 +02:00
}
}
2014-07-23 08:31:36 +02:00
func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) }
func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
2014-07-06 14:46:48 +02:00
2014-07-23 08:31:36 +02:00
// File utils.
2014-07-06 14:46:48 +02:00
func (s *session) newTemp() storage.FileDesc {
num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
return storage.FileDesc{storage.TypeTemp, num}
2014-11-18 16:24:42 +04:00
}
2014-07-23 08:31:36 +02:00
// Session state.
2014-07-06 14:46:48 +02:00
2014-11-18 16:24:42 +04:00
// Get current version. This will incr version ref, must call
// version.release (exactly once) after use.
2014-07-06 14:46:48 +02:00
func (s *session) version() *version {
s.vmu.Lock()
defer s.vmu.Unlock()
s.stVersion.ref++
return s.stVersion
}
// Set current version to v.
func (s *session) setVersion(v *version) {
s.vmu.Lock()
2014-11-18 16:24:42 +04:00
v.ref = 1 // Holds by session.
2014-07-06 14:46:48 +02:00
if old := s.stVersion; old != nil {
2014-11-18 16:24:42 +04:00
v.ref++ // Holds by old version.
2014-07-06 14:46:48 +02:00
old.next = v
2014-11-18 16:24:42 +04:00
old.releaseNB()
2014-07-06 14:46:48 +02:00
}
s.stVersion = v
s.vmu.Unlock()
}
// Get current unused file number.
func (s *session) nextFileNum() int64 {
return atomic.LoadInt64(&s.stNextFileNum)
2014-07-06 14:46:48 +02:00
}
2014-11-18 16:24:42 +04:00
// Set current unused file number to num.
func (s *session) setNextFileNum(num int64) {
atomic.StoreInt64(&s.stNextFileNum, num)
2014-07-06 14:46:48 +02:00
}
// Mark file number as used.
func (s *session) markFileNum(num int64) {
2014-11-18 16:24:42 +04:00
nextFileNum := num + 1
2014-07-06 14:46:48 +02:00
for {
2014-11-18 16:24:42 +04:00
old, x := s.stNextFileNum, nextFileNum
2014-07-06 14:46:48 +02:00
if old > x {
x = old
}
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
2014-07-06 14:46:48 +02:00
break
}
}
}
// Allocate a file number.
func (s *session) allocFileNum() int64 {
return atomic.AddInt64(&s.stNextFileNum, 1) - 1
2014-07-06 14:46:48 +02:00
}
// Reuse given file number.
func (s *session) reuseFileNum(num int64) {
2014-07-06 14:46:48 +02:00
for {
2014-11-18 16:24:42 +04:00
old, x := s.stNextFileNum, num
2014-07-06 14:46:48 +02:00
if old != x+1 {
x = old
}
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
2014-07-06 14:46:48 +02:00
break
}
}
}
// Set compaction ptr at given level; need external synchronization.
func (s *session) setCompPtr(level int, ik internalKey) {
if level >= len(s.stCompPtrs) {
newCompPtrs := make([]internalKey, level+1)
copy(newCompPtrs, s.stCompPtrs)
s.stCompPtrs = newCompPtrs
}
s.stCompPtrs[level] = append(internalKey{}, ik...)
}
// Get compaction ptr at given level; need external synchronization.
func (s *session) getCompPtr(level int) internalKey {
if level >= len(s.stCompPtrs) {
return nil
}
return s.stCompPtrs[level]
}
2014-07-23 08:31:36 +02:00
// Manifest related utils.
2014-07-06 14:46:48 +02:00
// Fill given session record obj with current states; need external
// synchronization.
func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
2014-11-18 16:24:42 +04:00
r.setNextFileNum(s.nextFileNum())
2014-07-06 14:46:48 +02:00
if snapshot {
if !r.has(recJournalNum) {
r.setJournalNum(s.stJournalNum)
}
2014-11-18 16:24:42 +04:00
if !r.has(recSeqNum) {
r.setSeqNum(s.stSeqNum)
2014-07-06 14:46:48 +02:00
}
2014-11-18 16:24:42 +04:00
for level, ik := range s.stCompPtrs {
2014-07-06 14:46:48 +02:00
if ik != nil {
2014-11-18 16:24:42 +04:00
r.addCompPtr(level, ik)
2014-07-06 14:46:48 +02:00
}
}
2014-07-06 23:13:10 +02:00
r.setComparer(s.icmp.uName())
2014-07-06 14:46:48 +02:00
}
}
2014-10-06 22:07:33 +02:00
// Mark if record has been committed, this will update session state;
2014-07-06 14:46:48 +02:00
// need external synchronization.
func (s *session) recordCommited(rec *sessionRecord) {
if rec.has(recJournalNum) {
s.stJournalNum = rec.journalNum
2014-07-06 14:46:48 +02:00
}
if rec.has(recPrevJournalNum) {
s.stPrevJournalNum = rec.prevJournalNum
2014-07-06 14:46:48 +02:00
}
if rec.has(recSeqNum) {
s.stSeqNum = rec.seqNum
2014-07-06 14:46:48 +02:00
}
for _, r := range rec.compPtrs {
s.setCompPtr(r.level, internalKey(r.ikey))
2014-07-06 14:46:48 +02:00
}
}
// Create a new manifest file; need external synchronization.
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()}
writer, err := s.stor.Create(fd)
2014-07-06 14:46:48 +02:00
if err != nil {
return
}
jw := journal.NewWriter(writer)
if v == nil {
2014-11-18 16:24:42 +04:00
v = s.version()
defer v.release()
2014-07-06 14:46:48 +02:00
}
if rec == nil {
2015-06-15 21:10:18 +02:00
rec = &sessionRecord{}
2014-07-06 14:46:48 +02:00
}
s.fillRecord(rec, true)
v.fillRecord(rec)
defer func() {
if err == nil {
s.recordCommited(rec)
if s.manifest != nil {
s.manifest.Close()
}
if s.manifestWriter != nil {
s.manifestWriter.Close()
}
if !s.manifestFd.Nil() {
s.stor.Remove(s.manifestFd)
2014-07-06 14:46:48 +02:00
}
s.manifestFd = fd
2014-07-06 14:46:48 +02:00
s.manifestWriter = writer
s.manifest = jw
} else {
writer.Close()
s.stor.Remove(fd)
s.reuseFileNum(fd.Num)
2014-07-06 14:46:48 +02:00
}
}()
w, err := jw.Next()
if err != nil {
return
}
err = rec.encode(w)
if err != nil {
return
}
err = jw.Flush()
if err != nil {
return
}
err = s.stor.SetMeta(fd)
2014-07-06 14:46:48 +02:00
return
}
// Flush record to disk.
func (s *session) flushManifest(rec *sessionRecord) (err error) {
s.fillRecord(rec, false)
w, err := s.manifest.Next()
if err != nil {
return
}
err = rec.encode(w)
if err != nil {
return
}
err = s.manifest.Flush()
if err != nil {
return
}
2015-08-18 08:56:07 +02:00
if !s.o.GetNoSync() {
err = s.manifestWriter.Sync()
if err != nil {
return
}
2014-07-06 14:46:48 +02:00
}
s.recordCommited(rec)
return
}