Rust—common mistake 2 🐜 Link to heading

In the previous series, we investigated a common mistake regarding implicit lifetime specification. Today, let’s say we want to implement a ThreadPool in Rust. We create n threads, each waiting for some work to execute. The work will be available via a channel, so all the threads will share a Receiver<Work> object of the channel. Here is the minimal implementation of ThreadPool just described:

use std::sync::mpsc::{Sender, channel};
use std::sync::{Mutex, Arc};
use std::thread;

type Work = Box<dyn FnOnce() + Send + 'static>;

struct ThreadPool {
    tx: Sender<Work>, // to submit work through the channel
}

impl ThreadPool {
    fn new(n_thread: usize) -> Self {
        let (tx, rx) = channel::<Work>();
        let rx = Arc::new(Mutex::new(rx));
        for _ in 0..n_thread {
            let rx = Arc::clone(&rx); // receive work through the channel
            thread::spawn(move || {
                loop {
                    match rx.lock().unwrap().recv() {
                        Ok(work) => work(), // execute the work
                        Err(_) => break, // threadpool dropped
                    }
                }
            });
        }
        Self { tx }
    }
    
    fn execute(&self, work: impl FnOnce() + Send + 'static) {
        self.tx.send(Box::new(work)).unwrap();
    }
}

The meat of the implementation is the worker’s body being spawned, i.e.,

loop {
    match rx.lock().unwrap().recv() {
        Ok(work) => work(), // execute the work
        Err(_) => break, // threadpool dropped
    }
}

Now, here is the $1M question—do you see any bug 🐜 with the implementation above? If you can spot it right away, you must be a skilled Rust dev!

Bug 🐜 Link to heading

For those, including myself, who cannot spot the bug immediately, let’s put our ThreadPool into work and see what the issue is.

fn main() {
    let pool = ThreadPool::new(4);
    let (tx, rx) = channel();
    for ix in 0..4 {
        let tx = tx.clone();
        pool.execute(move || {
            thread::sleep(std::time::Duration::from_secs(1));
            tx.send(ix).unwrap();
        });
    }
    
    for x in rx.into_iter().take(4) {
        println!("{}", x);
    }
}

What is your expected output? How long will it take for the code to execute? Let’s find out here.

A bug 🐜 in ThreadPool forces serial execution of work

Strangely enough, the work is not being executed in parallel; they are being executed in series! That is, it takes not 1 second but 4 seconds to run! Obviously, if we cannot parallelize the work, this defeats the whole purpose of our ThreadPool struct. What is happening?

Well, the issue here is that the work is being executed while Mutex is locked. This blocks other threads from retrieving next work, forcing serial executions. The fix is quite trivial actually.

--- before
+++ after
@@ -1,5 +1,6 @@
 loop {
-    match rx.lock().unwrap().recv() {
+    let result = rx.lock().unwrap().recv();
+    match result {
         Ok(work) => work(), // execute the work
         Err(_) => break, // threadpool dropped
     }

I know, it is hard to believe that it makes any difference, but it does. We simply need to define a variable result and then do a match as a two statements. By doing so, we drop the mutex lock before the match statement, hence releasing it so that other threads can access the shared rx object. We then run the work() inside the match statement while mutex is unlocked.

Let’s see this in action here.

ThreadPool correctly runs work in parallel

I don’t know exactly why they make any difference. To me, they should be exactly identical, but I guess this is just some weird cases of Rust. I’d really appreciate if anyone could actually explain why those two should make any difference.

Edit Link to heading

A few folks explained why the two made the difference. It seems like the temporary variable from lock() is not dropped until its statement is complete. In the first case, we have the match clause in the same statement where we lock(), so this temporary lock is held for the duration of match clause.

Here is where I disagree with the current Rust language design. I dare say that the Rust compiler should expand the single statement into multiple statements internally:

// when a user writes this single statement
match rx.lock().unwrap().recv() { ... } // lock is held by the temporary variable but goes out of scope

// the compiler should implicitly expand it into
let lock = rx.lock().unwrap(); // temporary lock variable above
let result = lock.recv();      // temporary lock variable goes out of the scope
std::mem::drop(lock);          // drop the temporary variable as soon as it goes out of the scope
match result { ... }           // result must be held for the duration of match clause

because the temporary lock object should be dropped immediately after the recv() call, as that is where the temporary variable’s scope ends. There is no way to reference this temporary lock object anymore once recv() is called, so this is precisely where the scope of this temporary variable should end. If one wants to keep holding the lock, one will need to declare the lock variable explicitly, i.e.,

let lock = rx.lock.unwrap(); // explicit variable to hold the lock
match lock.recv() { ... }    // lock is still in the scope

Does anyone agree with my suggestion? Isn’t this more intuitive? Or is it just me alone?

References Link to heading