手记

基于ucontext.h的轻量级协程库

本文主要是对自己学习协程并实现轻量级协程过程的一个记录, 语言略显啰嗦, 各位见谅. 水平有限, 如有疏漏, 欢迎各位指正.

一 了解协程

  • 协程可以理解为一种用户态的轻量级线程, 切换由用户定义

  • 协程上下文切换很快, 因为不会陷入内核态

  • 协程拥有自己的寄存器上下文和栈, 协程调度切换时,将寄存器上下文和栈保存到其他地方,在切换回来的时候,恢复先前保存的寄存器上下文和栈

优点

  • 协程具有极高的执行效率 因为子程序切换不是线程切换,是由程序自身控制,因此协程没有线程切换的开销, 多线程的线程数量越多,协程的性能优势就越明显

  • 访问共享资源不需要多线程的锁机制, 因为只有一个线程, 也不存在同时写变量冲突, 所以在协程中控制共享资源无需加锁, 只需要判断状态就好了,执行效率比多线程高很多, 而且代码编写难度也可以相应降低

  • 以同步代码的方式写异步逻辑

缺点

  • 无法利用多核资源, 除非和多进程配合



二 了解ucontext

  • ucontext组件是什么

    • 头文件<ucontext.h>定义了两个数据结构, mcontext_t(暂时用不到)和ucontext_t和四个函数, 可以被用来实现一个进程中的用户级线程(协程)切换

数据结构

  • ucontext_t 结构体至少拥有如下几个域

    typedef struct ucontext {
        struct ucontext *uc_link;
        sigset_t uc_sigmask;    stack_t uc_stack;    mcontext_t uc_mcontext;
        ...
    } ucontext_t;
    • uc_link指向后继上下文, 当前上下文运行终止时系统会恢复指向的上下文

    • uc_sigmask为该上下文中的阻塞信号集合

    • uc_stack为该上下文中使用的栈

    • uc_mcontex保存上下文的特定机器, 包括调用线程的特定寄存器等等

    • 简而言之这个数据结构是用来保存上下文的

函数

  1. int getcontext(ucontext_t * ucp);

  • 获取当前上下文, 初始化ucp结构体, 将当前上下文保存到ucp中

  • 成功返回0; 失败返回-1, 并设置errno

void makecontext(ucontext_t *ucp, void(*func)(), int argc, ...);

ucontext_t tar_ctx;ucontext_t next_ctx;char stack[100];
getcontext(&tar_ctx);
tar_ctx.uc_stack.ss_sp = stack;
tar_ctx.uc_stack.ss_sp = sizeof(stack);
tar_ctx.uc_stack.ss_flags = 0;
tar_ctx.uc_link = &next_ctx;
  • 创建一个目标上下文 创建方式: (1) getcontext, (2) 指定分配给上下文的栈uc_stack.ss_sp, (3) 指定这块栈的大小uc_stack.ss_size, (4) 指定uc_stack.ss_flags, (5) 指定后继上下文uc_link

  • 协程运行时使用主协程划分的栈空间,而协程切回主线程时需要将该部分栈空间的内容copy到每个协程各自的一个空间缓存起来,因为主协程中划分的栈空间并不是只用于一个协程,而是会用于多个协程

  • makecontext可以修改通过getcontext初始化得到的上下文, (必须先调用getcontext), 然后为ucp指定一个栈空间ucp->stack, 设置后继的上下文ucp->uc_link

  • 当上下文通过setcontext或者swapcontext激活后, 执行func函数(argc为后续的参数个数, 可变参数). 当func执行返回后, 继承的上下文被激活(ucp->uc_link), 如果为NULL, 则线程退出

int setcontext(const ucontext_t *ucp)

  • 设置当前的上下文为ucp(激活ucp)

  • ucp来自getcontext, 那么上下文恢复至ucp

  • ucp来自makecontext, 那么将会调用makecontext函数的第二个参数指向的函数func, 如果func返回, 则恢复至ucp->uc_link指定的后继上下文, 如果该ucp中的uc_link为NULL, 那么线程退出

  • 成功不返回, 失败返回-1, 设置errno

int swapcontext(ucontext_t *oucp, ucontext_t *ucp)

  • 切换上下文

  • 保存当前上下文至oucp, 激活ucp上下文(先执行makecontext指定的ucp入口函数, 而后会执行ucp->uc_link指向的后继上下文)

  • 成功不返回, 失败返回-1, 设置errno



三 轻量级协程实现

名词说明

  • 协程调度器 代码中的SingleSchedule

  • 用户协程 代码中的Coroutine

  • 栈空间/栈区 对应SingleSchedule中的成员stack

  • 栈缓存/缓存区 对应的是Coroutine中的成员Buffer

  • 主协程上下文 SingleSchedule中的main_ctx, 对应main函数中的上下文

  • 用户协程上下文 Coroutine中的ctx, 对应每个用户协程自身的上下文


思路

  • 本文基于ucontext.h实现协程库

  • 基本思路:

    • 构造一个协程调度类, 该类用于调度所有的用户协程, 提供一个协程池ctxPool, 使用单例模式实现.

    • 构造一个用户协程类, 该类对象对应每个用户协程, 提供一个用户协程逻辑虚函数CoProcess, 提供一个用户协程上下文ctx

    • 用户协程首次激活, 将会为其分配协程调度器提供的栈区stack

    • 用户协程被挂起, 那么会将该协程的栈信息栈区stack保存到其自身的缓存区buffer;

    • 用户协程被唤醒, 那么会将该用户协程的栈信息缓存区buffer中Reload至栈区

  • 协程库框架

    • 激活 初始化用户协程(指定协程状态RUNNING), 初始化用户协程上下文(指定协程栈空间stack, 指定后继上下文), 将协程加入协程池

    • 挂起 将栈空间stack对应的数据缓存至当前用户协程的栈缓存buffer中, 更改协程状态SUSPEND

    • 恢复 将用户协程栈缓存buffer中的数据reload进栈空间stack

  • 示意图


1 用户协程类 Coroutine

数据成员

  • 协程状态CoState state(FREE, RUNNING, SUSPEND)

  • 协程号int id(对应协程调度类中的协程池的id)

  • 栈缓存char * Buffer, 一个动态数组, 当协程被切出时, 缓存栈空间

  • 所需缓存区尺寸int stack_size

  • 用户协程栈容量尺寸int cap cap如果小于stack_size, 那么需要重新分配缓存区, 否则可以直接缓存

  • 用户协程上下文ucontext_t ctx

主要成员函数

  • 挂起协程函数void Yield();

    • 挂起当前协程, 并SaveStack栈空间, 切换状态至SUSPEND

  • 恢复协程函数void Resume()

    • 恢复该协程, 并ReloadStack栈空间

  • 缓存堆栈函数void SaveStack();

    • 调用时机是协程被切出, 会将协程调度对象中的堆栈缓存入用户协程自身的缓存区

  • 载入堆栈函数void ReloadStack();

    • 调用时机是协程被恢复时, 会将该用户协程的堆栈信息从缓存区回复到协程调度对象的堆栈中

  • 用户协程逻辑虚函数virtual void CoProcess();

    • 用于派生成员中定义业务逻辑


2 协程调度类

  • 单例

数据成员

  • 协程池std::map<int, Coroutine*> crtPool;

  • 主协程上下文ucontext_t main_ctx

  • 协程堆栈char stack[DEFAULT_STACK_SIZE], 所有的协程都利用这块区域

成员函数

  • 协程启动函数 void CoroutineNew(Coroutine * crt);

    • 初始化用户协程(指定协程状态RUNNING), 初始化用户协程上下文(指定协程栈空间stack, 指定后继上下文), 将协程加入协程池

  • 用户协程入口函数static void CoroutineEntry(void * crt);

    • 指向用户协程的入口函数

  • 协程恢复函数void Resume(int id);

    • 根据id恢复对应协程

  • 检查并清理协程池int HasCoroutine();

    • 清理FREE的协程, 并返回剩余的协程数量

  • 协程删除函数void Remove(int id);

    • 删除对应协程


注意点

  1. 所有的用户协程都使用调度器的栈空间, 每个用户协程自身的buffer只不过用来作缓存

  2. SaveStack和ReloadStack函数的实现需要注意, 如何缓存协程栈

    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();    // 获取到栈底, 即高地址char dumy = 0;                                                       // 最后创建的变量, 必然分配在栈顶assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);                     // 被栈缓存不能大于栈空间if (cap<stackBottom-&dumy){                                          // cap 代表当前栈缓存大小, 如果不够需要重分配
        if(buffer){                                                      // 释放当前栈缓存
            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;                                      // 栈空间大小memcpy(buffer, &dumy, stack_size);                                   // 缓存
  • 协程栈是由用户分配的, 如代码中stack数组, 由于该数组的目的是用作栈空间, 而进程中栈是预分配的, 即首先确定栈的高地址, 从高地址开始往低使用, 根据这一点我们可以确定需要被缓存的栈空间大小.


代码实现

https://github.com/trioZwShen/my_code_rep/tree/master/My_Coroutine

1 用户协程

/**
 * @file    : Coroutine.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 用户协程类
 */#ifndef COROUTINE_H_#define COROUTINE_H_#define DEFAULT_STACK_SIZE (1024*1024)#include <stdio.h>#include <string.h>#include <ucontext.h>enum CoState {FREE = 0, RUNNING = 1, SUSPEND = 2};class Coroutine{public:
    Coroutine();    virtual ~Coroutine();    /**
     * 用户协程入口函数
     */
    virtual void CoProcess();    
    /**
     * 用户协程恢复函数
     */
    void Resume();    /**
     * 获取协程id
     * @return [返回id]
     */
    int GetId()const {        return id;
    }    /**
     * 设置协程id
     */
    void SetId(int _id) {
        id = _id;
    }    /**
     * 获取协程状态
     * @return [返回协程状态]
     */
    int GetState()const {        return state;
    }    /**
     * 设置协程状态
     */
    void SetState(CoState _state) {
        state = _state;
    }protected:    /**
     * 用户协程挂起函数
     */
    void Yield();    /**
     * 堆栈缓存
     */
    void SaveStack();    /**
     * 堆栈恢复
     */
    void ReloadStack();    
public:    char *buffer;       // 缓存协程堆栈
    ucontext_t ctx;private:    int stack_size;    int cap;    int id;
    CoState state;
};#endif
#include <assert.h>#include "Coroutine.h"#include "Schedule.h"Coroutine::Coroutine()
        :id(0),state(FREE),cap(0),stack_size(0),buffer(nullptr)
{

}

Coroutine::~Coroutine()
{    delete [] buffer;
}void Coroutine::CoProcess()
{

}void Coroutine::Resume()
{    if(state==SUSPEND){
        ReloadStack();
        state = RUNNING;
        swapcontext(&(SingleSchedule::GetInst()->mainCtx), &ctx);
    }
}void Coroutine::Yield()
{    if (state == RUNNING){
        SaveStack();
        state = SUSPEND;
        swapcontext(&ctx, &(SingleSchedule::GetInst()->mainCtx));
    }
}void Coroutine::SaveStack()
{    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();    char dumy = 0;

    assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);    if (cap<stackBottom-&dumy){        if(buffer){            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;    memcpy(buffer, &dumy, stack_size);
}void Coroutine::ReloadStack()
{    memcpy(SingleSchedule::GetInst()->GetStackBottom()-stack_size, buffer, stack_size);
}

2 单例模板

/**
 * @file    : Singleton.h
 * @author  : neilzwshen
 * @time    : 2018-7-30
 * @version : 1.0
 * @remark  : 单例模板, 只要将对象作为T, 就可以获取到一个单例对象, 构造函数不能传参
 */#ifndef SINGLETON_H_#define SINGLETON_H_template<class T>class Singleton {public:    /**
     * 单例获取
     * @return [返回T的单例对象]
     */
    static T* GetInst(){        if (!flag_instance){
            flag_instance = new Singleton();
        }        return &flag_instance->_instance;
    }protected:    /**
     * 单例构造
     */
    Singleton(){}private:    /**
     * T对象实例
     */
    T _instance;    /**
     * 单例模板实例,
     */
    static Singleton<T> * flag_instance;
};template<class T>Singleton<T> * Singleton<T>::flag_instance = 0;#endif

3 协程调度器

/**
 * @file    : Schedule.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 协程调度类
 */#ifndef SCHEDULE_H_#define SCHEDULE_H_#include <stdio.h>#include <map>#include <ucontext.h>#include "Coroutine.h"#include "Singleton.h"typedef std::map<int, Coroutine*> CrtMap;class Schedule{public:
    Schedule();    virtual ~Schedule();    /**
     * 用户协程入口函数
     */
    static void CoroutineEntry(void * crt);    /**
     * 将协程crt加入协程池, 并开启
     * @param  crt [协程指针]
     */
    void CoroutineNew(Coroutine * crt);    /**
     * 恢复用户协程
     * @param id [description]
     */
    void Resume(int id);    /**
     * 判断协程池中是否还有未完成的协程, 并将已经终止的协程删除
     * @return [返回协程数]
     */
    int HasCoroutine();    /**
     * 根据协程id删除协程
     * @param id [协程id]
     */
    void Remove(int id);    /**
     * 获取到栈底
     * @return [返回栈底地址]
     */
    char* GetStackBottom(){        return stack + DEFAULT_STACK_SIZE;
    }public:    ucontext_t mainCtx;    char stack[DEFAULT_STACK_SIZE];     // 运行协程堆栈private:
    CrtMap crtPool;
};typedef Singleton<Schedule> SingleSchedule;#endif
#include <assert.h>#include "Schedule.h"Schedule::Schedule()
{

}

Schedule::~Schedule()
{

}void Schedule::CoroutineEntry(void * crt) {
    ((Coroutine *)crt)->SetState(RUNNING);
    ((Coroutine *)crt)->CoProcess();
    ((Coroutine *)crt)->SetState(FREE);
}void Schedule::CoroutineNew(Coroutine * crt) {    
    int id = crt->GetId();
    CoState state = CoState(crt->GetState());
    assert(id != 0);
    assert(state == FREE);    //printf("--%d,%d--\n",id, state);

    if (crtPool[id] != nullptr) {
        CrtMap::iterator it = crtPool.find(id);
        crtPool.erase(it);
    }    
    // 构建用户协程上下文
    getcontext(&(crt->ctx));    //memset(stack, 0, DEFAULT_STACK_SIZE);
    crt->ctx.uc_stack.ss_sp = stack;
    crt->ctx.uc_stack.ss_size = DEFAULT_STACK_SIZE;
    crt->ctx.uc_stack.ss_flags = 0;
    crt->ctx.uc_link = &mainCtx;
    crtPool[id] = crt;
    
    makecontext(&crt->ctx, (void(*)(void))CoroutineEntry, 1, (void *)crt);
    swapcontext(&mainCtx, &crt->ctx);
}void Schedule::Resume(int id){    if (crtPool[id] != nullptr) {
        crtPool[id]->Resume();
    }
}int Schedule::HasCoroutine() {    int count = 0;
    CrtMap::iterator it;    for (it = crtPool.begin(); it != crtPool.end(); it++) {        if (it->second->GetState() != FREE) {
            count++;
        }else{
            it=crtPool.erase(it);
            it--;
        }
    }    return count;
}void Schedule::Remove(int id) {    if (crtPool[id] != nullptr) {
        crtPool.erase(crtPool.find(id));
    }
}

4 示例

#include <stdio.h>#include <memory>#include "Coroutine.h"#include "Schedule.h"class Logic1 : public Coroutine{    void CoProcess(){        puts("1");
        Yield();        puts("4");
        Yield();        puts("7");
    }
};class Logic2 : public Coroutine{    void CoProcess(){        puts("2");
        Yield();        puts("5");
        Yield();        puts("8");
    }
};class Logic3 : public Coroutine{    void CoProcess(){        puts("3");
        Yield();        puts("6");
        Yield();        puts("9");
    }
};int main() {    std::shared_ptr<Coroutine> ct1(new Logic1());    std::shared_ptr<Coroutine> ct2(new Logic2());    std::shared_ptr<Coroutine> ct3(new Logic3());

    ct1->SetId(1);
    ct2->SetId(2);
    ct3->SetId(3);

    SingleSchedule::GetInst()->CoroutineNew(ct1.get());
    SingleSchedule::GetInst()->CoroutineNew(ct2.get());
    SingleSchedule::GetInst()->CoroutineNew(ct3.get());

    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);
    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);    //SingleSchedule::GetInst()->Remove(1);
    //SingleSchedule::GetInst()->Remove(2);
    //SingleSchedule::GetInst()->Remove(3);

    int count = SingleSchedule::GetInst()->HasCoroutine();    printf("%d\n", count);    return 0;
}

5 执行结果

szw@szw-VirtualBox:~/code/coroutine/temp/My_Coroutine_3_0$ ./main 
1234567890



作者:neilzwshen
链接:https://www.jianshu.com/p/4f7d3aa83088


0人推荐
随时随地看视频
慕课网APP