SRS的协程是基于State-thread(ST)开发的,这篇文章希望能够理清这个框架的基本思路。本篇文章部分讲解来自文章

类似框架

首先应该以一个宏观角度去审视框架。业内还有很多类似的协程框架。

  • 首先是libco,这是一个微信开源的CC++协程库,不同于本文提到的ST,这个框架使用了共享栈技术,多个协程在运行时可以分时复用同一块栈内存。
  • 另外是Libgo,可以提供类似Go语言的并发编程体验。它支持多线程调度(work-stealing)、Channel通信、协程局部存储(CLS)等高级特性。
  • Boost.Coroutine2: C++ Boost库提供的官方协程组件。它更底层、更灵活,提供了构建协程所需的核心能力(上下文切换),但需要用户自己基于它来构建完整的调度器和IO事件循环。
  • 语言级别的协程支持,是未来的标准方向。通过co_await, co_yield, co_return等关键字,编译器会自动将一个函数转换成状态机,从而实现协程。它与语言结合得最紧密,但需要异步框架(如asio)的配合来驱动IO事件。

ST框架结构

核心结构体

_st_vp_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef struct _st_vp {

st_utime_t last_clock; /* The last time we went into vp_check_clock() */

//下面是四种线程状态
_st_thread_t *idle_thread; //idle微线程是框架创建的微线程,只有在没有其他微线程可以调度时,才会被调度执行。主要任务是调用IO多路复用函数等待IO事件和处理定时器。
_st_clist_t run_q; //可运行的微线程,等待框架调度即可运行。
_st_clist_t io_q; //当微线程需要等待IO事件时,会被放到IO等待队列中。当等待的IO事件发生 或者 超时 或者 被中断时,会从IO等待队列中移除并加入到runable队列中。
_st_clist_t zombie_q; //当微线程结束时,如果设置了joinable,即需要其他微线程‘收尸’,就会添加到zombie队列。
#ifdef DEBUG
_st_clist_t thread_q; /* all threads of this vp */
#endif
int pagesize;

_st_thread_t *sleep_q; //数据结构为完全二叉树组织的最小堆结构,当微线程设置了定时器时,就会根据超时时间添加到树中。
int sleepq_size; /* number of threads on sleep queue */

#ifdef ST_SWITCH_CB
st_switch_cb_t switch_out_cb; /* called when a thread is switched out */
st_switch_cb_t switch_in_cb; /* called when a thread is switched in */
#endif
} _st_vp_t;

st_thread_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
typedef struct _st_thread {
int state; /* Thread's state */
int flags; /* Thread's flags */

void *(*start)(void *arg); /* The start function of the thread */
void *arg; /* Argument of the start function */
void *retval; /* Return value of the start function */

_st_stack_t *stack; /* Info about thread's stack */

_st_clist_t links; /* For putting on run/sleep/zombie queue */
_st_clist_t wait_links; /* For putting on mutex/condvar wait queue */

st_utime_t due; /* Wakeup time when thread is sleeping */
_st_thread_t *left; /* For putting in timeout heap */
_st_thread_t *right; /* -- see docs/timeout_heap.txt for details */
int heap_index;

void **private_data; /* Per thread private data */

_st_cond_t *term; /* Termination condition variable for join */

jmp_buf context; /* Thread's context */
} _st_thread_t;

st_stack_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct _st_stack {
// 指向实际栈内存块的起始虚拟地址。
// 这块内存通常是通过 mmap 系统调用从操作系统申请的一段连续虚拟内存。
// 协程的所有函数调用、局部变量都存放在这块内存区域中。
void *v_addr;

// 栈内存块的实际大小(以字节为单位)。
// 这个大小是在创建时确定的,例如 64KB。
int v_size;

// 指向下一个 st_stack_t 结构体的指针。
// 这是实现“空闲栈缓存池”的关键所在。当一个协程结束,它所使用的栈不会被立即释放,
// 而是通过这个 next 指针被链接到一个全局的空闲链表(free list)中。
// 当需要创建新协程时,框架会优先从这个链表中获取一个已分配的栈,从而避免了昂贵的 mmap 调用。
struct _st_stack *next;

} st_stack_t;

微线程状态和分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 微线程状态定义
#define _ST_ST_RUNNING 0 // 执行中
#define _ST_ST_RUNNABLE 1 // 可执行状态,等待调度
#define _ST_ST_IO_WAIT 2 // 等待IO事件
#define _ST_ST_LOCK_WAIT 3 // 等待互斥锁
#define _ST_ST_COND_WAIT 4 // 等待条件变量
#define _ST_ST_SLEEPING 5 // sleep
#define _ST_ST_ZOMBIE 6 // 微线程已结束,待其他微线程调用st_thread_join收尸
#define _ST_ST_SUSPENDED 7 // 暂停,只能调用st_thread_interrupt唤醒

// 微线程flag定义
#define _ST_FL_PRIMORDIAL 0x01 // 原生微线程,即不是创建的微线程,没有分配私有栈资源
#define _ST_FL_IDLE_THREAD 0x02 // 空闲处理微线程,用于调用epoll,处理定时器
#define _ST_FL_ON_SLEEPQ 0x04 // 微线程在sleep队列中,需要定时器的情况:调用st_usleep、st_cond_timedwait、st_poll等待IO事件等
#define _ST_FL_INTERRUPT 0x08 // 微线程被调用st_thread_interrupt()中断
#define _ST_FL_TIMEDOUT 0x10 // 微线程定时器超时

同时微线程被分成了三种角色,分别是Primordial、Idle、User

  • Primoodial,当函数A调用st_init的时候,ST会将函数A的上下文包装成一个协程,负责初始化整个协程。
  • User,由用户通过st_thread_create函数创建,负责具体的业务逻辑。
  • Idle,同样是在st_init时被创建并维护,当ST中没有User正在运行时,会调用Idle微线程,用IO事件轮询器(如poll()或epoll_wait()),并把自己挂起,从而让整个进程进入睡眠状态,等待内核的唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// idle微线程执行的start函数
void *_st_idle_thread_start(void *arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();

while (_st_active_count > 0) {
/* Idle vp till I/O is ready or the smallest timeout expired */
// 调用IO多路复用函数(epoll,select等)等待IO事件发生,并处理所有发生的IO事件,
// 如果微线程等待的IO事件发生,会将微线程从io等待队列st_vp_t.io_q中移除
// 并加入到runable队列st_vp_t.run_q
_ST_VP_IDLE();

/* Check sleep queue for expired threads */
// 检查微线程设置的定时器是否超时,针对超时的微线程,将其从sleep队列移除
// 并加入到runable队列
_st_vp_check_clock();

// 交出运行权,并从runable队列st_vp_t.run_q中调度下一个微线程运行
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}

/* No more threads */
exit(0);

/* NOTREACHED */
return NULL;
}

初始化、创建、销毁流程

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 框架初始化函数
int st_init(void)
{
_st_thread_t *thread;

if (_st_active_count) {
/* Already initialized */
return 0;
}

// 选择使用哪个IO多路复用函数,如epoll、select等
st_set_eventsys(ST_EVENTSYS_DEFAULT);

// 屏蔽SIGPIPE信号 并 初始化IO多路复用函数相关数据结构
if (_st_io_init() < 0)
return -1;

memset(&_st_this_vp, 0, sizeof(_st_vp_t));

// 清空runable队列、io等待队列、zombie队列
ST_INIT_CLIST(&_ST_RUNQ);
ST_INIT_CLIST(&_ST_IOQ);
ST_INIT_CLIST(&_ST_ZOMBIEQ);

// 调用IO多路复用函数对应的初始化函数
if ((*_st_eventsys->init)() < 0)
return -1;

_st_this_vp.pagesize = getpagesize();
_st_this_vp.last_clock = st_utime();

// 创建idle微线程,idle微线程主要用于调用epoll等待IO事件和处理定时器
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,
NULL, 0, 0);
if (!_st_this_vp.idle_thread)
return -1;
_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
_st_active_count--;
// 将idle微线程从runable队列移除,由于idle微线程只在没有微线程可以运行时,才会主动调度,
// 所以不需要加入到run队列
_ST_DEL_RUNQ(_st_this_vp.idle_thread);

// 初始化primordial微线程,primordial微线程用来标记系统进程,由于可以直接使用
// 系统进程的栈空间,故只需要为primordial微线程分配st_thread_t和私有key数据区
thread = (_st_thread_t *)calloc(1, sizeof(_st_thread_t) +
(ST_KEYS_MAX * sizeof(void *)));
if (!thread)
return -1;
thread->private_data = (void **)(thread + 1);
thread->state = _ST_ST_RUNNING;
thread->flags = _ST_FL_PRIMORDIAL;
_ST_SET_CURRENT_THREAD(thread);
_st_active_count++;

// 当前运行的微线程是primordial微线程,当primordial微线程退出时,整个进程也会终止
return 0;
}

微线程创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
 // 创建微线程函数
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg,
int joinable, int stk_size)
{
_st_thread_t *thread;
_st_stack_t *stack;
void **ptds;
char *sp;
//step 1
if (stk_size == 0)
stk_size = ST_DEFAULT_STACK_SIZE; // 默认栈空间为64K
// 将栈空间调整为PAGE_SIZE的整数倍
stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
// 分配微线程使用的私有数据空间:包括栈空间,私有key数据区,st_thread_t结构体等
// 为了避免频繁调用系统分配函数分配微线程栈空间,框架中保存了stack空闲链表,
// 会优先从空闲链表获取,当空闲链表为空时,会从系统分配
stack = _st_stack_new(stk_size);
if (!stack)
return NULL;
//step 2
// 初始化分配好的空间
sp = stack->stk_top;
sp = sp - (ST_KEYS_MAX * sizeof(void *));
ptds = (void **)sp;
sp = sp - sizeof(_st_thread_t);
thread = (_st_thread_t *)sp;

// 保证栈可用位置一定是64字节对齐的
if ((unsigned long)sp & 0x3f)
sp = sp - ((unsigned long)sp & 0x3f);
stack->sp = sp - _ST_STACK_PAD_SIZE;

// 分配的空间重置为0,因为stack可能是从空闲链表中获取的
memset(thread, 0, sizeof(_st_thread_t));
memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));

thread->private_data = ptds;
thread->stack = stack;
thread->start = start;
thread->arg = arg;
//step 3
// 调用setjmp保存当前的上下文信息到jmp_buf中,主要功能是初始化jmp_buf
// 因为现在保存到jmp_buf中的信息不会在微线程的后续运行中使用到
if (setjmp(thread->context))
// 当微线程首次被调度运行时,会进入该分支,并调用_st_thread_main函数执行
_st_thread_main();
// 非常重要的一点:设置jmp_buf中的rsp指向为微线程分配的私有栈空间的可用位置
// 这样可以保证框架中不同微线程运行在自己的私有栈空间上
thread->context[JB_RSP] = (long)(stack->sp);

// 如果微线程是可joinable的,就创建一个微线程终止时使用的条件变量
// 当该微线程终止时会等待在该条件变量上,当其他微线程调用st_thread_join函数
// 替该微线程收尸后,该微线程才会真正终止
if (joinable) {
thread->term = st_cond_new();
if (thread->term == NULL) {
_st_stack_free(thread->stack);
return NULL;
}
}

// 设置为runable状态,并 加入到runable队列中
thread->state = _ST_ST_RUNNABLE;
_st_active_count++;
_ST_ADD_RUNQ(thread);

return thread;
}

// 微线程启动后调用的main函数
void _st_thread_main(void)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();

/*
* Cap the stack by zeroing out the saved return address register
* value. This allows some debugging/profiling tools to know when
* to stop unwinding the stack. It's a no-op on most platforms.
*/
MD_CAP_STACK(&thread);

// 调用微线程设置的start函数
thread->retval = (*thread->start)(thread->arg);

// 清理微线程使用的资源
st_thread_exit(thread->retval);
}

创建微线程使用的私有栈->初始化私有栈->调用setjmp并修改jmp_buf

在创建微线程的过程中,核心在于,st_thread_create并不会直接让协程跳转到用户指定的函数(start_routine)。而是跳转到一个由ST框架提供的、名为_st_thread_main的内部包裹函数。creat这个函数是目前我见过最复杂的一个函数了,后续再去学习吧。。。

微线程切换与调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 切换_thread微线程出去,并调度下一个runable状态的微线程运行
#define _ST_SWITCH_CONTEXT(_thread) \
{ \
if (!setjmp(_thread->context)) { \
_st_vp_schedule(); \
} \
}

// 微线程调度逻辑,即调度下一个runable状态微线程运行
void _st_vp_schedule(void)
{
_st_thread_t *thread;

// 如果runable队列_st_this_vp.run_q非空,就选队列首的微线程
// 否则调度idle微线程运行
if (_ST_RUNQ.next != &_ST_RUNQ) {
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
}
else {
thread = _st_this_vp.idle_thread;
}

// 设置选中的微线程的状态为running状态,并调用longjmp函数跳转到选中的微线程,
// 选中的微线程从最后一次的setjmp位置继续运行
thread->state = _ST_ST_RUNNING;
_ST_SET_CURRENT_THREAD(thread);
longjmp(thread->context, 1)
}

线程终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 // 微线程终止时调用的函数
void st_thread_exit(void *retval)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();

thread->retval = retval;

// 释放微线程运行期间调用st_thread_setspecific设置的私有key数据
_st_thread_cleanup(thread);

_st_active_count--;

// 如果创建了term条件变量,需要通知调用st_thread_join()等待该微线程的微线程为该
// 微线程“收尸”
if (thread->term) {
// 添加到zombie队列
thread->state = _ST_ST_ZOMBIE;
_ST_ADD_ZOMBIEQ(thread);

// 通知等待在term条件变量上的微线程
st_cond_signal(thread->term);

// 交出控制权,等到为本线程收尸的微线程调用st_thread_join()返回
// 后,本微线程才会switch回来,并恢复运行
_ST_SWITCH_CONTEXT(thread);

// 清理条件变量
st_cond_destroy(thread->term);
thread->term = NULL;
}

// 如果终止的不是Primordial微线程,就释放为微线程分配的私有栈空间,
// 释放的栈空间会放到空闲链表中
if (!(thread->flags & _ST_FL_PRIMORDIAL))
_st_stack_free(thread->stack);

// 交出控制权,并调度下一个runable状态的微线程,微线程生命周期终止
_ST_SWITCH_CONTEXT(thread);
/* Not going to land here */
}

本站由 Edison.Chen 创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

undefined