######################################## 协程 ######################################## 协程又被成为纤程、微线程、用户态线程。与线程相比而言,它是 CPU 无法感知到的,CPU 调度依然以线程为最小单位。协程的根本革命性在于: - 以同步的方式完成了异步编程 - 协程之间无需加锁 协程有两种用途,一种是用作一般的控制流转移,例如 Python 的生成器。而另一种是在执行某些 IO 繁忙的操作时让出协程,去执行其它函数。之所以是 IO 繁忙是因为等待外设就绪的这部分时间对程序来说是无工作的。 协程分类 **************************************** 对称协程和非对称协程 ======================================== 根据控制转移机制可以将协程分为对称协程和非对称协程。对称协程有无调度器,需要程序员手动调度协程,而非对称协程则有调度器,程序会在 *恰当* 的时候主动让出协程。 boost.coroutine2 就是非对称协程,而 boost.fiber 就是对称协程,在使用 boost.fiber 提供的 barrier, mutex 等工具时,会引起协程调度。 .. seealso:: - `What is the difference between asymmetric and symmetric coroutines? `_ - `协程(一)协程的定义与分类_自由的天空-CSDN博客_对称协程和非对称协程 `_ 有栈协程和无栈协程 ======================================== 有栈协程和无栈协程实际上运行时都是有栈的,但是有栈协程会在堆上分配空间用作协程栈来保存上下文信息,而无栈协程则直接使用系统栈。有栈协程由于保存了自己的上下文信息,因此允许在任意嵌套深度的环境下进行协程切换操作。而无栈协程只允许在非嵌套环境下使用。(比如不能在 Lambda 表达式中切换) 另外,有栈协程又分为共享栈和非共享栈,其中的区别是堆上分配的空间是否被协程之间共享使用。共享栈能够降低协程的内存开销,但是非共享栈实现简单,效率也高。 .. hint:: 相较于有栈协程的百花齐放,无栈协程的实现相对有限。知名的仅有 `Protothreads `_ 一个。另外,C++ 20 中引入的协程也是无栈协程 .. seealso:: - `有栈协程与无栈协程 `_ - `如何在C++17中实现stackless coroutine以及相关的任务调度器 - 知乎 `_ - `开源一个超简单的无栈协程原型(2) - 知乎 `_ - `c++20协程入门 - 知乎 `_ - `C++20 协程初探 | Netcan on Programming `_ 调用栈 **************************************** 函数调用栈是一段连续的地址空间,无论 caller 还是 callee 都位于这段空间内。调用栈中一个函数所占用的空间称为一个栈桢,调用栈就是由若干栈桢拼接成的,一个典型的调用栈为: .. image:: assets/call-stack.png 其中, :abbr:`栈顶指针 (Stack Pointer)` 总是指向调用栈的顶部地址,此地址由 esp 寄存器储存。 :abbr:`栈桢指针 (Frame Pointer)` 实际上就是基址指针,由 ebp 储存。 *返回地址* 储存的是在 callee 返回后 caller 需要继续执行的地址。 .. note:: 调用栈和广义意义上程序的栈并无太大区别。一般来说调用栈是程序栈的组成部分,但是对于有栈协程,很多时候是在堆中分配内存,然后将此内存指定为调用栈。这时候调用栈就存在于堆中,而不是系统栈上。调用栈分配过小可能会导致 SIGBUS 错误 协程原语 **************************************** 协程有两个基本原语:yield 用于让出,resume 用于恢复 yield 和 resume 都是由 switch 实现的,switch 的两步功能为: switch(from, to) 1. save from.ctx 2. load to.ctx switch 实现有三种方式: 1. ucontext 2. setjmp/longjmp 3. 使用汇编 粘合 CPU 是计算密集型 - `GitHub - wangbojing/NtyCo: 纯c版本的协程实现,汇编切换,调度器实现,包含服务器端案例,客户端并发测试案例 `_ - `GitHub - gEricy/NtyCo: 轻量级协程库(源码自渎) `_ 协程的实现方式 **************************************** 协程有三种实现方式: - 使用汇编完成上下文切换 - 使用 ucontext 库 - 使用 setlongjump 一些不错的实现为: +-------------------------------------------------------------+----------+ | 库 | 实现方式 | +=============================================================+==========+ | `cloudwu/coroutine `_ | ucontext | +-------------------------------------------------------------+----------+ | `wangbojing/NtyCo `_ | 汇编 | +-------------------------------------------------------------+----------+ 协程 **************************************** 首先要明白协程的本质是什么。实际上协程就是一个用户态的函数,而协程的切换需要完成的工作就是在函数之间进行切换。 寄存器组是唯一被所有过程共享的资源,根据约定,rbx, rbp, r12-r15 是 *被调用者保存* 寄存器,这意味着当过程 P 调用过程 Q 时,过程 Q 负责保存这些寄存器 。除了 rsp,剩下的寄存器被称为 *调用者保存* 寄存器,这意味着当过程 P 调用 Q 时,保存这些寄存器是过程 P 的责任。 x64 的 CPU 一共有 16 个通用寄存器,但是我们只需要其中的 10 个就行,这 10 个寄存器分别是: +-----+--------------+ | rax | 返回值 | +=====+==============+ | rbx | 被调用者保存 | +-----+--------------+ | rcx | 第 4 个参数 | +-----+--------------+ | rdx | 第 3 个参数 | +-----+--------------+ | rsi | 第 2 个参数 | +-----+--------------+ | rdi | 第 1 个参数 | +-----+--------------+ | rbp | 被调用者保存 | +-----+--------------+ | rsp | 栈指针 | +-----+--------------+ | r8 | 第 5 个参数 | +-----+--------------+ | r9 | 第 6 个参数 | +-----+--------------+ | r10 | 调用者保存 | +-----+--------------+ | r11 | 调用者保存 | +-----+--------------+ | r12 | 被调用者保存 | +-----+--------------+ | r13 | 被调用者保存 | +-----+--------------+ | r14 | 被调用者保存 | +-----+--------------+ | r15 | 被调用者保存 | +-----+--------------+ 还有一个特殊寄存器 rip 用来保存 CPU 下一个执行的指令。根据上面的约定,我们只需要保存 rbx, rsi, rdi, rbp, rsp, rip, r12, r13, r14, r15 这 9 个寄存器 .. note:: - rbp 的作用是作为 rsp 的备份。这样当 rsp 前进时,局部变量和函数参数可以相对于 rbp 访问 `c - What is the purpose of the RBP register in x86_64 assembler?`_ - x64 的调用约定都准许 System V ABI 规范。但是 Linux 和 Windows 对于函数参数的处理略有不同 `GCC的调用约定`_ .. seealso:: - `Windows x64 软件约定 `_ - `System V x86-64 ABI `_ - `linux - Where is the x86-64 System V ABI documented? `_ .. _`c - What is the purpose of the RBP register in x86_64 assembler?`: https://stackoverflow.com/questions/41912684/what-is-the-purpose-of-the-rbp-register-in-x86-64-assembler .. _`GCC的调用约定`: https://blog.csdn.net/weixin_44395686/article/details/105036297 使用 ucontext 实现的协程 **************************************** 下面讲述使用 ucontext 实现协程的方法和原理 ucontext ======================================== .. admonition:: 注 代码实现位于仓库:https://github.com/cathaysia/CoCo ucontext 使用结构体 ucontext_t 来保存上下文信息,其定义可以简化为: .. code-block:: c typedef struct { void *ss_sp; // 栈顶 int ss_flags; // 栈标志 size_t ss_size; // 栈大小 } stack_t; typedef struct { ... struct ucontext_t *uc_link; // 指向后继上下文 stack_t uc_stack; // 协程栈 ... } ucontext_t; 而操纵上下文的几个函数分别为: .. code-block:: c int getcontext(ucontext_t *ucp); // 将当前上下文保存到 ucp 中 int setcontext(const ucontext_t *ucp); // 设置当前上下文 void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...); // 为 ucp 设置栈空并将 func 设置为 ucp 的后继上下文 int swapcontext(ucontext_t *oucp, ucontext_t *ucp); // 将当前上下文保存到 oucp 中,然后切换到 ucp 上 所谓的 *后继上下文* 就是指在当前上下文执行完毕后切换到后继上下文上。如果后继上下文为空指针,则推出线程 另外,makecontext 函数使用前 ucp 必须已经使用 getcontext 进行了初始化 一个初始化上下文的代码为: .. code-block:: cpp getcontext(&ctx_); // 使用当前上下文初始化 ctx_ // 设置上下文的栈属性 ctx_.uc_stack.ss_sp = stack_; ctx_.uc_stack.ss_size = kSTACK_SZIE; ctx_.uc_stack.ss_flags = 0; ctx_.uc_link = &(Schedule::getInstance()->main_); // 使用 ucontext 的成员函数初始化实际上下文对象 makecontext(&ctx_, &Schedule::mainCo, 0); 协程库的基本结构 ======================================== 就像线程分为主线程和分线程一样,协程也分为主协程和普通协程,两者的区别在于主协程只负责对普通协程的调度,而普通协程则是工作协程。两者除了职责不同外其余完全相同。 协程完整的实现为: .. code-block:: cpp constexpr const int kSTACK_SZIE = 1024 * 128; // 协程栈的大小,不能在小了 struct Coroutine { enum State { Stopped, Ready, Running, Suspend }; State state_; CoFun func_; void *args_; char stack_[kSTACK_SZIE]; ucontext_t ctx_; Coroutine(CoFun func, void *args) : state_(Coroutine::Ready), func_(func), args_(args), stack_ { 0 } { getcontext(&ctx_); ctx_.uc_stack.ss_sp = stack_; ctx_.uc_stack.ss_size = kSTACK_SZIE; ctx_.uc_stack.ss_flags = 0; ctx_.uc_link = &(Schedule::getInstance()->main_); makecontext(&ctx_, &Schedule::mainCo, 0); } Coroutine(Coroutine const &) = delete; }; 主协程被放置在调度器中,一个调度器包含了主协程的上下文对象 main\_ 和主协程需要执行的函数 mainCo。 当普通协程执行完毕后会自动切换到主协程上下文 main\_ 并开始执行主协程的函数 mainCo 调度器的声明为: .. code-block:: cpp using cid = int; using CoFun = void (*)(Schedule *s, void *args); struct Schedule { ucontext_t main_; // 主协程 上下文 int cur_co_; // 当前正在执行的协程的 id 号 std::deque coroutines_; // 用户协程队列 static Schedule *getInstance(); // 获取调度器 static void mainCo(); // 主协程的调度函数 cid push(CoFun fun, void *args); // 根据 fun 和 args 创建协程并将其储存到 coroutines_ 中 bool finished(); // 判读是否所有协程已经终止 void run(cid id); // 执行一个协程 Coroutine::State state(cid id); // 查看 cid 为 id 的协程的状态 Schedule(Schedule const&) = delete; // 禁止拷贝对象 ~Schedule(); private: Schedule() = default; }; 和线程有 tid 一样,我们也为协程声明了一个新的类型 cid。我们不将 cid 储存在协程对象中,这是因为: - 在协程执行时其 cid 总是等于 Schedule\:\:cur_co\_ - 当协程终止后我们可以重复利用它的 id 号 现在,我可以描述整个函数的流程: .. mermaid:: graph TD A[main 函数] -->|获取调度器| B[Schedule::getInstance] B --> |创建普通协程| C[Schedule::push] C --> D{协程是否已经全部终止} D --> |是| E[结束] D --> |否| F{查看协程状态} F --> |协程还没运行过| G[Schedule::run] F --> |协程已经运行过| H[resume] G & H --> |让出 CPU 到主协程| I[yield] I --> D 整个流程如图所示。 代码如下: .. code-block:: cpp Schedule *s = Schedule::getInstance(); cid id1 = s->push(func1, args1); cid id2 = s->push(func2, args2); while(!s->finished()) { for(int i = 0; i < s->coroutines_.size(); ++i) { switch(s->coroutines_[i]->state_) { case Coroutine::Ready: s->run(i); break; case Coroutine::Suspend: resume(i); break; default: break; } } } 创建调度器 ======================================== 调度器我们一个线程一个调度器,线程之间不共享: .. code-block:: cpp Schedule *Schedule::getInstance() { thread_local static Schedule *s = nullptr; if(s) return s; s = new Schedule; s->cur_co_ = -1; return s; } 另外,函数 mainCo 会在协程第一个被运行前调用,其根据调用当前活跃协程的函数: .. code-block:: cpp void Schedule::mainCo() { Schedule *s = Schedule::getInstance(); if(s->cur_co_ == -1) return; Coroutine *c = s->coroutines_[s->cur_co_]; c->func_(s, c->args_); c->state_ = Coroutine::Stopped; s->cur_co_ = -1; } 创建协程 ======================================== 我们先查找已经结束的协程的 cid,然后复用其 cid,否者创建一个新协程: .. code-block:: cpp cid Schedule::push(CoFun func, void *args) { int i = 0; for(i = 0; i < coroutines_.size(); i++) { if(coroutines_[i]->state_ == Coroutine::Stopped) { delete coroutines_[i]; Coroutine *c = new Coroutine(func, args); coroutines_[i] = c; break; } } if(i == coroutines_.size()) { Coroutine *c = new Coroutine(func, args); coroutines_.push_back(c); } return i; } 让出协程和恢复协程 ======================================== 这一步骤只是简单地在主协程和普通协程之间进行切换: .. code-block:: cpp void yield() { Schedule *s = Schedule::getInstance(); if(s->cur_co_ == -1) return; Coroutine *c = s->coroutines_[s->cur_co_]; c->state_ = Coroutine::Suspend; s->cur_co_ = -1; swapcontext(&(c->ctx_), &(s->main_)); } void resume(cid id) { Schedule *s = Schedule::getInstance(); if(id < 0 || id > s->coroutines_.size()) return; Coroutine *c = s->coroutines_[id]; if(c && c->state_ == Coroutine::Suspend) { c->state_ = Coroutine::Running; s->cur_co_ = id; swapcontext(&(s->main_), &(c->ctx_)); } }