blob: cc4a187d78f78654ac5a9156fb445be5d860902f [file] [log] [blame]
//! Types for working with [`File`].
//!
//! [`File`]: File
use self::State::*;
use crate::fs::{asyncify, sys};
use crate::io::blocking::Buf;
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite};
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::future::Future;
use std::io::{self, Seek, SeekFrom};
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::task::Poll::*;
/// A reference to an open file on the filesystem.
///
/// This is a specialized version of [`std::fs::File`][std] for usage from the
/// Tokio runtime.
///
/// An instance of a `File` can be read and/or written depending on what options
/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
/// cursor that the file contains internally.
///
/// A file will not be closed immediately when it goes out of scope if there
/// are any IO operations that have not yet completed. To ensure that a file is
/// closed immediately when it is dropped, you should call [`flush`] before
/// dropping it. Note that this does not ensure that the file has been fully
/// written to disk; the operating system might keep the changes around in an
/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
/// the data to disk.
///
/// Reading and writing to a `File` is usually done using the convenience
/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. Examples
/// import these traits through [the prelude].
///
/// [std]: struct@std::fs::File
/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
/// [`sync_all`]: fn@crate::fs::File::sync_all
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
/// [the prelude]: crate::prelude
///
/// # Examples
///
/// Create a new file and asynchronously write bytes to it:
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*; // for write_all()
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// # Ok(())
/// # }
/// ```
///
/// Read the contents of a file into a buffer
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*; // for read_to_end()
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
///
/// let mut contents = vec![];
/// file.read_to_end(&mut contents).await?;
///
/// println!("len = {}", contents.len());
/// # Ok(())
/// # }
/// ```
pub struct File {
std: Arc<sys::File>,
state: State,
/// Errors from writes/flushes are returned in write/flush calls. If a write
/// error is observed while performing a read, it is saved until the next
/// write / flush call.
last_write_err: Option<io::ErrorKind>,
}
#[derive(Debug)]
enum State {
Idle(Option<Buf>),
Busy(sys::Blocking<(Operation, Buf)>),
}
#[derive(Debug)]
enum Operation {
Read(io::Result<usize>),
Write(io::Result<()>),
Seek(io::Result<u64>),
}
impl File {
/// Attempts to open a file in read-only mode.
///
/// See [`OpenOptions`] for more details.
///
/// [`OpenOptions`]: super::OpenOptions
///
/// # Errors
///
/// This function will return an error if called from outside of the Tokio
/// runtime or if path does not already exist. Other errors may also be
/// returned according to OpenOptions::open.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
///
/// let mut contents = vec![];
/// file.read_to_end(&mut contents).await?;
///
/// println!("len = {}", contents.len());
/// # Ok(())
/// # }
/// ```
///
/// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
///
/// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
let path = path.as_ref().to_owned();
let std = asyncify(|| sys::File::open(path)).await?;
Ok(File::from_std(std))
}
/// Opens a file in write-only mode.
///
/// This function will create a file if it does not exist, and will truncate
/// it if it does.
///
/// See [`OpenOptions`] for more details.
///
/// [`OpenOptions`]: super::OpenOptions
///
/// # Errors
///
/// Results in an error if called from outside of the Tokio runtime or if
/// the underlying [`create`] call results in an error.
///
/// [`create`]: std::fs::File::create
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// # Ok(())
/// # }
/// ```
///
/// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
let path = path.as_ref().to_owned();
let std_file = asyncify(move || sys::File::create(path)).await?;
Ok(File::from_std(std_file))
}
/// Converts a [`std::fs::File`][std] to a [`tokio::fs::File`][file].
///
/// [std]: std::fs::File
/// [file]: File
///
/// # Examples
///
/// ```no_run
/// // This line could block. It is not recommended to do this on the Tokio
/// // runtime.
/// let std_file = std::fs::File::open("foo.txt").unwrap();
/// let file = tokio::fs::File::from_std(std_file);
/// ```
pub fn from_std(std: sys::File) -> File {
File {
std: Arc::new(std),
state: State::Idle(Some(Buf::with_capacity(0))),
last_write_err: None,
}
}
/// Seeks to an offset, in bytes, in a stream.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// use std::io::SeekFrom;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
/// file.seek(SeekFrom::Start(6)).await?;
///
/// let mut contents = vec![0u8; 10];
/// file.read_exact(&mut contents).await?;
/// # Ok(())
/// # }
/// ```
///
/// The [`read_exact`] method is defined on the [`AsyncReadExt`] trait.
///
/// [`read_exact`]: fn@crate::io::AsyncReadExt::read_exact
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
self.complete_inflight().await;
let mut buf = match self.state {
Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
_ => unreachable!(),
};
// Factor in any unread data from the buf
if !buf.is_empty() {
let n = buf.discard_read();
if let SeekFrom::Current(ref mut offset) = pos {
*offset += n;
}
}
let std = self.std.clone();
// Start the operation
self.state = Busy(sys::run(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
let (op, buf) = match self.state {
Idle(_) => unreachable!(),
Busy(ref mut rx) => rx.await.unwrap(),
};
self.state = Idle(Some(buf));
match op {
Operation::Seek(res) => res,
_ => unreachable!(),
}
}
/// Attempts to sync all OS-internal metadata to disk.
///
/// This function will attempt to ensure that all in-core data reaches the
/// filesystem before returning.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// file.sync_all().await?;
/// # Ok(())
/// # }
/// ```
///
/// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
pub async fn sync_all(&mut self) -> io::Result<()> {
self.complete_inflight().await;
let std = self.std.clone();
asyncify(move || std.sync_all()).await
}
/// This function is similar to `sync_all`, except that it may not
/// synchronize file metadata to the filesystem.
///
/// This is intended for use cases that must synchronize content, but don't
/// need the metadata on disk. The goal of this method is to reduce disk
/// operations.
///
/// Note that some platforms may simply implement this in terms of `sync_all`.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// file.sync_data().await?;
/// # Ok(())
/// # }
/// ```
///
/// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
pub async fn sync_data(&mut self) -> io::Result<()> {
self.complete_inflight().await;
let std = self.std.clone();
asyncify(move || std.sync_data()).await
}
/// Truncates or extends the underlying file, updating the size of this file to become size.
///
/// If the size is less than the current file's size, then the file will be
/// shrunk. If it is greater than the current file's size, then the file
/// will be extended to size and have all of the intermediate data filled in
/// with 0s.
///
/// # Errors
///
/// This function will return an error if the file is not opened for
/// writing.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// file.set_len(10).await?;
/// # Ok(())
/// # }
/// ```
///
/// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
pub async fn set_len(&mut self, size: u64) -> io::Result<()> {
self.complete_inflight().await;
let mut buf = match self.state {
Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
_ => unreachable!(),
};
let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
} else {
None
};
let std = self.std.clone();
self.state = Busy(sys::run(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| std.set_len(size))
} else {
std.set_len(size)
}
.map(|_| 0); // the value is discarded later
// Return the result as a seek
(Operation::Seek(res), buf)
}));
let (op, buf) = match self.state {
Idle(_) => unreachable!(),
Busy(ref mut rx) => rx.await?,
};
self.state = Idle(Some(buf));
match op {
Operation::Seek(res) => res.map(|_| ()),
_ => unreachable!(),
}
}
/// Queries metadata about the underlying file.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
///
/// # async fn dox() -> std::io::Result<()> {
/// let file = File::open("foo.txt").await?;
/// let metadata = file.metadata().await?;
///
/// println!("{:?}", metadata);
/// # Ok(())
/// # }
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
let std = self.std.clone();
asyncify(move || std.metadata()).await
}
/// Create a new `File` instance that shares the same underlying file handle
/// as the existing `File` instance. Reads, writes, and seeks will affect both
/// File instances simultaneously.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
///
/// # async fn dox() -> std::io::Result<()> {
/// let file = File::open("foo.txt").await?;
/// let file_clone = file.try_clone().await?;
/// # Ok(())
/// # }
/// ```
pub async fn try_clone(&self) -> io::Result<File> {
let std = self.std.clone();
let std_file = asyncify(move || std.try_clone()).await?;
Ok(File::from_std(std_file))
}
/// Destructures `File` into a [`std::fs::File`][std]. This function is
/// async to allow any in-flight operations to complete.
///
/// Use `File::try_into_std` to attempt conversion immediately.
///
/// [std]: std::fs::File
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
///
/// # async fn dox() -> std::io::Result<()> {
/// let tokio_file = File::open("foo.txt").await?;
/// let std_file = tokio_file.into_std().await;
/// # Ok(())
/// # }
/// ```
pub async fn into_std(mut self) -> sys::File {
self.complete_inflight().await;
Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
}
/// Tries to immediately destructure `File` into a [`std::fs::File`][std].
///
/// [std]: std::fs::File
///
/// # Errors
///
/// This function will return an error containing the file if some
/// operation is in-flight.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
///
/// # async fn dox() -> std::io::Result<()> {
/// let tokio_file = File::open("foo.txt").await?;
/// let std_file = tokio_file.try_into_std().unwrap();
/// # Ok(())
/// # }
/// ```
pub fn try_into_std(mut self) -> Result<sys::File, Self> {
match Arc::try_unwrap(self.std) {
Ok(file) => Ok(file),
Err(std_file_arc) => {
self.std = std_file_arc;
Err(self)
}
}
}
/// Changes the permissions on the underlying file.
///
/// # Platform-specific behavior
///
/// This function currently corresponds to the `fchmod` function on Unix and
/// the `SetFileInformationByHandle` function on Windows. Note that, this
/// [may change in the future][changes].
///
/// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
///
/// # Errors
///
/// This function will return an error if the user lacks permission change
/// attributes on the underlying file. It may also return an error in other
/// os-specific unspecified cases.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
///
/// # async fn dox() -> std::io::Result<()> {
/// let file = File::open("foo.txt").await?;
/// let mut perms = file.metadata().await?.permissions();
/// perms.set_readonly(true);
/// file.set_permissions(perms).await?;
/// # Ok(())
/// # }
/// ```
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
let std = self.std.clone();
asyncify(move || std.set_permissions(perm)).await
}
async fn complete_inflight(&mut self) {
use crate::future::poll_fn;
if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
self.last_write_err = Some(e.kind());
}
}
}
impl AsyncRead for File {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
dst: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
match self.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
if !buf.is_empty() {
let n = buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(n));
}
buf.ensure_capacity_for(dst);
let std = self.std.clone();
self.state = Busy(sys::run(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
}));
}
Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
match op {
Operation::Read(Ok(_)) => {
let n = buf.copy_to(dst);
self.state = Idle(Some(buf));
return Ready(Ok(n));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());
self.state = Idle(Some(buf));
return Ready(Err(e));
}
Operation::Write(Ok(_)) => {
assert!(buf.is_empty());
self.state = Idle(Some(buf));
continue;
}
Operation::Write(Err(e)) => {
assert!(self.last_write_err.is_none());
self.last_write_err = Some(e.kind());
self.state = Idle(Some(buf));
}
Operation::Seek(_) => {
assert!(buf.is_empty());
self.state = Idle(Some(buf));
continue;
}
}
}
}
}
}
}
impl AsyncSeek for File {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut pos: SeekFrom,
) -> Poll<io::Result<()>> {
loop {
match self.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
// Factor in any unread data from the buf
if !buf.is_empty() {
let n = buf.discard_read();
if let SeekFrom::Current(ref mut offset) = pos {
*offset += n;
}
}
let std = self.std.clone();
self.state = Busy(sys::run(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
return Ready(Ok(()));
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
match op {
Operation::Read(_) => {}
Operation::Write(Err(e)) => {
assert!(self.last_write_err.is_none());
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(_) => {}
}
}
}
}
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
loop {
match self.state {
Idle(_) => panic!("must call start_seek before calling poll_complete"),
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
match op {
Operation::Read(_) => {}
Operation::Write(Err(e)) => {
assert!(self.last_write_err.is_none());
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(res) => return Ready(res),
}
}
}
}
}
}
impl AsyncWrite for File {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
src: &[u8],
) -> Poll<io::Result<usize>> {
if let Some(e) = self.last_write_err.take() {
return Ready(Err(e.into()));
}
loop {
match self.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
} else {
None
};
let n = buf.copy_from(src);
let std = self.std.clone();
self.state = Busy(sys::run(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};
(Operation::Write(res), buf)
}));
return Ready(Ok(n));
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
match op {
Operation::Read(_) => {
// We don't care about the result here. The fact
// that the cursor has advanced will be reflected in
// the next iteration of the loop
continue;
}
Operation::Write(res) => {
// If the previous write was successful, continue.
// Otherwise, error.
res?;
continue;
}
Operation::Seek(_) => {
// Ignore the seek
continue;
}
}
}
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
if let Some(e) = self.last_write_err.take() {
return Ready(Err(e.into()));
}
let (op, buf) = match self.state {
Idle(_) => return Ready(Ok(())),
Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
};
// The buffer is not used here
self.state = Idle(Some(buf));
match op {
Operation::Read(_) => Ready(Ok(())),
Operation::Write(res) => Ready(res),
Operation::Seek(_) => Ready(Ok(())),
}
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
impl From<sys::File> for File {
fn from(std: sys::File) -> Self {
Self::from_std(std)
}
}
impl fmt::Debug for File {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("tokio::fs::File")
.field("std", &self.std)
.finish()
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for File {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.std.as_raw_fd()
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for File {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.std.as_raw_handle()
}
}