blob: c455dc2e325d14ccccfe74cb400856a219b7963a [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll
import (
"io"
"sync/atomic"
"syscall"
"syscall/zx"
"syscall/zx/mxnet"
"syscall/zx/zxsocket"
"unsafe"
)
// FD is a file descriptor. The net and os packages embed this type in
// a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a normal file.
isFile bool
// Whether in blocking mode
isBlocking uint32
}
// Init initializes the FD. The Sysfd field should already be set.
// This can be called multiple times on a single FD.
// The net argument is a network name from the net package (e.g., "tcp"),
// or "file".
// Set pollable to true if fd should be managed by runtime netpoll.
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
fd.isBlocking = 1
}
return err
}
// see fd.Close() for reasoning.
func close(fd *FD) error {
sysfd := atomic.SwapInt64((*int64)(unsafe.Pointer(&fd.Sysfd)), -1)
if sysfd != -1 {
return syscall.Close(int(sysfd))
}
return nil
}
// Destroy closes the file descriptor. This is called when there are
// no remaining references.
func (fd *FD) destroy() error {
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before closing the fd.
fd.pd.close()
return close(fd)
}
// Close closes the FD. The underlying file descriptor is closed by the
// destroy method when there are no remaining references.
func (fd *FD) Close() error {
if !fd.fdmu.increfAndClose() {
return errClosing(fd.isFile)
}
fd.pd.evict()
// Note: On other implementations, evict will unblock sibling refs, however,
// that can't be done yet here. Instead, we have to forcefully close the
// connection, sadly ahead of destroy, in order to unblock others.
err := close(fd)
fd.decref()
return err
}
// Shutdown wraps the shutdown network call.
func (fd *FD) Shutdown(how int) error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
panic("TODO shutdown")
//return syscall.Shutdown(fd.Sysfd, how) // TODO
}
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
// TODO: pollable() read
n, err := syscall.Read(fd.Sysfd, p)
err = fd.eofError(n, err)
return n, err
}
// Pread wraps the pread system call.
func (fd *FD) Pread(p []byte, off int64) (int, error) {
// Call incref, not readLock, because since pread specifies the
// offset it is independent from other reads.
// Similarly, using the poller doesn't make sense for pread.
if err := fd.incref(); err != nil {
return 0, err
}
n, err := syscall.Pread(fd.Sysfd, p, off)
if err != nil {
n = 0
}
fd.decref()
err = fd.eofError(n, err)
return n, err
}
// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
var nn int
for {
n, err := syscall.Write(fd.Sysfd, p[nn:])
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}
// Pwrite wraps the pwrite system call.
func (fd *FD) Pwrite(p []byte, off int64) (int, error) {
// Call incref, not writeLock, because since pwrite specifies the
// offset it is independent from other writes.
// Similarly, using the poller doesn't make sense for pwrite.
if err := fd.incref(); err != nil {
return 0, err
}
defer fd.decref()
var nn int
for {
n, err := syscall.Pwrite(fd.Sysfd, p[nn:], off+int64(nn))
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}
// Accept wraps the accept network call.
func (fd *FD) Accept() (*zxsocket.Socket, error) {
if err := fd.readLock(); err != nil {
return nil, err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return nil, err
}
if s, ok := syscall.FDIOForFD(fd.Sysfd).(*zxsocket.Socket); ok {
code, control, err := s.Accept(0)
if code == syscall.EAGAIN {
// TODO: non-blocking support, pass this to the poller
if obs, err := s.Wait(mxnet.MXSIO_SIGNAL_INCOMING|zx.SignalSocketPeerClosed, zx.TimensecInfinite); err != nil {
return nil, err
} else if obs&zx.SignalSocketPeerClosed != 0 {
return nil, &zx.Error{Status: zx.ErrPeerClosed, Text: "zxsocket"}
} else if obs&mxnet.MXSIO_SIGNAL_INCOMING != 0 {
// fallthrough
} else {
panic("unreachable")
}
code, control, err = s.Accept(0)
}
if err != nil {
return nil, err
}
if code != 0 {
return nil, syscall.Errno(code)
}
return zxsocket.NewSocket(control)
}
return nil, syscall.EBADF
}
// Seek wraps syscall.Seek.
func (fd *FD) Seek(offset int64, whence int) (int64, error) {
if err := fd.incref(); err != nil {
return 0, err
}
defer fd.decref()
return syscall.Seek(fd.Sysfd, offset, whence)
}
// ReadDirent wraps syscall.ReadDirent.
// We treat this like an ordinary system call rather than a call
// that tries to fill the buffer.
func (fd *FD) ReadDirent(buf []byte) (int, error) {
if err := fd.incref(); err != nil {
return 0, err
}
defer fd.decref()
return syscall.ReadDirent(fd.Sysfd, buf)
}
// Fstat wraps syscall.Fstat
func (fd *FD) Fstat(s *syscall.Stat_t) error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
return syscall.Fstat(fd.Sysfd, s)
}
// eofError returns io.EOF when fd is available for reading end of
// file.
func (fd *FD) eofError(n int, err error) error {
if n == 0 && err == nil && fd.ZeroReadIsEOF {
return io.EOF
}
return err
}
// RawControl invokes the user-defined function f for a non-IO
// operation.
func (fd *FD) RawControl(f func(uintptr)) error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
f(uintptr(fd.Sysfd))
return nil
}
// RawRead invokes the user-defined function f for a read operation.
func (fd *FD) RawRead(f func(uintptr) bool) error {
if err := fd.readLock(); err != nil {
return err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return err
}
for {
if f(uintptr(fd.Sysfd)) {
return nil
}
if err := fd.pd.waitRead(fd.isFile); err != nil {
return err
}
}
}
// RawWrite invokes the user-defined function f for a write operation.
func (fd *FD) RawWrite(f func(uintptr) bool) error {
if err := fd.writeLock(); err != nil {
return err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return err
}
for {
if f(uintptr(fd.Sysfd)) {
return nil
}
if err := fd.pd.waitWrite(fd.isFile); err != nil {
return err
}
}
}
// SetBlocking puts the file into blocking mode.
func (fd *FD) SetBlocking() error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
// Atomic store so that concurrent calls to SetBlocking
// do not cause a race condition. isBlocking only ever goes
// from 0 to 1 so there is no real race here.
atomic.StoreUint32(&fd.isBlocking, 1)
return syscall.SetNonblock(fd.Sysfd, false)
}