操作系统第二次作业
1. 实验内容
本次实验的主要内容是在 Linux 内核中设计并实现一种事件同步屏障原语,允许多个进程在某个事件发生前阻塞,直到其他进程触发该事件。当某个进程触发事件时,所有因该事件阻塞的进程将解除阻塞,如果没有进程被阻塞则不产生任何影响。
具体要求是实现四个新的系统调用:
sys_eventopen():用于创建新事件并返回一个全局唯一的事件标识符 (event ID)。sys_eventwait(int event_id):用于使当前进程在指定event_id上阻塞(睡眠)。sys_eventsig(int event_id):用于唤醒所有在指定event_id上阻塞的进程。sys_eventclose(int event_id):用于销毁事件,并唤醒所有可能仍在该事件上阻塞的进程。
最后,需要编写用户态测试程序,至少覆盖“多进程等待同一事件”和“多进程多事件并发”两种场景。
2. 实验思路
本实验采取分阶段推进的策略,首先搭建和验证环境,然后设计核心数据结构,接着实现系统调用功能,最后编写用户态程序进行测试。
阶段一 环境配置与内核裁剪
- 环境优化: 为缩短内核编译的迭代时间,首先对编译环境进行优化。
- 使用
make menuconfig对内核配置进行裁剪,禁用所有与实验无关的模块(如无线、声卡、蓝牙等非必要驱动)。 - 安装
ccache并配置make CC="ccache gcc",利用编译缓存实现后续的快速迭代。 - 搭建调用框架: 在
syscall_64.tbl、syscalls.h和kernel/sys.c中注册eventopen、eventclose、eventwait、eventsig四个空的系统调用。 - 初步验证: 编写一个简单的用户态程序
test_events_skeleton.c逐一调用这四个“空壳”函数,并通过dmesg确认内核打印了对应信息,确保系统调用的“通路”已全部打通。
阶段二:核心数据结构设计
-
全局管理: 在
kernel/sys.c中定义一个全局数组static struct os_event event_table[MAX_EVENTS]来管理所有事件。event_id即为该数组的索引。 -
管程锁(粗粒度): 定义一个全局互斥锁
static DEFINE_MUTEX(event_table_lock)。这把锁只用于保护event_table数组本身,即in_use标志的分配与释放,确保槽位操作的原子性。 -
事件结构体(细粒度): 为了解决复杂的“释放后重用”(Use-After-Free)竞态条件,设计了包含引用计数和销毁队列的事件结构:
/* 事件结构体 */
struct os_event {
bool in_use; // 是否正在使用
bool initialized; // 是否已初始化
wait_queue_head_t queue; // 等待队列
spinlock_t lock; // 保护 gen/closing 字段
unsigned long gen; // 每次当 eventsig 时自增,表示新一轮信号
bool closing; // 表示正在关闭
atomic_t waiters; // 当前处于等待队列中的线程数
wait_queue_head_t destroy_wq; // 用来等待 waiters==0 的队列
};
阶段三:核心逻辑实现
eventopen:获取全局锁event_table_lock,遍历查找in_use == false的槽位。如果槽位是首次使用 (initialized == false),则初始化其queue,lock,destroy_wq和waiters计数器。无论如何,都会重置gen = 0和closing = false,然后释放全局锁。eventwait:- 获取全局锁
event_table_lock,检查in_use标志。 - 在释放全局锁之前,原子地增加引用计数
atomic_inc(&ev->waiters)。这确保eventclose在检查waiters时,不会漏掉正在进入等待的本进程。 - 释放全局锁。
- 获取内部
spinlock,读取当前的gen作为快照mygen。 - 调用
wait_event_interruptible,等待ev->gen != mygen(被eventsig改变) 或ev->closing == true(被eventclose改变)。 - 唤醒后,调用
atomic_dec_and_test(&ev->waiters)。如果是最后一个退出的等待者 (waiters == 0) 且eventclose正在进行 (closing == true),则负责唤醒在destroy_wq上睡眠的eventclose进程。 eventsig:获取内部spinlock,将gen计数器加一,然后释放锁。最后调用wake_up_all(&ev->queue)唤醒所有等待者。eventclose:- 获取内部
spinlock,设置closing = true。 - 调用
wake_up_all(&ev->queue)唤醒所有等待者(它们会检查到closing == true而退出)。 - 调用
wait_event(ev->destroy_wq, ...)在销毁队列上睡眠,直到waiters计数器归零。 - 被唤醒后(由最后一个
eventwait进程唤醒),获取全局锁event_table_lock,安全地将in_use = false,释放该槽位以供eventopen重用。
阶段四:编写用户态测试程序
- 设计三个测试用例来验证功能的完备性和并发正确性:
sync_test1.c:多进程等待单事件。用于验证基本的“等待-唤醒”功能。sync_test2.c:多进程等待多事件。用于验证不同事件之间的隔离性。sync_test3.c:验证eventclose也能唤醒正在等待的进程。sync_test4.c:验证对同一事件的多次重复等待和唤醒。sync_test5.c:高并发压力测试,验证eventclose的引用计数器能正确处理“释放-重用”竞态条件。
3. 实验过程
3.1 内核裁剪与环境配置
为加速内核编译,缩短开发-测试的迭代周期,本次实验对编译环境进行了两项关键优化:内核裁剪与编译缓存。
-
使用
make menuconfig裁剪内核 -
在内核源码根目录执行
cp /boot/config-$(uname -r) .config,以一个已知的、可工作的配置为基础。 - 运行
make menuconfig进入图形化配置界面。 - 裁剪原则: 实验环境为 VMware 虚拟机,且仅通过 SSH 访问,因此所有物理硬件、非必要的驱动和功能均可禁用。
- 主要裁剪项:
Networking support --->- 禁用
Wireless LAN、Bluetooth subsystem support、NFC subsystem support等所有无线和非 IP 协议。 - (保留
TCP/IP networking和Ethernet driver support中的VMware VMXNET3驱动以保证 SSH 连通性)。 Device Drivers --->- 禁用
Sound card support --->。 - 禁用
Multimedia support --->(摄像头、电视卡等)。 - 禁用
Graphics support --->子菜单中除VMware VMSVGA和VGA text console之外的所有物理显卡驱动 (NVIDIA, AMD, Intel)。 - 禁用
USB support --->(VMware 虚拟机通过 SSH 访问,无需 USB)。 - 禁用
File systems --->子菜单中除Ext4(或当前根文件系统类型)之外的所有其他文件系统(如XFS,Btrfs,NTFS)。
-
裁剪后保存配置,大幅减少了需要编译的模块数量。
-
配置
ccache编译缓存 -
ccache(Compiler Cache) 是一个编译加速工具,它能缓存.c文件上一次的编译结果(.o文件)。当下次编译时,若源文件未更改,ccache会直接返回缓存的结果,跳过实际的gcc编译过程。 -
安装:
sudo apt install ccache -
扩大缓存空间 : 内核编译会产生大量缓存,默认空间可能不足。
ccache -M 50G # 将缓存上限设为 50GB -
使用: 在调用
make时,通过CC变量指定ccache作为gcc的“包装器”。make CC="ccache gcc" -j$(nproc) -
效果: 首次编译(填充缓存)依然耗时,但在此后修改代码(如
kernel/sys.c)后再次编译,编译链接时间从2小时缩短到2分钟,极大地提高了调试效率。
3.2 搭建系统调用框架
- 修改
arch/x86/entry/syscalls/syscall_64.tbl:
分配系统调用号。
550 64 eventopen sys_eventopen
551 64 eventclose sys_eventclose
552 64 eventwait sys_eventwait
553 64 eventsig sys_eventsig
- 修改
include/linux/syscalls.h:
声明系统调用。
asmlinkage long sys_eventopen(void);
asmlinkage long sys_eventclose(int event_id);
asmlinkage long sys_eventwait(int event_id);
asmlinkage long sys_eventsig(int event_id);
- 修改
kernel/sys.c:
为系统调用添加空壳实现。
SYSCALL_DEFINE0(eventopen) { printk(KERN_INFO "eventopen call\n"); return 0; }
SYSCALL_DEFINE1(eventclose, int, event_id) { printk(KERN_INFO "eventclose call\n"); return 0; }
SYSCALL_DEFINE1(eventwait, int, event_id) { printk(KERN_INFO "eventwait call\n"); return 0; }
SYSCALL_DEFINE1(eventsig, int, event_id) { printk(KERN_INFO "eventsig call\n"); return 0; }
3.3 核心代码实现
在 kernel/sys.c 顶部添加头文件和全局定义:
/* 需要包含的头文件 */
#include <linux/wait.h>
#include <linux/mutex.h>
#include <linux/spinlock.h>
#include <linux/errno.h>
#include <linux/atomic.h>
#include <linux/sched.h>
#include <linux/uaccess.h> // (如果需要与用户空间交互)
/* 常量 */
#define MAX_EVENTS 64
/* 事件结构体 */
struct os_event {
bool in_use; // 是否正在使用
bool initialized; // 是否已初始化
wait_queue_head_t queue; // 等待队列
spinlock_t lock; // 保护 gen/closing 字段
unsigned long gen; // 每次当 eventsig 时自增,表示新一轮信号
bool closing; // 表示正在关闭
atomic_t waiters; // 当前处于等待队列中的线程数
wait_queue_head_t destroy_wq; // 用来等待 waiters==0 的队列
};
/* 全局事件表与保护锁 */
static struct os_event event_table[MAX_EVENTS];
static DEFINE_MUTEX(event_table_lock);
3.3.1 sys_eventopen 的实现
eventopen 负责分配一个事件槽位。它使用 event_table_lock 这把粗粒度锁来保证槽位分配的原子性。如果一个槽位是首次被使用 (initialized == false),它会负责初始化所有底层的同步原语(等待队列、自旋锁、原子计数器)。无论如何,它都会重置事件的逻辑状态(gen 和 closing),为新的使用者做好准备。
SYSCALL_DEFINE0(eventopen) {
int i, id = -1;
mutex_lock(&event_table_lock);
for (i = 0; i < MAX_EVENTS; ++i) {
if (!event_table[i].in_use) {
event_table[i].in_use = true;
id = i;
break;
}
}
if (id != -1) {
if (!event_table[id].initialized) {
init_waitqueue_head(&event_table[id].queue);
spin_lock_init(&event_table[id].lock);
init_waitqueue_head(&event_table[id].destroy_wq);
atomic_set(&event_table[id].waiters, 0);
event_table[id].initialized = true;
}
/* 重置逻辑状态为初始 generation(安全:eventclose 已等 waiters==0 再 set in_use=false) */
spin_lock(&event_table[id].lock);
event_table[id].gen = 0;
event_table[id].closing = false;
spin_unlock(&event_table[id].lock);
}
mutex_unlock(&event_table_lock);
return id != -1 ? id : -ENOSPC;
}
3.3.2 sys_eventwait 的实现
eventwait 是最复杂的设计。
- 它首先在全局锁
event_table_lock的保护下检查事件是否存在,并原子地递增waiters引用计数。 - 递增计数后,它立即释放全局锁,以允许其他进程(如
eventsig)并发执行。 - 它使用内部自旋锁
ev->lock来安全地读取当前gen的快照mygen。 - 它调用
wait_event_interruptible进入睡眠,等待gen发生变化(被eventsig触发)或closing变为true(被eventclose触发)。 - 唤醒后,它原子地递减
waiters计数。如果是最后一个退出者且closing为true,它负责唤醒正在destroy_wq上等待的eventclose进程。
SYSCALL_DEFINE1(eventwait, int, event_id) {
struct os_event *ev;
unsigned long flags;
unsigned long mygen;
int ret;
if (event_id < 0 || event_id >= MAX_EVENTS)
return -EINVAL;
ev = &event_table[event_id];
/* 在全局锁下检查是否存在 */
mutex_lock(&event_table_lock);
if (!ev->in_use) {
mutex_unlock(&event_table_lock);
return -EINVAL;
}
/* 进入等待前计数 */
atomic_inc(&ev->waiters);
mutex_unlock(&event_table_lock);
/* 读取 snapshot generation(受 spinlock 保护,避免同时修改) */
spin_lock_irqsave(&ev->lock, flags);
mygen = ev->gen;
spin_unlock_irqrestore(&ev->lock, flags);
/* 等待直到 generation 发生变化(eventsig)或 closing 被设置 */
ret = wait_event_interruptible(ev->queue, ({
bool cond;
spin_lock_irqsave(&ev->lock, flags);
cond = (ev->gen != mygen) || ev->closing;
spin_unlock_irqrestore(&ev->lock, flags);
cond;
}));
/* 退出时递减 waiters;如果这是最后一个且正在 closing,则通知 destroy_wq */
if (atomic_dec_and_test(&ev->waiters)) {
spin_lock_irqsave(&ev->lock, flags);
if (ev->closing)
wake_up(&ev->destroy_wq);
spin_unlock_irqrestore(&ev->lock, flags);
}
return ret;
}
3.3.3 sys_eventsig 的实现
eventsig 的实现轻量且高效。它不需要获取全局锁,只在内部 spinlock 的保护下将 gen 计数器加一,然后调用 wake_up_all 唤醒所有在 queue 上等待的进程。
SYSCALL_DEFINE1(eventsig, int, event_id) {
struct os_event *ev;
unsigned long flags;
if (event_id < 0 || event_id >= MAX_EVENTS)
return -EINVAL;
ev = &event_table[event_id];
mutex_lock(&event_table_lock);
if (!ev->in_use) {
mutex_unlock(&event_table_lock);
return -EINVAL;
}
mutex_unlock(&event_table_lock);
/* 增加 generation(受锁保护),然后 wake_up_all */
spin_lock_irqsave(&ev->lock, flags);
ev->gen++; /* 每次 signal 都自增 generation */
spin_unlock_irqrestore(&ev->lock, flags);
wake_up_all(&ev->queue);
return 0;
}
3.3.4 sys_eventclose 的实现
eventclose 是确保“释放后重用”安全的关键。
- 它首先设置
closing = true并唤醒所有等待者。 - 然后,它不会立即释放槽位,而是调用
wait_event在destroy_wq上睡眠。 - 它会一直在此睡眠,直到最后一个
eventwait进程退出并唤醒它。 - 被唤醒后,
eventclose才获取全局锁event_table_lock,并安全地将in_use = false,此时槽位才被真正释放。
SYSCALL_DEFINE1(eventclose, int, event_id) {
struct os_event *ev;
unsigned long flags;
if (event_id < 0 || event_id >= MAX_EVENTS)
return -EINVAL;
ev = &event_table[event_id];
/* 检查并标记为正在关闭(不立即清 in_use) */
mutex_lock(&event_table_lock);
if (!ev->in_use) {
mutex_unlock(&event_table_lock);
return -EINVAL;
}
mutex_unlock(&event_table_lock);
/* 标记 closing 并唤醒所有等待者 */
spin_lock_irqsave(&ev->lock, flags);
ev->closing = true;
spin_unlock_irqrestore(&ev->lock, flags);
wake_up_all(&ev->queue);
/* 等待所有等待者离开(waiters==0),eventwait 在最后会 wake_up destroy_wq */
wait_event(ev->destroy_wq, atomic_read(&ev->waiters) == 0);
/* 现在没有任何线程依赖此槽位,可以安全释放 in_use */
mutex_lock(&event_table_lock);
ev->in_use = false;
ev->closing = false; /* 复位 closing(optional: 保留 initialized) */
mutex_unlock(&event_table_lock);
return 0;
}
3.4 用户态测试程序
为了验证上述内核实现的正确性和健壮性,设计了以下五个测试程序。
3.4.1 测试一:多进程等待单事件 (sync_test1.c)
此测试用于验证基本功能:多个子进程(5个)同时等待同一个事件,父进程在 sleep(3) 后调用 eventsig,验证所有子进程是否能被同时唤醒。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>
#define SYS_eventopen 550
#define sys_eventclose 551
#define sys_eventwait 552
#define sys_eventsig 553
#define NUM_CHILDREN 5
int main() {
long event_id;
int i;
pid_t pid;
printf("(2723)[Parent] 正在创建新事件...\n");
event_id = syscall(SYS_eventopen);
if (event_id < 0) {
perror("syscall(eventopen) 失败");
return 1;
}
printf("(2723)[Parent] 事件创建成功, ID: %ld\n", event_id);
for (i = 0; i < NUM_CHILDREN; i++) {
pid = fork();
if (pid < 0) {
perror("fork 失败");
return 1;
}
if (pid == 0) {
printf("(2723)[Child %d] 正在等待事件 %ld...\n", getpid(), event_id);
long ret = syscall(sys_eventwait, event_id);
if (ret == 0) {
printf("(2723)[Child %d] 成功从事件 %ld 唤醒!\n", getpid(), event_id);
} else {
perror("(2723)[Child] eventwait 失败\n");
}
exit(0);
}
}
printf("(2723)[parent] 等待3秒,确保所有子进程都已进入睡眠...\n");
sleep(3);
printf("(2723)[Parent] 正在发送信号(eventsig)到事件 %ld...\n", event_id);
syscall(sys_eventsig, event_id);
for (i = 0; i < NUM_CHILDREN; i++) {
wait(NULL);
}
printf("(2723)[Parent] 所有子进程已退出。\n");
syscall(sys_eventclose, event_id);
printf("(2723)[Parent] 事件 %ld 已关闭。\n", event_id);
return 0;
}
3.4.2 测试二:多进程等待多事件 (sync_test2.c)
此测试用于验证事件的隔离性。创建三个子进程,分别等待三个不同的事件 (A, B, C)。父进程依次唤醒 B、C、A,验证每次 eventsig 只唤醒了对应事件上的进程。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>
#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553
void wait_for_event(int event_id, const char* event_name) {
printf("(2723)[Child %d] 正在等待事件 %s (ID: %d)...\n", getpid(), event_name, event_id);
long ret = syscall(SYS_eventwait, event_id);
if (ret == 0) {
printf("(2723)[Child %d] 成功从事件 %s (ID: %d) 唤醒!\n", getpid(), event_name, event_id);
} else {
perror("(2723)[Child] eventwait 失败");
}
exit(0);
}
int main() {
long event_A, event_B, event_C;
event_A = syscall(SYS_eventopen);
event_B = syscall(SYS_eventopen);
event_C = syscall(SYS_eventopen);
printf("(2723)[Parent] 创建了三个事件: A=%ld, B=%ld, C=%ld", event_A, event_B, event_C);
if (fork() == 0) {
wait_for_event(event_A, "A");
}
if (fork() == 0) {
wait_for_event(event_B, "B");
}
if (fork() == 0) {
wait_for_event(event_C, "C");
}
printf("(2723)[Parent] 等待3秒,确保所有子进程进入睡眠...\n");
sleep(3);
printf("(2723)[Parent] 正在发送信号(eventsig)到事件B(ID: %ld)...\n", event_B);
syscall(SYS_eventsig, event_B);
sleep(1);
printf("(2723)[Parent] ---------------------------------------\n");
printf("(2723)[Parent] 正在发送信号(eventsig)到事件C(ID: %ld)...\n", event_C);
syscall(SYS_eventsig, event_C);
sleep(1);
printf("(2723)[Parent] ---------------------------------------\n");
printf("(2723)[Parent] 正在发送信号(eventsig)到事件A(ID: %ld)...\n", event_A);
syscall(SYS_eventsig, event_A);
wait(NULL);
wait(NULL);
wait(NULL);
printf("(2723)[Parent] 所有子进程已退出。\n");
syscall(SYS_eventclose, event_A);
syscall(SYS_eventclose, event_B);
syscall(SYS_eventclose, event_C);
printf("(2723)[Parent] 所有事件已关闭。\n");
return 0;
}
3.4.3 测试三:eventclose 唤醒测试 (sync_test3.c)
此测试用于验证 eventclose 的唤醒功能。父进程不调用 eventsig,而是直接调用 eventclose,验证所有正在等待的子进程是否被正确唤醒(它们会检查到 closing == true)。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>
#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553
#define NUM_CHILDREN 3
int main() {
long event_id;
int i;
pid_t pid;
printf("(2723)[Parent] 正在创建新事件...\n");
event_id = syscall(SYS_eventopen);
if (event_id < 0) {
perror("syscall(eventopen) failed.\n");
return 1;
}
printf("(2723)[Parent] 事件创建成功,ID: %ld\n", event_id);
for (i = 0; i < NUM_CHILDREN; i++) {
if (fork() == 0) {
printf("(2723)[Child %d] 正在等待事件 %ld...\n", getpid(), event_id);
long ret = syscall(SYS_eventwait, event_id);
if (ret == 0) {
printf("(2723)[Child %d] 成功从事件 %ld 唤醒 (被close唤醒?)\n", getpid(), event_id);
} else {
printf("(2723)[Child %d] eventwait 返回 %ld, 可能是被 close 中断\n", getpid(), ret);
}
exit(0);
}
}
printf("(2723)[Parent] 等待3秒,确保所有子进程都已进入睡眠...\n");
sleep(3);
printf("(2723)[Parent] 正在关闭(eventclose)事件 %ld 来唤醒子进程...\n", event_id);
syscall(SYS_eventclose, event_id);
for (i = 0; i < NUM_CHILDREN; i++) {
wait(NULL);
}
printf("(2723)[Parent] 所有子进程已退出。\n");
printf("(2723)[Parent] 测试完成。\n");
return 0;
}
3.4.4 测试四:边缘触发测试 (sync_test4.c)
此测试用于验证 gen 计数器实现的“边缘触发”特性。子进程在一个循环中调用 eventwait 5次;父进程也必须在循环中调用 eventsig 5次,每次 eventsig 递增 gen,才能让子进程完成一次循环。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>
#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553
#define LOOP_COUNT 5
int main()
{
long event_id;
int i;
pid_t pid;
printf("(2723)[Parent] 正在创建新事件...\n");
event_id = syscall(SYS_eventopen);
printf("(2723)[Parent] 事件创建成功,ID: %ld\n", event_id);
pid = fork();
if (pid == 0) {
// --- 子进程逻辑 ---
for (i = 0; i < LOOP_COUNT; i++) {
printf("(2723)[Child %d] 正在等待事件 %ld (第 %d/%d 次)\n",
getpid(), event_id, i + 1, LOOP_COUNT);
long ret = syscall(SYS_eventwait, event_id); // 等待
if (ret != 0) {
perror("(2723)[Child] eventwait 失败");
exit(1);
}
printf("(2723)[Child %d] 成功从事件 %ld 唤醒 (第 %d/%d 次)\n",
getpid(), event_id, i + 1, LOOP_COUNT);
// 唤醒后不退出,而是循环回去再次等待
}
printf("(2723)[Child %d] 循环完成,正在退出。\n", getpid());
exit(0);
// --- 子进程结束 ---
}
// --- 父进程逻辑 ---
printf("(2723)[Parent] 等待 2 秒让子进程进入睡眠...\n");
sleep(2);
for (i = 0; i < LOOP_COUNT; i++) {
printf("(2723)[Parent] 正在发送信号 (eventsig) 到事件 %ld (第 %d/%d 次)\n",
event_id, i + 1, LOOP_COUNT);
syscall(SYS_eventsig, event_id); // 每次循环都发一个信号
sleep(1); // 给子进程一点时间去循环并再次睡眠
}
wait(NULL); // 等待子进程退出
printf("(2723)[Parent] 子进程已退出。\n");
syscall(SYS_eventclose, event_id);
printf("(2723)[Parent] 事件 %ld 已关闭。\n", event_id);
return 0;
}
3.4.5 测试五:释放-重用压力测试 (sync_test5.c)
此测试用于验证 atomic_t waiters 引用计数和 destroy_wq 机制的健壮性。
- 首先填满全部 64 个事件槽位。
- 启动一个“慢等待者”线程 (
slow_waiter) 在ID: 0上睡眠。 - 启动一个“关闭者”线程 (
closer) 调用eventclose(0)。 closer会阻塞在destroy_wq上,等待waiters归零。- 主线程 (
main) 此时疯狂循环调用eventopen。 - 预期
main线程会一直失败 (返回-ENOSPC),直到slow_waiter退出、closer返回、ID: 0的in_use最终被设为false后,eventopen才能成功。 - 此测试使用
pthread库,编译时需链接-lpthread。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>
#include <pthread.h>
#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553
// 你的内核中的 MAX_EVENTS
#define KERNEL_MAX_EVENTS 64
long event_id_to_test = 0; // 我们将专门测试 ID 0
// 线程 1:缓慢的等待者
void* slow_waiter(void* arg) {
printf("[Waiter] 线程启动,准备等待事件 %ld\n", event_id_to_test);
syscall(SYS_eventwait, event_id_to_test);
printf("[Waiter] 刚刚被唤醒 (被 close 唤醒)。\n");
printf("[Waiter] 模拟一些清理工作... (睡眠 2 秒)\n");
sleep(2); // 模拟在退出 eventwait 后仍持有锁或资源
printf("[Waiter] 清理工作完成,线程退出。\n");
return NULL;
}
// 线程 2:关闭者
void* closer(void* arg) {
// 等待 waiter 先进入睡眠
sleep(1);
printf("(2723)[Closer] 线程启动,准备关闭事件 %ld\n", event_id_to_test);
long ret = syscall(SYS_eventclose, event_id_to_test);
printf("(2723)[Closer] eventclose 调用返回 (ret=%ld)。这应该在 Waiter 退出后才发生。\n", ret);
return NULL;
}
int main()
{
pthread_t waiter_tid, closer_tid;
long event_ids[KERNEL_MAX_EVENTS];
int i;
// --- 步骤 1: 填满所有事件槽位 ---
printf("(2723)[Main] 正在填满所有 %d 个事件槽位...\n", KERNEL_MAX_EVENTS);
for (i = 0; i < KERNEL_MAX_EVENTS; i++) {
event_ids[i] = syscall(SYS_eventopen);
if (event_ids[i] < 0) {
perror("(2723)[Main] eventopen 失败,无法填满 table");
return 1;
}
}
printf("(2723)[Main] 所有 %d 个槽位已占满 (0 到 %d)。\n", KERNEL_MAX_EVENTS, KERNEL_MAX_EVENTS - 1);
// 我们选择第一个事件 ID (即 0) 来进行测试
event_id_to_test = event_ids[0];
// --- 步骤 2 & 3: 启动 Waiter 和 Closer ---
pthread_create(&waiter_tid, NULL, slow_waiter, NULL);
pthread_create(&closer_tid, NULL, closer, NULL);
// 等待 Closer 线程启动 (Closer 自己会 sleep 1s, 所以 0.1s 足够)
usleep(100000); // 0.1 秒
// --- 步骤 4: 主线程 (Churner) 开始疯狂尝试重用 ---
printf("(2723)[Main] Closer 已启动。Main 线程开始循环尝试 eventopen...\n");
long ret = -1;
int attempts = 0;
while(ret < 0) {
attempts++;
// 尝试打开新事件。我们希望它失败,直到 closer 线程完成。
ret = syscall(SYS_eventopen);
if (ret < 0) {
// -ENOSPC (或 -ENOMEM) 是我们期望的
// 这证明 eventclose 仍在阻塞,event_table[0].in_use 仍为 true
if (attempts % 500000 == 0) { // 调高打印频率
printf("(2723)[Main] 第 %d 次 eventopen 失败 (errno: %d)。这是正常的。\n", attempts, errno);
}
} else {
// 成功了!
printf("\n(2723)[Main] *** 成功在 %d 次尝试后打开了新事件 ID %ld ***\n", attempts, ret);
}
}
printf("(2723)[Main] 这应该在 Closer 和 Waiter 都退出后才发生。\n");
// --- 清理 ---
pthread_join(waiter_tid, NULL);
pthread_join(closer_tid, NULL);
// 关闭我们新打开的事件
syscall(SYS_eventclose, ret);
// 关闭其他所有占位的事件 (ID 1 到 63)
for (i = 1; i < KERNEL_MAX_EVENTS; i++) {
syscall(SYS_eventclose, event_ids[i]);
}
printf("(2723)[Main] 测试完成。\n");
return 0;
}
4. 实验结果
以下是五个测试程序的实际运行输出结果。
4.1 测试一:多进程等待单事件
测试 sync_test1.c,父进程 eventsig 成功唤醒了所有 5 个正在等待的子进程。
(2723)[Parent] 正在创建新事件...
(2723)[Parent] 事件创建成功, ID: 0
(2723)[Child 3070] 正在等待事件 0...
(2723)[Child 3071] 正在等待事件 0...
(2723)[parent] 等待3秒,确保所有子进程都已进入睡眠...
(2723)[Child 3073] 正在等待事件 0...
(2723)[Child 3072] 正在等待事件 0...
(2723)[Child 3074] 正在等待事件 0...
(2723)[Parent] 正在发送信号(eventsig)到事件 0...
(2723)[Child 3074] 成功从事件 0 唤醒!
(2723)[Child 3071] 成功从事件 0 唤醒!
(2723)[Child 3072] 成功从事件 0 唤醒!
(2723)[Child 3073] 成功从事件 0 唤醒!
(2723)[Child 3070] 成功从事件 0 唤醒!
(2723)[Parent] 所有子进程已退出。
(2723)[Parent] 事件 0 已关闭。
4.2 测试二:多进程等待多事件
测试 sync_test2.c,父进程对 B、C、A 的 eventsig 调用,精确地、依次唤醒了各自事件上的等待者,证明了事件的隔离性。
(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Child 3143] 正在等待事件 A (ID: 0)...
(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Child 3144] 正在等待事件 B (ID: 1)...
(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Parent] 等待3秒,确保所有子进程进入睡眠...
(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Child 3145] 正在等待事件 C (ID: 2)...
(2723)[Parent] 正在发送信号(eventsig)到事件B(ID: 1)...
(2723)[Child 3144] 成功从事件 B (ID: 1) 唤醒!
(2723)[Parent] ---------------------------------------
(2723)[Parent] 正在发送信号(eventsig)到事件C(ID: 2)...
(2723)[Child 3145] 成功从事件 C (ID: 2) 唤醒!
(2723)[Parent] ---------------------------------------
(2723)[Parent] 正在发送信号(eventsig)到事件A(ID: 0)...
(2723)[Child 3143] 成功从事件 A (ID: 0) 唤醒!
(2723)[Parent] 所有子进程已退出。
(2723)[Parent] 所有事件已关闭。
4.3 测试三:eventclose 唤醒测试
测试 sync_test3.c,父进程直接调用 eventclose,所有正在等待的子进程被成功唤醒,证明 eventclose 满足了“通知所有被阻塞进程”的要求。
(2723)[Parent] 正在创建新事件...
(2723)[Parent] 事件创建成功,ID: 0
(2723)[Child 3236] 正在等待事件 0...
(2723)[Parent] 等待3秒,确保所有子进程都已进入睡眠...
(2723)[Child 3237] 正在等待事件 0...
(2723)[Child 3238] 正在等待事件 0...
(2723)[Parent] 正在关闭(eventclose)事件 0 来唤醒子进程...
(2723)[Child 3236] 成功从事件 0 唤醒 (被close唤醒?)
(2723)[Child 3237] 成功从事件 0 唤醒 (被close唤醒?)
(2723)[Child 3238] 成功从事件 0 唤醒 (被close唤醒?)
(2723)[Parent] 所有子进程已退出。
(2723)[Parent] 测试完成。
4.4 测试四:边缘触发测试
测试 sync_test4.c,父进程的 5 次 eventsig 调用,对应了子进程的 5 次 eventwait 唤醒和循环,证明了 gen 计数器实现的边缘触发逻辑正确。
(2723)[Parent] 正在创建新事件...
(2723)[Parent] 事件创建成功,ID: 0
(2723)[Parent] 等待 2 秒让子进程进入睡眠...
(2723)[Child 3298] 正在等待事件 0 (第 1/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 1/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 1/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 2/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 2/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 2/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 3/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 3/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 3/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 4/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 4/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 4/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 5/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 5/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 5/5 次)
(2723)[Child 3298] 循环完成,正在退出。
(2723)[Parent] 子进程已退出。
(2723)[Parent] 事件 0 已关闭。
4.5 测试五:释放-重用压力测试
测试 sync_test5.c,主线程在 2 秒多的时间内 eventopen 失败了超过 160 万次,直到 slow_waiter 线程退出、closer 线程返回后,eventopen 才成功拿到 ID: 0。这证明了 atomic_t waiters 和 destroy_wq 机制成功阻塞了 eventclose,避免了“释放后重用”的竞态条件。
(2723)[Main] 正在填满所有 64 个事件槽位...
(2723)[Main] 所有 64 个槽位已占满 (0 到 63)。
[Waiter] 线程启动,准备等待事件 0
(2723)[Main] Closer 已启动。Main 线程开始循环尝试 eventopen...
(2723)[Main] 第 500000 次 eventopen 失败 (errno: 28)。这是正常的。
(2723)[Main] 第 1000000 次 eventopen 失败 (errno: 28)。这是正常的。
(2723)[Main] 第 1500000 次 eventopen 失败 (errno: 28)。这是正常的。
(2723)[Closer] 线程启动,准备关闭事件 0
[Waiter] 刚刚被唤醒 (被 close 唤醒)。
[Waiter] 模拟一些清理工作... (睡眠 2 秒)
(2723)[Closer] eventclose 调用返回 (ret=0)。这应该在 Waiter 退出后才发生。
(2723)[Main] *** 成功在 1609364 次尝试后打开了新事件 ID 0 ***
(2723)[Main] 这应该在 Closer 和 Waiter 都退出后才发生。
[Waiter] 清理工作完成,线程退出。
(2723)[Main] 测试完成。
5. 实验总结与反思
本次实验在实现进程同步原语的过程中,遇到了多个在课上讨论过的并发问题,这些调试经历是本次实验最大的收获。
最初的探索是关于内核编译的。为了加速开发迭代,本次实验尝试了 make localmodconfig 和 make menuconfig 来裁剪内核。但过于激进的裁剪(如禁用了 VMware PVSCSI 驱动)导致了 (initramfs) 启动失败。通过 GRUB 菜单切换回 ...-generic 内核才得以恢复,这让我深刻理解到内核对底层硬件驱动的依赖性。最终,通过 ccache 编译缓存和针对性的 menuconfig(仅禁用非必要外设,保留核心驱动)相结合,实现了开发效率的平衡。
在实现系统调用逻辑时,第一个挑战是“丢失的唤醒” (Lost Wakeup)。最初的 eventwait 设计是基于 schedule() 的简单睡眠,这导致 eventsig 的 wake_up_all 在 eventwait 进程加入等待队列之前就已执行,进程因此永久睡眠。为了修复这个竞态条件,引入了 bool signaled 标志和 wait_event_interruptible 宏,实现了“条件变量”模式。wait_event_interruptible 宏原子地检查条件和进入睡眠,彻底解决了这个Bug。
然而,signaled 标志又引入了新的竞态条件,即 eventclose 和 eventopen 之间对 signaled 标志的重置冲突。为了解决这个问题,将 signaled 升级为 unsigned long gen 计数器,实现了边缘触发。
最大的挑战来自于 eventclose。eventclose 必须唤醒所有等待者,并安全释放槽位。如果 eventclose 立即释放 in_use = false,另一个 eventopen 进程就可能重用这个槽位,并重置它的同步原语(如 gen 计数器)。而此时,被 eventclose 唤醒的旧进程可能才刚被调度,它们在检查 gen 或 closing 标志时,会读到被新进程重置的脏数据,导致再次睡眠或崩溃。这是一个典型的“释放后重用”竞态条件。最终的解决方案是引入了 atomic_t waiters 引用计数器和 wait_queue_head_t destroy_wq 销毁队列。eventwait 在进入时 atomic_inc,退出时 atomic_dec;eventclose 则必须阻塞在 destroy_wq 上,直到 waiters 归零,才被最后一个退出的 eventwait 进程唤醒,并最终安全地释放槽位。压力测试(sync_test5.c)中 eventopen 失败了 160 万次,也证明了这一机制的健壮性。