blob: 70323c88c9f38a57278b2e4b02d90a27475e4a60 [file] [log] [blame]
package fsutil
import (
"context"
"hash"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil/types"
"golang.org/x/sync/errgroup"
)
type WriteToFunc func(context.Context, string, io.WriteCloser) error
type DiskWriterOpt struct {
AsyncDataCb WriteToFunc
SyncDataCb WriteToFunc
NotifyCb func(ChangeKind, string, os.FileInfo, error) error
ContentHasher ContentHasher
Filter FilterFunc
}
type FilterFunc func(string, *types.Stat) bool
type DiskWriter struct {
opt DiskWriterOpt
dest string
wg sync.WaitGroup
ctx context.Context
cancel func()
eg *errgroup.Group
filter FilterFunc
}
func NewDiskWriter(ctx context.Context, dest string, opt DiskWriterOpt) (*DiskWriter, error) {
if opt.SyncDataCb == nil && opt.AsyncDataCb == nil {
return nil, errors.New("no data callback specified")
}
if opt.SyncDataCb != nil && opt.AsyncDataCb != nil {
return nil, errors.New("can't specify both sync and async data callbacks")
}
ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
return &DiskWriter{
opt: opt,
dest: dest,
eg: eg,
ctx: ctx,
cancel: cancel,
filter: opt.Filter,
}, nil
}
func (dw *DiskWriter) Wait(ctx context.Context) error {
return dw.eg.Wait()
}
func (dw *DiskWriter) HandleChange(kind ChangeKind, p string, fi os.FileInfo, err error) (retErr error) {
if err != nil {
return err
}
select {
case <-dw.ctx.Done():
return dw.ctx.Err()
default:
}
defer func() {
if retErr != nil {
dw.cancel()
}
}()
destPath := filepath.Join(dw.dest, filepath.FromSlash(p))
if kind == ChangeKindDelete {
if dw.filter != nil {
var empty types.Stat
if ok := dw.filter(p, &empty); !ok {
return nil
}
}
// todo: no need to validate if diff is trusted but is it always?
if err := os.RemoveAll(destPath); err != nil {
return errors.Wrapf(err, "failed to remove: %s", destPath)
}
if dw.opt.NotifyCb != nil {
if err := dw.opt.NotifyCb(kind, p, nil, nil); err != nil {
return err
}
}
return nil
}
stat, ok := fi.Sys().(*types.Stat)
if !ok {
return errors.Errorf("%s invalid change without stat information", p)
}
statCopy := *stat
if dw.filter != nil {
if ok := dw.filter(p, &statCopy); !ok {
return nil
}
}
rename := true
oldFi, err := os.Lstat(destPath)
if err != nil {
if os.IsNotExist(err) {
if kind != ChangeKindAdd {
return errors.Wrapf(err, "invalid addition: %s", destPath)
}
rename = false
} else {
return errors.Wrapf(err, "failed to stat %s", destPath)
}
}
if oldFi != nil && fi.IsDir() && oldFi.IsDir() {
if err := rewriteMetadata(destPath, &statCopy); err != nil {
return errors.Wrapf(err, "error setting dir metadata for %s", destPath)
}
return nil
}
newPath := destPath
if rename {
newPath = filepath.Join(filepath.Dir(destPath), ".tmp."+nextSuffix())
}
isRegularFile := false
switch {
case fi.IsDir():
if err := os.Mkdir(newPath, fi.Mode()); err != nil {
return errors.Wrapf(err, "failed to create dir %s", newPath)
}
case fi.Mode()&os.ModeDevice != 0 || fi.Mode()&os.ModeNamedPipe != 0:
if err := handleTarTypeBlockCharFifo(newPath, &statCopy); err != nil {
return errors.Wrapf(err, "failed to create device %s", newPath)
}
case fi.Mode()&os.ModeSymlink != 0:
if err := os.Symlink(statCopy.Linkname, newPath); err != nil {
return errors.Wrapf(err, "failed to symlink %s", newPath)
}
case statCopy.Linkname != "":
if err := os.Link(filepath.Join(dw.dest, statCopy.Linkname), newPath); err != nil {
return errors.Wrapf(err, "failed to link %s to %s", newPath, statCopy.Linkname)
}
default:
isRegularFile = true
file, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, fi.Mode()) //todo: windows
if err != nil {
return errors.Wrapf(err, "failed to create %s", newPath)
}
if dw.opt.SyncDataCb != nil {
if err := dw.processChange(ChangeKindAdd, p, fi, file); err != nil {
file.Close()
return err
}
break
}
if err := file.Close(); err != nil {
return errors.Wrapf(err, "failed to close %s", newPath)
}
}
if err := rewriteMetadata(newPath, &statCopy); err != nil {
return errors.Wrapf(err, "error setting metadata for %s", newPath)
}
if rename {
if oldFi.IsDir() != fi.IsDir() {
if err := os.RemoveAll(destPath); err != nil {
return errors.Wrapf(err, "failed to remove %s", destPath)
}
}
if err := os.Rename(newPath, destPath); err != nil {
return errors.Wrapf(err, "failed to rename %s to %s", newPath, destPath)
}
}
if isRegularFile {
if dw.opt.AsyncDataCb != nil {
dw.requestAsyncFileData(p, destPath, fi, &statCopy)
}
} else {
return dw.processChange(kind, p, fi, nil)
}
return nil
}
func (dw *DiskWriter) requestAsyncFileData(p, dest string, fi os.FileInfo, st *types.Stat) {
// todo: limit worker threads
dw.eg.Go(func() error {
if err := dw.processChange(ChangeKindAdd, p, fi, &lazyFileWriter{
dest: dest,
}); err != nil {
return err
}
return chtimes(dest, st.ModTime) // TODO: parent dirs
})
}
func (dw *DiskWriter) processChange(kind ChangeKind, p string, fi os.FileInfo, w io.WriteCloser) error {
origw := w
var hw *hashedWriter
if dw.opt.NotifyCb != nil {
var err error
if hw, err = newHashWriter(dw.opt.ContentHasher, fi, w); err != nil {
return err
}
w = hw
}
if origw != nil {
fn := dw.opt.SyncDataCb
if fn == nil && dw.opt.AsyncDataCb != nil {
fn = dw.opt.AsyncDataCb
}
if err := fn(dw.ctx, p, w); err != nil {
return err
}
} else {
if hw != nil {
hw.Close()
}
}
if hw != nil {
return dw.opt.NotifyCb(kind, p, hw, nil)
}
return nil
}
type hashedWriter struct {
os.FileInfo
io.Writer
h hash.Hash
w io.WriteCloser
dgst digest.Digest
}
func newHashWriter(ch ContentHasher, fi os.FileInfo, w io.WriteCloser) (*hashedWriter, error) {
stat, ok := fi.Sys().(*types.Stat)
if !ok {
return nil, errors.Errorf("invalid change without stat information")
}
h, err := ch(stat)
if err != nil {
return nil, err
}
hw := &hashedWriter{
FileInfo: fi,
Writer: io.MultiWriter(w, h),
h: h,
w: w,
}
return hw, nil
}
func (hw *hashedWriter) Close() error {
hw.dgst = digest.NewDigest(digest.SHA256, hw.h)
if hw.w != nil {
return hw.w.Close()
}
return nil
}
func (hw *hashedWriter) Digest() digest.Digest {
return hw.dgst
}
type lazyFileWriter struct {
dest string
ctx context.Context
f *os.File
fileMode *os.FileMode
}
func (lfw *lazyFileWriter) Write(dt []byte) (int, error) {
if lfw.f == nil {
file, err := os.OpenFile(lfw.dest, os.O_WRONLY, 0) //todo: windows
if os.IsPermission(err) {
// retry after chmod
fi, er := os.Stat(lfw.dest)
if er == nil {
mode := fi.Mode()
lfw.fileMode = &mode
er = os.Chmod(lfw.dest, mode|0222)
if er == nil {
file, err = os.OpenFile(lfw.dest, os.O_WRONLY, 0)
}
}
}
if err != nil {
return 0, errors.Wrapf(err, "failed to open %s", lfw.dest)
}
lfw.f = file
}
return lfw.f.Write(dt)
}
func (lfw *lazyFileWriter) Close() error {
var err error
if lfw.f != nil {
err = lfw.f.Close()
}
if err == nil && lfw.fileMode != nil {
err = os.Chmod(lfw.dest, *lfw.fileMode)
}
return err
}
func mkdev(major int64, minor int64) uint32 {
return uint32(((minor & 0xfff00) << 12) | ((major & 0xfff) << 8) | (minor & 0xff))
}
// Random number state.
// We generate random temporary file names so that there's a good
// chance the file doesn't exist yet - keeps the number of tries in
// TempFile to a minimum.
var rand uint32
var randmu sync.Mutex
func reseed() uint32 {
return uint32(time.Now().UnixNano() + int64(os.Getpid()))
}
func nextSuffix() string {
randmu.Lock()
r := rand
if r == 0 {
r = reseed()
}
r = r*1664525 + 1013904223 // constants from Numerical Recipes
rand = r
randmu.Unlock()
return strconv.Itoa(int(1e9 + r%1e9))[1:]
}