Trivial Concurrency Examples

Here are two concurrent programs implemented in several different languages.

The Plan

We'll look at two different simple programs; each implemented eight different ways:

Yes, that’s a lot of languages. If you are interested in exploring these languages deeper, feel free to check out my own notes on Java, Perl, Rust, Ada, C, C++, and Go; or, of course, find the respective language home pages via a web search.

The two examples are:

  1. A program that runs three threads in parallel, each writing thousands of copies of a certain trinary digit, with no synchronization at all.
  2. A program that sums up a very large array by allocating workers to each sum distinct portions of the array, increasing with the main thread adding up all the results.

These two examples are chosen to be really, really introductory! They don’t do any synchronization other than waiting for other threads to finish. Synchronization techniques that implement mutual exclusion are covered in the next set of notes where we will look at the famous Dining Philosophers.

Why?

A lot of concepts will emerge in looking at real code. We’ll study details and theory later.

Example 1: Large Trinary Numerals

The first program runs three threads in parallel:

  1. One that writes the numeral $0$ five thousand times
  2. One that writes the numeral $1$ five thousand times
  3. One that writes the numeral $2$ five thousand times

The purpose of this introductory example is only to show how to set up and run threads, not how to use them effectively. In fact, we are going to completely ignore all synchronization concerns, letting the threads interleave however they will. Subsequent examples will tackle those concerns.

Java

Java features thread objects, which are instances of class Thread. Although there are several constructors for this class, the preferred way to make threads is to make a builder with the ofPlatform or ofVirtual methods, then create the thread with either start or unstarted. Although starting threads is explicit, we don’t have to do anything special to stop them. A program does not exit until all nondaemeon threads have finished (and all of the threads in this example are nondaemon threads, because what makes a thread a daemon thread is simply setting the thread’s daemon property to true, that is all).

Trinary.java
/**
 * A first illustration of Java concurrency. Illustrates three simple threads:
 * one that writes zeros, one that writes ones, and one that writes twos. Each
 * thread writes to standard output with no synchronization, so the output is
 * intentionally all mixed up.
 */

 public class Trinary {

    public static void main(String[] args) {
        Thread.ofPlatform().start(() -> {
            for (var i = 0; i < 5000; i++) {
                System.out.print("0");
            }
        });
        Thread.ofPlatform().start(() -> {
            for (var i = 0; i < 5000; i++) {
                System.out.print("1");
            }
        });
        for (var i = 0; i < 5000; i++) {
            System.out.print("2");
        }
    }
}

Run with java Trinary.java.

Exercise: Replace each of the ofPlatform calls with ofVirtual. Notice anything different?

Perl

Perl threads start running as soon as they are created with create (or its alias new). However, once the main thread stops, all other threads will immedately die. So, we need to explicitly wait for threads to finish by calling join.

trinary.pl
# A first illustration of Perl concurrency. Illustrates three simple threads:
# one that writes zeros, one that writes ones, and one that writes twos. Each
# thread writes to standard output with no synchronization, so the output is
# intentionally all mixed up.

use strict;
use warnings;
use threads;

my $zero_thread = threads->new(sub { print "0" for 1..5000; });
my $one_thread  = threads->new(sub { print "1" for 1..5000; });
print "2" for 1..5000;

$zero_thread->join;
$one_thread->join;

Run with perl trinary.pl.

On my machine, I got very, very little interleaving. Some runs had no interleaving at all. But when changing the threads to print 50,000 digits each, I noticed a bit more.

Exercise: Do some experiments. Change the code as follows: (1) write the numerals 0 and 1 a few million times, (2) write the numeral '2' only 10 times, and (3) comment out the join calls. Run the modified program. What happens?

Rust

Rust follows Perl in that you explicitly spawn threads and you explicitly wait for them to finish.

trinary.rs
// A first illustration of Rust concurrency. Illustrates three simple threads:
// one that writes zeros, one that writes ones, and one that writes twos. Each
// thread writes to standard output with no synchronization, so the output is
// intentionally all mixed up.

use std::thread;

fn main() {
    let zeros = thread::spawn(|| {
        for _ in 0..5000 {
            print!("0");
        }
    });

    let ones = thread::spawn(|| {
        for _ in 0..5000 {
            print!("1");
        }
    });

    for _ in 0..5000 {
        print!("2");
    }

    zeros.join().expect("Failed to join zeros thread");
    ones.join().expect("Failed to join ones thread");
}

Run with cargo run --bin trinary or cargo run --release --bin trinary. Not surprisingly, I got a lot of interleaving with the former and less with the latter.

Exercise: If you know Rust, replace the error handling with expect to match on the Result object returned from join and always exit cleanly.

Ada

In Ada, tasks run in parallel with the unit in which they are declared. The tasks and their parent (more or less) begin together and terminate together. There are no explicit activation or termination calls. It’s all done implicitly based on program structure. Ada was an early implemetor of structured concurrency.

trinary.adb
-- A first illustration of Ada concurrency. Illustrates three simple threads:
-- one that writes zeros, one that writes ones, and one that writes twos. Each
-- thread writes to standard output with no synchronization, so the output is
-- intentionally all mixed up.

with Ada.Text_IO;
use Ada.Text_IO;

procedure Trinary is

   task Write_Zeros;
   task Write_Ones;

   task body Write_Zeros is
   begin
      for I in 1..5000 loop
         Put ('0');
      end loop;
   end Write_Zeros;

   task body Write_Ones is
   begin
      for I in 1..5000 loop
         Put ('1');
      end loop;
   end Write_Ones;

begin
   for I in 1..5000 loop
      Put ('2');
   end loop;
end Trinary;

Run with alr build trinary.adb && bin/trinary.

C with Posix Threads

The Posix threads library, a.k.a pthreads, has been ported to many operating systems. It's quite capable.

trinary.c
// A first illustration of C concurrency using the Posix threads (pthreads)
// library. Illustrates three simple threads: one that writes zeros, one that
// writes ones, and one that writes twos. Each thread writes to standard output
// with no synchronization, so the output is intentionally all mixed up.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

// Each pthread requires a function to run. This function must take a
// single argument of type void* and return a pointer of type void*.
// In this program, we care about neither.
void* print_zeros(void* ignored) {
    for (int i = 0; i < 5000; i++) {
        printf("0");
    }
    return NULL;
}

void* print_ones(void* ignored) {
    for (int i = 0; i < 5000; i++) {
        printf("1");
    }
    return NULL;
}

int main() {
    pthread_t zero_thread;
    pthread_t one_thread;

    // Create two threads and run them immediately
    int rc = pthread_create(&zero_thread, NULL, &print_zeros, NULL);
    if (rc) puts("Cannot create the zero-writing thread"), exit(rc);
    rc = pthread_create(&one_thread, NULL, &print_ones, NULL);
    if (rc) puts("Cannot create the one-writing thread"), exit(rc);

    // This runs concurrently with the other two threads
    for (int i = 0; i < 5000; i++) {
        printf("2");
    }

    // Wait for the other two threads to finish. The second argument to join
    // is a pointer to a void* that will receive the return value of the thread.
    // In this case, we don't care about the return value, so we pass NULL.
    // If we did care, we would need to allocate memory for the return value
    // and pass a pointer to that memory.
    rc = pthread_join(zero_thread, NULL);
    if (rc) printf("Cannot join the zero-writing thread"), exit(rc);
    rc = pthread_join(one_thread, NULL);
    if (rc) printf("Cannot join the one-writing thread"), exit(rc);
    return 0;
}

Run with gcc -std=c2x trinary.c -lpthread && ./a.out.

On my machine, I got a good deal of interleaving.

For each of the other languages, letting the program throw an exception (Java, Perl, C++, Ada), panic (Rust, Go), or crash (Erlang) when a thread cannot be created or joined, is acceptable for this small example. C does not have exceptions or panics, and does not follow Erlang’s “Let it Crash” philosophy, so we’ve opted here to check every return code and deal with it.

C++

In C++, we use the std::thread. It feels similar to Perl and Rust. You can use the pthreads library with C++, but there’s no need.

trinary.cpp
// A first illustration of C++ concurrency. Illustrates three simple threads:
// one that writes zeros, one that writes ones, and one that writes twos. Each
// thread writes to standard output with no synchronization, so the output is
// intentionally all mixed up.

#include <iostream>
#include <thread>

int main() {
    std::thread zero_thread([]() {
        for (int i = 0; i < 5000; i++) {
            std::cout << '0';
        }
    });

    std::thread one_thread([]() {
        for (int i = 0; i < 5000; i++) {
            std::cout << '1';
        }
    });

    for (int i = 0; i < 5000; i++) {
        std::cout << '2';
    }

    zero_thread.join();
    one_thread.join();
}

Run with g++ -std=c++20 trinary.cpp && ./a.out.

Go

As the printing tasks don’t interact with each other, it makes sense for the goroutines to use a wait group so the main program can wait for them all to finish. Channels could also be used here.

trinary.go
// A first illustration of Erlang concurrency. Illustrates three goroutines:
// one that writes zeros, one that writes ones, and one that writes twos. Each
// goroutine writes to standard output with no synchronization, so the output
// is intentionally all mixed up.

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        for i := 0; i < 5000; i++ {
            fmt.Print("0")
        }
    }()

    go func() {
        defer wg.Done()
        for i := 0; i < 5000; i++ {
            fmt.Print("1")
        }
    }()

    for i := 0; i < 5000; i++ {
        fmt.Print("2")
    }

    wg.Wait()
}

Run with go run trinary.go.

Erlang

Erlang is a functional programming language and processes can only communicate via message passing. There’s no join call or wait groups; we have to arrange for each process to send a message when it’s done, and have the main process wait on them by consuming the messages:

trinary.erl
% A first illustration of Erlang concurrency. Illustrates three processes:
% one that writes zeros, one that writes ones, and one that writes twos. Each
% process writes to standard output with no synchronization, so the output is
% intentionally all mixed up.

main(_) ->
    Waiter = self(),
    spawn(fun() -> write(0, 5000), Waiter ! done end),
    spawn(fun() -> write(1, 5000), Waiter ! done end),
    spawn(fun() -> write(2, 5000), Waiter ! done end),
    wait_for(3).

write(Value, Times) when Times > 0 ->
    io:format("~p", [Value]),
    write(Value, Times-1);
write(_, _) ->
    ok.

wait_for(0) -> ok;
wait_for(N) ->
    receive
        done -> wait_for(N-1)
    end.

Run with escript trinary.erl.

Example 2: Concurrent Array Sum

This next example illustrates a function to sum all the values in an array, like so: if there are less than 100 elements in the array, the function just sums them up itself. Otherwise it spawns ten threads to sum up distinct partitions of the array. The main thread sums up all the partial sums.

We wish to explore different approaches to waiting for the threads to finish and how to get the partial sums. Will the threads each write to a shared array of partial sums? Or will they all try to increment a shared variable? Or will the partial sums be stored within the threads themselves, or an associated object accessible to the main thread?

Java

When you have several of threads doing pretty much the same thing, it’s easiest to create a handful of tasks and submit them to, and invoke them with, an executor service. Each task is assigned to a thread created by the service, and a future is created for each thread. Calling future.get() blocks until the task’s thread terminates, and which point the task’s return value is available in the future.

Summer.java
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.IntStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;

public class Summer {

    /**
     * Sums an array using 10 threads, unless the array has fewer than 100
     * elements, in which case it is summed up directly.
     */
    public static int sum(int[] a) throws InterruptedException, ExecutionException {
        if (a.length < 100) {
            return Arrays.stream(a).sum();
        }

        final var numThreads = 10;
        var sliceSize = (a.length + numThreads - 1) / numThreads;
        var tasks = new ArrayList<Callable<Integer>>(numThreads);
        for (var i = 0; i < numThreads; i++) {
            final int low = i * sliceSize;
            final int high = Math.min(low + sliceSize, a.length);
            tasks.add(() -> Arrays.stream(a, low, high).sum());
        }

        try (var executor = Executors.newFixedThreadPool(numThreads)) {
            var total = 0;
            for (var future : executor.invokeAll(tasks)) {
                total += future.get();
            }
            return total;
        }
    }

    public static void main(String[] args) throws Exception {
        assert sum(new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }) == 55;
        assert sum(new int[] { 100, 200, 300, 400 }) == 1000;
        assert sum(new int[] {}) == 0;
        assert sum(IntStream.generate(() -> 7).limit(20000).toArray()) == 140000;
        assert sum(IntStream.generate(() -> 7).limit(19999).toArray()) == 139993;
        assert sum(IntStream.range(0, 10003).toArray()) == 50025003;
        System.out.println("All tests passed");
    }
}
$ java -ea Summer.java
All tests passed

Perl

By default, Perl threads do not share data. Thus we can’t let the spawned thread write into a variable that we expect the main thread to pick up. We have to get information out of this thread via its return value, which is obtained as the return value of join.

summer.pl
use strict;
use warnings;
use threads;
use List::Util qw(sum);

sub sum_of {
    my ($array_ref) = @_;
    my $length = scalar @$array_ref;

    if ($length < 100) {
        return sum(@$array_ref) || 0;
    }

    my $num_threads = 10;
    my $slice_size = int(($length + $num_threads - 1) / $num_threads);
    my @threads;

    for (my $i = 0; $i < $num_threads; $i++) {
        my $low = $i * $slice_size;
        my $high = ($low + $slice_size < $length) ? ($low + $slice_size) : $length;
        push @threads, threads->create(sub {
            return sum(@$array_ref[$low .. $high - 1]) || 0;
        });
    }

    my $total = 0;
    foreach my $thread (@threads) {
        $total += $thread->join();
    }

    return $total;
}

use Test::More;
is(sum_of([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 55);
is(sum_of([100, 200, 300, 400]), 1000);
is(sum_of([]), 0);
is(sum_of([(7) x 20000]), 140000);
is(sum_of([(7) x 19999]), 139993);
is(sum_of([0 .. 10002]), 50025003);
done_testing();
$ perl summer.pl
ok 1
ok 2
ok 3
ok 4
ok 5
ok 6
1..6

Rust

Rust has the same style of threading as Perl: we have to manually spawn and we get the results from calling join. However because of the borrow checker, I ended up having to clone the slices. Ouch! Can you find a better way? Is there a better way?

summer.rs
use std::thread;

fn sum(a: &[i32]) -> i32 {
    if a.len() < 100 {
        return a.iter().sum();
    }

    let num_threads = 10;
    let slice_size = (a.len() + num_threads - 1) / num_threads;
    let mut handles = Vec::with_capacity(num_threads);

    for i in 0..num_threads {
        let low = i * slice_size;
        let high = std::cmp::min(low + slice_size, a.len());
        // Clone the slice so the thread owns its data
        let slice = a[low..high].to_vec();
        handles.push(thread::spawn(move || slice.into_iter().sum::<i32>()));
    }

    // Collect results from all threads and sum them up
    handles.into_iter().map(|h| h.join().unwrap()).sum()
}

fn main() {
    assert_eq!(sum(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 55);
    assert_eq!(sum(&[100, 200, 300, 400]), 1000);
    assert_eq!(sum(&[]), 0);
    assert_eq!(sum(&vec![7; 20000]), 140000);
    assert_eq!(sum(&vec![7; 19999]), 139993);
    assert_eq!(sum(&(0..10003).collect::<Vec<_>>().as_slice()), 50025003);
    println!("All tests passed");
}
$ cargo run --bin summer
All tests passed
Exercise: What errors to you get when removing .to_vec()? Why does the borrow checker complain? Explain the error message.

Ada

Again we take advantage of structured concurrency! We get synchronization and termination pretty much for free by nesting the tasks inside a block:

summer.adb
with Ada.Text_IO, Ada.Assertions;
use Ada.Text_IO, Ada.Assertions;

procedure Summer is

   type Numbers is array (Integer range <>) of Integer;
   NUM_TASKS : constant := 10;

   function Sum (A : Numbers) return Integer is
      --  Where each task will deposit its result.
      Results : array (1 .. NUM_TASKS) of Integer := [others => 0];

      --  Used for the small array case, and for summing up the results.
      Total : Integer := 0;
   begin
      declare

         --  Each task receives a slice range and task index for its result.
         task type Sum_Slice (First, Last, Task_Index : Integer);
         task body Sum_Slice is
            Task_Sum : Integer := 0;
         begin
            for I in First .. Last loop
               Task_Sum := Task_Sum + A (I);
            end loop;
            Results (Task_Index) := Task_Sum;
         end Sum_Slice;

         Tasks : array (1 .. NUM_TASKS) of access Sum_Slice;

      begin
         if A'Length < 100 then
            for I in A'Range loop
               Total := Total + A (I);
            end loop;
            return Total;
         end if;

         for I in 1 .. NUM_TASKS loop
            declare
               Slice_Size : constant Integer := A'Length / NUM_TASKS;
               First      : constant Integer := A'First + (I - 1) * Slice_Size;
               Last       : constant Integer := A'First + I * Slice_Size - 1;
            begin
               Tasks (I) := new Sum_Slice (First, Last, I);
            end;
         end loop;

         --  Keep tasks alive by referencing them (suppresses warning)
         pragma Unreferenced (Tasks);
      end;

      --  All tasks are guaranteed to finish before we get here.
      for I in Results'Range loop
         Total := Total + Results (I);
      end loop;
      return Total;
   end Sum;

begin
   declare
      Unconventional : constant Numbers (103 .. 110) := [9, 9, 3, others => 6];
      Empty : constant Numbers (1 .. 0) := [];
   begin
      Assert (Sum (Unconventional) = 51);
      Assert (Sum ([1, 2, 3]) = 6);
      Assert (Sum ([for I in 1 .. 10 => I]) = 55);
      Assert (Sum ([for I in 1 .. 20000 => 7]) = 140000);
      Assert (Sum (Empty) = 0);
      Put_Line ("All tests passed");
   end;
end Summer;
$ alr build summer.adb && bin/summer
All tests passed

C with Posix Threads

Like a lot of C APIs, pthreads uses void* a lot, so you end up casting a lot. It makes the code a bit ugly, but that’s what you get with a language whose compiler insists that all the types work out but then lets you cast it all away.

Note that the pthreads API, like Perl, also lets you get the return value of a thread by calling the waiting (join) function. But the function is required to return a pointer and we have to manage our own memory. The classic pattern for this is to have the thread allocate memory, return a pointer to it, and have the “caller” free it. Not fun:

summer.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>

// With pthreads, you can only pass a single argument to the thread function,
// so we use a struct to encapsulate the array and the range. The argument
// also contains the result of the sum, which is set by the thread function.
typedef struct {
    const int* data;
    size_t low;
    size_t high;
    int result;
} Slice;

// A thread function that sums a slice of the array, and deposits the sum
// in the designated field of the struct. An alternative would have been
// to return the result from the thread function, but that would require
// using a global variable or passing a pointer to the result, which is
// arguably less clean than just using a struct.
void* sum_slice(void* arg) {
    Slice* s = (Slice*)arg;
    int sum = 0;
    for (size_t i = s->low; i < s->high; i++) {
        sum += s->data[i];
    }
    s->result = sum;
    return NULL;
}

// This function will an array of n numbers using ten threads, unless the
// array has fewer than 100 elements, in which case it is summed up directly
// on the main thread.
int sum_vector(const int* a, size_t n) {
    if (n < 100) {
        int sum = 0;
        for (size_t i = 0; i < n; i++) sum += a[i];
        return sum;
    }

    int numThreads = 10;
    int sliceSize = (n + numThreads - 1) / numThreads;
    pthread_t threads[numThreads];
    Slice slices[numThreads];
    for (int i = 0; i < numThreads; i++) {
        slices[i].data = a;
        slices[i].low = i * sliceSize;
        slices[i].high = slices[i].low + sliceSize;
        if (n <  slices[i].high) slices[i].high = n;
        slices[i].result = 0;
        if (pthread_create(&threads[i], NULL, sum_slice, &slices[i]) != 0) {
            fprintf(stderr, "Cannot create thread\n");
            exit(1);
        }
    }

    // Wait for all the threads to finish and collect their results.
    int result = 0;
    for (int i = 0; i < numThreads; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            fprintf(stderr, "Cannot join thread\n");
            exit(1);
        }
        result += slices[i].result;
    }
    return result;
}

int main() {
    assert(sum_vector((const int[]){ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 10) == 55);
    assert(sum_vector((const int[]){ 100, 200, 300, 400 }, 4) == 1000);
    assert(sum_vector((const int[]){}, 0) == 0);
    int a[20000];
    for (int i = 0; i < 20000; ++i) a[i] = 7;
    assert(sum_vector(a, 20000) == 140000);
    int b[19999];
    for (int i = 0; i < 19999; ++i) b[i] = 7;
    assert(sum_vector(b, 19999) == 139993);
    int c[10003];
    for (int i = 0; i < 10003; ++i) c[i] = i;
    assert(sum_vector(c, 10003) == 50025003);
    printf("All tests passed\n");
    return 0;
}
$ gcc -std=c2x summer.c -lpthread && ./a.out
All tests passed

C++

In C++, we can use the std::thread class, which is a lot like the Perl threads. We can pass a function to it, and it will run that function in a separate thread. The result is returned via a std::future.

summer.cpp
#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <algorithm>

int sum(const std::vector<int>& a) {
    if (a.size() < 100) {
        return std::accumulate(a.begin(), a.end(), 0);
    }

    const int numThreads = 10;
    int sliceSize = (a.size() + numThreads - 1) / numThreads;
    std::vector<std::future<int>> futures;

    for (int i = 0; i < numThreads; ++i) {
        int low = i * sliceSize;
        int high = std::min(low + sliceSize, static_cast<int>(a.size()));
        futures.push_back(std::async(std::launch::async, [low, high, &a]() {
            return std::accumulate(a.begin() + low, a.begin() + high, 0);
        }));
    }

    int total = 0;
    for (auto& future : futures) {
        total += future.get();
    }
    return total;
}

int main() {
    assert(sum({ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }) == 55);
    assert(sum({ 100, 200, 300, 400 }) == 1000);
    assert(sum({}) == 0);
    assert(sum(std::vector<int>(20000, 7)) == 140000);
    assert(sum(std::vector<int>(19999, 7)) == 139993);
    std::vector<int> a;
    for (int i = 0; i < 10003; ++i) {
        a.push_back(i);
    }
    assert(sum(a) == 50025003);

    std::cout << "All tests passed" << std::endl;
    return 0;
}
$ g++ -std=c++20 summer.cpp && ./a.out
All tests passed

Go

summer.go
package main

import (
    "fmt"
    "log"
    "slices"
    "sync"
)

func directSum(a []int) int {
    total := 0
    for _, v := range a {
        total += v
    }
    return total
}

func parallelSum(a []int) int {
    if len(a) < 100 {
        return directSum(a)
    }

    numThreads := 10
    sliceSize := (len(a) + numThreads - 1) / numThreads
    var wg sync.WaitGroup
    results := make([]int, numThreads)
    wg.Add(numThreads)

    for i := 0; i < numThreads; i++ {
        go func(i int) {
            defer wg.Done()
            low := i * sliceSize
            high := low + sliceSize
            if high > len(a) {
                high = len(a)
            }
            for _, v := range a[low:high] {
                results[i] += v
            }
        }(i)
    }

    wg.Wait()
    return directSum(results)
}

func expect(a []int, expected int) {
    if parallelSum(a) != expected {
        log.Fatal("Test failed, did not get %d\n", expected)
    }
}

func main() {
    expect([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 55)
    expect([]int{100, 200, 300, 400}, 1000)
    expect([]int{}, 0)
    expect(slices.Repeat([]int{7}, 20000), 140000)
    expect(slices.Repeat([]int{7}, 19999), 139993)
    a := make([]int, 10003)
    for i := 0; i < 10003; i++ {
        a[i] = i
    }
    expect(a, 50025003)
    fmt.Println("All tests passed\n")
}

Erlang

summer.erl
% The function sum/1 sums an array of numbers using 10 processes,
% unless the array has fewer than 100 elements, in which case it
% is summed up directly on the main process.

sum(A) when length(A) < 100 ->
    lists:sum(A);
sum(A) ->
    NumProcesses = 10,
    Length = length(A),
    SliceSize = (Length + NumProcesses - 1) div NumProcesses,
    Parent = self(),

    Pids = [spawn(fun() ->
        Low = I * SliceSize,
        High = min(Low + SliceSize, Length),
        Slice = lists:sublist(A, Low + 1, High - Low),
        Parent ! {self(), lists:sum(Slice)}
    end) || I <- lists:seq(0, NumProcesses-1)],

    collect_results(Pids, 0).

collect_results([], Total) ->
    Total;
collect_results(Pids, Total) ->
    receive
        {Pid, Result} ->
            collect_results(lists:delete(Pid, Pids), Total + Result)
    end.

main(_) ->
    55 = sum(lists:seq(1, 10)),
    1000 = sum([100, 200, 300, 400]),
    0 = sum([]),
    140000 = sum(lists:duplicate(20000, 7)),
    139993 = sum(lists:duplicate(19999, 7)),
    50025003 = sum(lists:seq(0, 10002)),
    io:format("All tests passed~n").

Recall Practice

Here are some questions useful for your spaced repetition learning. Many of the answers are not found on this page. Some will have popped up in lecture. Others will require you to do your own research.

  1. What is the modern way to create and launch a single (platform) thread in Java?
    Thread.ofPlatform().start(lambda)
  2. When does a Java program end?
    When all nondaemon threads have finished.
  3. How do you create a thread in Perl?
    threads->create(sub { ... })
  4. How do you wait for a thread to finish in Perl?
    join
  5. How do you create a thread in Rust?
    thread::spawn(|| { ... })
  6. How do you wait for a thread to finish in Rust?
    thread.join()
  7. How do you create a task in Ada?
    By declaring a task inside a block.
  8. How do you wait for a task to finish in Ada?
    By exiting the block in which the task was declared.
  9. How do you create a thread in C using pthreads?
    pthread_create(&thread, NULL, function, arg)
  10. How do you wait for a thread to finish in C using pthreads?
    pthread_join(thread, &result)
  11. How do you create a thread in C++?
    std::thread thread(function, arg)
  12. How do you wait for a thread to finish in C++?
    thread.join()
  13. How do you create a goroutine in Go?
    go function(arg)
  14. How do you wait for a goroutine to finish in Go?
    By using a sync.WaitGroup, though you could, if you like, set up a mechanism for the goroutine to message another that it is done.
  15. How do you create a process in Erlang?
    spawn(fun () -> ... end)
  16. How do you wait for a process to finish in Erlang?
    By sending a message to the main process and consuming it.
  17. What is structured concurrency?
    A programming paradigm where tasks are scoped to a block, and their lifetime is tied to that block.
  18. What is a future in the context of threading?
    A future is an object that represents a value that may not yet be available, allowing you to retrieve the result of a thread or task once it has completed.
  19. What is a thread pool?
    A thread pool is a collection of pre-initialized threads that can be reused to execute tasks, improving performance by avoiding the overhead of creating and destroying threads.
  20. What is the larger construct in Java of which thread pools are a kind?
    An executor service.

Summary

We’ve covered:

  • Three thread examples in eight languages each
  • Different ways to define, start, and wait for threads
  • Trivial synchronization examples