导入
用户级线程库指的是创建的线程是内核无法感知的线程,线程的建立和切换都由线程库自己完成,类似协程的概念。
本篇博客将实现一个简单的用户级线程库uthread
,一个抢占式的线程库,通过定时器中断定时强制切换线程,也就是轮询或者叫Round-Robin
算法。这个简单的算法其实效果不差,因为用户级线程库线程切换开销极小,所以这样频繁的切换也可以承受,且这样还可以隐式解决IO阻塞的问题,因为当你读写IO阻塞时,定时器会强制切换到别的线程,因此无需特意针对IO编写检测代码,更加简单。
正文
首先定义TCB
线程控制块,比如:
// 线程控制块结构体
typedef struct {
jmp_buf context; // 线程执行上下文
void (*func)(void*); // 线程执行函数
void* arg; // 线程参数
ThreadState state; // 线程当前状态
char* stack; // 线程栈空间指针
int id; // 线程ID
} Thread;
不过因为类unix系统有一个专门的上下文库,也就是ucontext.h
,因此这里我就直接使用这个上下文库,主要用其中的三个函数getcontext, makecontext, swapcontext
,分别是获取上下文,建立上下文和切换上下文。所谓上下文,就是线程运行所需的寄存器状态和帧指针等信息。
typedef struct {
ucontext_t context; // 保存了线程的完整执行上下文(寄存器, 栈指针, 信号掩码等)
ThreadState state; // 当前线程的状态
char* stack; // 指向为该线程分配的栈空间的指针
} Thread;
然后定义线程状态:
// 线程状态枚举: 用于描述一个线程当前所处的生命周期阶段
typedef enum {
READY, // 就绪状态: 线程已准备好, 等待调度器分配CPU时间
RUNNING, // 运行状态: 线程当前正在CPU上执行
TERMINATED // 终止状态: 线程已经执行完毕
} ThreadState;
再创建线程函数实现之前,首先明确临界区
,因为是时间间隔短的定时器中断,在对竞态资源处理时,要注意阻塞定时器信号,防止在此时切换线程。
// 阻塞定时器信号: 用于保护临界区代码,防止在执行敏感操作时被定时器中断
static void block_timer() {
sigset_t set; // 定义一个信号集
sigemptyset(&set); // 清空信号集
sigaddset(&set, SIGALRM); // 将定时器信号(SIGALRM)加入集合
sigprocmask(SIG_BLOCK, &set, NULL); // 调用sigprocmask阻塞该信号
}
// 解除定时器信号阻塞: 在离开临界区后,恢复定时器信号
static void unblock_timer() {
sigset_t set; // 定义一个信号集
sigemptyset(&set); // 清空信号集
sigaddset(&set, SIGALRM); // 将定时器信号(SIGALRM)加入集合
sigprocmask(SIG_UNBLOCK, &set, NULL); // 调用sigprocmask解除对该信号的阻塞
}
然后定义线程创建函数,本篇博客给出的代码都会加上详细注释,因此不会过多赘述,注释非常清晰:
//线程运行的函数封装
void thread_wrapper(void (*func)(void*), void* arg) {
unblock_timer(); // 新线程开始执行,解除信号阻塞,允许被抢占
func(arg); // 执行用户传入的实际线程函数
block_timer(); // 线程函数执行完毕,进入临界区
threads[current_thread].state = TERMINATED; // 将自身状态标记为"终止"
}
// 创建一个新线程
int thread_create(void (*func)(void*), void* arg) {
block_timer(); // 进入临界区,防止在创建过程中被中断
if (thread_count >= MAX_THREADS) { // 检查是否达到线程数上限
unblock_timer(); // 解除阻塞
return -1; // 返回错误
}
int tid = thread_count++; // 分配一个新的线程ID,并增加线程总数
threads[tid].stack = malloc(STACK_SIZE); // 为新线程的栈分配内存
if (!threads[tid].stack) { // 检查内存分配是否成功
thread_count--; // 分配失败,撤销线程计数
unblock_timer(); // 解除阻塞
return -1; // 返回错误
}
// 核心步骤: 初始化新线程的上下文
getcontext(&threads[tid].context); // 1. 获取一个当前上下文作为模板
threads[tid].context.uc_stack.ss_sp = threads[tid].stack; // 2. 设置新线程的栈指针
threads[tid].context.uc_stack.ss_size = STACK_SIZE; // 3. 设置新线程的栈大小
threads[tid].context.uc_link = &main_scheduler_context; // 4. 设置后继上下文:当线程函数返回时,会自动切换到此上下文
threads[tid].state = READY; // 5. 将新线程的状态设置为"就绪"
// 核心步骤: 修改上下文,使其从thread_wrapper函数开始执行
makecontext(&threads[tid].context, (void (*)(void))thread_wrapper, 2, func, arg);
unblock_timer(); // 离开临界区
return tid; // 返回新创建的线程ID
}
接下来编写调度器:
//调度器: 决定下一个应该运行哪个线程
void schedule() {
block_timer(); // 进入临界区
int prev_thread = current_thread; // 记录即将被换出的线程
if (threads[prev_thread].state == RUNNING) { // 如果它还在运行(而不是阻塞或终止)
threads[prev_thread].state = READY; // 则将其状态变更为"就绪"
}
int next_thread = -1; // 准备寻找下一个可运行的线程
while(next_thread == -1) { // 持续寻找直到找到或确认无线程可运行
int all_done = 1; // 假设所有工作线程都已完成
for (int i = 1; i < thread_count; i++) { // 遍历所有工作线程(0号是主线程)
if (threads[i].state != TERMINATED) { // 如果发现任何一个没结束
all_done = 0; // 推翻假设
break; // 无需再检查
}
}
if (all_done) { // 如果所有工作线程真的都结束了
unblock_timer(); // 解除阻塞
// 通过 setcontext 直接跳转回 main 函数的循环, 而不是通过 swapcontext
// 因为当前线程(通常是main)还要继续执行join, 而不是切换出去
setcontext(&threads[0].context);
return;
}
// 轮询算法: 从当前线程之后开始,查找下一个处于READY状态的线程
for (int i = 1; i <= thread_count; i++) {
int tid = (current_thread + i) % thread_count;
if (threads[tid].state == READY) {
next_thread = tid; // 找到
break;
}
}
if (next_thread == -1) {
// 如果没找到可运行的线程,但又不是所有线程都结束,说明可能发生死锁或都在等待
// 在这个简单的库里,我们直接返回,等待下一次定时器中断再来调度
unblock_timer();
return;
}
}
current_thread = next_thread; // 更新当前运行线程的ID
threads[current_thread].state = RUNNING; // 更新其状态为"运行"
unblock_timer(); // 准备切换,解除阻塞
// 核心: 原子地保存当前上下文到prev_thread,并切换到next_thread的上下文
//thread_wrapper 只在 swapcontext 函数执行之后才开始运行
swapcontext(&threads[prev_thread].context, &threads[current_thread].context);
}
然后是线程阻塞函数:
// 等待一个线程结束 (阻塞)
void thread_join(int tid) {
if (tid < 0 || tid >= thread_count) return; // 无效ID则直接返回
while (threads[tid].state != TERMINATED) { // 只要目标线程还没结束
schedule(); // 就调用调度器,把CPU让给其他线程
}
block_timer(); // 目标线程已结束,进入临界区
free(threads[tid].stack); // 释放其栈空间
threads[tid].stack = NULL; // 指针置空,防止野指针
unblock_timer(); // 离开临界区
}
可能会在join函数这里有疑问,join查询到线程未终止就让出CPU,那不是要一直让出CPU吗?那我什么时候才能执行完?一定要注意join函数的执行上下文是谁,join函数是main主函数执行的,这个函数是在主函数的线程中,查询线程号为tid
的子线程运行情况,并不是查询自身线程的运行状态,是当子线程没有终止时,main主线程就让出CPU,以便这个没有终止的子线程有机会获取CPU。当然,也可能会选择其他状态为READY
的子线程,这是无关紧要的,因为定时器中断触发后,总能切换到这个子线程的,只有当这个子线程完全终止后,才会退出以这个子线程为参数的join函数。此时可能有其他的子线程早于这个子线程结束,同样没有关系,只是当下面调用到其他子线程为参数的join函数的时候,就不需要调用调度器了,直接进入临界区并且释放栈空间,清理资源。所以join函数还有一个线程执行结束后清理线程资源的功能。
具体的定时器中断处理函数和定时器的设置,每10ms触发一次定时器中断,然后强制调用调度器,切换线程:
// 定时器中断处理函数
void timer_handler(int signum) {
(void)signum; // 避免编译器关于未使用参数的警告
schedule(); // 定时器触发,强制调用调度器进行抢占
}
// 初始化线程库
void uthread_init() {
// 将当前执行流(main函数)初始化为0号线程
getcontext(&threads[0].context);
threads[0].state = RUNNING;
threads[0].stack = NULL; // 主线程使用进程默认的栈,无需单独分配
// 设置信号处理
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = timer_handler; // 指定SIGALRM信号的处理函数
sigaction(SIGALRM, &sa, NULL);
// 设置定时器,每10毫秒触发一次SIGALRM信号
struct itimerval timer;
timer.it_value.tv_sec = 0;
timer.it_value.tv_usec = 10000; // 首次触发时间
timer.it_interval.tv_sec = 0;
timer.it_interval.tv_usec = 10000; // 后续触发间隔
setitimer(ITIMER_REAL, &timer, NULL);
}
一个线程从创建到销毁的完整流程如下:
-
初始化 (
uthread_init
)- 程序启动时,main 函数首先调用 uthread_init。
- 这个函数会把 main 函数本身“包装”成0号线程,并获取其上下文,将其标记为 RUNNING。
- 同时,它设置一个定时器,每10毫秒触发一次 SIGALRM 信号。这个信号的处理函数是timer_handler,它会直接调用调度器。这就是“抢占”的来源:无论线程是否愿意,每隔10毫秒,调度器都会被强制执行,从而有机会让其他线程运行。
-
创建 (
thread_create
)- 当调用 thread_create 时,我们为新线程分配一个TCB和一块独立的栈空间。
- 通过 getcontext 获取一个上下文模板,然后修改它:将它的栈指针指向我们新分配的栈,并将它的“后继上下文”(uc_link)设置为一个公共的返回点。
- 最关键的一步是调用 makecontext,它将线程的上下文与我们的入口函数 thread_wrapper“绑定”起来。从此,当这个上下文被激活时,程序就会从 thread_wrapper 开始执行。
-
调度与执行 (
schedule
&swapcontext
)- 调度可以由两种方式触发:
- 抢占式:定时器到期,timer_handler 调用 schedule。
- 协作式:线程调用 thread_join 等函数时,主动调用 schedule 让出CPU。
- schedule 函数找到下一个 READY 的线程后,调用swapcontext,CPU的控制权就交给了新线程。新线程从 thread_wrapper开始执行,并最终进入用户提供的函数。
- 调度可以由两种方式触发:
-
终止与清理 (
thread_wrapper
&thread_join
)- 当用户函数执行完毕返回后,thread_wrapper 会将该线程的状态设置为 TERMINATED,然后调用 schedule(),永久地让出CPU。
- 与此同时,main 函数中的 thread_join 在一个循环里不断调用schedule。当它检测到目标线程的状态变为 TERMINATED 时,循环结束,它就可以安全地 free掉该线程之前分配的栈空间,完成资源的回收。
测试:
// 一个示例线程函数
void sample_thread_func(void* arg) {
int id = *(int*)arg; // 获取传入的参数
for (int i = 0; i < 5; ++i) {
printf("Thread %d says: %d\n", id, i);
// 执行一个耗时循环,以模拟真实工作,并清晰地展示抢占式调度效果
volatile unsigned long long k = 0;
for (unsigned long long j = 0; j < 100000000; ++j) { k++; }
}
printf("Thread %d finished.\n", id);
free(arg); // 释放传入的参数内存
}
// 主函数
int main() {
printf("Main: Initializing user-level thread library with ucontext.\n");
uthread_init(); // 初始化线程库
// 在此保存一个特殊的上下文,作为所有子线程结束后的“返回点”
getcontext(&main_scheduler_context);
printf("Main: Creating threads.\n");
int* tid1 = malloc(sizeof(int)); *tid1 = 1; // 为线程参数分配内存
int* tid2 = malloc(sizeof(int)); *tid2 = 2;
int t1_id = thread_create(sample_thread_func, tid1); // 创建1号线程
int t2_id = thread_create(sample_thread_func, tid2); // 创建2号线程
// 等待线程结束
thread_join(t1_id);
printf("Main: Joined thread %d.\n", t1_id);
thread_join(t2_id);
printf("Main: Joined thread %d.\n", t2_id);
printf("Main: All threads finished. Exiting.\n");
return 0;
}
运行结果如下:
Main: Initializing user-level thread library with ucontext.
Main: Creating threads.
Thread 1 says: 0
Thread 2 says: 0
Thread 1 says: 1
Thread 2 says: 1
Thread 1 says: 2
Thread 2 says: 2
Thread 1 says: 3
Thread 2 says: 3
Thread 1 says: 4
Thread 2 says: 4
Thread 1 finished.
Main: Joined thread 1.
Thread 2 finished.
Main: Joined thread 1.
Main: Joined thread 2.
Main: All threads finished. Exiting.
可以看到2个线程确实在交替运行。再次强调,整个函数运行都是串行的,没有任何的并行部分,这也是用户级线程库的特点,因为内核无法感知,没有维护这个LWP
轻量级进程的队列,因此内核无法调度多核并行。
为了让读者对上述简单多线程测试样例有一个更加清晰的视角,这里给出下面运行的详细的模拟流程:
步骤 | 执行函数/位置 | CPU | T0 状态 | T1 状态 | T2 状态 | 备注 |
---|---|---|---|---|---|---|
1 | main() |
T0 | RUNNING | - | - | 程序开始 |
2 | uthread_init() |
T0 | RUNNING | - | - | T0上下文被保存, 定时器启动 |
3 | thread_create(T1) |
T0 | RUNNING | READY | - | T1被创建, 但不运行 |
4 | thread_create(T2) |
T0 | RUNNING | READY | READY | T2被创建, 但不运行 |
5 | thread_join(T1) |
T0 | RUNNING | READY | READY | T0进入join函数, 准备让出CPU |
6 | schedule() |
T0 | RUNNING | READY | READY | T0调用调度器 |
7 | swapcontext(T0, T1) |
T1 | READY | RUNNING | READY | 第一次切换! CPU交给T1 |
8 | thread_wrapper(T1) |
T1 | READY | RUNNING | READY | T1开始运行, 打印”Thread 1 says: 0” |
9 | …T1在循环中… | T1 | READY | RUNNING | READY | T1持续运行 |
10 | [定时器中断!] | T1 | READY | RUNNING | READY | 10ms到达, 强制中断T1 |
11 | schedule() |
T1 | READY | RUNNING | READY | T1被中断, 进入调度器 |
12 | swapcontext(T1, T2) |
T2 | READY | READY | RUNNING | 第二次切换! CPU交给T2 |
13 | thread_wrapper(T2) |
T2 | READY | READY | RUNNING | T2开始运行, 打印”Thread 2 says: 0” |
14 | [定时器中断!] | T2 | READY | READY | RUNNING | 再次中断 |
15 | schedule() |
T2 | READY | READY | RUNNING | T2进入调度器 |
16 | swapcontext(T2, T0) |
T0 | RUNNING | READY | READY | 第三次切换! CPU交还给主线程 |
17 | thread_join(T1) |
T0 | RUNNING | READY | READY | T0从schedule 返回, 继续while 循环, 发现T1未结束, 再次调用schedule 让出CPU |
18 | … | … | … | … | … | 循环往复: T0, T1, T2交替运行。T0总是在join 里检查并让出CPU, T1和T2则在执行自己的任务。 |
19 | thread_wrapper(T1) |
T1 | READY | RUNNING | READY | T1任务完成, 打印”Thread 1 finished.” |
20 | thread_wrapper(T1) |
T1 | READY | TERMINATED | READY | T1将自己状态设为终止, 然后其上下文自动链接到main_scheduler_context |
21 | main() |
T0 | RUNNING | TERMINATED | READY | T0的join 函数在下一次检查时发现T1已终止, while 循环退出 |
22 | main() |
T0 | RUNNING | TERMINATED | READY | thread_join(t1_id) 返回, 打印”Main: Joined thread 1.” |
23 | thread_join(T2) |
T0 | RUNNING | TERMINATED | READY | T0开始以同样的方式等待T2 |
24 | … | … | … | … | … | 循环往复: T0和T2交替运行, 直到T2也完成 |
25 | main() |
T0 | RUNNING | TERMINATED | TERMINATED | thread_join(t2_id) 返回, 打印”Main: Joined thread 2.” |
26 | main() |
T0 | RUNNING | TERMINATED | TERMINATED | 打印”All threads finished.”, 程序结束 |
互斥锁
如果要增加互斥锁,需要给线程增加阻塞状态BLOCKED
,然后可以参考如下实现,:
// 互斥锁结构体
typedef struct {
atomic_int locked; // 原子锁标志
int blocking_threads[MAX_THREADS]; // 阻塞在该锁上的线程数组
int count; // 阻塞线程计数
} Mutex;
// 创建互斥锁
Mutex mutex_create() {
Mutex m;
atomic_init(&m.locked, 0); // 初始化为未锁定状态
m.count = 0; // 初始化阻塞线程数为0
return m;
}
// 获取互斥锁
void mutex_lock(Mutex* m) {
// 使用原子交换尝试获取锁
while (atomic_exchange(&m->locked, 1)) {
// 如果获取失败,将当前线程加入阻塞队列
m->blocking_threads[m->count++] = current_thread;
threads[current_thread].state = BLOCKED;
// 主动让出CPU
longjmp(threads[current_thread].context, 1);
}
}
// 释放互斥锁
void mutex_unlock(Mutex* m) {
atomic_store(&m->locked, 0); // 原子操作释放锁
// 唤醒所有阻塞在该锁上的线程
for (int i = 0; i < m->count; i++) {
int tid = m->blocking_threads[i];
if (threads[tid].state == BLOCKED) {
threads[tid].state = READY;
}
}
m->count = 0; // 重置阻塞线程计数
}