Start using threadpool crate

This commit is contained in:
Ben S 2015-06-05 03:04:56 +01:00
parent cc1d6aa5f1
commit ea3a57eb97
3 changed files with 32 additions and 35 deletions

11
Cargo.lock generated
View File

@ -4,7 +4,7 @@ version = "0.2.0"
dependencies = [
"ansi_term 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"bitflags 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"datetime 0.2.0",
"datetime 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"getopts 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"git2 0.2.11 (git+https://github.com/alexcrichton/git2-rs.git)",
"libc 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -13,6 +13,7 @@ dependencies = [
"num_cpus 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"number_prefix 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"pad 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-width 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"users 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -34,7 +35,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "datetime"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"locale 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"num 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
@ -239,6 +241,11 @@ dependencies = [
"rand 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "threadpool"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "tz"
version = "0.2.0"

View File

@ -17,6 +17,7 @@ natord = "1.0.7"
num_cpus = "*"
number_prefix = "0.2.3"
pad = "0.1.1"
threadpool = "*"
unicode-width = "*"
users = "0.4.0"

View File

@ -1,5 +1,5 @@
#![feature(collections, convert, core, exit_status, file_type, fs_ext, fs_mode)]
#![feature(metadata_ext, raw_ext, scoped, symlink_metadata)]
#![feature(metadata_ext, raw_ext, symlink_metadata)]
extern crate ansi_term;
extern crate datetime;
@ -10,6 +10,7 @@ extern crate natord;
extern crate num_cpus;
extern crate number_prefix;
extern crate pad;
extern crate threadpool;
extern crate users;
extern crate unicode_width;
@ -19,8 +20,9 @@ extern crate git2;
use std::env;
use std::fs;
use std::path::{Component, Path, PathBuf};
use std::sync::mpsc::{channel, sync_channel};
use std::thread;
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
use dir::Dir;
use file::File;
@ -36,6 +38,7 @@ mod options;
mod output;
mod term;
#[cfg(not(test))]
struct Exa<'dir> {
count: usize,
@ -56,15 +59,13 @@ impl<'dir> Exa<'dir> {
}
fn load(&mut self, files: &[String]) {
// Separate the user-supplied paths into directories and files.
// Files are shown first, and then each directory is expanded
// and listed second.
let is_tree = self.options.dir_action.is_tree() || self.options.dir_action.is_as_file();
let total_files = files.len();
// Denotes the maxinum number of concurrent threads
let (thread_capacity_tx, thread_capacity_rs) = sync_channel(8 * num_cpus::get());
// Communication between consumer thread and producer threads
enum StatResult<'dir> {
@ -73,39 +74,17 @@ impl<'dir> Exa<'dir> {
Error
}
let (results_tx, results_rx) = channel();
// Spawn consumer thread
let _consumer = thread::scoped(move || {
for _ in 0..total_files {
// Make room for more producer threads
let _ = thread_capacity_rs.recv();
// Receive a producer's result
match results_rx.recv() {
Ok(result) => match result {
StatResult::File(file) => self.files.push(file),
StatResult::Dir(path) => self.dirs.push(path),
StatResult::Error => ()
},
Err(_) => unreachable!(),
}
self.count += 1;
}
});
let pool = ThreadPool::new(8 * num_cpus::get());
let (tx, rx) = channel();
for file in files.iter() {
let tx = tx.clone();
let file = file.clone();
let results_tx = results_tx.clone();
// Block until there is room for another thread
let _ = thread_capacity_tx.send(());
// Spawn producer thread
thread::spawn(move || {
pool.execute(move || {
let path = Path::new(&*file);
let _ = results_tx.send(match fs::metadata(&path) {
let _ = tx.send(match fs::metadata(&path) {
Ok(metadata) => {
if !metadata.is_dir() {
StatResult::File(File::with_metadata(metadata, &path, None, false))
@ -124,6 +103,16 @@ impl<'dir> Exa<'dir> {
});
});
}
// Spawn consumer thread
for result in rx.iter().take(total_files) {
match result {
StatResult::File(file) => self.files.push(file),
StatResult::Dir(path) => self.dirs.push(path),
StatResult::Error => ()
}
self.count += 1;
}
}
fn print_files(&self) {