We'll look at two different simple programs; each implemented eight different ways:
std::thread
Yes, that’s a lot of languages. If you are interested in exploring these languages deeper, feel free to check out my own notes on Java, Perl, Rust, Ada, C, C++, and Go; or, of course, find the respective language home pages via a web search.
The two examples are:
These two examples are chosen to be really, really introductory! They don’t do any synchronization other than waiting for other threads to finish. Synchronization techniques that implement mutual exclusion are covered in the next set of notes where we will look at the famous Dining Philosophers.
A lot of concepts will emerge in looking at real code. We’ll study details and theory later.
The first program runs three threads in parallel:
The purpose of this introductory example is only to show how to set up and run threads, not how to use them effectively. In fact, we are going to completely ignore all synchronization concerns, letting the threads interleave however they will. Subsequent examples will tackle those concerns.
Java features thread objects, which are instances of class Thread
. Although there are several constructors for this class, the preferred way to make threads is to make a builder with the ofPlatform
or ofVirtual
methods, then create the thread with either start
or unstarted
. Although starting threads is explicit, we don’t have to do anything special to stop them. A program does not exit until all nondaemeon threads have finished (and all of the threads in this example are nondaemon threads, because what makes a thread a daemon thread is simply setting the thread’s daemon property to true
, that is all).
/**
* A first illustration of Java concurrency. Illustrates three simple threads:
* one that writes zeros, one that writes ones, and one that writes twos. Each
* thread writes to standard output with no synchronization, so the output is
* intentionally all mixed up.
*/
public class Trinary {
public static void main(String[] args) {
Thread.ofPlatform().start(() -> {
for (var i = 0; i < 5000; i++) {
System.out.print("0");
}
});
Thread.ofPlatform().start(() -> {
for (var i = 0; i < 5000; i++) {
System.out.print("1");
}
});
for (var i = 0; i < 5000; i++) {
System.out.print("2");
}
}
}
Run with java Trinary.java
.
ofPlatform
calls with ofVirtual
. Notice anything different?
Perl threads start running as soon as they are created with create
(or its alias new
). However, once the main thread stops, all other threads will immedately die. So, we need to explicitly wait for threads to finish by calling join
.
# A first illustration of Perl concurrency. Illustrates three simple threads:
# one that writes zeros, one that writes ones, and one that writes twos. Each
# thread writes to standard output with no synchronization, so the output is
# intentionally all mixed up.
use strict;
use warnings;
use threads;
my $zero_thread = threads->new(sub { print "0" for 1..5000; });
my $one_thread = threads->new(sub { print "1" for 1..5000; });
print "2" for 1..5000;
$zero_thread->join;
$one_thread->join;
Run with perl trinary.pl
.
On my machine, I got very, very little interleaving. Some runs had no interleaving at all. But when changing the threads to print 50,000 digits each, I noticed a bit more.
Rust follows Perl in that you explicitly spawn threads and you explicitly wait for them to finish.
// A first illustration of Rust concurrency. Illustrates three simple threads:
// one that writes zeros, one that writes ones, and one that writes twos. Each
// thread writes to standard output with no synchronization, so the output is
// intentionally all mixed up.
use std::thread;
fn main() {
let zeros = thread::spawn(|| {
for _ in 0..5000 {
print!("0");
}
});
let ones = thread::spawn(|| {
for _ in 0..5000 {
print!("1");
}
});
for _ in 0..5000 {
print!("2");
}
zeros.join().expect("Failed to join zeros thread");
ones.join().expect("Failed to join ones thread");
}
Run with cargo run --bin trinary
or cargo run --release --bin trinary
. Not surprisingly, I got a lot of interleaving with the former and less with the latter.
expect
to match on the Result
object returned from join
and always exit cleanly.
In Ada, tasks run in parallel with the unit in which they are declared. The tasks and their parent (more or less) begin together and terminate together. There are no explicit activation or termination calls. It’s all done implicitly based on program structure. Ada was an early implemetor of structured concurrency.
-- A first illustration of Ada concurrency. Illustrates three simple threads:
-- one that writes zeros, one that writes ones, and one that writes twos. Each
-- thread writes to standard output with no synchronization, so the output is
-- intentionally all mixed up.
with Ada.Text_IO;
use Ada.Text_IO;
procedure Trinary is
task Write_Zeros;
task Write_Ones;
task body Write_Zeros is
begin
for I in 1..5000 loop
Put ('0');
end loop;
end Write_Zeros;
task body Write_Ones is
begin
for I in 1..5000 loop
Put ('1');
end loop;
end Write_Ones;
begin
for I in 1..5000 loop
Put ('2');
end loop;
end Trinary;
Run with alr build trinary.adb && bin/trinary
.
The Posix threads library, a.k.a pthreads, has been ported to many operating systems. It's quite capable.
// A first illustration of C concurrency using the Posix threads (pthreads)
// library. Illustrates three simple threads: one that writes zeros, one that
// writes ones, and one that writes twos. Each thread writes to standard output
// with no synchronization, so the output is intentionally all mixed up.
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
// Each pthread requires a function to run. This function must take a
// single argument of type void* and return a pointer of type void*.
// In this program, we care about neither.
void* print_zeros(void* ignored) {
for (int i = 0; i < 5000; i++) {
printf("0");
}
return NULL;
}
void* print_ones(void* ignored) {
for (int i = 0; i < 5000; i++) {
printf("1");
}
return NULL;
}
int main() {
pthread_t zero_thread;
pthread_t one_thread;
// Create two threads and run them immediately
int rc = pthread_create(&zero_thread, NULL, &print_zeros, NULL);
if (rc) puts("Cannot create the zero-writing thread"), exit(rc);
rc = pthread_create(&one_thread, NULL, &print_ones, NULL);
if (rc) puts("Cannot create the one-writing thread"), exit(rc);
// This runs concurrently with the other two threads
for (int i = 0; i < 5000; i++) {
printf("2");
}
// Wait for the other two threads to finish. The second argument to join
// is a pointer to a void* that will receive the return value of the thread.
// In this case, we don't care about the return value, so we pass NULL.
// If we did care, we would need to allocate memory for the return value
// and pass a pointer to that memory.
rc = pthread_join(zero_thread, NULL);
if (rc) printf("Cannot join the zero-writing thread"), exit(rc);
rc = pthread_join(one_thread, NULL);
if (rc) printf("Cannot join the one-writing thread"), exit(rc);
return 0;
}
Run with gcc -std=c2x trinary.c -lpthread && ./a.out
.
On my machine, I got a good deal of interleaving.
For each of the other languages, letting the program throw an exception (Java, Perl, C++, Ada), panic (Rust, Go), or crash (Erlang) when a thread cannot be created or joined, is acceptable for this small example. C does not have exceptions or panics, and does not follow Erlang’s “Let it Crash” philosophy, so we’ve opted here to check every return code and deal with it.
In C++, we use the std::thread
. It feels similar to Perl and Rust. You can use the pthreads library with C++, but there’s no need.
// A first illustration of C++ concurrency. Illustrates three simple threads:
// one that writes zeros, one that writes ones, and one that writes twos. Each
// thread writes to standard output with no synchronization, so the output is
// intentionally all mixed up.
#include <iostream>
#include <thread>
int main() {
std::thread zero_thread([]() {
for (int i = 0; i < 5000; i++) {
std::cout << '0';
}
});
std::thread one_thread([]() {
for (int i = 0; i < 5000; i++) {
std::cout << '1';
}
});
for (int i = 0; i < 5000; i++) {
std::cout << '2';
}
zero_thread.join();
one_thread.join();
}
Run with g++ -std=c++20 trinary.cpp && ./a.out
.
As the printing tasks don’t interact with each other, it makes sense for the goroutines to use a wait group so the main program can wait for them all to finish. Channels could also be used here.
// A first illustration of Erlang concurrency. Illustrates three goroutines:
// one that writes zeros, one that writes ones, and one that writes twos. Each
// goroutine writes to standard output with no synchronization, so the output
// is intentionally all mixed up.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 5000; i++ {
fmt.Print("0")
}
}()
go func() {
defer wg.Done()
for i := 0; i < 5000; i++ {
fmt.Print("1")
}
}()
for i := 0; i < 5000; i++ {
fmt.Print("2")
}
wg.Wait()
}
Run with go run trinary.go
.
Erlang is a functional programming language and processes can only communicate via message passing. There’s no join
call or wait groups; we have to arrange for each process to send a message when it’s done, and have the main process wait on them by consuming the messages:
% A first illustration of Erlang concurrency. Illustrates three processes:
% one that writes zeros, one that writes ones, and one that writes twos. Each
% process writes to standard output with no synchronization, so the output is
% intentionally all mixed up.
main(_) ->
Waiter = self(),
spawn(fun() -> write(0, 5000), Waiter ! done end),
spawn(fun() -> write(1, 5000), Waiter ! done end),
spawn(fun() -> write(2, 5000), Waiter ! done end),
wait_for(3).
write(Value, Times) when Times > 0 ->
io:format("~p", [Value]),
write(Value, Times-1);
write(_, _) ->
ok.
wait_for(0) -> ok;
wait_for(N) ->
receive
done -> wait_for(N-1)
end.
Run with escript trinary.erl
.
This next example illustrates a function to sum all the values in an array, like so: if there are less than 100 elements in the array, the function just sums them up itself. Otherwise it spawns ten threads to sum up distinct partitions of the array. The main thread sums up all the partial sums.
We wish to explore different approaches to waiting for the threads to finish and how to get the partial sums. Will the threads each write to a shared array of partial sums? Or will they all try to increment a shared variable? Or will the partial sums be stored within the threads themselves, or an associated object accessible to the main thread?
When you have several of threads doing pretty much the same thing, it’s easiest to create a handful of tasks and submit them to, and invoke them with, an executor service. Each task is assigned to a thread created by the service, and a future is created for each thread. Calling future.get()
blocks until the task’s thread terminates, and which point the task’s return value is available in the future.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.IntStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
public class Summer {
/**
* Sums an array using 10 threads, unless the array has fewer than 100
* elements, in which case it is summed up directly.
*/
public static int sum(int[] a) throws InterruptedException, ExecutionException {
if (a.length < 100) {
return Arrays.stream(a).sum();
}
final var numThreads = 10;
var sliceSize = (a.length + numThreads - 1) / numThreads;
var tasks = new ArrayList<Callable<Integer>>(numThreads);
for (var i = 0; i < numThreads; i++) {
final int low = i * sliceSize;
final int high = Math.min(low + sliceSize, a.length);
tasks.add(() -> Arrays.stream(a, low, high).sum());
}
try (var executor = Executors.newFixedThreadPool(numThreads)) {
var total = 0;
for (var future : executor.invokeAll(tasks)) {
total += future.get();
}
return total;
}
}
public static void main(String[] args) throws Exception {
assert sum(new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }) == 55;
assert sum(new int[] { 100, 200, 300, 400 }) == 1000;
assert sum(new int[] {}) == 0;
assert sum(IntStream.generate(() -> 7).limit(20000).toArray()) == 140000;
assert sum(IntStream.generate(() -> 7).limit(19999).toArray()) == 139993;
assert sum(IntStream.range(0, 10003).toArray()) == 50025003;
System.out.println("All tests passed");
}
}
$ java -ea Summer.java All tests passed
By default, Perl threads do not share data. Thus we can’t let the spawned thread write into a variable that we expect the main thread to pick up. We have to get information out of this thread via its return value, which is obtained as the return value of join
.
use strict;
use warnings;
use threads;
use List::Util qw(sum);
sub sum_of {
my ($array_ref) = @_;
my $length = scalar @$array_ref;
if ($length < 100) {
return sum(@$array_ref) || 0;
}
my $num_threads = 10;
my $slice_size = int(($length + $num_threads - 1) / $num_threads);
my @threads;
for (my $i = 0; $i < $num_threads; $i++) {
my $low = $i * $slice_size;
my $high = ($low + $slice_size < $length) ? ($low + $slice_size) : $length;
push @threads, threads->create(sub {
return sum(@$array_ref[$low .. $high - 1]) || 0;
});
}
my $total = 0;
foreach my $thread (@threads) {
$total += $thread->join();
}
return $total;
}
use Test::More;
is(sum_of([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 55);
is(sum_of([100, 200, 300, 400]), 1000);
is(sum_of([]), 0);
is(sum_of([(7) x 20000]), 140000);
is(sum_of([(7) x 19999]), 139993);
is(sum_of([0 .. 10002]), 50025003);
done_testing();
$ perl summer.pl ok 1 ok 2 ok 3 ok 4 ok 5 ok 6 1..6
Rust has the same style of threading as Perl: we have to manually spawn and we get the results from calling join
. However because of the borrow checker, I ended up having to clone the slices. Ouch! Can you find a better way? Is there a better way?
use std::thread;
fn sum(a: &[i32]) -> i32 {
if a.len() < 100 {
return a.iter().sum();
}
let num_threads = 10;
let slice_size = (a.len() + num_threads - 1) / num_threads;
let mut handles = Vec::with_capacity(num_threads);
for i in 0..num_threads {
let low = i * slice_size;
let high = std::cmp::min(low + slice_size, a.len());
// Clone the slice so the thread owns its data
let slice = a[low..high].to_vec();
handles.push(thread::spawn(move || slice.into_iter().sum::<i32>()));
}
// Collect results from all threads and sum them up
handles.into_iter().map(|h| h.join().unwrap()).sum()
}
fn main() {
assert_eq!(sum(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 55);
assert_eq!(sum(&[100, 200, 300, 400]), 1000);
assert_eq!(sum(&[]), 0);
assert_eq!(sum(&vec![7; 20000]), 140000);
assert_eq!(sum(&vec![7; 19999]), 139993);
assert_eq!(sum(&(0..10003).collect::<Vec<_>>().as_slice()), 50025003);
println!("All tests passed");
}
$ cargo run --bin summer All tests passed
.to_vec()
? Why does the borrow checker complain? Explain the error message.
Again we take advantage of structured concurrency! We get synchronization and termination pretty much for free by nesting the tasks inside a block:
with Ada.Text_IO, Ada.Assertions;
use Ada.Text_IO, Ada.Assertions;
procedure Summer is
type Numbers is array (Integer range <>) of Integer;
NUM_TASKS : constant := 10;
function Sum (A : Numbers) return Integer is
-- Where each task will deposit its result.
Results : array (1 .. NUM_TASKS) of Integer := [others => 0];
-- Used for the small array case, and for summing up the results.
Total : Integer := 0;
begin
declare
-- Each task receives a slice range and task index for its result.
task type Sum_Slice (First, Last, Task_Index : Integer);
task body Sum_Slice is
Task_Sum : Integer := 0;
begin
for I in First .. Last loop
Task_Sum := Task_Sum + A (I);
end loop;
Results (Task_Index) := Task_Sum;
end Sum_Slice;
Tasks : array (1 .. NUM_TASKS) of access Sum_Slice;
begin
if A'Length < 100 then
for I in A'Range loop
Total := Total + A (I);
end loop;
return Total;
end if;
for I in 1 .. NUM_TASKS loop
declare
Slice_Size : constant Integer := A'Length / NUM_TASKS;
First : constant Integer := A'First + (I - 1) * Slice_Size;
Last : constant Integer := A'First + I * Slice_Size - 1;
begin
Tasks (I) := new Sum_Slice (First, Last, I);
end;
end loop;
-- Keep tasks alive by referencing them (suppresses warning)
pragma Unreferenced (Tasks);
end;
-- All tasks are guaranteed to finish before we get here.
for I in Results'Range loop
Total := Total + Results (I);
end loop;
return Total;
end Sum;
begin
declare
Unconventional : constant Numbers (103 .. 110) := [9, 9, 3, others => 6];
Empty : constant Numbers (1 .. 0) := [];
begin
Assert (Sum (Unconventional) = 51);
Assert (Sum ([1, 2, 3]) = 6);
Assert (Sum ([for I in 1 .. 10 => I]) = 55);
Assert (Sum ([for I in 1 .. 20000 => 7]) = 140000);
Assert (Sum (Empty) = 0);
Put_Line ("All tests passed");
end;
end Summer;
$ alr build summer.adb && bin/summer All tests passed
Like a lot of C APIs, pthreads uses void*
a lot, so you end up casting a lot. It makes the code a bit ugly, but that’s what you get with a language whose compiler insists that all the types work out but then lets you cast it all away.
Note that the pthreads API, like Perl, also lets you get the return value of a thread by calling the waiting (join
) function. But the function is required to return a pointer and we have to manage our own memory. The classic pattern for this is to have the thread allocate memory, return a pointer to it, and have the “caller” free it. Not fun:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
// With pthreads, you can only pass a single argument to the thread function,
// so we use a struct to encapsulate the array and the range. The argument
// also contains the result of the sum, which is set by the thread function.
typedef struct {
const int* data;
size_t low;
size_t high;
int result;
} Slice;
// A thread function that sums a slice of the array, and deposits the sum
// in the designated field of the struct. An alternative would have been
// to return the result from the thread function, but that would require
// using a global variable or passing a pointer to the result, which is
// arguably less clean than just using a struct.
void* sum_slice(void* arg) {
Slice* s = (Slice*)arg;
int sum = 0;
for (size_t i = s->low; i < s->high; i++) {
sum += s->data[i];
}
s->result = sum;
return NULL;
}
// This function will an array of n numbers using ten threads, unless the
// array has fewer than 100 elements, in which case it is summed up directly
// on the main thread.
int sum_vector(const int* a, size_t n) {
if (n < 100) {
int sum = 0;
for (size_t i = 0; i < n; i++) sum += a[i];
return sum;
}
int numThreads = 10;
int sliceSize = (n + numThreads - 1) / numThreads;
pthread_t threads[numThreads];
Slice slices[numThreads];
for (int i = 0; i < numThreads; i++) {
slices[i].data = a;
slices[i].low = i * sliceSize;
slices[i].high = slices[i].low + sliceSize;
if (n < slices[i].high) slices[i].high = n;
slices[i].result = 0;
if (pthread_create(&threads[i], NULL, sum_slice, &slices[i]) != 0) {
fprintf(stderr, "Cannot create thread\n");
exit(1);
}
}
// Wait for all the threads to finish and collect their results.
int result = 0;
for (int i = 0; i < numThreads; i++) {
if (pthread_join(threads[i], NULL) != 0) {
fprintf(stderr, "Cannot join thread\n");
exit(1);
}
result += slices[i].result;
}
return result;
}
int main() {
assert(sum_vector((const int[]){ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 10) == 55);
assert(sum_vector((const int[]){ 100, 200, 300, 400 }, 4) == 1000);
assert(sum_vector((const int[]){}, 0) == 0);
int a[20000];
for (int i = 0; i < 20000; ++i) a[i] = 7;
assert(sum_vector(a, 20000) == 140000);
int b[19999];
for (int i = 0; i < 19999; ++i) b[i] = 7;
assert(sum_vector(b, 19999) == 139993);
int c[10003];
for (int i = 0; i < 10003; ++i) c[i] = i;
assert(sum_vector(c, 10003) == 50025003);
printf("All tests passed\n");
return 0;
}
$ gcc -std=c2x summer.c -lpthread && ./a.out All tests passed
In C++, we can use the std::thread
class, which is a lot like the Perl threads. We can pass a function to it, and it will run that function in a separate thread. The result is returned via a std::future
.
#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <algorithm>
int sum(const std::vector<int>& a) {
if (a.size() < 100) {
return std::accumulate(a.begin(), a.end(), 0);
}
const int numThreads = 10;
int sliceSize = (a.size() + numThreads - 1) / numThreads;
std::vector<std::future<int>> futures;
for (int i = 0; i < numThreads; ++i) {
int low = i * sliceSize;
int high = std::min(low + sliceSize, static_cast<int>(a.size()));
futures.push_back(std::async(std::launch::async, [low, high, &a]() {
return std::accumulate(a.begin() + low, a.begin() + high, 0);
}));
}
int total = 0;
for (auto& future : futures) {
total += future.get();
}
return total;
}
int main() {
assert(sum({ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }) == 55);
assert(sum({ 100, 200, 300, 400 }) == 1000);
assert(sum({}) == 0);
assert(sum(std::vector<int>(20000, 7)) == 140000);
assert(sum(std::vector<int>(19999, 7)) == 139993);
std::vector<int> a;
for (int i = 0; i < 10003; ++i) {
a.push_back(i);
}
assert(sum(a) == 50025003);
std::cout << "All tests passed" << std::endl;
return 0;
}
$ g++ -std=c++20 summer.cpp && ./a.out All tests passed
package main
import (
"fmt"
"log"
"slices"
"sync"
)
func directSum(a []int) int {
total := 0
for _, v := range a {
total += v
}
return total
}
func parallelSum(a []int) int {
if len(a) < 100 {
return directSum(a)
}
numThreads := 10
sliceSize := (len(a) + numThreads - 1) / numThreads
var wg sync.WaitGroup
results := make([]int, numThreads)
wg.Add(numThreads)
for i := 0; i < numThreads; i++ {
go func(i int) {
defer wg.Done()
low := i * sliceSize
high := low + sliceSize
if high > len(a) {
high = len(a)
}
for _, v := range a[low:high] {
results[i] += v
}
}(i)
}
wg.Wait()
return directSum(results)
}
func expect(a []int, expected int) {
if parallelSum(a) != expected {
log.Fatal("Test failed, did not get %d\n", expected)
}
}
func main() {
expect([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 55)
expect([]int{100, 200, 300, 400}, 1000)
expect([]int{}, 0)
expect(slices.Repeat([]int{7}, 20000), 140000)
expect(slices.Repeat([]int{7}, 19999), 139993)
a := make([]int, 10003)
for i := 0; i < 10003; i++ {
a[i] = i
}
expect(a, 50025003)
fmt.Println("All tests passed\n")
}
% The function sum/1 sums an array of numbers using 10 processes,
% unless the array has fewer than 100 elements, in which case it
% is summed up directly on the main process.
sum(A) when length(A) < 100 ->
lists:sum(A);
sum(A) ->
NumProcesses = 10,
Length = length(A),
SliceSize = (Length + NumProcesses - 1) div NumProcesses,
Parent = self(),
Pids = [spawn(fun() ->
Low = I * SliceSize,
High = min(Low + SliceSize, Length),
Slice = lists:sublist(A, Low + 1, High - Low),
Parent ! {self(), lists:sum(Slice)}
end) || I <- lists:seq(0, NumProcesses-1)],
collect_results(Pids, 0).
collect_results([], Total) ->
Total;
collect_results(Pids, Total) ->
receive
{Pid, Result} ->
collect_results(lists:delete(Pid, Pids), Total + Result)
end.
main(_) ->
55 = sum(lists:seq(1, 10)),
1000 = sum([100, 200, 300, 400]),
0 = sum([]),
140000 = sum(lists:duplicate(20000, 7)),
139993 = sum(lists:duplicate(19999, 7)),
50025003 = sum(lists:seq(0, 10002)),
io:format("All tests passed~n").
Here are some questions useful for your spaced repetition learning. Many of the answers are not found on this page. Some will have popped up in lecture. Others will require you to do your own research.
Thread.ofPlatform().start(lambda)
threads->create(sub { ... })
join
thread::spawn(|| { ... })
thread.join()
pthread_create(&thread, NULL, function, arg)
pthread_join(thread, &result)
std::thread thread(function, arg)
thread.join()
go function(arg)
sync.WaitGroup
, though you could, if you like, set up a mechanism for the goroutine to message another that it is done.spawn(fun () -> ... end)
We’ve covered: