| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package leveldb |
| |
| import ( |
| "fmt" |
| "sort" |
| "sync/atomic" |
| |
| "github.com/syndtr/goleveldb/leveldb/cache" |
| "github.com/syndtr/goleveldb/leveldb/iterator" |
| "github.com/syndtr/goleveldb/leveldb/opt" |
| "github.com/syndtr/goleveldb/leveldb/storage" |
| "github.com/syndtr/goleveldb/leveldb/table" |
| "github.com/syndtr/goleveldb/leveldb/util" |
| ) |
| |
| // tFile holds basic information about a table. |
| type tFile struct { |
| fd storage.FileDesc |
| seekLeft int32 |
| size int64 |
| imin, imax internalKey |
| } |
| |
| // Returns true if given key is after largest key of this table. |
| func (t *tFile) after(icmp *iComparer, ukey []byte) bool { |
| return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0 |
| } |
| |
| // Returns true if given key is before smallest key of this table. |
| func (t *tFile) before(icmp *iComparer, ukey []byte) bool { |
| return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0 |
| } |
| |
| // Returns true if given key range overlaps with this table key range. |
| func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool { |
| return !t.after(icmp, umin) && !t.before(icmp, umax) |
| } |
| |
| // Cosumes one seek and return current seeks left. |
| func (t *tFile) consumeSeek() int32 { |
| return atomic.AddInt32(&t.seekLeft, -1) |
| } |
| |
| // Creates new tFile. |
| func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile { |
| f := &tFile{ |
| fd: fd, |
| size: size, |
| imin: imin, |
| imax: imax, |
| } |
| |
| // We arrange to automatically compact this file after |
| // a certain number of seeks. Let's assume: |
| // (1) One seek costs 10ms |
| // (2) Writing or reading 1MB costs 10ms (100MB/s) |
| // (3) A compaction of 1MB does 25MB of IO: |
| // 1MB read from this level |
| // 10-12MB read from next level (boundaries may be misaligned) |
| // 10-12MB written to next level |
| // This implies that 25 seeks cost the same as the compaction |
| // of 1MB of data. I.e., one seek costs approximately the |
| // same as the compaction of 40KB of data. We are a little |
| // conservative and allow approximately one seek for every 16KB |
| // of data before triggering a compaction. |
| f.seekLeft = int32(size / 16384) |
| if f.seekLeft < 100 { |
| f.seekLeft = 100 |
| } |
| |
| return f |
| } |
| |
| func tableFileFromRecord(r atRecord) *tFile { |
| return newTableFile(storage.FileDesc{storage.TypeTable, r.num}, r.size, r.imin, r.imax) |
| } |
| |
| // tFiles hold multiple tFile. |
| type tFiles []*tFile |
| |
| func (tf tFiles) Len() int { return len(tf) } |
| func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] } |
| |
| func (tf tFiles) nums() string { |
| x := "[ " |
| for i, f := range tf { |
| if i != 0 { |
| x += ", " |
| } |
| x += fmt.Sprint(f.fd.Num) |
| } |
| x += " ]" |
| return x |
| } |
| |
| // Returns true if i smallest key is less than j. |
| // This used for sort by key in ascending order. |
| func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool { |
| a, b := tf[i], tf[j] |
| n := icmp.Compare(a.imin, b.imin) |
| if n == 0 { |
| return a.fd.Num < b.fd.Num |
| } |
| return n < 0 |
| } |
| |
| // Returns true if i file number is greater than j. |
| // This used for sort by file number in descending order. |
| func (tf tFiles) lessByNum(i, j int) bool { |
| return tf[i].fd.Num > tf[j].fd.Num |
| } |
| |
| // Sorts tables by key in ascending order. |
| func (tf tFiles) sortByKey(icmp *iComparer) { |
| sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp}) |
| } |
| |
| // Sorts tables by file number in descending order. |
| func (tf tFiles) sortByNum() { |
| sort.Sort(&tFilesSortByNum{tFiles: tf}) |
| } |
| |
| // Returns sum of all tables size. |
| func (tf tFiles) size() (sum int64) { |
| for _, t := range tf { |
| sum += t.size |
| } |
| return sum |
| } |
| |
| // Searches smallest index of tables whose its smallest |
| // key is after or equal with given key. |
| func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int { |
| return sort.Search(len(tf), func(i int) bool { |
| return icmp.Compare(tf[i].imin, ikey) >= 0 |
| }) |
| } |
| |
| // Searches smallest index of tables whose its largest |
| // key is after or equal with given key. |
| func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int { |
| return sort.Search(len(tf), func(i int) bool { |
| return icmp.Compare(tf[i].imax, ikey) >= 0 |
| }) |
| } |
| |
| // Returns true if given key range overlaps with one or more |
| // tables key range. If unsorted is true then binary search will not be used. |
| func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { |
| if unsorted { |
| // Check against all files. |
| for _, t := range tf { |
| if t.overlaps(icmp, umin, umax) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| i := 0 |
| if len(umin) > 0 { |
| // Find the earliest possible internal key for min. |
| i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek)) |
| } |
| if i >= len(tf) { |
| // Beginning of range is after all files, so no overlap. |
| return false |
| } |
| return !tf[i].before(icmp, umax) |
| } |
| |
| // Returns tables whose its key range overlaps with given key range. |
| // Range will be expanded if ukey found hop across tables. |
| // If overlapped is true then the search will be restarted if umax |
| // expanded. |
| // The dst content will be overwritten. |
| func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { |
| dst = dst[:0] |
| for i := 0; i < len(tf); { |
| t := tf[i] |
| if t.overlaps(icmp, umin, umax) { |
| if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 { |
| umin = t.imin.ukey() |
| dst = dst[:0] |
| i = 0 |
| continue |
| } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 { |
| umax = t.imax.ukey() |
| // Restart search if it is overlapped. |
| if overlapped { |
| dst = dst[:0] |
| i = 0 |
| continue |
| } |
| } |
| |
| dst = append(dst, t) |
| } |
| i++ |
| } |
| |
| return dst |
| } |
| |
| // Returns tables key range. |
| func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) { |
| for i, t := range tf { |
| if i == 0 { |
| imin, imax = t.imin, t.imax |
| continue |
| } |
| if icmp.Compare(t.imin, imin) < 0 { |
| imin = t.imin |
| } |
| if icmp.Compare(t.imax, imax) > 0 { |
| imax = t.imax |
| } |
| } |
| |
| return |
| } |
| |
| // Creates iterator index from tables. |
| func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer { |
| if slice != nil { |
| var start, limit int |
| if slice.Start != nil { |
| start = tf.searchMax(icmp, internalKey(slice.Start)) |
| } |
| if slice.Limit != nil { |
| limit = tf.searchMin(icmp, internalKey(slice.Limit)) |
| } else { |
| limit = tf.Len() |
| } |
| tf = tf[start:limit] |
| } |
| return iterator.NewArrayIndexer(&tFilesArrayIndexer{ |
| tFiles: tf, |
| tops: tops, |
| icmp: icmp, |
| slice: slice, |
| ro: ro, |
| }) |
| } |
| |
| // Tables iterator index. |
| type tFilesArrayIndexer struct { |
| tFiles |
| tops *tOps |
| icmp *iComparer |
| slice *util.Range |
| ro *opt.ReadOptions |
| } |
| |
| func (a *tFilesArrayIndexer) Search(key []byte) int { |
| return a.searchMax(a.icmp, internalKey(key)) |
| } |
| |
| func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator { |
| if i == 0 || i == a.Len()-1 { |
| return a.tops.newIterator(a.tFiles[i], a.slice, a.ro) |
| } |
| return a.tops.newIterator(a.tFiles[i], nil, a.ro) |
| } |
| |
| // Helper type for sortByKey. |
| type tFilesSortByKey struct { |
| tFiles |
| icmp *iComparer |
| } |
| |
| func (x *tFilesSortByKey) Less(i, j int) bool { |
| return x.lessByKey(x.icmp, i, j) |
| } |
| |
| // Helper type for sortByNum. |
| type tFilesSortByNum struct { |
| tFiles |
| } |
| |
| func (x *tFilesSortByNum) Less(i, j int) bool { |
| return x.lessByNum(i, j) |
| } |
| |
| // Table operations. |
| type tOps struct { |
| s *session |
| noSync bool |
| cache *cache.Cache |
| bcache *cache.Cache |
| bpool *util.BufferPool |
| } |
| |
| // Creates an empty table and returns table writer. |
| func (t *tOps) create() (*tWriter, error) { |
| fd := storage.FileDesc{storage.TypeTable, t.s.allocFileNum()} |
| fw, err := t.s.stor.Create(fd) |
| if err != nil { |
| return nil, err |
| } |
| return &tWriter{ |
| t: t, |
| fd: fd, |
| w: fw, |
| tw: table.NewWriter(fw, t.s.o.Options), |
| }, nil |
| } |
| |
| // Builds table from src iterator. |
| func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { |
| w, err := t.create() |
| if err != nil { |
| return |
| } |
| |
| defer func() { |
| if err != nil { |
| w.drop() |
| } |
| }() |
| |
| for src.Next() { |
| err = w.append(src.Key(), src.Value()) |
| if err != nil { |
| return |
| } |
| } |
| err = src.Error() |
| if err != nil { |
| return |
| } |
| |
| n = w.tw.EntriesLen() |
| f, err = w.finish() |
| return |
| } |
| |
| // Opens table. It returns a cache handle, which should |
| // be released after use. |
| func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) { |
| ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) { |
| var r storage.Reader |
| r, err = t.s.stor.Open(f.fd) |
| if err != nil { |
| return 0, nil |
| } |
| |
| var bcache *cache.NamespaceGetter |
| if t.bcache != nil { |
| bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)} |
| } |
| |
| var tr *table.Reader |
| tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options) |
| if err != nil { |
| r.Close() |
| return 0, nil |
| } |
| return 1, tr |
| |
| }) |
| if ch == nil && err == nil { |
| err = ErrClosed |
| } |
| return |
| } |
| |
| // Finds key/value pair whose key is greater than or equal to the |
| // given key. |
| func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) { |
| ch, err := t.open(f) |
| if err != nil { |
| return nil, nil, err |
| } |
| defer ch.Release() |
| return ch.Value().(*table.Reader).Find(key, true, ro) |
| } |
| |
| // Finds key that is greater than or equal to the given key. |
| func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) { |
| ch, err := t.open(f) |
| if err != nil { |
| return nil, err |
| } |
| defer ch.Release() |
| return ch.Value().(*table.Reader).FindKey(key, true, ro) |
| } |
| |
| // Returns approximate offset of the given key. |
| func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) { |
| ch, err := t.open(f) |
| if err != nil { |
| return |
| } |
| defer ch.Release() |
| return ch.Value().(*table.Reader).OffsetOf(key) |
| } |
| |
| // Creates an iterator from the given table. |
| func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { |
| ch, err := t.open(f) |
| if err != nil { |
| return iterator.NewEmptyIterator(err) |
| } |
| iter := ch.Value().(*table.Reader).NewIterator(slice, ro) |
| iter.SetReleaser(ch) |
| return iter |
| } |
| |
| // Removes table from persistent storage. It waits until |
| // no one use the the table. |
| func (t *tOps) remove(f *tFile) { |
| t.cache.Delete(0, uint64(f.fd.Num), func() { |
| if err := t.s.stor.Remove(f.fd); err != nil { |
| t.s.logf("table@remove removing @%d %q", f.fd.Num, err) |
| } else { |
| t.s.logf("table@remove removed @%d", f.fd.Num) |
| } |
| if t.bcache != nil { |
| t.bcache.EvictNS(uint64(f.fd.Num)) |
| } |
| }) |
| } |
| |
| // Closes the table ops instance. It will close all tables, |
| // regadless still used or not. |
| func (t *tOps) close() { |
| t.bpool.Close() |
| t.cache.Close() |
| if t.bcache != nil { |
| t.bcache.CloseWeak() |
| } |
| } |
| |
| // Creates new initialized table ops instance. |
| func newTableOps(s *session) *tOps { |
| var ( |
| cacher cache.Cacher |
| bcache *cache.Cache |
| bpool *util.BufferPool |
| ) |
| if s.o.GetOpenFilesCacheCapacity() > 0 { |
| cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity()) |
| } |
| if !s.o.GetDisableBlockCache() { |
| var bcacher cache.Cacher |
| if s.o.GetBlockCacheCapacity() > 0 { |
| bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity()) |
| } |
| bcache = cache.NewCache(bcacher) |
| } |
| if !s.o.GetDisableBufferPool() { |
| bpool = util.NewBufferPool(s.o.GetBlockSize() + 5) |
| } |
| return &tOps{ |
| s: s, |
| noSync: s.o.GetNoSync(), |
| cache: cache.NewCache(cacher), |
| bcache: bcache, |
| bpool: bpool, |
| } |
| } |
| |
| // tWriter wraps the table writer. It keep track of file descriptor |
| // and added key range. |
| type tWriter struct { |
| t *tOps |
| |
| fd storage.FileDesc |
| w storage.Writer |
| tw *table.Writer |
| |
| first, last []byte |
| } |
| |
| // Append key/value pair to the table. |
| func (w *tWriter) append(key, value []byte) error { |
| if w.first == nil { |
| w.first = append([]byte{}, key...) |
| } |
| w.last = append(w.last[:0], key...) |
| return w.tw.Append(key, value) |
| } |
| |
| // Returns true if the table is empty. |
| func (w *tWriter) empty() bool { |
| return w.first == nil |
| } |
| |
| // Closes the storage.Writer. |
| func (w *tWriter) close() { |
| if w.w != nil { |
| w.w.Close() |
| w.w = nil |
| } |
| } |
| |
| // Finalizes the table and returns table file. |
| func (w *tWriter) finish() (f *tFile, err error) { |
| defer w.close() |
| err = w.tw.Close() |
| if err != nil { |
| return |
| } |
| if !w.t.noSync { |
| err = w.w.Sync() |
| if err != nil { |
| return |
| } |
| } |
| f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last)) |
| return |
| } |
| |
| // Drops the table. |
| func (w *tWriter) drop() { |
| w.close() |
| w.t.s.stor.Remove(w.fd) |
| w.t.s.reuseFileNum(w.fd.Num) |
| w.tw = nil |
| w.first = nil |
| w.last = nil |
| } |