



  • 异步有序执行: 任务在另外一个单独的线程中执行, 并且执行顺序严格和提交顺序一致,任务提交是wait-free的
  • Multi Producer: 多个线程可以同时向一个ExecutionQueue提交任务
  • 支持cancel一个已经提交的任务
  • 支持stop
  • 支持高优任务插队,且执行顺序也会严格按照提交顺序


首先看下使用ExecutionQueue的例子,定义执行函数consume和执行任务DemoTask,consume函数中就是遍历所有task,然后执行每个task的run;然后定义一个ExecutionQueue,使用execution_queue_start启动,使用execution_queue_execute提交一个新的任务。 这里consume为什么使用for循环的原因后面会讲。

  1. class DemoTask {
  2. public:
  3. void run();
  4. };
  5. int consume(void* meta, TaskIterator<DemoTask*>& iter) {
  6. if (iter.is_queue_stopped()) {
  7. return 0;
  8. }
  9. for (; iter; ++iter) {
  10. DemoTask* task = *iter;
  11. task->run();
  12. }
  13. return 0;
  14. }
  15. ExecutionQueueId<DemoTask*> exe_queue;
  16. int ret = execution_queue_start(&exe_queue, nullptr, consume, nullptr);
  17. DemoTask* task = new DemoTask();
  18. ret = execution_queue_execute(exe_queue, task);



  1. template <typename T>
  2. struct ExecutionQueueId {
  3. uint64_t value;
  4. };
  5. inline int execution_queue_start(
  6. ExecutionQueueId<T>* id,
  7. const ExecutionQueueOptions* options,
  8. int (*execute)(void* meta, TaskIterator<T>&),
  9. void* meta) {
  10. return ExecutionQueue<T>::create(id, options, execute, meta);
  11. }

id为64位类型, 相当于ExecutionQueue实例的一个弱引用, 可以wait-free的在O(1)时间内定位一个ExecutionQueue,option和meta我们传的都是null,所以先不关注,execute即刚刚定义的consume函数。

  1. inline static int create(id_t* id, const ExecutionQueueOptions* options,
  2. execute_func_t execute_func, void* meta) {
  3. return Base::create(&id->value, options, execute_task,
  4. clear_task_mem, meta, (void*)execute_func);
  5. }
  6. int ExecutionQueueBase::create(uint64_t* id, const ExecutionQueueOptions* options,
  7. execute_func_t execute_func,
  8. clear_task_mem clear_func,
  9. void* meta, void* type_specific_function) {
  10. ...
  11. slot_id_t slot;
  12. ExecutionQueueBase* const m = butil::get_resource(&slot, Forbidden());
  13. if (BAIDU_LIKELY(m != NULL)) {
  14. m->_execute_func = execute_func;
  15. m->_clear_func = clear_func;
  16. m->_meta = meta;
  17. m->_type_specific_function = type_specific_function;
  18. CHECK(m->_head.load(butil::memory_order_relaxed) == NULL);
  19. CHECK_EQ(0, m->_high_priority_tasks.load(butil::memory_order_relaxed));
  20. ExecutionQueueOptions opt;
  21. if (options != NULL) {
  22. opt = *options;
  23. }
  24. m->_options = opt;
  25. m->, butil::memory_order_relaxed);
  26. m->_this_id = make_id(
  27. _version_of_vref(m->_versioned_ref.fetch_add(
  28. 1, butil::memory_order_release)), slot);
  29. *id = m->_this_id;
  30. get_execq_vars()->execq_count << 1;
  31. return 0;
  32. }
  33. return ENOMEM;
  34. }
  35. static int execute_task(void* meta, void* specific_function,
  36. TaskIteratorBase& it) {
  37. execute_func_t f = (execute_func_t)specific_function;
  38. return f(meta, static_cast<iterator&>(it));
  39. }



然后看下执行一个任务,其中butil::add_const_reference::type就是const T&,首先会通过id address到ExecutionQueue,然后调用execute,在示例的场景下option和handle均为null。

  1. template <typename T>
  2. inline int execution_queue_execute(ExecutionQueueId<T> id,
  3. typename butil::add_const_reference<T>::type task,
  4. const TaskOptions* options,
  5. TaskHandle* handle) {
  6. typename ExecutionQueue<T>::scoped_ptr_t
  7. ptr = ExecutionQueue<T>::address(id);
  8. if (ptr != NULL) {
  9. return ptr->execute(task, options, handle);
  10. } else {
  11. return EINVAL;
  12. }
  13. }
  14. int execute(typename butil::add_const_reference<T>::type task,
  15. const TaskOptions* options, TaskHandle* handle) {
  16. if (stopped()) {
  17. return EINVAL;
  18. }
  19. TaskNode* node = allocate_node();
  20. if (BAIDU_UNLIKELY(node == NULL)) {
  21. return ENOMEM;
  22. }
  23. void* const mem = allocator::allocate(node);
  24. if (BAIDU_UNLIKELY(!mem)) {
  25. return_task_node(node);
  26. return ENOMEM;
  27. }
  28. new (mem) T(task);
  29. ...
  30. }


  2. ...
  3. butil::Mutex mutex; // to guard version and status
  4. int64_t version;
  5. uint8_t status;
  6. bool stop_task;
  7. bool iterated;
  8. bool high_priority;
  9. bool in_place;
  10. TaskNode* next;
  11. ExecutionQueueBase* q;
  12. union {
  13. char static_task_mem[56]; // Make sizeof TaskNode exactly 128 bytes
  14. char* dynamic_task_mem;
  15. };
  16. ...
  17. static TaskNode* const UNCONNECTED;
  18. };


  1. template <size_t size, bool small_object> struct TaskAllocatorBase {
  2. };
  3. template <size_t size>
  4. struct TaskAllocatorBase<size, true> {
  5. inline static void* allocate(TaskNode* node)
  6. { return node->static_task_mem; }
  7. inline static void* get_allocated_mem(TaskNode* node)
  8. { return node->static_task_mem; }
  9. inline static void deallocate(TaskNode*) {}
  10. };
  11. template<size_t size>
  12. struct TaskAllocatorBase<size, false> {
  13. inline static void* allocate(TaskNode* node) {
  14. node->dynamic_task_mem = (char*)malloc(size);
  15. return node->dynamic_task_mem;
  16. }
  17. inline static void* get_allocated_mem(TaskNode* node)
  18. { return node->dynamic_task_mem; }
  19. inline static void deallocate(TaskNode* node) {
  20. free(node->dynamic_task_mem);
  21. }
  22. };
  23. template <typename T>
  24. struct TaskAllocator : public TaskAllocatorBase<
  25. sizeof(T), sizeof(T) <= sizeof(TaskNode().static_task_mem)>
  26. {};

然后调用allocator的allocate,如上所述,这里直接返回node的static_task_mem,然后在这块内存上调用placement_new,所以DemoTask*便赋值到了static_task_mem上。 然后设置优先级等,因为传入的TaskOptions为null,所以不是高优,然后执行start_execute。

  1. int execute(typename butil::add_const_reference<T>::type task,
  2. const TaskOptions* options, TaskHandle* handle) {
  3. ...
  4. node->stop_task = false;
  5. TaskOptions opt;
  6. if (options) {
  7. opt = *options;
  8. }
  9. node->high_priority = opt.high_priority;
  10. node->in_place = opt.in_place_if_possible;
  11. if (handle) {
  12. handle->node = node;
  13. handle->version = node->version;
  14. }
  15. start_execute(node);
  16. return 0;
  17. }


  1. void ExecutionQueueBase::start_execute(TaskNode* node) {
  2. node->next = TaskNode::UNCONNECTED;
  3. node->status = UNEXECUTED;
  4. node->iterated = false;
  5. if (node->high_priority) {
  6. // Add _high_priority_tasks before pushing this task into queue to
  7. // make sure that _execute_tasks sees the newest number when this
  8. // task is in the queue. Although there might be some useless for
  9. // loops in _execute_tasks if this thread is scheduled out at this
  10. // point, we think it's just fine.
  11. _high_priority_tasks.fetch_add(1, butil::memory_order_relaxed);
  12. }
  13. TaskNode* const prev_head =, butil::memory_order_release);
  14. if (prev_head != NULL) {
  15. node->next = prev_head;
  16. return;
  17. }
  18. ...
  19. }


然后结合示意图看下之后会发生什么,假设此时时间点t1,现在队列里只有一个节点 图 1


  1. void* ExecutionQueueBase::_execute_tasks(void* arg) {
  2. ExecutionQueueVars* vars = get_execq_vars();
  3. TaskNode* head = (TaskNode*)arg;
  4. ExecutionQueueBase* m = (ExecutionQueueBase*)head->q;
  5. TaskNode* cur_tail = NULL;
  6. bool destroy_queue = false;
  7. for (;;) {
  8. if (head->iterated) {
  9. CHECK(head->next != NULL);
  10. TaskNode* saved_head = head;
  11. head = head->next;
  12. m->return_task_node(saved_head);
  13. }
  14. int rc = 0;
  15. if (m->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
  16. int nexecuted = 0;
  17. // Don't care the return value
  18. rc = m->_execute(head, true, &nexecuted);
  19. m->_high_priority_tasks.fetch_sub(
  20. nexecuted, butil::memory_order_relaxed);
  21. if (nexecuted == 0) {
  22. // Some high_priority tasks are not in queue
  23. sched_yield();
  24. }
  25. } else {
  26. rc = m->_execute(head, false, NULL);
  27. }
  28. ...
  29. }
  30. vars->execq_active_count << -1;
  31. return NULL;
  32. }


  1. int ExecutionQueueBase::_execute(TaskNode* head, bool high_priority, int* niterated) {
  2. ...
  3. TaskIteratorBase iter(head, this, false, high_priority);
  4. if (iter) {
  5. _execute_func(_meta, _type_specific_function, iter);
  6. }
  7. ...
  8. }



  1. class TaskIteratorBase {
  2. ...
  3. TaskNode* _cur_node;
  4. TaskNode* _head;
  5. ExecutionQueueBase* _q;
  6. bool _is_stopped;
  7. bool _high_priority;
  8. bool _should_break;
  9. int _num_iterated;
  10. };
  11. template <typename T>
  12. class TaskIterator : public TaskIteratorBase {
  13. TaskIterator();
  14. public:
  15. typedef T* pointer;
  16. typedef T& reference;
  17. reference operator*() const;
  18. pointer operator->() const { return &(operator*()); }
  19. TaskIterator& operator++();
  20. void operator++(int);
  21. };

在demo的consume函数中,通过对TaskIter解引用得到了DemoTask*,这块逻辑如下,上文中说到 DemoTask* 存在TaskNode的static_task_mem中,这里get_allocated_mem则是直接返回static_task_mem,因此便拿到了加到队列中的DemoTask*。

  1. inline typename TaskIterator<T>::reference
  2. TaskIterator<T>::operator*() const {
  3. T* const ptr = (T* const)TaskAllocator<T>::get_allocated_mem(cur_node());
  4. return *ptr;
  5. }

然后看下自增操作,主要逻辑就是将_cur_node挪到下一个和当前iterator优先级一致的,并且没有被遍历过的节点。 具体的,先判断当前节点是否遍历过,在示例中节点1的iterated为false,所以直接往下进入while循环,因为当前生成的是低优先级的iter,node也是低优先级,所以进入if,在第二个if中,iterated为false,peek_to_execute是判断当前节点状态是否为UNEXECUTED,因此也进入第二个if,将当前节点iterated置为true直接返回。注意上面TaskIter的构造函数会执行一次operator++,所以就会将节点1的iterated置为true,且_cur_node指向1。

  1. void TaskIteratorBase::operator++() {
  2. if (!(*this)) {
  3. return;
  4. }
  5. if (_cur_node->iterated) {
  6. _cur_node = _cur_node->next;
  7. }
  8. if (should_break_for_high_priority_tasks()) {
  9. return;
  10. } // else the next high_priority_task would be delayed for at most one task
  11. while (_cur_node && !_cur_node->stop_task) {
  12. if (_high_priority == _cur_node->high_priority) {
  13. if (!_cur_node->iterated && _cur_node->peek_to_execute()) {
  14. ++_num_iterated;
  15. _cur_node->iterated = true;
  16. return;
  17. }
  18. _num_iterated += !_cur_node->iterated;
  19. _cur_node->iterated = true;
  20. }
  21. _cur_node = _cur_node->next;
  22. }
  23. return;
  24. }


  1. TaskIteratorBase::~TaskIteratorBase() {
  2. // Set the iterated tasks as EXECUTED here instead of waiting them to be
  3. // returned in _start_execute as the high_priority_task might be in the
  4. // middle of the linked list and is not going to be returned soon
  5. if (_is_stopped) {
  6. return;
  7. }
  8. while (_head != _cur_node) {
  9. if (_head->iterated && _head->high_priority == _high_priority) {
  10. _head->set_executed();
  11. }
  12. _head = _head->next;
  13. }
  14. if (_should_break && _cur_node != NULL
  15. && _cur_node->high_priority == _high_priority && _cur_node->iterated) {
  16. _cur_node->set_executed();
  17. }
  18. }



  1. void* ExecutionQueueBase::_execute_tasks(void* arg) {
  2. ExecutionQueueVars* vars = get_execq_vars();
  3. TaskNode* head = (TaskNode*)arg;
  4. ExecutionQueueBase* m = (ExecutionQueueBase*)head->q;
  5. TaskNode* cur_tail = NULL;
  6. bool destroy_queue = false;
  7. for (;;) {
  8. ...
  9. // Release TaskNode until uniterated task or last task
  10. while (head->next != NULL && head->iterated) {
  11. TaskNode* saved_head = head;
  12. head = head->next;
  13. m->return_task_node(saved_head);
  14. }
  15. if (cur_tail == NULL) {
  16. for (cur_tail = head; cur_tail->next != NULL;
  17. cur_tail = cur_tail->next) {}
  18. }
  19. // break when no more tasks and head has been executed
  20. if (!m->_more_tasks(cur_tail, &cur_tail, !head->iterated)) {
  21. CHECK_EQ(cur_tail, head);
  22. CHECK(head->iterated);
  23. m->return_task_node(head);
  24. break;
  25. }
  26. }
  27. ...
  28. return NULL;
  29. }


图 2


  1. inline bool ExecutionQueueBase::_more_tasks(
  2. TaskNode* old_head, TaskNode** new_tail,
  3. bool has_uniterated) {
  4. CHECK(old_head->next == NULL);
  5. // Try to set _head to NULL to mark that the execute is done.
  6. TaskNode* new_head = old_head;
  7. TaskNode* desired = NULL;
  8. bool return_when_no_more = false;
  9. if (has_uniterated) {
  10. desired = old_head;
  11. return_when_no_more = true;
  12. }
  13. if (_head.compare_exchange_strong(
  14. new_head, desired, butil::memory_order_acquire)) {
  15. // No one added new tasks.
  16. return return_when_no_more;
  17. }
  18. ...
  19. }

假设执行到此时为t3,又新加了两个节点,如下图 图 3


  1. inline bool ExecutionQueueBase::_more_tasks(
  2. TaskNode* old_head, TaskNode** new_tail,
  3. bool has_uniterated) {
  4. ...
  5. TaskNode* tail = NULL;
  6. if (new_tail) {
  7. *new_tail = new_head;
  8. }
  9. TaskNode* p = new_head;
  10. do {
  11. while (p->next == TaskNode::UNCONNECTED) {
  12. // TODO(gejun): elaborate this
  13. sched_yield();
  14. }
  15. TaskNode* const saved_next = p->next;
  16. p->next = tail;
  17. tail = p;
  18. p = saved_next;
  19. CHECK(p != NULL);
  20. } while (p != old_head);
  21. // Link old list with new list.
  22. old_head->next = tail;
  23. return true;
  24. }

此时整个执行队列如下图所示,此时节点1已被执行过,但仍在队列中,回到上文的_execute_tasks,下次循环时,首先head是否被遍历过,如果遍历过,则将该节点释放; 图 4

然后执行_execute,_execute中会执行2,3,接着释放节点2,继续链表反转,此时队列结构如下图,后面的过程则和上文类似不再赘述。 图 5




假设t3时刻加入的5是high_priority,那么执行完_more_task第二次循环时,摘掉已执行过的节点1之后的队列情况如下图 图 6


  1. void* ExecutionQueueBase::_execute_tasks(void* arg) {
  2. ...
  3. for (;;) {
  4. ...
  5. if (m->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
  6. int nexecuted = 0;
  7. // Don't care the return value
  8. rc = m->_execute(head, true, &nexecuted);
  9. m->_high_priority_tasks.fetch_sub(
  10. nexecuted, butil::memory_order_relaxed);
  11. if (nexecuted == 0) {
  12. // Some high_priority tasks are not in queue
  13. sched_yield();
  14. }
  15. } else {
  16. rc = m->_execute(head, false, NULL);
  17. }
  18. ...
  19. }
  20. vars->execq_active_count << -1;
  21. return NULL;
  22. }


接着调用_more_task,cur_tail指向节点3,经过反转链表后如下图所示 图 7 重新执行_execute,生成高优iter,执行++,遍历到5的时候设置5的iterated为true,然后return,调用用户自定义执行函数consume执行了5这个高优task,consume中的++又会跳过所有低优任务到达null,此时再执行_more_task时,因为head节点iterated为false,所以has_uniterated为true,此时desired指向5,return_when_no_more为true,此时再经过cas时会直接返回true;下一轮循环中会生成低优迭代器执行队列中2,3,4,并在回收内存时将5的内存一并回收。

  1. inline bool ExecutionQueueBase::_more_tasks(
  2. TaskNode* old_head, TaskNode** new_tail,
  3. bool has_uniterated) {
  4. CHECK(old_head->next == NULL);
  5. // Try to set _head to NULL to mark that the execute is done.
  6. TaskNode* new_head = old_head;
  7. TaskNode* desired = NULL;
  8. bool return_when_no_more = false;
  9. if (has_uniterated) {
  10. desired = old_head;
  11. return_when_no_more = true;
  12. }
  13. if (_head.compare_exchange_strong(
  14. new_head, desired, butil::memory_order_acquire)) {
  15. // No one added new tasks.
  16. return return_when_no_more;
  17. }
  18. ...
  19. }

