blob: 5b5f199f94ae72c711f52009c6a801acfe763bab [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package cache
import (
"context"
"encoding/hex"
"encoding/json"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"sort"
"sync/atomic"
"time"
)
// the time in milliseconds to wait before checking the size of the directory
// that contains cache files.
const waitBeforeReadDir = 32
// FileKey is the interface of keys that can be used to index cached files.
type FileKey interface {
// Hash should return a valid unique file name to be used as the key.
Hash() string
}
// LinkedFile removes the file it names on closing. This is used to link files
// into a temporary directory while they're being used so that collection of
// old files doesn't delete a file that's in use. This effectivelly uses the
// inode count as a reference counter for the file.
type LinkedFile string
// String returns the name of the file.
func (l LinkedFile) String() string {
return string(l)
}
// Close deletes the file.
func (l LinkedFile) Close() error {
return os.Remove(l.String())
}
type config struct {
// Size is the maximum number of entries before an item is evicted.
// Zero means no limt on the number of entries.
Size uint64
}
// FileCache represents a cache of files stored somewhere on the disk. It is
// possible to use the cache concurrently from any number of processes or
// threads.
type FileCache struct {
dir string
cfg config
curSize uint64
cullLock chan struct{}
}
func getTmpDir(path string) string {
return filepath.Join(path, "tmp")
}
func getTmpFile(path string) string {
out := make([]byte, 8)
rand.Read(out)
return filepath.Join(path, "tmp", hex.EncodeToString(out))
}
func getConfigDir(path string) string {
return filepath.Join(path, "cfg")
}
func getConfigPath(path string) string {
return filepath.Join(path, "cfg", "config.json")
}
func getLockFile(path string) string {
return filepath.Join(path, "cfg", "lock")
}
func getStoreDir(path string) string {
return filepath.Join(path, "store")
}
func getFilePath(path string, hash string) string {
return filepath.Join(path, "store", hash)
}
func initFileCache(cache *FileCache, path string) {
cache.dir = path
cache.cullLock = make(chan struct{}, 1)
cache.cullLock <- struct{}{}
}
// LoadFileCache loads a persistent cache from the file system.
func LoadFileCache(path string) (*FileCache, error) {
var cache FileCache
initFileCache(&cache, path)
file, err := os.Open(getConfigPath(path))
if err != nil {
return nil, err
}
defer file.Close()
dec := json.NewDecoder(file)
if err = dec.Decode(&cache.cfg); err != nil {
return nil, err
}
storeDir := getStoreDir(cache.dir)
files, err := ioutil.ReadDir(storeDir)
if err != nil {
return nil, err
}
// curSize is a rough estimate of the cache size, not the actual true
// cache size which can't be known at all times.
cache.curSize = uint64(len(files))
return &cache, nil
}
// NewFileCache creates a new persistent cache on the file system.
func NewFileCache(path string, size uint64) (*FileCache, error) {
var cache FileCache
initFileCache(&cache, path)
cache.cfg.Size = size
if err := os.MkdirAll(getConfigDir(path), os.ModePerm); err != nil {
return nil, err
}
if err := os.MkdirAll(getStoreDir(path), os.ModePerm); err != nil {
return nil, err
}
if err := os.MkdirAll(getTmpDir(path), os.ModePerm); err != nil {
return nil, err
}
file, err := os.Create(getConfigPath(path))
if err != nil {
return nil, err
}
defer file.Close()
enc := json.NewEncoder(file)
if err = enc.Encode(cache.cfg); err != nil {
return nil, err
}
return &cache, nil
}
// GetFileCache will attempt to load an existing cache if one exists.
// If one does not it will create a cache with the given size.
func GetFileCache(path string, size uint64) (*FileCache, error) {
if cache, err := LoadFileCache(path); err == nil {
return cache, nil
}
return NewFileCache(path, size)
}
// Update starts a goroutine to periodically check on the status of the cache. The
// cache can be operated without using Update but if Update is not called the cache's
// estimate of the cache size will diverge from reality. If many actors are present
// this can cause the cache size to be very different from this actor's estimate.
func (fc *FileCache) Update(ctx context.Context) {
// Make sure we keep curSize more up to date. This avoids
// us needing to limit the number of concurrent actors.
go func() {
storeDir := getStoreDir(fc.dir)
for {
select {
case <-ctx.Done():
return
default:
}
// We don't want to be constantly hitting the file system with expensive
// operations like ReadDir. This sleep ensures that we not be too aggressive.
// By sleeping for 32 milliseconds we make the ReadDir call about 30 times a
// second.
time.Sleep(waitBeforeReadDir * time.Millisecond)
files, err := ioutil.ReadDir(storeDir)
if err == nil {
atomic.StoreUint64(&fc.curSize, uint64(len(files)))
}
}
}()
}
func (fc *FileCache) getFilePath(hash string) string {
return getFilePath(fc.dir, hash)
}
// cullFiles obtains the lock file or exits. If the lock file is obtained
// then clean up occurs (slow). Other processes can read and get files while
// cleanup is occurring since all operations except culling are lock-free and
// atomic.
func (fc *FileCache) cullFiles() {
// TODO(jakehehrlich): Add functionality to trigger a cull for total size as
// well as number of files.
// Acquire the cull lock so the same process doesn't try and create a file
// too frequently.
select {
case <-fc.cullLock:
// Make sure we release this process' cull lock when done.
defer func() { fc.cullLock <- struct{}{} }()
default:
return
}
lockFile := getLockFile(fc.dir)
file, err := os.OpenFile(lockFile, os.O_WRONLY|os.O_EXCL|os.O_CREATE, os.ModeExclusive)
if err != nil {
// Someone else is already cleaning so exit.
return
}
// Make sure we release the lock when done.
defer func() {
name := file.Name()
defer file.Close()
os.Remove(name)
}()
// Now we know we have the lock and are the only process cleaning the cache
// If the directory suddenly deleted or something else causes the ReadDir to
// fail then we can set our count to zero. Some other operation will then
// soon fail in a context where errors can be handled better.
files, _ := ioutil.ReadDir(getStoreDir(fc.dir))
// It could turn out we guessed wrong and no files need to be collected. Just
// return in this case.
numFiles := uint64(len(files))
if numFiles <= fc.cfg.Size {
// Update our count and return. One might be concerned
// that this store might overwrite previous adds or that the collection
// of operations do not look kosher or linerize in someway. This is not
// an issue here as 'curSize' is just an estimate and all actors are
// just trying to ensure that at any given time this value is as close
// to the right value as possible. The value is just an estimate to
// control how often a we try and collect garbage.
atomic.StoreUint64(&fc.curSize, numFiles)
return
}
// Now remove the oldest files first. Note that we update the mod time on
// reads as a means of keeping hot files in the cache.
sort.Slice(files, func(i, j int) bool {
return files[j].ModTime().After(files[i].ModTime())
})
filesToRemove := numFiles - fc.cfg.Size
for _, file := range files[:filesToRemove] {
fpath := fc.getFilePath(file.Name())
os.Remove(fpath)
}
// Finally update our estimate.
atomic.StoreUint64(&fc.curSize, fc.cfg.Size)
}
// Add adds an entry into the cache so that it might later be accessed via
// Get.
func (fc *FileCache) Add(key FileKey, value io.Reader) (*LinkedFile, error) {
file, err := os.Create(getTmpFile(fc.dir))
if err != nil {
return nil, err
}
defer func() {
name := file.Name()
file.Close()
os.Remove(name)
}()
io.Copy(file, value)
path := fc.getFilePath(key.Hash())
// Link the file into the temporary directory for return.
out := LinkedFile(getTmpFile(fc.dir))
err = os.Link(file.Name(), out.String())
if err != nil {
return nil, err
}
// Rename the newly copied file into
err = os.Rename(file.Name(), path)
if err != nil {
out.Close()
return nil, err
}
// We never cull if the maximum size is zero.
if fc.cfg.Size == 0 {
return &out, nil
}
// We don't want to constantly try cleaning so we instead wait for our
// estimate size to hit 1.25. This means that 4 processes can be using
// the cache at the same time and the cache won't go over 2x size.
if atomic.AddUint64(&fc.curSize, 1) >= fc.cfg.Size+fc.cfg.Size/4 {
fc.cullFiles()
}
return &out, nil
}
// Get returns a LinkedFile to an entry for the given key if one exists.
func (fc *FileCache) Get(key FileKey) (*LinkedFile, error) {
// We need to link the file to a temp file before returning the temp file.
// This allows the file to be kept alive as long as the user needs while
// the cache can be removed from freely.
path := fc.getFilePath(key.Hash())
tmpFile := getTmpFile(fc.dir)
err := os.Link(path, tmpFile)
if err != nil {
return nil, err
}
// Update the ModTime so this file is less likely to be removed.
t := time.Now()
os.Chtimes(path, t, t)
// If the file had not existed os.Link would have returned an error.
lf := LinkedFile(tmpFile)
return &lf, nil
}
// DeleteCache removes all traces of a cache located at cachepath.
func DeleteCache(cachepath string) error {
tmpDir := getTmpDir(cachepath)
storeDir := getStoreDir(cachepath)
cfgDir := getConfigDir(cachepath)
if err := os.RemoveAll(tmpDir); err != nil {
return err
}
if err := os.RemoveAll(storeDir); err != nil {
return err
}
if err := os.RemoveAll(cfgDir); err != nil {
return err
}
if err := os.Remove(cachepath); err != nil {
return err
}
return nil
}