From 53a723b4bab66fb32b066feb3808b73d72b4133c Mon Sep 17 00:00:00 2001 From: bp Date: Tue, 3 Mar 2015 21:48:36 -0500 Subject: [PATCH] making Exa.load() stat files in parallel --- src/main.rs | 78 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 61 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index b9b037d..6ffe614 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,9 @@ extern crate git2; use std::env; use std::old_io::{fs, FileType}; +use std::os::num_cpus; +use std::sync::mpsc::{channel, sync_channel}; +use std::thread; use dir::Dir; use file::File; @@ -52,31 +55,72 @@ impl<'a> Exa<'a> { } } - fn load(&mut self, iter: T) where T: Iterator { + fn load(&mut self, files: &[String]) { // Separate the user-supplied paths into directories and files. // Files are shown first, and then each directory is expanded // and listed second. - for file in iter { - let path = Path::new(file); - match fs::stat(&path) { - Ok(stat) => { - if stat.kind == FileType::Directory { - if self.options.dir_action.is_tree() { - self.files.push(File::with_stat(stat, &path, None, true)); + + let is_tree = self.options.dir_action.is_tree(); + let total_files = files.len(); + + // Denotes the maxinum number of concurrent threads + let (thread_capacity_tx, thread_capacity_rs) = sync_channel(8 * num_cpus()); + + // Communication between consumer thread and producer threads + enum StatResult<'a> { + File(File<'a>), + Path(Path) + } + let (results_tx, results_rx) = channel(); + + // Spawn consumer thread + let _consumer = thread::scoped(move || { + for _ in 0..total_files { + + // Make room for more producer threads + let _ = thread_capacity_rs.recv(); + + // Receive a producer's result + match results_rx.recv() { + Ok(result) => match result { + StatResult::File(file) => self.files.push(file), + StatResult::Path(path) => self.dirs.push(path) + }, + Err(_) => unreachable!() + } + self.count += 1; + } + }); + + for file in files.iter() { + let file = file.clone(); + let results_tx = results_tx.clone(); + + // Block until there is room for another thread + let _ = thread_capacity_tx.send(()); + + // Spawn producer thread + thread::spawn(move || { + let path = Path::new(file.clone()); + match fs::stat(&path) { + Ok(stat) => { + if stat.kind == FileType::Directory { + if is_tree { + let _ = results_tx.send(StatResult::File(File::with_stat(stat, &path, None, true))); + } + else { + let _ = results_tx.send(StatResult::Path(path)); + } } else { - self.dirs.push(path); + let _ = results_tx.send(StatResult::File(File::with_stat(stat, &path, None, false))); } } - else { - self.files.push(File::with_stat(stat, &path, None, false)); - } + Err(e) => println!("{}: {}", file, e), } - Err(e) => println!("{}: {}", file, e), - } - - self.count += 1; + }); } + } fn print_files(&self) { @@ -154,7 +198,7 @@ fn main() { match Options::getopts(args.tail()) { Ok((options, paths)) => { let mut exa = Exa::new(options); - exa.load(paths.iter()); + exa.load(&paths); exa.print_files(); exa.print_dirs(); },