blob: e53434cab73169a76e42125cfeb38ce7b9cd3d0c [file] [log] [blame]
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reservefs.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package storage
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
var (
errFileOpen = errors.New("leveldb/storage: file still open")
errReadOnly = errors.New("leveldb/storage: storage is read-only")
)
type fileLock interface {
release() error
}
type fileStorageLock struct {
fs *fileStorage
}
func (lock *fileStorageLock) Unlock() {
if lock.fs != nil {
lock.fs.mu.Lock()
defer lock.fs.mu.Unlock()
if lock.fs.slock == lock {
lock.fs.slock = nil
}
}
}
const logSizeThreshold = 1024 * 1024 // 1 MiB
// fileStorage is a file-system backed storage.
type fileStorage struct {
path string
readOnly bool
mu sync.Mutex
flock fileLock
slock *fileStorageLock
logw *os.File
logSize int64
buf []byte
// Opened file counter; if open < 0 means closed.
open int
day int
}
// OpenFile returns a new filesytem-backed storage implementation with the given
// path. This also acquire a file lock, so any subsequent attempt to open the
// same path will fail.
//
// The storage must be closed after use, by calling Close method.
func OpenFile(path string, readOnly bool) (Storage, error) {
if fi, err := os.Stat(path); err == nil {
if !fi.IsDir() {
return nil, fmt.Errorf("leveldb/storage: open %s: not a directory", path)
}
} else if os.IsNotExist(err) && !readOnly {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
} else {
return nil, err
}
flock, err := newFileLock(filepath.Join(path, "LOCK"), readOnly)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
flock.release()
}
}()
var (
logw *os.File
logSize int64
)
if !readOnly {
logw, err = os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
logSize, err = logw.Seek(0, os.SEEK_END)
if err != nil {
logw.Close()
return nil, err
}
}
fs := &fileStorage{
path: path,
readOnly: readOnly,
flock: flock,
logw: logw,
logSize: logSize,
}
runtime.SetFinalizer(fs, (*fileStorage).Close)
return fs, nil
}
func (fs *fileStorage) Lock() (Locker, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return nil, ErrClosed
}
if fs.readOnly {
return &fileStorageLock{}, nil
}
if fs.slock != nil {
return nil, ErrLocked
}
fs.slock = &fileStorageLock{fs: fs}
return fs.slock, nil
}
func itoa(buf []byte, i int, wid int) []byte {
u := uint(i)
if u == 0 && wid <= 1 {
return append(buf, '0')
}
// Assemble decimal in reverse order.
var b [32]byte
bp := len(b)
for ; u > 0 || wid > 0; u /= 10 {
bp--
wid--
b[bp] = byte(u%10) + '0'
}
return append(buf, b[bp:]...)
}
func (fs *fileStorage) printDay(t time.Time) {
if fs.day == t.Day() {
return
}
fs.day = t.Day()
fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n"))
}
func (fs *fileStorage) doLog(t time.Time, str string) {
if fs.logSize > logSizeThreshold {
// Rotate log file.
fs.logw.Close()
fs.logw = nil
fs.logSize = 0
rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old"))
}
if fs.logw == nil {
var err error
fs.logw, err = os.OpenFile(filepath.Join(fs.path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return
}
// Force printDay on new log file.
fs.day = 0
}
fs.printDay(t)
hour, min, sec := t.Clock()
msec := t.Nanosecond() / 1e3
// time
fs.buf = itoa(fs.buf[:0], hour, 2)
fs.buf = append(fs.buf, ':')
fs.buf = itoa(fs.buf, min, 2)
fs.buf = append(fs.buf, ':')
fs.buf = itoa(fs.buf, sec, 2)
fs.buf = append(fs.buf, '.')
fs.buf = itoa(fs.buf, msec, 6)
fs.buf = append(fs.buf, ' ')
// write
fs.buf = append(fs.buf, []byte(str)...)
fs.buf = append(fs.buf, '\n')
fs.logw.Write(fs.buf)
}
func (fs *fileStorage) Log(str string) {
if !fs.readOnly {
t := time.Now()
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return
}
fs.doLog(t, str)
}
}
func (fs *fileStorage) log(str string) {
if !fs.readOnly {
fs.doLog(time.Now(), str)
}
}
func (fs *fileStorage) SetMeta(fd FileDesc) (err error) {
if !FileDescOk(fd) {
return ErrInvalidFile
}
if fs.readOnly {
return errReadOnly
}
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return ErrClosed
}
defer func() {
if err != nil {
fs.log(fmt.Sprintf("CURRENT: %v", err))
}
}()
path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return
}
_, err = fmt.Fprintln(w, fsGenName(fd))
// Close the file first.
if cerr := w.Close(); cerr != nil {
fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, cerr))
}
if err != nil {
return
}
return rename(path, filepath.Join(fs.path, "CURRENT"))
}
func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return FileDesc{}, ErrClosed
}
dir, err := os.Open(fs.path)
if err != nil {
return
}
names, err := dir.Readdirnames(0)
// Close the dir first before checking for Readdirnames error.
if ce := dir.Close(); ce != nil {
fs.log(fmt.Sprintf("close dir: %v", ce))
}
if err != nil {
return
}
// Find latest CURRENT file.
var rem []string
var pend bool
var cerr error
for _, name := range names {
if strings.HasPrefix(name, "CURRENT") {
pend1 := len(name) > 7
var pendNum int64
// Make sure it is valid name for a CURRENT file, otherwise skip it.
if pend1 {
if name[7] != '.' || len(name) < 9 {
fs.log(fmt.Sprintf("skipping %s: invalid file name", name))
continue
}
var e1 error
if pendNum, e1 = strconv.ParseInt(name[8:], 10, 0); e1 != nil {
fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", name, e1))
continue
}
}
path := filepath.Join(fs.path, name)
r, e1 := os.OpenFile(path, os.O_RDONLY, 0)
if e1 != nil {
return FileDesc{}, e1
}
b, e1 := ioutil.ReadAll(r)
if e1 != nil {
r.Close()
return FileDesc{}, e1
}
var fd1 FileDesc
if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd1) {
fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", name))
if pend1 {
rem = append(rem, name)
}
if !pend1 || cerr == nil {
metaFd, _ := fsParseName(name)
cerr = &ErrCorrupted{
Fd: metaFd,
Err: errors.New("leveldb/storage: corrupted or incomplete meta file"),
}
}
} else if pend1 && pendNum != fd1.Num {
fs.log(fmt.Sprintf("skipping %s: inconsistent pending-file num: %d vs %d", name, pendNum, fd1.Num))
rem = append(rem, name)
} else if fd1.Num < fd.Num {
fs.log(fmt.Sprintf("skipping %s: obsolete", name))
if pend1 {
rem = append(rem, name)
}
} else {
fd = fd1
pend = pend1
}
if err := r.Close(); err != nil {
fs.log(fmt.Sprintf("close %s: %v", name, err))
}
}
}
// Don't remove any files if there is no valid CURRENT file.
if fd.Zero() {
if cerr != nil {
err = cerr
} else {
err = os.ErrNotExist
}
return
}
if !fs.readOnly {
// Rename pending CURRENT file to an effective CURRENT.
if pend {
path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", fd.Num, err))
}
}
// Remove obsolete or incomplete pending CURRENT files.
for _, name := range rem {
path := filepath.Join(fs.path, name)
if err := os.Remove(path); err != nil {
fs.log(fmt.Sprintf("remove %s: %v", name, err))
}
}
}
return
}
func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return nil, ErrClosed
}
dir, err := os.Open(fs.path)
if err != nil {
return
}
names, err := dir.Readdirnames(0)
// Close the dir first before checking for Readdirnames error.
if cerr := dir.Close(); cerr != nil {
fs.log(fmt.Sprintf("close dir: %v", cerr))
}
if err == nil {
for _, name := range names {
if fd, ok := fsParseName(name); ok && fd.Type&ft != 0 {
fds = append(fds, fd)
}
}
}
return
}
func (fs *fileStorage) Open(fd FileDesc) (Reader, error) {
if !FileDescOk(fd) {
return nil, ErrInvalidFile
}
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return nil, ErrClosed
}
of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_RDONLY, 0)
if err != nil {
if fsHasOldName(fd) && os.IsNotExist(err) {
of, err = os.OpenFile(filepath.Join(fs.path, fsGenOldName(fd)), os.O_RDONLY, 0)
if err == nil {
goto ok
}
}
return nil, err
}
ok:
fs.open++
return &fileWrap{File: of, fs: fs, fd: fd}, nil
}
func (fs *fileStorage) Create(fd FileDesc) (Writer, error) {
if !FileDescOk(fd) {
return nil, ErrInvalidFile
}
if fs.readOnly {
return nil, errReadOnly
}
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return nil, ErrClosed
}
of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
fs.open++
return &fileWrap{File: of, fs: fs, fd: fd}, nil
}
func (fs *fileStorage) Remove(fd FileDesc) error {
if !FileDescOk(fd) {
return ErrInvalidFile
}
if fs.readOnly {
return errReadOnly
}
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return ErrClosed
}
err := os.Remove(filepath.Join(fs.path, fsGenName(fd)))
if err != nil {
if fsHasOldName(fd) && os.IsNotExist(err) {
if e1 := os.Remove(filepath.Join(fs.path, fsGenOldName(fd))); !os.IsNotExist(e1) {
fs.log(fmt.Sprintf("remove %s: %v (old name)", fd, err))
err = e1
}
} else {
fs.log(fmt.Sprintf("remove %s: %v", fd, err))
}
}
return err
}
func (fs *fileStorage) Rename(oldfd, newfd FileDesc) error {
if !FileDescOk(oldfd) || !FileDescOk(newfd) {
return ErrInvalidFile
}
if oldfd == newfd {
return nil
}
if fs.readOnly {
return errReadOnly
}
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return ErrClosed
}
return rename(filepath.Join(fs.path, fsGenName(oldfd)), filepath.Join(fs.path, fsGenName(newfd)))
}
func (fs *fileStorage) Close() error {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return ErrClosed
}
// Clear the finalizer.
runtime.SetFinalizer(fs, nil)
if fs.open > 0 {
fs.log(fmt.Sprintf("close: warning, %d files still open", fs.open))
}
fs.open = -1
if fs.logw != nil {
fs.logw.Close()
}
return fs.flock.release()
}
type fileWrap struct {
*os.File
fs *fileStorage
fd FileDesc
closed bool
}
func (fw *fileWrap) Sync() error {
if err := fw.File.Sync(); err != nil {
return err
}
if fw.fd.Type == TypeManifest {
// Also sync parent directory if file type is manifest.
// See: https://code.google.com/p/leveldb/issues/detail?id=190.
if err := syncDir(fw.fs.path); err != nil {
fw.fs.log(fmt.Sprintf("syncDir: %v", err))
return err
}
}
return nil
}
func (fw *fileWrap) Close() error {
fw.fs.mu.Lock()
defer fw.fs.mu.Unlock()
if fw.closed {
return ErrClosed
}
fw.closed = true
fw.fs.open--
err := fw.File.Close()
if err != nil {
fw.fs.log(fmt.Sprintf("close %s: %v", fw.fd, err))
}
return err
}
func fsGenName(fd FileDesc) string {
switch fd.Type {
case TypeManifest:
return fmt.Sprintf("MANIFEST-%06d", fd.Num)
case TypeJournal:
return fmt.Sprintf("%06d.log", fd.Num)
case TypeTable:
return fmt.Sprintf("%06d.ldb", fd.Num)
case TypeTemp:
return fmt.Sprintf("%06d.tmp", fd.Num)
default:
panic("invalid file type")
}
}
func fsHasOldName(fd FileDesc) bool {
return fd.Type == TypeTable
}
func fsGenOldName(fd FileDesc) string {
switch fd.Type {
case TypeTable:
return fmt.Sprintf("%06d.sst", fd.Num)
}
return fsGenName(fd)
}
func fsParseName(name string) (fd FileDesc, ok bool) {
var tail string
_, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail)
if err == nil {
switch tail {
case "log":
fd.Type = TypeJournal
case "ldb", "sst":
fd.Type = TypeTable
case "tmp":
fd.Type = TypeTemp
default:
return
}
return fd, true
}
n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail)
if n == 1 {
fd.Type = TypeManifest
return fd, true
}
return
}
func fsParseNamePtr(name string, fd *FileDesc) bool {
_fd, ok := fsParseName(name)
if fd != nil {
*fd = _fd
}
return ok
}