实现一个用户级线程库

导入

用户级线程库指的是创建的线程是内核无法感知的线程,线程的建立和切换都由线程库自己完成,类似协程的概念。

本篇博客将实现一个简单的用户级线程库uthread,一个抢占式的线程库,通过定时器中断定时强制切换线程,也就是轮询或者叫Round-Robin算法。这个简单的算法其实效果不差,因为用户级线程库线程切换开销极小,所以这样频繁的切换也可以承受,且这样还可以隐式解决IO阻塞的问题,因为当你读写IO阻塞时,定时器会强制切换到别的线程,因此无需特意针对IO编写检测代码,更加简单。

正文

首先定义TCB线程控制块,比如:

// 线程控制块结构体
typedef struct {
    jmp_buf context;    // 线程执行上下文
    void (*func)(void*); // 线程执行函数
    void* arg;          // 线程参数
    ThreadState state;  // 线程当前状态
    char* stack;        // 线程栈空间指针
    int id;             // 线程ID
} Thread;

不过因为类unix系统有一个专门的上下文库,也就是ucontext.h,因此这里我就直接使用这个上下文库,主要用其中的三个函数getcontext, makecontext, swapcontext,分别是获取上下文,建立上下文和切换上下文。所谓上下文,就是线程运行所需的寄存器状态和帧指针等信息。

typedef struct {
    ucontext_t context;    // 保存了线程的完整执行上下文(寄存器, 栈指针, 信号掩码等)
    ThreadState state;    // 当前线程的状态
    char* stack;        // 指向为该线程分配的栈空间的指针
} Thread;

然后定义线程状态:

// 线程状态枚举: 用于描述一个线程当前所处的生命周期阶段
typedef enum {
    READY,            // 就绪状态: 线程已准备好, 等待调度器分配CPU时间
    RUNNING,        // 运行状态: 线程当前正在CPU上执行
    TERMINATED        // 终止状态: 线程已经执行完毕
} ThreadState;

再创建线程函数实现之前,首先明确临界区,因为是时间间隔短的定时器中断,在对竞态资源处理时,要注意阻塞定时器信号,防止在此时切换线程。

// 阻塞定时器信号: 用于保护临界区代码,防止在执行敏感操作时被定时器中断
static void block_timer() {
    sigset_t set;                    // 定义一个信号集
    sigemptyset(&set);                    // 清空信号集
    sigaddset(&set, SIGALRM);                // 将定时器信号(SIGALRM)加入集合
    sigprocmask(SIG_BLOCK, &set, NULL);        // 调用sigprocmask阻塞该信号
}

// 解除定时器信号阻塞: 在离开临界区后,恢复定时器信号
static void unblock_timer() {
    sigset_t set;                        // 定义一个信号集
    sigemptyset(&set);                    // 清空信号集
    sigaddset(&set, SIGALRM);                // 将定时器信号(SIGALRM)加入集合
    sigprocmask(SIG_UNBLOCK, &set, NULL);        // 调用sigprocmask解除对该信号的阻塞
}

然后定义线程创建函数,本篇博客给出的代码都会加上详细注释,因此不会过多赘述,注释非常清晰:

//线程运行的函数封装
void thread_wrapper(void (*func)(void*), void* arg) {
    unblock_timer(); // 新线程开始执行,解除信号阻塞,允许被抢占
    func(arg);       // 执行用户传入的实际线程函数
    block_timer();   // 线程函数执行完毕,进入临界区
    threads[current_thread].state = TERMINATED; // 将自身状态标记为"终止"

} 

// 创建一个新线程
int thread_create(void (*func)(void*), void* arg) {
    block_timer(); // 进入临界区,防止在创建过程中被中断
    if (thread_count >= MAX_THREADS) { // 检查是否达到线程数上限
        unblock_timer(); // 解除阻塞
        return -1;                // 返回错误
    }

    int tid = thread_count++; // 分配一个新的线程ID,并增加线程总数
    threads[tid].stack = malloc(STACK_SIZE); // 为新线程的栈分配内存
    if (!threads[tid].stack) { // 检查内存分配是否成功
        thread_count--;    // 分配失败,撤销线程计数
        unblock_timer();   // 解除阻塞
        return -1;            // 返回错误
    }

    // 核心步骤: 初始化新线程的上下文
    getcontext(&threads[tid].context); // 1. 获取一个当前上下文作为模板
    threads[tid].context.uc_stack.ss_sp = threads[tid].stack;        // 2. 设置新线程的栈指针
    threads[tid].context.uc_stack.ss_size = STACK_SIZE;        // 3. 设置新线程的栈大小
    threads[tid].context.uc_link = &main_scheduler_context;        // 4. 设置后继上下文:当线程函数返回时,会自动切换到此上下文
    threads[tid].state = READY;    // 5. 将新线程的状态设置为"就绪"

    // 核心步骤: 修改上下文,使其从thread_wrapper函数开始执行
    makecontext(&threads[tid].context, (void (*)(void))thread_wrapper, 2, func, arg);

    unblock_timer(); // 离开临界区
    return tid;            // 返回新创建的线程ID
}

接下来编写调度器:

//调度器: 决定下一个应该运行哪个线程
void schedule() {
    block_timer(); // 进入临界区

    int prev_thread = current_thread;    // 记录即将被换出的线程
    if (threads[prev_thread].state == RUNNING) {    // 如果它还在运行(而不是阻塞或终止)
        threads[prev_thread].state = READY;        // 则将其状态变更为"就绪"
    }

    int next_thread = -1;    // 准备寻找下一个可运行的线程
    while(next_thread == -1) {    // 持续寻找直到找到或确认无线程可运行
        int all_done = 1;    // 假设所有工作线程都已完成
        for (int i = 1; i < thread_count; i++) {    // 遍历所有工作线程(0号是主线程)
            if (threads[i].state != TERMINATED) {    // 如果发现任何一个没结束
                all_done = 0;            // 推翻假设
                break;                // 无需再检查
            }
        }
        if (all_done) {    // 如果所有工作线程真的都结束了
             unblock_timer(); // 解除阻塞
             // 通过 setcontext 直接跳转回 main 函数的循环, 而不是通过 swapcontext
             // 因为当前线程(通常是main)还要继续执行join, 而不是切换出去
             setcontext(&threads[0].context);
             return;
        }

        // 轮询算法: 从当前线程之后开始,查找下一个处于READY状态的线程
        for (int i = 1; i <= thread_count; i++) {
            int tid = (current_thread + i) % thread_count;
            if (threads[tid].state == READY) {
                next_thread = tid;    // 找到
                break;
            }
        }

        if (next_thread == -1) {
            // 如果没找到可运行的线程,但又不是所有线程都结束,说明可能发生死锁或都在等待
            // 在这个简单的库里,我们直接返回,等待下一次定时器中断再来调度
            unblock_timer();
            return;    
        }
    }

    current_thread = next_thread;    // 更新当前运行线程的ID
    threads[current_thread].state = RUNNING;    // 更新其状态为"运行"

    unblock_timer(); // 准备切换,解除阻塞
    // 核心: 原子地保存当前上下文到prev_thread,并切换到next_thread的上下文
    //thread_wrapper 只在 swapcontext 函数执行之后才开始运行
    swapcontext(&threads[prev_thread].context, &threads[current_thread].context);
}

然后是线程阻塞函数:

// 等待一个线程结束 (阻塞)
void thread_join(int tid) {
    if (tid < 0 || tid >= thread_count) return;    // 无效ID则直接返回
    while (threads[tid].state != TERMINATED) {    // 只要目标线程还没结束
        schedule();                    // 就调用调度器,把CPU让给其他线程
    }
    block_timer(); // 目标线程已结束,进入临界区
    free(threads[tid].stack);            // 释放其栈空间
    threads[tid].stack = NULL;            // 指针置空,防止野指针
    unblock_timer(); // 离开临界区
}

可能会在join函数这里有疑问,join查询到线程未终止就让出CPU,那不是要一直让出CPU吗?那我什么时候才能执行完?一定要注意join函数的执行上下文是谁,join函数是main主函数执行的,这个函数是在主函数的线程中,查询线程号为tid的子线程运行情况,并不是查询自身线程的运行状态,是当子线程没有终止时,main主线程就让出CPU,以便这个没有终止的子线程有机会获取CPU。当然,也可能会选择其他状态为READY的子线程,这是无关紧要的,因为定时器中断触发后,总能切换到这个子线程的,只有当这个子线程完全终止后,才会退出以这个子线程为参数的join函数。此时可能有其他的子线程早于这个子线程结束,同样没有关系,只是当下面调用到其他子线程为参数的join函数的时候,就不需要调用调度器了,直接进入临界区并且释放栈空间,清理资源。所以join函数还有一个线程执行结束后清理线程资源的功能。

具体的定时器中断处理函数和定时器的设置,每10ms触发一次定时器中断,然后强制调用调度器,切换线程:

// 定时器中断处理函数
void timer_handler(int signum) {
    (void)signum;    // 避免编译器关于未使用参数的警告
    schedule();        // 定时器触发,强制调用调度器进行抢占
}

// 初始化线程库
void uthread_init() {
    // 将当前执行流(main函数)初始化为0号线程
    getcontext(&threads[0].context);
    threads[0].state = RUNNING;
    threads[0].stack = NULL;    // 主线程使用进程默认的栈,无需单独分配

    // 设置信号处理
    struct sigaction sa;
    memset(&sa, 0, sizeof(sa));
    sa.sa_handler = timer_handler;    // 指定SIGALRM信号的处理函数
    sigaction(SIGALRM, &sa, NULL);

    // 设置定时器,每10毫秒触发一次SIGALRM信号
    struct itimerval timer;
    timer.it_value.tv_sec = 0;
    timer.it_value.tv_usec = 10000;    // 首次触发时间
    timer.it_interval.tv_sec = 0;
    timer.it_interval.tv_usec = 10000;    // 后续触发间隔
    setitimer(ITIMER_REAL, &timer, NULL);
}

一个线程从创建到销毁的完整流程如下:

  1. 初始化 (uthread_init)

    • 程序启动时,main 函数首先调用 uthread_init。
    • 这个函数会把 main 函数本身“包装”成0号线程,并获取其上下文,将其标记为 RUNNING。
    • 同时,它设置一个定时器,每10毫秒触发一次 SIGALRM 信号。这个信号的处理函数是timer_handler,它会直接调用调度器。这就是“抢占”的来源:无论线程是否愿意,每隔10毫秒,调度器都会被强制执行,从而有机会让其他线程运行。
  2. 创建 (thread_create)

    • 当调用 thread_create 时,我们为新线程分配一个TCB和一块独立的栈空间。
    • 通过 getcontext 获取一个上下文模板,然后修改它:将它的栈指针指向我们新分配的栈,并将它的“后继上下文”(uc_link)设置为一个公共的返回点。
    • 最关键的一步是调用 makecontext,它将线程的上下文与我们的入口函数 thread_wrapper“绑定”起来。从此,当这个上下文被激活时,程序就会从 thread_wrapper 开始执行。
  3. 调度与执行 (schedule & swapcontext)

    • 调度可以由两种方式触发:
      • 抢占式:定时器到期,timer_handler 调用 schedule。
      • 协作式:线程调用 thread_join 等函数时,主动调用 schedule 让出CPU。
    • schedule 函数找到下一个 READY 的线程后,调用swapcontext,CPU的控制权就交给了新线程。新线程从 thread_wrapper开始执行,并最终进入用户提供的函数。
  4. 终止与清理 (thread_wrapper & thread_join)

    • 当用户函数执行完毕返回后,thread_wrapper 会将该线程的状态设置为 TERMINATED,然后调用 schedule(),永久地让出CPU。
    • 与此同时,main 函数中的 thread_join 在一个循环里不断调用schedule。当它检测到目标线程的状态变为 TERMINATED 时,循环结束,它就可以安全地 free掉该线程之前分配的栈空间,完成资源的回收。

测试:

// 一个示例线程函数
void sample_thread_func(void* arg) {
    int id = *(int*)arg;    // 获取传入的参数
    for (int i = 0; i < 5; ++i) {
        printf("Thread %d says: %d\n", id, i);
        // 执行一个耗时循环,以模拟真实工作,并清晰地展示抢占式调度效果
        volatile unsigned long long k = 0;
        for (unsigned long long j = 0; j < 100000000; ++j) { k++; }
    }
    printf("Thread %d finished.\n", id);
    free(arg);        // 释放传入的参数内存
}

// 主函数
int main() {
    printf("Main: Initializing user-level thread library with ucontext.\n");
    uthread_init();    // 初始化线程库

    // 在此保存一个特殊的上下文,作为所有子线程结束后的“返回点”
    getcontext(&main_scheduler_context);

    printf("Main: Creating threads.\n");
    int* tid1 = malloc(sizeof(int)); *tid1 = 1;    // 为线程参数分配内存
    int* tid2 = malloc(sizeof(int)); *tid2 = 2;
    int t1_id = thread_create(sample_thread_func, tid1);    // 创建1号线程
    int t2_id = thread_create(sample_thread_func, tid2);    // 创建2号线程

    // 等待线程结束
    thread_join(t1_id);
    printf("Main: Joined thread %d.\n", t1_id);
    thread_join(t2_id);
    printf("Main: Joined thread %d.\n", t2_id);

    printf("Main: All threads finished. Exiting.\n");

    return 0;
}

运行结果如下:

Main: Initializing user-level thread library with ucontext.
Main: Creating threads.
Thread 1 says: 0
Thread 2 says: 0
Thread 1 says: 1
Thread 2 says: 1
Thread 1 says: 2
Thread 2 says: 2
Thread 1 says: 3
Thread 2 says: 3
Thread 1 says: 4
Thread 2 says: 4
Thread 1 finished.
Main: Joined thread 1.
Thread 2 finished.
Main: Joined thread 1.
Main: Joined thread 2.
Main: All threads finished. Exiting.

可以看到2个线程确实在交替运行。再次强调,整个函数运行都是串行的,没有任何的并行部分,这也是用户级线程库的特点,因为内核无法感知,没有维护这个LWP轻量级进程的队列,因此内核无法调度多核并行。

为了让读者对上述简单多线程测试样例有一个更加清晰的视角,这里给出下面运行的详细的模拟流程:

步骤 执行函数/位置 CPU T0 状态 T1 状态 T2 状态 备注
1 main() T0 RUNNING - - 程序开始
2 uthread_init() T0 RUNNING - - T0上下文被保存, 定时器启动
3 thread_create(T1) T0 RUNNING READY - T1被创建, 但不运行
4 thread_create(T2) T0 RUNNING READY READY T2被创建, 但不运行
5 thread_join(T1) T0 RUNNING READY READY T0进入join函数, 准备让出CPU
6 schedule() T0 RUNNING READY READY T0调用调度器
7 swapcontext(T0, T1) T1 READY RUNNING READY 第一次切换! CPU交给T1
8 thread_wrapper(T1) T1 READY RUNNING READY T1开始运行, 打印”Thread 1 says: 0”
9 …T1在循环中… T1 READY RUNNING READY T1持续运行
10 [定时器中断!] T1 READY RUNNING READY 10ms到达, 强制中断T1
11 schedule() T1 READY RUNNING READY T1被中断, 进入调度器
12 swapcontext(T1, T2) T2 READY READY RUNNING 第二次切换! CPU交给T2
13 thread_wrapper(T2) T2 READY READY RUNNING T2开始运行, 打印”Thread 2 says: 0”
14 [定时器中断!] T2 READY READY RUNNING 再次中断
15 schedule() T2 READY READY RUNNING T2进入调度器
16 swapcontext(T2, T0) T0 RUNNING READY READY 第三次切换! CPU交还给主线程
17 thread_join(T1) T0 RUNNING READY READY T0从schedule返回, 继续while循环, 发现T1未结束, 再次调用schedule让出CPU
18 循环往复: T0, T1, T2交替运行。T0总是在join里检查并让出CPU, T1和T2则在执行自己的任务。
19 thread_wrapper(T1) T1 READY RUNNING READY T1任务完成, 打印”Thread 1 finished.”
20 thread_wrapper(T1) T1 READY TERMINATED READY T1将自己状态设为终止, 然后其上下文自动链接到main_scheduler_context
21 main() T0 RUNNING TERMINATED READY T0的join函数在下一次检查时发现T1已终止, while循环退出
22 main() T0 RUNNING TERMINATED READY thread_join(t1_id)返回, 打印”Main: Joined thread 1.”
23 thread_join(T2) T0 RUNNING TERMINATED READY T0开始以同样的方式等待T2
24 循环往复: T0和T2交替运行, 直到T2也完成
25 main() T0 RUNNING TERMINATED TERMINATED thread_join(t2_id)返回, 打印”Main: Joined thread 2.”
26 main() T0 RUNNING TERMINATED TERMINATED 打印”All threads finished.”, 程序结束

互斥锁

如果要增加互斥锁,需要给线程增加阻塞状态BLOCKED,然后可以参考如下实现,:

// 互斥锁结构体
typedef struct {
    atomic_int locked;          // 原子锁标志
    int blocking_threads[MAX_THREADS]; // 阻塞在该锁上的线程数组
    int count;                  // 阻塞线程计数
} Mutex;

// 创建互斥锁
Mutex mutex_create() {
    Mutex m;
    atomic_init(&m.locked, 0);  // 初始化为未锁定状态
    m.count = 0;                // 初始化阻塞线程数为0
    return m;
}

// 获取互斥锁
void mutex_lock(Mutex* m) {
    // 使用原子交换尝试获取锁
    while (atomic_exchange(&m->locked, 1)) {
        // 如果获取失败,将当前线程加入阻塞队列
        m->blocking_threads[m->count++] = current_thread;
        threads[current_thread].state = BLOCKED;
        // 主动让出CPU
        longjmp(threads[current_thread].context, 1);
    }
}

// 释放互斥锁
void mutex_unlock(Mutex* m) {
    atomic_store(&m->locked, 0);  // 原子操作释放锁
    // 唤醒所有阻塞在该锁上的线程
    for (int i = 0; i < m->count; i++) {
        int tid = m->blocking_threads[i];
        if (threads[tid].state == BLOCKED) {
            threads[tid].state = READY;
        }
    }
    m->count = 0;  // 重置阻塞线程计数
}