1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
#![deny(missing_docs)]
#![cfg_attr(test, deny(warnings))]
#![deny(missing_debug_implementations)]
#![doc(html_root_url = "https://docs.rs/futures-fs/0.0.5")]
//! A thread pool to handle file IO operations.
//!
//! # Examples
//!
//! ```rust
//! extern crate futures;
//! extern crate futures_fs;
//!
//! use futures::{Future, Stream};
//! use futures_fs::FsPool;
//!
//! # fn run() {
//! let fs = FsPool::default();
//!
//! // our source file
//! let read = fs.read("/home/sean/foo.txt", Default::default());
//!
//! // default writes options to create a new file
//! let write = fs.write("/home/sean/out.txt", Default::default());
//!
//! // block this thread!
//! // the reading and writing however will happen off-thread
//! read.forward(write).wait()
//! .expect("IO error piping foo.txt to out.txt");
//! # }
//! # fn main() {}
//! ```
extern crate bytes;
#[macro_use]
extern crate futures;
extern crate futures_cpupool;
use std::{fmt, fs, io};
use std::path::Path;
use std::sync::Arc;
use futures::{Async, Future, Poll};
use futures::future::{lazy, Executor};
use futures::sync::oneshot::{self, Receiver};
use futures_cpupool::CpuPool;
pub use self::read::{FsReadStream, ReadOptions};
pub use self::write::{FsWriteSink, WriteOptions};
mod read;
mod write;
/// A pool of threads to handle file IO.
#[derive(Clone)]
pub struct FsPool {
executor: Arc<Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync>,
}
// ===== impl FsPool ======
impl FsPool {
/// Creates a new `FsPool`, with the supplied number of threads.
pub fn new(threads: usize) -> Self {
FsPool {
executor: Arc::new(CpuPool::new(threads)),
}
}
/// Creates a new `FsPool`, from an existing `Executor`.
///
/// # Note
///
/// The executor will be used to spawn tasks that can block the thread.
/// It likely should not be an executor that is also handling light-weight
/// tasks, but a dedicated thread pool.
///
/// The most common use of this constructor is to allow creating a single
/// `CpuPool` for your application for blocking tasks, and sharing it with
/// `FsPool` and any other things needing a thread pool.
pub fn with_executor<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
{
FsPool {
executor: Arc::new(executor),
}
}
#[doc(hidden)]
#[deprecated(note = "renamed to with_executor")]
pub fn from_executor<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
{
FsPool {
executor: Arc::new(executor),
}
}
/// Returns a `Stream` of the contents of the file at the supplied path.
pub fn read<P>(&self, path: P, opts: ReadOptions) -> FsReadStream
where
P: AsRef<Path> + Send + 'static,
{
::read::new(self, path, opts)
}
/// Returns a `Stream` of the contents of the supplied file.
pub fn read_file(&self, file: fs::File, opts: ReadOptions) -> FsReadStream {
::read::new_from_file(self, file, opts)
}
/// Returns a `Sink` to send bytes to be written to the file at the supplied path.
pub fn write<P>(&self, path: P, opts: WriteOptions) -> FsWriteSink
where
P: AsRef<Path> + Send + 'static,
{
::write::new(self, path, opts)
}
/// Returns a `Sink` to send bytes to be written to the supplied file.
pub fn write_file(&self, file: fs::File) -> FsWriteSink {
::write::new_from_file(self, file)
}
/// Returns a `Future` that resolves when the target file is deleted.
pub fn delete<P>(&self, path: P) -> FsFuture<()>
where
P: AsRef<Path> + Send + 'static,
{
let (tx, rx) = oneshot::channel();
let fut = Box::new(lazy(move || {
tx.send(fs::remove_file(path).map_err(From::from))
.map_err(|_| ())
}));
self.executor.execute(fut).unwrap();
fs(rx)
}
}
impl Default for FsPool {
fn default() -> FsPool {
FsPool::new(4)
}
}
impl fmt::Debug for FsPool {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FsPool").finish()
}
}
// ===== impl FsFuture =====
/// A future representing work in the `FsPool`.
pub struct FsFuture<T> {
inner: Receiver<io::Result<T>>,
}
fn fs<T: Send>(rx: Receiver<io::Result<T>>) -> FsFuture<T> {
FsFuture { inner: rx }
}
impl<T: Send + 'static> Future for FsFuture<T> {
type Item = T;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll().unwrap() {
Async::Ready(Ok(item)) => Ok(Async::Ready(item)),
Async::Ready(Err(e)) => Err(e),
Async::NotReady => Ok(Async::NotReady),
}
}
}
impl<T> fmt::Debug for FsFuture<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FsFuture").finish()
}
}
fn _assert_kinds() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
fn assert_clone<T: Clone>() {}
assert_send::<FsPool>();
assert_sync::<FsPool>();
assert_clone::<FsPool>();
assert_send::<FsFuture<()>>();
}