Parallel computing
On our computers, we usually have multiple cores which can work in parallel. In this section, we will use several threads. Each thread will analyze a subset of the input data, and we will later aggregate the results.
Here is how it will work for n
threads:
- The number of lines (let's call it
l
) will be divided inton
chunks. The goal is to givel/n
lines to each thread. - A MPSC (multiple producers single consumer) channel will be created.
- Each thread will compute the statistics on the lines it got and will put the map containing the result on the channel.
- The main thread will aggregate the various statistics into a single map and return it.
We will use scoped threads to alleviate lifetime constraints.
Creating the parallel version
Exercise 2.a: Create a function with the following signature which uses up to n
threads to compute the number of occurrences of each characters:
fn count_chars_parallel<S: AsRef<str> + Sync>(input: &[S], n: usize) ->
HashMap<char, usize>
You will notice that we have an extra constraint on S
: since objects of type S
will b referenced from other threads (the one we will be creating), it must be Sync
in addition to being AsRef<str>
.
Breakdown of the count_chars_parallel()
function
This function must do the following things, in order.
First it must create a (tx, rx)
pair corresponding to a MPSC channel.
It must then create a scope
in which new threads will be spawned.
For each chunk
of size n
or less (for the last chunk):
tx
must be cloned into a new variable that will be captured by the new thread;- a new thread should be spawned from the current scope, capturing
tx
and the current chunk.
This thread will compute the characters' occurrences (using count_chars()
) and send the result on the MPSC channel using its captured tx
clone.
It is now time to close the thread scope and to concentrate on receiving data from the spawned threads. Since we are after the thread::scope()
call, at this stage we know that all analysis have been sent on the MPSC channel.
How does the reception part work?
rx.recv()
will wait until data is available. As long as there is data available on the channel, it will returnOk(some_data)
.- If all senders are dead (
tx
and all its clones have been dropped) and all data has been read,rx.recv()
will returnErr(…)
.
At this stage, all threads have terminated their work. We can read data from the MPSC channel through rx
: we will get a bunch of Ok(…)
containing data from the threads, then Err(…)
saying that everything has terminated. Right? No… We have to get rid of the tx
variable which is still present. We have cloned it each time we have created a thread, but since tx
still exists the use of the channel is never considered complete.
How do we get rid of tx
? std::mem::drop(tx)
is intended for that usage.
Now that we have done that, we can collect every map which comes from the MPSC channel. Since you already know how to do that, let us share the code to build the aggregated hash map and save some time:
let mut freq = HashMap::default();
while let Ok(fr) = rx.recv() {
for (c, n) in fr {
*freq.entry(c).or_default() += n;
}
}
Returning freq
is the last thing to do.
Calling the parallel version
Exercise 2.b: Update your main()
function so that it uses count_chars_parallel()
with, for example, 4 threads, so that it processes Moby Dick text faster.