[造轮子] 又一个 c++ coroutine 的实现

Coroutine (又称协程)目前已经是一个比较热门和时髦的概念。现代的编程语言很多都已经从 语言层面对这一概念进行了支持,甚至有些语言将其作为主打的特性,比如说 Golang 。本文就将简单介绍如何在 C++ 中实现一个协程。

协程的定义与分类

根据维基百科,协程被定义如下

Coroutines are computer-program components that generalize subroutines for non-preemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.

用中文说就是 协程是一个子程序,只不过这个子程序可以被暂停和继续执行。 传统意义上的同步和异步程序是这样的:要么必须等待函数执行完毕返回;要么提供一个回调,当函数执行完毕之后会调用这个回调。但是协程提供了另一个 可能,可以在函数执行一半的时候返回,当需要的时候还能在返回处继续执行。

从分类上来说,协程分为两种,对称与非对称。所谓非对称协程是指协程之间存在着调用者和被调用者的关系。比如说在 lua 中,一个 coroutine 的结束就意味着另一个 coroutine 中coroutine.resume 的返回。但是对于对称 协程而言,协程之间是不存在从属的关系的,比如说 golang 中的go关键字,两个协程同时执行,其运行关系很类似于操作系统中的*线程*。

本文不讨论对称协程的实现,单纯地在 C++ 中实现一个类似 Lua 中的协程接口。

前置知识

考虑到协程要允许函数在执行的时候中断并重新开始,这很容易使我们想到两个标准C的接口, setjmp, longjmp。这两个函数一般用作异常的处理。像在 Lua 中, 一个 pcall 其实相当于调用了 setjmp,而 error 调用的是 longjmpsetjmp的作用是保存当前的运行环境,所谓运行环境就是局部变量,所谓局部变量在底层汇编也就是栈和寄存器的内容,所谓栈在 x86_64 中也就是 bpsp 指针,说到底还是寄存器。因此setjmp 要保存的其实就是寄存器的内容。同样longjmp所实现的跳转也就是将寄存器恢复出来。并没有什么魔法黑科技。

下面来看看setjmp的实现(来自musl libc)

setjmp:
    mov %rbx,(%rdi)         /* rdi is jmp_buf, move registers onto it */
    mov %rbp,8(%rdi)
    mov %r12,16(%rdi)
    mov %r13,24(%rdi)
    mov %r14,32(%rdi)
    mov %r15,40(%rdi)
    lea 8(%rsp),%rdx        /* this is our rsp WITHOUT current ret addr */
    mov %rdx,48(%rdi)
    mov (%rsp),%rdx         /* save return addr ptr for new rip */
    mov %rdx,56(%rdi)
    xor %rax,%rax           /* always return 0 */
    ret

可以看出主要保存了以下的寄存器rbx, rbp, r12, r13, r14, r15,然后是rsp。最后是rip,当然这个rip是函数调用前的下一条,也就是栈顶上的那个元素。

为什么会选择保存这些寄存器?因为这些寄存器(当然除了 rip 外)都是跨 function call 保留的,也就是当使用call的时候, 这些寄存器的值会被保留到调用的函数中。因此必须要保存这些变量,这其实就是所谓的 Context 。其他常用的寄存器会由编译器自动处理,因此保存下来是没有意义的。

同样,longjmp 只是将这些 mov 操作反过来。但是由于没法直接修改 riplongjmp的代码最后一行是jmp

PS: 这里面实现的 setjmp 其实效率并不够高,libunwind 中有更快的实现方案,只需要保存rsprip就可以,其采用dwarf中定义的栈回溯方式来回复其他变量,因此setjmp 的效率会比较高,而longjmp的效率则比较低

协程库接口设计

可以仿照Lua 中的设计。其中最核心的接口就是 resumeyield。这两个接口其实就是类似于上面的setjmplongjmp的结合体。都是 先保存当前的运行环境,然后 jump 到目标。只不过一个是 jump 到协程里面,一个是从协程里面跳出到外面。互相跳的关系。

context环境

其实就一行

typedef void *co_jmp_buf[8]; /* rip, rsp, rbp, rbx, r12, r13, r14, r15 */

这个与 setjmp 保存的东西是完全一致的。

jump 的实现

由于resumeyield的基础操作都是 jump, 也就是从一个 context 跳转到另一个 context,因此,下面就是co_jump的实现:

static inline void co_jump(co_jmp_buf from, co_jmp_buf to) {
    __asm__ __volatile__("leaq 1f(%%rip), %%rax\n\t"
                         "movq %%rax, (%0)\n\t"
                         "movq %%rsp, 8(%0)\n\t"
                         "movq %%rbp, 16(%0)\n\t"
                         "movq %%rbx, 24(%0)\n\t"
                         "movq %%r12, 32(%0)\n\t"
                         "movq %%r13, 40(%0)\n\t"
                         "movq %%r14, 48(%0)\n\t"
                         "movq %%r15, 56(%0)\n\t"
                         "movq 56(%1), %%r15\n\t"
                         "movq 48(%1), %%r14\n\t"
                         "movq 40(%1), %%r13\n\t"
                         "movq 32(%1), %%r12\n\t"
                         "movq 24(%1), %%rbx\n\t"
                         "movq 16(%1), %%rbp\n\t"
                         "movq 8(%1), %%rsp\n\t"
                         "jmpq *(%1)\n"
                         "1:\n"
                         : "+S"(from), "+D"(to)
                         :
                         : "rax", "rcx", "rdx", "r8", "r9", "r10", "r11", "memory", "cc");
}

其中, from会用来存储当前的 context 以便下次 jump 回来。而 to 则是需要跳转的目标 context。下面对这些代码进行解释

  1. leaq 1f(%%rip), %%rax 这里是要保存当前的iprax,但是由于下次跳转不能直接到当前的ip,否则就死循环了。因此加了一个1f,这玩意儿叫fb label,具体 解释可以看这里。这里相当于存储了1: label 的地址。
  2. 接下来就是存储当前的各种寄存器,和上面的setjmp 类似,不表
  3. 然后将目标环境的寄存器恢复,和longjmp类似
  4. 最后直接 jmp 过去, 也就是目标ip所指的地址。

创建 coroutine

有了co_jump之后,还需要一个初始化的routine,毕竟当我们启动协程的时候,目标的 context 不能是空的,得有一定的内容,其中最核心的就是启动函数。

coroutine 是一个独立的执行单元,需要一个独立的栈,因此我们需要分配一块内存,一般为2M,另外由于栈的地址是从高到低的,也就是说需要将分配内存 的 top 作为 rsp的初始化。

stack_ = malloc(CO_STACK_SIZE);
sp_ = (char *)(stack_) + CO_STACK_SIZE;
regs[1] = sp_; // rsp = sp_

对于 rip的值,其应该指向一个入口函数,这个函数不能直接是传入的入口函数,毕竟我们希望更灵活一点,可以更加精确地控制传参等功能。所以这里做了一些调整

regs[0] = (void *)(co_wrap_main); // rip = co_wrap_main;
regs[4] = reinterpret_cast<void *>(+[](Coroutine *self) { // r12 = 我们的入口函数
        // do with self
    });
regs[5] = this; // r13 其实是 Coroutine 的 this 指针

为了将 this 指针传入到 r12的入口函数中,co_wrap_main需要这样编写

static void co_wrap_main(void) {
    __asm__ __volatile__("\tmovq %r13, %rdi\n" // %rdi is the first argument
                         "\tjmpq *%r12\n");
}

其中 rdiamd64 架构下,第一个形参的寄存器。注意第二句,由于栈之类的已经保存了,因此不能使用call指令,而直接使用jmp,从而避免修改了rsp

以上就是最核心的内容,有了初始化操作和co_jump,就可以很自然地去实现yieldresume了。这里就不再详细介绍。

异常处理

上述的设计并没有考虑到异常。如果一个协程并没有完全执行完而是在协程函数return之前就已经终止了,按理说在这个函数中的局部变量应该被析构。但是由于我们直接修改了寄存器, 并没有调用这些析构函数,因此是有问题的。

好在我们可以借用 C++ 的异常。这里可以在 coroutine 销毁时,进入到目标的栈,然后强制抛出一个异常,再到最外层进行补货。接下来再销毁 coroutine 的栈即可。

Coroutine::~Coroutine() {
    if (status_ == Status::SUSPEND) {
        force_unwind_ = true; //先标记一下,说明要销毁栈了
        resume(NULL); // 进入到目标的执行栈
    }
    free(stack_);
}

// 这是统一入口函数
regs[4] = reinterpret_cast<void *>(+[](Coroutine *self) {
        try {
            self->yield_arg_ = self->start_routine_(self->resume_arg_);

        } catch (const ForceUnwind &) {
            // 最外层包装了一个 try ,专门用来捕获 ForceUnwind 异常
        }
        self->status_ = Status::EXIT;
    });

void *Coroutine::yield(void *ret) {
    yield_arg_ = ret;
    co_jmp_buf target;
    memcpy(target, saved_ctx_, sizeof(target));
    co_jump(saved_ctx_, target);
    if (force_unwind_) { // 从 yield 处恢复,这里其实是上面 resume 的下一句,立马抛出异常
        throw ForceUnwind{};
    }
    return resume_arg_;
}

上面的实现会有一个问题,即如果用户使用try{}catch(...){}之类的包住了yield,那么ForceUnwind异常就没法被最外层的程序捕获,会导致出现问题。目前并没有良好的解决方案。

其他选择

当然,这里实现的协程库并不能作为生产环境使用,现在已经有很多相关的第三方库可供使用,很多基于ucontext。 此外,像Boost.Coroutine2 也已经比较成熟,也提供了一些更为高层的解决方案。如果要在生产中使用,也 应该优先选择这些库。

--EOF--
欢迎关注我的微信公众号