Concurrency

Concurrency vs. Parallelism

The definitions of “concurrency” and “parallelism” sometimes get mixed up, but they are not the same.

A concurrent system is one that can be in charge of many tasks, although not necessarily it is executing them at the same time. You can think of yourself being in the kitchen cooking: you chop an onion, put it to fry, and while it’s being fried you chop a tomato, but you are not doing all of those things at the same time: you distribute your time between those tasks. Parallelism would be to stir fry onions with one hand while with the other one you chop a tomato.

At the moment of this writing, Crystal has concurrency support but not parallelism: several tasks can be executed, and a bit of time will be spent on each of these, but two code paths are never executed at the same exact time.

A Crystal program executes in a single operating system thread, except the Garbage Collector (GC) which implements a concurrent mark-and-sweep (currently Boehm GC).

Fibers

To achieve concurrency, Crystal has fibers. A fiber is in a way similar to an operating system thread except that it’s much more lightweight and its execution is managed internally by the process. So, a program will spawn multiple fibers and Crystal will make sure to execute them when the time is right.

Event loop

For everything I/O related there’s an event loop. Some time-consuming operations are delegated to it, and while the event loop waits for that operation to finish the program can continue executing other fibers. A simple example of this is waiting for data to come through a socket.

Channels

Crystal has Channels inspired by CSP. They allow communicating data between fibers without sharing memory and without having to worry about locks, semaphores or other special structures.

Execution of a program

When a program starts, it fires up a main fiber that will execute your top-level code. There, one can spawn many other fibers. The components of a program are:

  • The Runtime Scheduler, in charge of executing all fibers when the time is right.
  • The Event Loop, which is just another fiber, being in charge of async tasks, like for example files, sockets, pipes, signals and timers (like doing a sleep).
  • Channels, to communicate data between fibers. The Runtime Scheduler will coordinate fibers and channels for their communication.
  • Garbage Collector: to clean up “no longer used” memory.

A Fiber

A fiber is an execution unit that is more lightweight than a thread. It’s a small object that has an associated stack of 8MB, which is what is usually assigned to an operating system thread.

Fibers, unlike threads, are cooperative. Threads are pre-emptive: the operating system might interrupt a thread at any time and start executing another one. A fiber must explicitly tell the Runtime Scheduler to switch to another fiber. For example if there’s I/O to be waited on, a fiber will tell the scheduler “Look, I have to wait for this I/O to be available, you continue executing other fibers and come back to me when that I/O is ready”.

The advantage of being cooperative is that a lot of the overhead of doing a context switch (switching between threads) is gone.

A Fiber is much more lightweight than a thread: even though it’s assigned 8MB, it starts with a small stack of 4KB.

On a 64-bit machine it lets us spawn millions and millions of fibers. In a 32-bit machine we can only spawn 512 fibers, which is not a lot. But because 32-bit machines are starting to become obsolete, we’ll bet on the future and focus more on 64-bit machines.

The Runtime Scheduler

The scheduler has a queue of:

  • Fibers ready to be executed: for example when you spawn a fiber, it’s ready to be executed.
  • The event loop: which is another fiber. When there are no other fibers ready to be executed, the event loop checks if there is any async operation that is ready, and then executes the fiber waiting for that operation. The event loop is currently implemented with libevent, which is an abstraction of other event mechanisms like epoll and kqueue.
  • Fibers that voluntarily asked to wait: this is done with Fiber.yield, which means “I can continue executing, but I’ll give you some time to execute other fibers if you want”.

Communicating data

Because at this moment there’s only a single thread executing your code, accessing and modifying a class variable in different fibers will work just fine. However, once multiple threads (parallelism) is introduced in the language, it might break. That’s why the recommended mechanism to communicate data is using channels and sending messages between them. Internally, a channel implements all the locking mechanisms to avoid data races, but from the outside you use them as communication primitives, so you (the user) don’t have to use locks.

Sample code

Spawning a fiber

To spawn a fiber you use spawn with a block:

  1. spawn do
  2. # ...
  3. socket.gets
  4. # ...
  5. end
  6. spawn do
  7. # ...
  8. sleep 5.seconds
  9. # ...
  10. end

Here we have two fibers: one reads from a socket and the other does a sleep. When the first fiber reaches the socket.gets line, it gets suspended, the Event Loop is told to continue executing this fiber when there’s data in the socket, and the program continues with the second fiber. This fiber wants to sleep for 5 seconds, so the Event Loop is told to continue with this fiber in 5 seconds. If there aren’t other fibers to execute, the Event Loop will wait until either of these events happen, without consuming CPU time.

The reason why socket.gets and sleep behave like this is because their implementations talk directly with the Runtime Scheduler and the Event Loop, there’s nothing magical about it. In general, the standard library already takes care of doing all of this so you don’t have to.

Note, however, that fibers don’t get executed right away. For example:

  1. spawn do
  2. loop do
  3. puts "Hello!"
  4. end
  5. end

Running the above code will produce no output and exit immediately.

The reason for this is that a fiber is not executed as soon as it is spawned. So, the main fiber, the one that spawns the above fiber, finishes its execution and the program exits.

One way to solve it is to do a sleep:

  1. spawn do
  2. loop do
  3. puts "Hello!"
  4. end
  5. end
  6. sleep 1.second

This program will now print “Hello!” for one second and then exit. This is because the sleep call will schedule the main fiber to be executed in a second, and then executes another “ready to execute” fiber, which in this case is the one above.

Another way is this:

  1. spawn do
  2. loop do
  3. puts "Hello!"
  4. end
  5. end
  6. Fiber.yield

This time Fiber.yield will tell the scheduler to execute the other fiber. This will print “Hello!” until the standard output blocks (the system call will tell us we have to wait until the output is ready), and then execution continues with the main fiber and the program exits. Here the standard output might never block so the program will continue executing forever.

If we want to execute the spawned fiber for ever, we can use sleep without arguments:

  1. spawn do
  2. loop do
  3. puts "Hello!"
  4. end
  5. end
  6. sleep

Of course the above program can be written without spawn at all, just with a loop. sleep is more useful when spawning more than one fiber.

Spawning a call

You can also spawn by passing a method call instead of a block. To understand why this is useful, let’s look at this example:

  1. i = 0
  2. while i < 10
  3. spawn do
  4. puts(i)
  5. end
  6. i += 1
  7. end
  8. Fiber.yield

The above program prints “10” ten times. The problem is that there’s only one variable i that all spawned fibers refer to, and when Fiber.yield is executed its value is 10.

To solve this, we can do this:

  1. i = 0
  2. while i < 10
  3. proc = ->(x : Int32) do
  4. spawn do
  5. puts(x)
  6. end
  7. end
  8. proc.call(i)
  9. i += 1
  10. end
  11. Fiber.yield

Now it works because we are creating a Proc and we invoke it passing i, so the value gets copied and now the spawned fiber receives a copy.

To avoid all this boilerplate, the standard library provides a spawn macro that accepts a call expression and basically rewrites it to do the above. Using it, we end up with:

  1. i = 0
  2. while i < 10
  3. spawn puts(i)
  4. i += 1
  5. end
  6. Fiber.yield

This is mostly useful with local variables that change at iterations. This doesn’t happen with block arguments. For example, this works as expected:

  1. 10.times do |i|
  2. spawn do
  3. puts i
  4. end
  5. end
  6. Fiber.yield

Spawning a fiber and waiting for it to complete

We can use a channel for this:

  1. channel = Channel(Nil).new
  2. spawn do
  3. puts "Before send"
  4. channel.send(nil)
  5. puts "After send"
  6. end
  7. puts "Before receive"
  8. channel.receive
  9. puts "After receive"

This prints:

  1. Before receive
  2. Before send
  3. After receive

First, the program spawns a fiber but doesn’t execute it yet. When we invoke channel.receive, the main fiber blocks and execution continues with the spawned fiber. Then channel.send(nil) is invoked, and so execution continues at channel.receive, which was waiting for a value. Then the main fiber continues executing and finishes, so the program exits without giving the other fiber a chance to print “After send”.

In the above example we used nil just to communicate that the fiber ended. We can also use channels to communicate values between fibers:

  1. channel = Channel(Int32).new
  2. spawn do
  3. puts "Before first send"
  4. channel.send(1)
  5. puts "Before second send"
  6. channel.send(2)
  7. end
  8. puts "Before first receive"
  9. value = channel.receive
  10. puts value # => 1
  11. puts "Before second receive"
  12. value = channel.receive
  13. puts value # => 2

Output:

  1. Before first receive
  2. Before first send
  3. 1
  4. Before second receive
  5. Before second send
  6. 2

Note that when the program executes a receive, that fiber blocks and execution continues with the other fiber. When send is executed, execution continues with the fiber that was waiting on that channel.

Here we are sending literal values, but the spawned fiber might compute this value by, for example, reading a file, or getting it from a socket. When this fiber will have to wait for I/O, other fibers will be able to continue executing code until I/O is ready, and finally when the value is ready and sent through the channel, the main fiber will receive it. For example:

  1. require "socket"
  2. channel = Channel(String).new
  3. spawn do
  4. server = TCPServer.new("0.0.0.0", 8080)
  5. socket = server.accept
  6. while line = socket.gets
  7. channel.send(line)
  8. end
  9. end
  10. spawn do
  11. while line = gets
  12. channel.send(line)
  13. end
  14. end
  15. 3.times do
  16. puts channel.receive
  17. end

The above program spawns two fibers. The first one creates a TCPServer, accepts one connection and reads lines from it, sending them to the channel. There’s a second fiber reading lines from standard input. The main fiber reads the first 3 messages sent to the channel, either from the socket or stdin, then the program exits. The gets calls will block the fibers and tell the Event Loop to continue from there if data comes.

Likewise, we can wait for multiple fibers to complete execution, and gather their values:

  1. channel = Channel(Int32).new
  2. 10.times do |i|
  3. spawn do
  4. channel.send(i * 2)
  5. end
  6. end
  7. sum = 0
  8. 10.times do
  9. sum += channel.receive
  10. end
  11. puts sum # => 90

You can, of course, use receive inside a spawned fiber:

  1. channel = Channel(Int32).new
  2. spawn do
  3. puts "Before send"
  4. channel.send(1)
  5. puts "After send"
  6. end
  7. spawn do
  8. puts "Before receive"
  9. puts channel.receive
  10. puts "After receive"
  11. end
  12. puts "Before yield"
  13. Fiber.yield
  14. puts "After yield"

Output:

  1. Before yield
  2. Before send
  3. Before receive
  4. 1
  5. After receive
  6. After send
  7. After yield

Here channel.send is executed first, but since there’s no one waiting for a value (yet), execution continues in other fibers. The second fiber is executed, there’s a value on the channel, it’s obtained, and execution continues, first with the first fiber, then with the main fiber, because Fiber.yield puts a fiber at the end of the execution queue.

Buffered channels

The above examples use unbuffered channels: when sending a value, if a fiber is waiting on that channel then execution continues on that fiber.

With a buffered channel, invoking send won’t switch to another fiber unless the buffer is full:

  1. # A buffered channel of capacity 2
  2. channel = Channel(Int32).new(2)
  3. spawn do
  4. puts "Before send 1"
  5. channel.send(1)
  6. puts "Before send 2"
  7. channel.send(2)
  8. puts "Before send 3"
  9. channel.send(3)
  10. puts "After send"
  11. end
  12. 3.times do |i|
  13. puts channel.receive
  14. end

Output:

  1. Before send 1
  2. Before send 2
  3. Before send 3
  4. After send
  5. 1
  6. 2
  7. 3

Note that the first send does not occupy space in the channel. This is because there is a receive invoked prior to the first send whereas the other 2 send invocations take place before their respective receive. The number of send calls do not exceed the bounds of the buffer and so the send fiber runs uninterrupted to completion.

Here’s an example where all space in the buffer gets occupied:

  1. # A buffered channel of capacity 1
  2. channel = Channel(Int32).new(1)
  3. spawn do
  4. puts "Before send 1"
  5. channel.send(1)
  6. puts "Before send 2"
  7. channel.send(2)
  8. puts "Before send 3"
  9. channel.send(3)
  10. puts "End of send fiber"
  11. end
  12. 3.times do |i|
  13. puts channel.receive
  14. end

Output:

  1. Before send 1
  2. Before send 2
  3. Before send 3
  4. 1
  5. 2
  6. 3

Note that “End of send fiber” does not appear in the output because we receive the 3 send calls which means 3.times runs to completion and in turn unblocks the main fiber which executes to completion.

Here’s the same snippet as the one we just saw - with the addition of a Fiber.yield call at the very bottom:

  1. # A buffered channel of capacity 1
  2. channel = Channel(Int32).new(1)
  3. spawn do
  4. puts "Before send 1"
  5. channel.send(1)
  6. puts "Before send 2"
  7. channel.send(2)
  8. puts "Before send 3"
  9. channel.send(3)
  10. puts "End of send fiber"
  11. end
  12. 3.times do |i|
  13. puts channel.receive
  14. end
  15. Fiber.yield

Output:

  1. Before send 1
  2. Before send 2
  3. Before send 3
  4. 1
  5. 2
  6. 3
  7. End of send fiber

With the addition of a Fiber.yield call at the end of the snippet we see the “End of send fiber” message in the output which would have otherwise been missed due to the main fiber executing to completion.