| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package leveldb |
| |
| import ( |
| "fmt" |
| "sync/atomic" |
| |
| "github.com/syndtr/goleveldb/leveldb/journal" |
| "github.com/syndtr/goleveldb/leveldb/storage" |
| ) |
| |
| // Logging. |
| |
| type dropper struct { |
| s *session |
| fd storage.FileDesc |
| } |
| |
| func (d dropper) Drop(err error) { |
| if e, ok := err.(*journal.ErrCorrupted); ok { |
| d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason) |
| } else { |
| d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err) |
| } |
| } |
| |
| func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) } |
| func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) } |
| |
| // File utils. |
| |
| func (s *session) newTemp() storage.FileDesc { |
| num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 |
| return storage.FileDesc{storage.TypeTemp, num} |
| } |
| |
| // Session state. |
| |
| // Get current version. This will incr version ref, must call |
| // version.release (exactly once) after use. |
| func (s *session) version() *version { |
| s.vmu.Lock() |
| defer s.vmu.Unlock() |
| s.stVersion.ref++ |
| return s.stVersion |
| } |
| |
| // Set current version to v. |
| func (s *session) setVersion(v *version) { |
| s.vmu.Lock() |
| v.ref = 1 // Holds by session. |
| if old := s.stVersion; old != nil { |
| v.ref++ // Holds by old version. |
| old.next = v |
| old.releaseNB() |
| } |
| s.stVersion = v |
| s.vmu.Unlock() |
| } |
| |
| // Get current unused file number. |
| func (s *session) nextFileNum() int64 { |
| return atomic.LoadInt64(&s.stNextFileNum) |
| } |
| |
| // Set current unused file number to num. |
| func (s *session) setNextFileNum(num int64) { |
| atomic.StoreInt64(&s.stNextFileNum, num) |
| } |
| |
| // Mark file number as used. |
| func (s *session) markFileNum(num int64) { |
| nextFileNum := num + 1 |
| for { |
| old, x := s.stNextFileNum, nextFileNum |
| if old > x { |
| x = old |
| } |
| if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { |
| break |
| } |
| } |
| } |
| |
| // Allocate a file number. |
| func (s *session) allocFileNum() int64 { |
| return atomic.AddInt64(&s.stNextFileNum, 1) - 1 |
| } |
| |
| // Reuse given file number. |
| func (s *session) reuseFileNum(num int64) { |
| for { |
| old, x := s.stNextFileNum, num |
| if old != x+1 { |
| x = old |
| } |
| if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { |
| break |
| } |
| } |
| } |
| |
| // Set compaction ptr at given level; need external synchronization. |
| func (s *session) setCompPtr(level int, ik internalKey) { |
| if level >= len(s.stCompPtrs) { |
| newCompPtrs := make([]internalKey, level+1) |
| copy(newCompPtrs, s.stCompPtrs) |
| s.stCompPtrs = newCompPtrs |
| } |
| s.stCompPtrs[level] = append(internalKey{}, ik...) |
| } |
| |
| // Get compaction ptr at given level; need external synchronization. |
| func (s *session) getCompPtr(level int) internalKey { |
| if level >= len(s.stCompPtrs) { |
| return nil |
| } |
| return s.stCompPtrs[level] |
| } |
| |
| // Manifest related utils. |
| |
| // Fill given session record obj with current states; need external |
| // synchronization. |
| func (s *session) fillRecord(r *sessionRecord, snapshot bool) { |
| r.setNextFileNum(s.nextFileNum()) |
| |
| if snapshot { |
| if !r.has(recJournalNum) { |
| r.setJournalNum(s.stJournalNum) |
| } |
| |
| if !r.has(recSeqNum) { |
| r.setSeqNum(s.stSeqNum) |
| } |
| |
| for level, ik := range s.stCompPtrs { |
| if ik != nil { |
| r.addCompPtr(level, ik) |
| } |
| } |
| |
| r.setComparer(s.icmp.uName()) |
| } |
| } |
| |
| // Mark if record has been committed, this will update session state; |
| // need external synchronization. |
| func (s *session) recordCommited(rec *sessionRecord) { |
| if rec.has(recJournalNum) { |
| s.stJournalNum = rec.journalNum |
| } |
| |
| if rec.has(recPrevJournalNum) { |
| s.stPrevJournalNum = rec.prevJournalNum |
| } |
| |
| if rec.has(recSeqNum) { |
| s.stSeqNum = rec.seqNum |
| } |
| |
| for _, r := range rec.compPtrs { |
| s.setCompPtr(r.level, internalKey(r.ikey)) |
| } |
| } |
| |
| // Create a new manifest file; need external synchronization. |
| func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { |
| fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()} |
| writer, err := s.stor.Create(fd) |
| if err != nil { |
| return |
| } |
| jw := journal.NewWriter(writer) |
| |
| if v == nil { |
| v = s.version() |
| defer v.release() |
| } |
| if rec == nil { |
| rec = &sessionRecord{} |
| } |
| s.fillRecord(rec, true) |
| v.fillRecord(rec) |
| |
| defer func() { |
| if err == nil { |
| s.recordCommited(rec) |
| if s.manifest != nil { |
| s.manifest.Close() |
| } |
| if s.manifestWriter != nil { |
| s.manifestWriter.Close() |
| } |
| if !s.manifestFd.Nil() { |
| s.stor.Remove(s.manifestFd) |
| } |
| s.manifestFd = fd |
| s.manifestWriter = writer |
| s.manifest = jw |
| } else { |
| writer.Close() |
| s.stor.Remove(fd) |
| s.reuseFileNum(fd.Num) |
| } |
| }() |
| |
| w, err := jw.Next() |
| if err != nil { |
| return |
| } |
| err = rec.encode(w) |
| if err != nil { |
| return |
| } |
| err = jw.Flush() |
| if err != nil { |
| return |
| } |
| err = s.stor.SetMeta(fd) |
| return |
| } |
| |
| // Flush record to disk. |
| func (s *session) flushManifest(rec *sessionRecord) (err error) { |
| s.fillRecord(rec, false) |
| w, err := s.manifest.Next() |
| if err != nil { |
| return |
| } |
| err = rec.encode(w) |
| if err != nil { |
| return |
| } |
| err = s.manifest.Flush() |
| if err != nil { |
| return |
| } |
| if !s.o.GetNoSync() { |
| err = s.manifestWriter.Sync() |
| if err != nil { |
| return |
| } |
| } |
| s.recordCommited(rec) |
| return |
| } |