bRPC源码解析·bthread调度执行流程

(作者简介:KIDGINBROOK,在昆仑芯参与训练框架开发工作)

整体流程

task_group负责对bthread的调度执行,一个task_group对应一个pthread,内部有两个执行队列,分别为_rq和_remote_rq,执行队列中存放着待执行的bthread,bthread创建的bthread会被存放在_rq,pthread创建的bthread会被存放在_remote_rq。task_control为全局单例,内部有多个task_group。 bthread整体流程

主要接口

TaskControl

TaskControl是一个单例,下面是初始化的过程,主要逻辑即为创建_concurrency个worker(bthread_worker)线程,每个worker执行worker_thread函数

  1. int TaskControl::init(int concurrency) {
  2. _concurrency = concurrency;
  3. _workers.resize(_concurrency);
  4. for (int i = 0; i < _concurrency; ++i) {
  5. const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
  6. ...
  7. }
  8. ...
  9. }

worker_thread的逻辑为通过create_group创建一个TaskGroup g,添加到TaskControl中,设置tls_task_group为g,tls_task_group为tls变量,因此只有worker线程的tls_task_group为非null,然后执行TaskGroup的run_main_task函数

  1. void* TaskControl::worker_thread(void* arg) {
  2. TaskControl* c = static_cast<TaskControl*>(arg);
  3. TaskGroup* g = c->create_group();
  4. ...
  5. tls_task_group = g;
  6. c->_nworkers << 1;
  7. g->run_main_task();
  8. ...
  9. }
  10. TaskGroup* TaskControl::create_group() {
  11. ...
  12. g->init(FLAGS_task_group_runqueue_capacity);
  13. ...
  14. }

TaskGroup

TaskGroup对应一个pthread,初始化函数如下,创建rq和remote_rq,都是负责存放待执行bthread的队列,然后创建main_stack和main_tid,main_tid代表主流程对应的bthread id,后面会具体讲main_stack和main_tid的作用。TaskMeta为一个bthread的meta信息,如执行函数,参数,local storage等,这里会将cur_meta设置为main_tid对应的TaskMeta。

  1. int TaskGroup::init(size_t runqueue_capacity) {
  2. _rq.init(runqueue_capacity);
  3. _remote_rq.init(runqueue_capacity / 2);
  4. ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
  5. ...
  6. butil::ResourceId<TaskMeta> slot;
  7. TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
  8. ...
  9. m->stop = false;
  10. m->interrupted = false;
  11. m->about_to_quit = false;
  12. m->fn = NULL;
  13. m->arg = NULL;
  14. m->local_storage = LOCAL_STORAGE_INIT;
  15. m->cpuwide_start_ns = butil::cpuwide_time_ns();
  16. m->stat = EMPTY_STAT;
  17. m->attr = BTHREAD_ATTR_TASKGROUP;
  18. m->tid = make_tid(*m->version_butex, slot);
  19. m->set_stack(stk);
  20. _cur_meta = m;
  21. _main_tid = m->tid;
  22. _main_stack = stk;
  23. _last_run_ns = butil::cpuwide_time_ns();
  24. return 0;
  25. }

每个worker会一直在while循环中,如果有可执行的bthread,wait_task会返回对应bthread的tid,否则当前worker会阻塞;wait_task的具体逻辑是先去当前task_group的_remote_rq中pop,如果没有,则去其他的task_group的_rq和_remote_rq中pop。

  1. void TaskGroup::run_main_task() {
  2. bvar::PassiveStatus<double> cumulated_cputime(
  3. get_cumulated_cputime_from_this, this);
  4. std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
  5. TaskGroup* dummy = this;
  6. bthread_t tid;
  7. while (wait_task(&tid)) {
  8. TaskGroup::sched_to(&dummy, tid);
  9. DCHECK_EQ(this, dummy);
  10. DCHECK_EQ(_cur_meta->stack, _main_stack);
  11. if (_cur_meta->tid != _main_tid) {
  12. TaskGroup::task_runner(1/*skip remained*/);
  13. }
  14. }
  15. }

当拿到可执行的tid后,调用sched_to,首先通过tid拿到该tid对应的TaskMeta,如果已经为该meta分配过栈,则调用sched_to(pg, next_meta),该函数的主要逻辑为通过jump_stack(cur_meta->stack, next_meta->stack)跳转至next_meta;否则通过get_stack分配一个新栈,并设置该栈的执行入口为task_runner函数。

  1. inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
  2. TaskMeta* next_meta = address_meta(next_tid);
  3. if (next_meta->stack == NULL) {
  4. ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
  5. if (stk) {
  6. next_meta->set_stack(stk);
  7. } else {
  8. // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
  9. // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
  10. // This basically means that if we can't allocate stack, run
  11. // the task in pthread directly.
  12. next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
  13. next_meta->set_stack((*pg)->_main_stack);
  14. }
  15. }
  16. // Update now_ns only when wait_task did yield.
  17. sched_to(pg, next_meta);
  18. }

task_runner核心如下,首先执行remain函数,remain为一个bthread在开始运行自己逻辑前需要做的一些工作,后面会看到;然后执行该meta的函数,因为函数执行过程中该bth可能会调度至其他worker,因此task_group可能发生改变,所以执行完成后重新对g进行设置;最后调用ending_sched。

  1. void TaskGroup::task_runner(intptr_t skip_remained) {
  2. // NOTE: tls_task_group is volatile since tasks are moved around
  3. // different groups.
  4. TaskGroup* g = tls_task_group;
  5. if (!skip_remained) {
  6. while (g->_last_context_remained) {
  7. RemainedFn fn = g->_last_context_remained;
  8. g->_last_context_remained = NULL;
  9. fn(g->_last_context_remained_arg);
  10. g = tls_task_group;
  11. }
  12. }
  13. do {
  14. TaskMeta* const m = g->_cur_meta;
  15. try {
  16. thread_return = m->fn(m->arg);
  17. } catch (ExitException& e) {
  18. thread_return = e.value();
  19. }
  20. // Group is probably changed
  21. g = tls_task_group;
  22. ...
  23. g->set_remained(TaskGroup::_release_last_context, m);
  24. ending_sched(&g);
  25. } while (g->_cur_meta->tid != g->_main_tid);
  26. // Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
  27. // tasks to run, quit for more tasks.
  28. }

ending_sched会尝试获取一个可执行的bth,如果没有的话,则下一个执行的则为main_tid对应的meta;然后通过上述的sched_to(next_meta)跳转到next_meta。

  1. void TaskGroup::ending_sched(TaskGroup** pg) {
  2. TaskGroup* g = *pg;
  3. bthread_t next_tid = 0;
  4. // Find next task to run, if none, switch to idle thread of the group.
  5. #ifndef BTHREAD_FAIR_WSQ
  6. // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
  7. // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
  8. // to 2.9%
  9. const bool popped = g->_rq.pop(&next_tid);
  10. #else
  11. const bool popped = g->_rq.steal(&next_tid);
  12. #endif
  13. if (!popped && !g->steal_task(&next_tid)) {
  14. // Jump to main task if there's no task to run.
  15. next_tid = g->_main_tid;
  16. }
  17. TaskMeta* const cur_meta = g->_cur_meta;
  18. TaskMeta* next_meta = address_meta(next_tid);
  19. if (next_meta->stack == NULL) {
  20. if (next_meta->stack_type() == cur_meta->stack_type()) {
  21. // also works with pthread_task scheduling to pthread_task, the
  22. // transfered stack is just _main_stack.
  23. next_meta->set_stack(cur_meta->release_stack());
  24. } else {
  25. ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
  26. if (stk) {
  27. next_meta->set_stack(stk);
  28. } else {
  29. // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
  30. // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
  31. // This basically means that if we can't allocate stack, run
  32. // the task in pthread directly.
  33. next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
  34. next_meta->set_stack(g->_main_stack);
  35. }
  36. }
  37. }
  38. sched_to(pg, next_meta);
  39. }

main tid

然后说下开始提到的main_tid/main_stack,task_group是一个pthread,在执行bthread时,会运行在该bthread栈中,其他时刻都是运行在pthread栈中。brpc并没有为pthread重新分配一个栈,而是仅仅记录了pthread栈的位置,main_stack即为pthread栈,而main_tid则代表了这个pthread。

下面来看下是如何实现这一过程的

  1. int TaskGroup::init(size_t runqueue_capacity) {
  2. ...
  3. ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
  4. ...
  5. }

在上面TaskGroup::init中,可以看到ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL); STACK_TYPE_MAIN即为main_stack的类型,get_stack会调用StackFactory的get_stack,StackFactory是个模板类,get_stack会分配栈空间,然后针对STACK_TYPE_MAIN做了特化,此时不会分配栈空间,仅仅返回一个ContextualStack对象;

  1. template <> struct StackFactory<MainStackClass> {
  2. static ContextualStack* get_stack(void (*)(intptr_t)) {
  3. ContextualStack* s = new (std::nothrow) ContextualStack;
  4. if (NULL == s) {
  5. return NULL;
  6. }
  7. s->context = NULL;
  8. s->stacktype = STACK_TYPE_MAIN;
  9. s->storage.zeroize();
  10. return s;
  11. }
  12. static void return_stack(ContextualStack* s) {
  13. delete s;
  14. }
  15. };

然后在切换到bthread执行的过程中,会调用jump_stack(cur_meta->stack, next_meta->stack)

  1. inline void jump_stack(ContextualStack* from, ContextualStack* to) {
  2. bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
  3. }

cur_meta此时为main_tid对应的taskmeta,next_meta为即将要执行的meta;由前面文章可知bthread_jump_fcontext执行时,会将当前各个寄存器push到当前栈中,即pthread栈,然后将esp赋值给(rdi),即from->context,因此main_tid的stack便指向了pthread栈。

主要接口

接下来看下bthread提供的接口,以bthread_start_urgent和bthread_start_background为例,如函数名所示,前者对新建的bthread以”高优先级”处理,后者以”低优先级”处理,后面会看到优先级的意思。首先看下bthread_start_urgent

bthread_start_urgent
  1. int bthread_start_urgent(bthread_t* __restrict tid,
  2. const bthread_attr_t* __restrict attr,
  3. void * (*fn)(void*),
  4. void* __restrict arg) {
  5. bthread::TaskGroup* g = bthread::tls_task_group;
  6. if (g) {
  7. // start from worker
  8. return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
  9. }
  10. return bthread::start_from_non_worker(tid, attr, fn, arg);
  11. }

由上可知,tls_task_group为tls,普通pthread的tls_task_group为null,先以普通pthread看下整体流程;此时普通pthread会调用start_from_non_worker。

  1. BUTIL_FORCE_INLINE int
  2. start_from_non_worker(bthread_t* __restrict tid,
  3. const bthread_attr_t* __restrict attr,
  4. void * (*fn)(void*),
  5. void* __restrict arg) {
  6. TaskControl* c = get_or_new_task_control();
  7. if (NULL == c) {
  8. return ENOMEM;
  9. }
  10. if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
  11. // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
  12. // 1. NOSIGNAL is often for creating many bthreads in batch,
  13. // inserting into the same TaskGroup maximizes the batch.
  14. // 2. bthread_flush() needs to know which TaskGroup to flush.
  15. TaskGroup* g = tls_task_group_nosignal;
  16. if (NULL == g) {
  17. g = c->choose_one_group();
  18. tls_task_group_nosignal = g;
  19. }
  20. return g->start_background<true>(tid, attr, fn, arg);
  21. }
  22. return c->choose_one_group()->start_background<true>(
  23. tid, attr, fn, arg);
  24. }

start_from_non_worker会尝试获取taskcontrol单例,如果没有则创建一个,并初始化好一定数量的taskgroup;然后选择一个taskgroup,调用start_background。

  1. template <bool REMOTE>
  2. int TaskGroup::start_background(bthread_t* __restrict th,
  3. const bthread_attr_t* __restrict attr,
  4. void * (*fn)(void*),
  5. void* __restrict arg) {
  6. ...
  7. butil::ResourceId<TaskMeta> slot;
  8. TaskMeta* m = butil::get_resource(&slot);
  9. ...
  10. m->fn = fn;
  11. m->arg = arg;
  12. ...
  13. if (REMOTE) {
  14. ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
  15. } else {
  16. ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
  17. }
  18. return 0;
  19. }

REMOTE表示创建该bthread的线程是普通pthread还是bthread_worker,函数主要逻辑为创建taskmeta,然后调用ready_to_run_remote将该tid加入到taskgroup的remote_rq中。

然后看下bthread_worker调用bthread_start_urgent的过程,这种场景其实是在bthread中创建bthread,此时会调用start_foreground,然后创建taskmeta,并直接切换到这个新的bthread运行,即”高优先级”。

  1. int TaskGroup::start_foreground(TaskGroup** pg,
  2. bthread_t* __restrict th,
  3. const bthread_attr_t* __restrict attr,
  4. void * (*fn)(void*),
  5. void* __restrict arg) {
  6. ...
  7. TaskGroup* g = *pg;
  8. g->_control->_nbthreads << 1;
  9. if (g->is_current_pthread_task()) {
  10. // never create foreground task in pthread.
  11. g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
  12. } else {
  13. // NOSIGNAL affects current task, not the new task.
  14. RemainedFn fn = NULL;
  15. if (g->current_task()->about_to_quit) {
  16. fn = ready_to_run_in_worker_ignoresignal;
  17. } else {
  18. fn = ready_to_run_in_worker;
  19. }
  20. ReadyToRunArgs args = {
  21. g->current_tid(),
  22. (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
  23. };
  24. g->set_remained(fn, &args);
  25. TaskGroup::sched_to(pg, m->tid);
  26. }
  27. return 0;
  28. }

start_foreground最后,这里会设置当前task_group的remain,上文提到在task_runner中,bthread在真正执行自己meta的逻辑前会先执行remain,start_foreground会抢占当前bthread的执行,因此通过remain将当前bthread重新push到rq中等待执行。

bthread_start_background

接口bthread_start_background对于普通pthread的情况和bthread_start_urgent一致;而对于bthread_worker则会调用start_background,此时在新建taskmeta后会调用ready_to_run,此时会将该bthread push到rq中,而不是直接切换运行,即”低优先级”。

修改于 2024年2月5日: Release bRPC 1.8.0 (254a6bb)