Chapter 46. Boost.Lockfree

Boost.Lockfree provides thread-safe and lock-free containers. Containers from this library can be accessed from multiple threads without having to synchronize access.

In version 1.56.0, Boost.Lockfree provides only two containers: a queue of type boost::lockfree::queue and a stack of type boost::lockfree::stack. For the queue, a second implementation is available: boost::lockfree::spsc_queue. This class is optimized for use cases where exactly one thread writes to the queue and exactly one thread reads from the queue. The abbreviation spsc in the class name stands for single producer/single consumer.

Example 46.1. Using boost::lockfree::spsc_queue

  1. #include <boost/lockfree/spsc_queue.hpp>
  2. #include <thread>
  3. #include <iostream>
  4. boost::lockfree::spsc_queue<int> q{100};
  5. int sum = 0;
  6. void produce()
  7. {
  8. for (int i = 1; i <= 100; ++i)
  9. q.push(i);
  10. }
  11. void consume()
  12. {
  13. int i;
  14. while (q.pop(i))
  15. sum += i;
  16. }
  17. int main()
  18. {
  19. std::thread t1{produce};
  20. std::thread t2{consume};
  21. t1.join();
  22. t2.join();
  23. consume();
  24. std::cout << sum << '\n';
  25. }

Example 46.1 uses the container boost::lockfree::spscqueue. The first thread, which executes the function produce(), adds the numbers 1 to 100 to the container. The second thread, which executes consume(), reads the numbers from the container and adds them up in _sum. Because the container boost::lockfree::spsc_queue explicitly supports concurrent access from two threads, it isn’t necessary to synchronize the threads.

Please note that the function consume() is called a second time after the threads terminate. This is required to calculate the total of all 100 numbers, which is 5050. Because consume() accesses the queue in a loop, it is possible that it will read the numbers faster than they are inserted by produce(). If the queue is empty, pop() returns false. Thus, the thread executing consume() could terminate because produce() in the other thread couldn’t fill the queue fast enough. If the thread executing produce() is terminated, then it’s clear that all of the numbers were added to the queue. The second call to consume() makes sure that numbers that may not have been read yet are added to sum.

The size of the queue is passed to the constructor. Because boost::lockfree::spsc_queue is implemented with a circular buffer, the queue in Example 46.1 has a capacity of 100 elements. If a value can’t be added because the queue is full, push() returns false. The example doesn’t check the return value of push() because exactly 100 numbers are added to the queue. Thus, 100 elements is sufficient.

Example 46.2. boost::lockfree::spsc_queue with boost::lockfree::capacity

  1. #include <boost/lockfree/spsc_queue.hpp>
  2. #include <boost/lockfree/policies.hpp>
  3. #include <thread>
  4. #include <iostream>
  5. using namespace boost::lockfree;
  6. spsc_queue<int, capacity<100>> q;
  7. int sum = 0;
  8. void produce()
  9. {
  10. for (int i = 1; i <= 100; ++i)
  11. q.push(i);
  12. }
  13. void consume()
  14. {
  15. while (q.consume_one([](int i){ sum += i; }))
  16. ;
  17. }
  18. int main()
  19. {
  20. std::thread t1{produce};
  21. std::thread t2{consume};
  22. t1.join();
  23. t2.join();
  24. q.consume_all([](int i){ sum += i; });
  25. std::cout << sum << '\n';
  26. }

Example 46.2 works like the previous example, but this time the size of the circular buffer is set at compile time. This is done with the template boost::lockfree::capacity, which expects the capacity as a template parameter. q is instantiated with the default constructor – the capacity cannot be set at run time.

The function consume() has been changed to use consume_one(), rather than pop(), to read the number. A lambda function is passed as a parameter to consume_one(). consume_one() reads a number just like pop(), but the number isn’t returned through a reference to the caller. It is passed as the sole parameter to the lambda function.

When the threads terminate, main() calls the member function consume_all(), instead of consume(). consume_all() works like consume_one() but makes sure that the queue is empty after the call. consume_all() calls the lambda function as long as there are elements in the queue.

Once again, Example 46.2 writes 5050 to standard output.

Example 46.3. boost::lockfree::queue with variable container size

  1. #include <boost/lockfree/queue.hpp>
  2. #include <thread>
  3. #include <atomic>
  4. #include <iostream>
  5. boost::lockfree::queue<int> q{100};
  6. std::atomic<int> sum{0};
  7. void produce()
  8. {
  9. for (int i = 1; i <= 10000; ++i)
  10. q.push(i);
  11. }
  12. void consume()
  13. {
  14. int i;
  15. while (q.pop(i))
  16. sum += i;
  17. }
  18. int main()
  19. {
  20. std::thread t1{produce};
  21. std::thread t2{consume};
  22. std::thread t3{consume};
  23. t1.join();
  24. t2.join();
  25. t3.join();
  26. consume();
  27. std::cout << sum << '\n';
  28. }

Example 46.3 executes consume() in two threads. Because more than one thread reads from the queue, the class boost::lockfree::spsc_queue must not be used. This example uses boost::lockfree::queue instead.

Thanks to std::atomic, access to the variable sum is also now thread safe.

The size of the queue is set to 100 – this is the parameter passed to the constructor. However, this is only the initial size. By default, boost::lockfree::queue is not implemented with a circular buffer. If more items are added to the queue than the capacity is set to, it is automatically increased. boost::lockfree::queue dynamically allocates additional memory if the initial size isn’t sufficient.

That means that boost::lockfree::queue isn’t necessarily lock free. The allocator used by boost::lockfree::queue by default is boost::lockfree::allocator, which is based on std::allocator. Thus, this allocator determines whether boost::lockfree::queue is lock free without constraints.

Example 46.4. boost::lockfree::queue with constant container size

  1. #include <boost/lockfree/queue.hpp>
  2. #include <thread>
  3. #include <atomic>
  4. #include <iostream>
  5. using namespace boost::lockfree;
  6. queue<int, fixed_sized<true>> q{10000};
  7. std::atomic<int> sum{0};
  8. void produce()
  9. {
  10. for (int i = 1; i <= 10000; ++i)
  11. q.push(i);
  12. }
  13. void consume()
  14. {
  15. int i;
  16. while (q.pop(i))
  17. sum += i;
  18. }
  19. int main()
  20. {
  21. std::thread t1{produce};
  22. std::thread t2{consume};
  23. std::thread t3{consume};
  24. t1.join();
  25. t2.join();
  26. t3.join();
  27. consume();
  28. std::cout << sum << '\n';
  29. }

Example 46.4 uses a constant size of 10,000 elements. In this example, the queue doesn’t allocate additional memory when it is full. 10,000 is a fixed upper limit.

The queue’s capacity is constant because boost::lockfree::fixed_sized is passed as a template parameter. The capacity is passed as a parameter to the constructor and can be updated at any time using reserve(). If the capacity must be set at compile time, boost::lockfree::capacity can be passed as a template parameter to boost::lockfree::queue. boost::lockfree::capacity includes boost::lockfree::fixed_sized.

In Example 46.4, the queue has a capacity of 10,000 elements. Because consume() inserts 10,000 numbers into the queue, the upper limit isn’t exceeded. If it were exceeded, push() would return false.

boost::lockfree::queue is similar to boost::lockfree::spsc_queue and also provides member functions like consume_one() and consume_all().

The third class, boost::lockfree::stack, is similar to the other ones. As with boost::lockfree::queue, boost::lockfree::fixed_size and boost::lockfree::capacity can be passed as template parameters. The member functions are similar, too.

Exercise

Remove the class boost::lockfree::spsc_queue from Example 46.1 and implement the program with std::queue.