| use callback::Callback; |
| use config::{Config, MAX_WORKERS}; |
| use park::{BoxPark, BoxedPark, DefaultPark}; |
| use pool::{Pool, MAX_BACKUP}; |
| use shutdown::ShutdownTrigger; |
| use thread_pool::ThreadPool; |
| use worker::{self, Worker, WorkerId}; |
| |
| use std::any::Any; |
| use std::cmp::max; |
| use std::error::Error; |
| use std::fmt; |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| use crossbeam_deque::Injector; |
| use num_cpus; |
| use tokio_executor::park::Park; |
| use tokio_executor::Enter; |
| |
| /// Builds a thread pool with custom configuration values. |
| /// |
| /// Methods can be chained in order to set the configuration values. The thread |
| /// pool is constructed by calling [`build`]. |
| /// |
| /// New instances of `Builder` are obtained via [`Builder::new`]. |
| /// |
| /// See function level documentation for details on the various configuration |
| /// settings. |
| /// |
| /// [`build`]: #method.build |
| /// [`Builder::new`]: #method.new |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// use futures::future::{Future, lazy}; |
| /// use std::time::Duration; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .pool_size(4) |
| /// .keep_alive(Some(Duration::from_secs(30))) |
| /// .build(); |
| /// |
| /// thread_pool.spawn(lazy(|| { |
| /// println!("called from a worker thread"); |
| /// Ok(()) |
| /// })); |
| /// |
| /// // Gracefully shutdown the threadpool |
| /// thread_pool.shutdown().wait().unwrap(); |
| /// # } |
| /// ``` |
| pub struct Builder { |
| /// Thread pool specific configuration values |
| config: Config, |
| |
| /// Number of workers to spawn |
| pool_size: usize, |
| |
| /// Maximum number of futures that can be in a blocking section |
| /// concurrently. |
| max_blocking: usize, |
| |
| /// Generates the `Park` instances |
| new_park: Box<Fn(&WorkerId) -> BoxPark>, |
| } |
| |
| impl Builder { |
| /// Returns a new thread pool builder initialized with default configuration |
| /// values. |
| /// |
| /// Configuration methods can be chained on the return value. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// use std::time::Duration; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .pool_size(4) |
| /// .keep_alive(Some(Duration::from_secs(30))) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn new() -> Builder { |
| let num_cpus = max(1, num_cpus::get()); |
| |
| let new_park = |
| Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark); |
| |
| Builder { |
| pool_size: num_cpus, |
| max_blocking: 100, |
| config: Config { |
| keep_alive: None, |
| name_prefix: None, |
| stack_size: None, |
| around_worker: None, |
| after_start: None, |
| before_stop: None, |
| panic_handler: None, |
| }, |
| new_park, |
| } |
| } |
| |
| /// Set the maximum number of worker threads for the thread pool instance. |
| /// |
| /// This must be a number between 1 and 32,768 though it is advised to keep |
| /// this value on the smaller side. |
| /// |
| /// The default value is the number of cores available to the system. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .pool_size(4) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn pool_size(&mut self, val: usize) -> &mut Self { |
| assert!(val >= 1, "at least one thread required"); |
| assert!(val <= MAX_WORKERS, "max value is {}", MAX_WORKERS); |
| |
| self.pool_size = val; |
| self |
| } |
| |
| /// Set the maximum number of concurrent blocking sections. |
| /// |
| /// When the maximum concurrent `blocking` calls is reached, any further |
| /// calls to `blocking` will return `NotReady` and the task is notified once |
| /// previously in-flight calls to `blocking` return. |
| /// |
| /// This must be a number between 1 and 32,768 though it is advised to keep |
| /// this value on the smaller side. |
| /// |
| /// The default value is 100. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .max_blocking(200) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn max_blocking(&mut self, val: usize) -> &mut Self { |
| assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP); |
| self.max_blocking = val; |
| self |
| } |
| |
| /// Set the thread keep alive duration |
| /// |
| /// If set, a thread that has completed a `blocking` call will wait for up |
| /// to the specified duration to become a worker thread again. Once the |
| /// duration elapses, the thread will shutdown. |
| /// |
| /// When the value is `None`, the thread will wait to become a worker |
| /// thread forever. |
| /// |
| /// The default value is `None`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// use std::time::Duration; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .keep_alive(Some(Duration::from_secs(30))) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self { |
| self.config.keep_alive = val; |
| self |
| } |
| |
| /// Sets a callback to be triggered when a panic during a future bubbles up |
| /// to Tokio. By default Tokio catches these panics, and they will be |
| /// ignored. The parameter passed to this callback is the same error value |
| /// returned from std::panic::catch_unwind(). To abort the process on |
| /// panics, use std::panic::resume_unwind() in this callback as shown |
| /// below. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .panic_handler(|err| std::panic::resume_unwind(err)) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn panic_handler<F>(&mut self, f: F) -> &mut Self |
| where |
| F: Fn(Box<Any + Send>) + Send + Sync + 'static, |
| { |
| self.config.panic_handler = Some(Arc::new(f)); |
| self |
| } |
| |
| /// Set name prefix of threads spawned by the scheduler |
| /// |
| /// Thread name prefix is used for generating thread names. For example, if |
| /// prefix is `my-pool-`, then threads in the pool will get names like |
| /// `my-pool-1` etc. |
| /// |
| /// If this configuration is not set, then the thread will use the system |
| /// default naming scheme. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .name_prefix("my-pool-") |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { |
| self.config.name_prefix = Some(val.into()); |
| self |
| } |
| |
| /// Set the stack size (in bytes) for worker threads. |
| /// |
| /// The actual stack size may be greater than this value if the platform |
| /// specifies minimal stack size. |
| /// |
| /// The default stack size for spawned threads is 2 MiB, though this |
| /// particular stack size is subject to change in the future. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .stack_size(32 * 1024) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn stack_size(&mut self, val: usize) -> &mut Self { |
| self.config.stack_size = Some(val); |
| self |
| } |
| |
| /// Execute function `f` on each worker thread. |
| /// |
| /// This function is provided a handle to the worker and is expected to call |
| /// [`Worker::run`], otherwise the worker thread will shutdown without doing |
| /// any work. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .around_worker(|worker, _| { |
| /// println!("worker is starting up"); |
| /// worker.run(); |
| /// println!("worker is shutting down"); |
| /// }) |
| /// .build(); |
| /// # } |
| /// ``` |
| /// |
| /// [`Worker::run`]: struct.Worker.html#method.run |
| pub fn around_worker<F>(&mut self, f: F) -> &mut Self |
| where |
| F: Fn(&Worker, &mut Enter) + Send + Sync + 'static, |
| { |
| self.config.around_worker = Some(Callback::new(f)); |
| self |
| } |
| |
| /// Execute function `f` after each thread is started but before it starts |
| /// doing work. |
| /// |
| /// This is intended for bookkeeping and monitoring use cases. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .after_start(|| { |
| /// println!("thread started"); |
| /// }) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn after_start<F>(&mut self, f: F) -> &mut Self |
| where |
| F: Fn() + Send + Sync + 'static, |
| { |
| self.config.after_start = Some(Arc::new(f)); |
| self |
| } |
| |
| /// Execute function `f` before each thread stops. |
| /// |
| /// This is intended for bookkeeping and monitoring use cases. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .before_stop(|| { |
| /// println!("thread stopping"); |
| /// }) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn before_stop<F>(&mut self, f: F) -> &mut Self |
| where |
| F: Fn() + Send + Sync + 'static, |
| { |
| self.config.before_stop = Some(Arc::new(f)); |
| self |
| } |
| |
| /// Customize the `park` instance used by each worker thread. |
| /// |
| /// The provided closure `f` is called once per worker and returns a `Park` |
| /// instance that is used by the worker to put itself to sleep. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// # fn decorate<F>(f: F) -> F { f } |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .custom_park(|_| { |
| /// use tokio_threadpool::park::DefaultPark; |
| /// |
| /// // This is the default park type that the worker would use if we |
| /// // did not customize it. |
| /// let park = DefaultPark::new(); |
| /// |
| /// // Decorate the `park` instance, allowing us to customize work |
| /// // that happens when a worker thread goes to sleep. |
| /// decorate(park) |
| /// }) |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self |
| where |
| F: Fn(&WorkerId) -> P + 'static, |
| P: Park + Send + 'static, |
| P::Error: Error, |
| { |
| self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id)))); |
| |
| self |
| } |
| |
| /// Create the configured `ThreadPool`. |
| /// |
| /// The returned `ThreadPool` instance is ready to spawn tasks. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # extern crate tokio_threadpool; |
| /// # extern crate futures; |
| /// # use tokio_threadpool::Builder; |
| /// |
| /// # pub fn main() { |
| /// let thread_pool = Builder::new() |
| /// .build(); |
| /// # } |
| /// ``` |
| pub fn build(&self) -> ThreadPool { |
| trace!("build; num-workers={}", self.pool_size); |
| |
| // Create the worker entry list |
| let workers: Arc<[worker::Entry]> = { |
| let mut workers = vec![]; |
| |
| for i in 0..self.pool_size { |
| let id = WorkerId::new(i); |
| let park = (self.new_park)(&id); |
| let unpark = park.unpark(); |
| |
| workers.push(worker::Entry::new(park, unpark)); |
| } |
| |
| workers.into() |
| }; |
| |
| let queue = Arc::new(Injector::new()); |
| |
| // Create a trigger that will clean up resources on shutdown. |
| // |
| // The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain |
| // strong references. |
| let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone())); |
| |
| // Create the pool |
| let pool = Arc::new(Pool::new( |
| workers, |
| Arc::downgrade(&trigger), |
| self.max_blocking, |
| self.config.clone(), |
| queue, |
| )); |
| |
| ThreadPool::new2(pool, trigger) |
| } |
| } |
| |
| impl fmt::Debug for Builder { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| fmt.debug_struct("Builder") |
| .field("config", &self.config) |
| .field("pool_size", &self.pool_size) |
| .field("new_park", &"Box<Fn() -> BoxPark>") |
| .finish() |
| } |
| } |