blob: 9e8e2ca37406c8a8b527af2d49e478c5a977895c [file] [log] [blame]
package fifo
import (
"io"
"os"
"runtime"
"sync"
"syscall"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
)
type fifo struct {
flag int
opened chan struct{}
closed chan struct{}
closing chan struct{}
err error
file *os.File
closingOnce sync.Once // close has been called
closedOnce sync.Once // fifo is closed
handle *handle
}
var leakCheckWg *sync.WaitGroup
// OpenFifo opens a fifo. Returns io.ReadWriteCloser.
// Context can be used to cancel this function until open(2) has not returned.
// Accepted flags:
// - syscall.O_CREAT - create new fifo if one doesn't exist
// - syscall.O_RDONLY - open fifo only from reader side
// - syscall.O_WRONLY - open fifo only from writer side
// - syscall.O_RDWR - open fifo from both sides, never block on syscall level
// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
// fifo isn't open. read/write will be connected after the actual fifo is
// open or after fifo is closed.
func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
if _, err := os.Stat(fn); err != nil {
if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 {
if err := unix.Mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) {
return nil, errors.Wrapf(err, "error creating fifo %v", fn)
}
} else {
return nil, err
}
}
block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0
flag &= ^syscall.O_CREAT
flag &= ^syscall.O_NONBLOCK
h, err := getHandle(fn)
if err != nil {
return nil, err
}
f := &fifo{
handle: h,
flag: flag,
opened: make(chan struct{}),
closed: make(chan struct{}),
closing: make(chan struct{}),
}
wg := leakCheckWg
if wg != nil {
wg.Add(2)
}
go func() {
if wg != nil {
defer wg.Done()
}
select {
case <-ctx.Done():
f.Close()
case <-f.opened:
case <-f.closed:
}
}()
go func() {
if wg != nil {
defer wg.Done()
}
var file *os.File
fn, err := h.Path()
if err == nil {
file, err = os.OpenFile(fn, flag, 0)
}
select {
case <-f.closing:
if err == nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = errors.Errorf("fifo %v was closed before opening", h.Name())
}
if file != nil {
file.Close()
}
}
default:
}
if err != nil {
f.closedOnce.Do(func() {
f.err = err
close(f.closed)
})
return
}
f.file = file
close(f.opened)
}()
if block {
select {
case <-f.opened:
case <-f.closed:
return nil, f.err
}
}
return f, nil
}
// Read from a fifo to a byte array.
func (f *fifo) Read(b []byte) (int, error) {
if f.flag&syscall.O_WRONLY > 0 {
return 0, errors.New("reading from write-only fifo")
}
select {
case <-f.opened:
return f.file.Read(b)
default:
}
select {
case <-f.opened:
return f.file.Read(b)
case <-f.closed:
return 0, errors.New("reading from a closed fifo")
}
}
// Write from byte array to a fifo.
func (f *fifo) Write(b []byte) (int, error) {
if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
return 0, errors.New("writing to read-only fifo")
}
select {
case <-f.opened:
return f.file.Write(b)
default:
}
select {
case <-f.opened:
return f.file.Write(b)
case <-f.closed:
return 0, errors.New("writing to a closed fifo")
}
}
// Close the fifo. Next reads/writes will error. This method can also be used
// before open(2) has returned and fifo was never opened.
func (f *fifo) Close() (retErr error) {
for {
select {
case <-f.closed:
f.handle.Close()
return
default:
select {
case <-f.opened:
f.closedOnce.Do(func() {
retErr = f.file.Close()
f.err = retErr
close(f.closed)
})
default:
if f.flag&syscall.O_RDWR != 0 {
runtime.Gosched()
break
}
f.closingOnce.Do(func() {
close(f.closing)
})
reverseMode := syscall.O_WRONLY
if f.flag&syscall.O_WRONLY > 0 {
reverseMode = syscall.O_RDONLY
}
fn, err := f.handle.Path()
// if Close() is called concurrently(shouldn't) it may cause error
// because handle is closed
select {
case <-f.closed:
default:
if err != nil {
// Path has become invalid. We will leak a goroutine.
// This case should not happen in linux.
f.closedOnce.Do(func() {
f.err = err
close(f.closed)
})
<-f.closed
break
}
f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0)
if err == nil {
f.Close()
}
runtime.Gosched()
}
}
}
}
}