From ea3a57eb971fad30280f3f8fcea04343b159eb51 Mon Sep 17 00:00:00 2001 From: Ben S Date: Fri, 5 Jun 2015 03:04:56 +0100 Subject: [PATCH] Start using threadpool crate --- Cargo.lock | 11 +++++++++-- Cargo.toml | 1 + src/main.rs | 55 +++++++++++++++++++++-------------------------------- 3 files changed, 32 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fce49ce..f995dfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,7 +4,7 @@ version = "0.2.0" dependencies = [ "ansi_term 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "datetime 0.2.0", + "datetime 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "getopts 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "git2 0.2.11 (git+https://github.com/alexcrichton/git2-rs.git)", "libc 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -13,6 +13,7 @@ dependencies = [ "num_cpus 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "pad 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-width 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "users 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -34,7 +35,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "datetime" -version = "0.2.0" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "locale 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "num 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -239,6 +241,11 @@ dependencies = [ "rand 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "threadpool" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "tz" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 987d0c7..e8f5939 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ natord = "1.0.7" num_cpus = "*" number_prefix = "0.2.3" pad = "0.1.1" +threadpool = "*" unicode-width = "*" users = "0.4.0" diff --git a/src/main.rs b/src/main.rs index ddbad0d..cc4cb85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ #![feature(collections, convert, core, exit_status, file_type, fs_ext, fs_mode)] -#![feature(metadata_ext, raw_ext, scoped, symlink_metadata)] +#![feature(metadata_ext, raw_ext, symlink_metadata)] extern crate ansi_term; extern crate datetime; @@ -10,6 +10,7 @@ extern crate natord; extern crate num_cpus; extern crate number_prefix; extern crate pad; +extern crate threadpool; extern crate users; extern crate unicode_width; @@ -19,8 +20,9 @@ extern crate git2; use std::env; use std::fs; use std::path::{Component, Path, PathBuf}; -use std::sync::mpsc::{channel, sync_channel}; -use std::thread; + +use threadpool::ThreadPool; +use std::sync::mpsc::channel; use dir::Dir; use file::File; @@ -36,6 +38,7 @@ mod options; mod output; mod term; + #[cfg(not(test))] struct Exa<'dir> { count: usize, @@ -56,15 +59,13 @@ impl<'dir> Exa<'dir> { } fn load(&mut self, files: &[String]) { + // Separate the user-supplied paths into directories and files. // Files are shown first, and then each directory is expanded // and listed second. - let is_tree = self.options.dir_action.is_tree() || self.options.dir_action.is_as_file(); let total_files = files.len(); - // Denotes the maxinum number of concurrent threads - let (thread_capacity_tx, thread_capacity_rs) = sync_channel(8 * num_cpus::get()); // Communication between consumer thread and producer threads enum StatResult<'dir> { @@ -73,39 +74,17 @@ impl<'dir> Exa<'dir> { Error } - let (results_tx, results_rx) = channel(); - - // Spawn consumer thread - let _consumer = thread::scoped(move || { - for _ in 0..total_files { - - // Make room for more producer threads - let _ = thread_capacity_rs.recv(); - - // Receive a producer's result - match results_rx.recv() { - Ok(result) => match result { - StatResult::File(file) => self.files.push(file), - StatResult::Dir(path) => self.dirs.push(path), - StatResult::Error => () - }, - Err(_) => unreachable!(), - } - self.count += 1; - } - }); + let pool = ThreadPool::new(8 * num_cpus::get()); + let (tx, rx) = channel(); for file in files.iter() { + let tx = tx.clone(); let file = file.clone(); - let results_tx = results_tx.clone(); - - // Block until there is room for another thread - let _ = thread_capacity_tx.send(()); // Spawn producer thread - thread::spawn(move || { + pool.execute(move || { let path = Path::new(&*file); - let _ = results_tx.send(match fs::metadata(&path) { + let _ = tx.send(match fs::metadata(&path) { Ok(metadata) => { if !metadata.is_dir() { StatResult::File(File::with_metadata(metadata, &path, None, false)) @@ -124,6 +103,16 @@ impl<'dir> Exa<'dir> { }); }); } + + // Spawn consumer thread + for result in rx.iter().take(total_files) { + match result { + StatResult::File(file) => self.files.push(file), + StatResult::Dir(path) => self.dirs.push(path), + StatResult::Error => () + } + self.count += 1; + } } fn print_files(&self) {