跳转至

操作系统第二次作业

1. 实验内容

本次实验的主要内容是在 Linux 内核中设计并实现一种事件同步屏障原语,允许多个进程在某个事件发生前阻塞,直到其他进程触发该事件。当某个进程触发事件时,所有因该事件阻塞的进程将解除阻塞,如果没有进程被阻塞则不产生任何影响。

具体要求是实现四个新的系统调用:

  • sys_eventopen():用于创建新事件并返回一个全局唯一的事件标识符 (event ID)。
  • sys_eventwait(int event_id):用于使当前进程在指定 event_id 上阻塞(睡眠)。
  • sys_eventsig(int event_id):用于唤醒所有在指定 event_id 上阻塞的进程。
  • sys_eventclose(int event_id):用于销毁事件,并唤醒所有可能仍在该事件上阻塞的进程。

最后,需要编写用户态测试程序,至少覆盖“多进程等待同一事件”和“多进程多事件并发”两种场景。

2. 实验思路

本实验采取分阶段推进的策略,首先搭建和验证环境,然后设计核心数据结构,接着实现系统调用功能,最后编写用户态程序进行测试。

阶段一 环境配置与内核裁剪

  • 环境优化: 为缩短内核编译的迭代时间,首先对编译环境进行优化。
  • 使用 make menuconfig 对内核配置进行裁剪,禁用所有与实验无关的模块(如无线、声卡、蓝牙等非必要驱动)。
  • 安装 ccache 并配置 make CC="ccache gcc",利用编译缓存实现后续的快速迭代。
  • 搭建调用框架:syscall_64.tblsyscalls.hkernel/sys.c 中注册 eventopeneventcloseeventwaiteventsig 四个空的系统调用。
  • 初步验证: 编写一个简单的用户态程序 test_events_skeleton.c 逐一调用这四个“空壳”函数,并通过 dmesg 确认内核打印了对应信息,确保系统调用的“通路”已全部打通。

阶段二:核心数据结构设计

  • 全局管理:kernel/sys.c 中定义一个全局数组 static struct os_event event_table[MAX_EVENTS] 来管理所有事件。event_id 即为该数组的索引。

  • 管程锁(粗粒度): 定义一个全局互斥锁 static DEFINE_MUTEX(event_table_lock)。这把锁用于保护 event_table 数组本身,即 in_use 标志的分配与释放,确保槽位操作的原子性。

  • 事件结构体(细粒度): 为了解决复杂的“释放后重用”(Use-After-Free)竞态条件,设计了包含引用计数和销毁队列的事件结构:

/* 事件结构体 */
struct os_event {
    bool in_use;                   // 是否正在使用
    bool initialized;              // 是否已初始化

    wait_queue_head_t queue;       // 等待队列
    spinlock_t lock;               // 保护 gen/closing 字段

    unsigned long gen;             // 每次当 eventsig 时自增,表示新一轮信号
    bool closing;                  // 表示正在关闭

    atomic_t waiters;              // 当前处于等待队列中的线程数
    wait_queue_head_t destroy_wq;  // 用来等待 waiters==0 的队列
};

阶段三:核心逻辑实现

  • eventopen:获取全局锁 event_table_lock,遍历查找 in_use == false 的槽位。如果槽位是首次使用 (initialized == false),则初始化其 queue, lock, destroy_wqwaiters 计数器。无论如何,都会重置 gen = 0closing = false,然后释放全局锁。
  • eventwait
  • 获取全局锁 event_table_lock,检查 in_use 标志。
  • 在释放全局锁之前,原子地增加引用计数 atomic_inc(&ev->waiters)。这确保 eventclose 在检查 waiters 时,不会漏掉正在进入等待的本进程。
  • 释放全局锁。
  • 获取内部 spinlock,读取当前的 gen 作为快照 mygen
  • 调用 wait_event_interruptible,等待 ev->gen != mygen (被 eventsig 改变) 或 ev->closing == true (被 eventclose 改变)。
  • 唤醒后,调用 atomic_dec_and_test(&ev->waiters)。如果是最后一个退出的等待者 (waiters == 0) 且 eventclose 正在进行 (closing == true),则负责唤醒在 destroy_wq 上睡眠的 eventclose 进程。
  • eventsig:获取内部 spinlock,将 gen 计数器加一,然后释放锁。最后调用 wake_up_all(&ev->queue) 唤醒所有等待者。
  • eventclose
  • 获取内部 spinlock,设置 closing = true
  • 调用 wake_up_all(&ev->queue) 唤醒所有等待者(它们会检查到 closing == true 而退出)。
  • 调用 wait_event(ev->destroy_wq, ...) 在销毁队列上睡眠,直到 waiters 计数器归零。
  • 被唤醒后(由最后一个 eventwait 进程唤醒),获取全局锁 event_table_lock,安全地将 in_use = false,释放该槽位以供 eventopen 重用。

阶段四:编写用户态测试程序

  • 设计三个测试用例来验证功能的完备性和并发正确性:
  • sync_test1.c:多进程等待单事件。用于验证基本的“等待-唤醒”功能。
  • sync_test2.c:多进程等待多事件。用于验证不同事件之间的隔离性。
  • sync_test3.c:验证 eventclose 也能唤醒正在等待的进程。
  • sync_test4.c:验证对同一事件的多次重复等待和唤醒。
  • sync_test5.c:高并发压力测试,验证 eventclose 的引用计数器能正确处理“释放-重用”竞态条件。

3. 实验过程

3.1 内核裁剪与环境配置

为加速内核编译,缩短开发-测试的迭代周期,本次实验对编译环境进行了两项关键优化:内核裁剪与编译缓存。

  1. 使用 make menuconfig 裁剪内核

  2. 在内核源码根目录执行 cp /boot/config-$(uname -r) .config,以一个已知的、可工作的配置为基础。

  3. 运行 make menuconfig 进入图形化配置界面。
  4. 裁剪原则: 实验环境为 VMware 虚拟机,且仅通过 SSH 访问,因此所有物理硬件、非必要的驱动和功能均可禁用。
  5. 主要裁剪项:
    • Networking support --->
    • 禁用 Wireless LANBluetooth subsystem supportNFC subsystem support 等所有无线和非 IP 协议。
    • (保留 TCP/IP networkingEthernet driver support 中的 VMware VMXNET3 驱动以保证 SSH 连通性)。
    • Device Drivers --->
    • 禁用 Sound card support --->
    • 禁用 Multimedia support ---> (摄像头、电视卡等)。
    • 禁用 Graphics support ---> 子菜单中除 VMware VMSVGAVGA text console 之外的所有物理显卡驱动 (NVIDIA, AMD, Intel)。
    • 禁用 USB support ---> (VMware 虚拟机通过 SSH 访问,无需 USB)。
    • 禁用 File systems ---> 子菜单中除 Ext4(或当前根文件系统类型)之外的所有其他文件系统(如 XFS, Btrfs, NTFS)。
  6. 裁剪后保存配置,大幅减少了需要编译的模块数量。

  7. 配置 ccache 编译缓存

  8. ccache (Compiler Cache) 是一个编译加速工具,它能缓存 .c 文件上一次的编译结果(.o 文件)。当下次编译时,若源文件未更改,ccache 会直接返回缓存的结果,跳过实际的 gcc 编译过程。

  9. 安装:

    sudo apt install ccache
    
  10. 扩大缓存空间 : 内核编译会产生大量缓存,默认空间可能不足。

    ccache -M 50G  # 将缓存上限设为 50GB
    
  11. 使用: 在调用 make 时,通过 CC 变量指定 ccache 作为 gcc 的“包装器”。

    make CC="ccache gcc" -j$(nproc)
    
  12. 效果: 首次编译(填充缓存)依然耗时,但在此后修改代码(如 kernel/sys.c)后再次编译,编译链接时间从2小时缩短到2分钟,极大地提高了调试效率。

3.2 搭建系统调用框架

  1. 修改 arch/x86/entry/syscalls/syscall_64.tbl

分配系统调用号。

550 64 eventopen   sys_eventopen
551 64 eventclose  sys_eventclose
552 64 eventwait   sys_eventwait
553 64 eventsig    sys_eventsig
  1. 修改 include/linux/syscalls.h

声明系统调用。

asmlinkage long sys_eventopen(void);
asmlinkage long sys_eventclose(int event_id);
asmlinkage long sys_eventwait(int event_id);
asmlinkage long sys_eventsig(int event_id);
  1. 修改 kernel/sys.c

为系统调用添加空壳实现。

SYSCALL_DEFINE0(eventopen) { printk(KERN_INFO "eventopen call\n"); return 0; }
SYSCALL_DEFINE1(eventclose, int, event_id) { printk(KERN_INFO "eventclose call\n"); return 0; }
SYSCALL_DEFINE1(eventwait, int, event_id) { printk(KERN_INFO "eventwait call\n"); return 0; }
SYSCALL_DEFINE1(eventsig, int, event_id) { printk(KERN_INFO "eventsig call\n"); return 0; }

3.3 核心代码实现

kernel/sys.c 顶部添加头文件和全局定义:

/* 需要包含的头文件 */
#include <linux/wait.h>
#include <linux/mutex.h>
#include <linux/spinlock.h>
#include <linux/errno.h>
#include <linux/atomic.h>
#include <linux/sched.h>
#include <linux/uaccess.h> // (如果需要与用户空间交互)

/* 常量 */
#define MAX_EVENTS 64

/* 事件结构体 */
struct os_event {
    bool in_use;                   // 是否正在使用
    bool initialized;              // 是否已初始化

    wait_queue_head_t queue;       // 等待队列
    spinlock_t lock;               // 保护 gen/closing 字段

    unsigned long gen;             // 每次当 eventsig 时自增,表示新一轮信号
    bool closing;                  // 表示正在关闭

    atomic_t waiters;              // 当前处于等待队列中的线程数
    wait_queue_head_t destroy_wq;  // 用来等待 waiters==0 的队列
};

/* 全局事件表与保护锁 */
static struct os_event event_table[MAX_EVENTS];
static DEFINE_MUTEX(event_table_lock);

3.3.1 sys_eventopen 的实现

eventopen 负责分配一个事件槽位。它使用 event_table_lock 这把粗粒度锁来保证槽位分配的原子性。如果一个槽位是首次被使用 (initialized == false),它会负责初始化所有底层的同步原语(等待队列、自旋锁、原子计数器)。无论如何,它都会重置事件的逻辑状态(genclosing),为新的使用者做好准备。

SYSCALL_DEFINE0(eventopen) {
    int i, id = -1;

    mutex_lock(&event_table_lock);
    for (i = 0; i < MAX_EVENTS; ++i) {
        if (!event_table[i].in_use) {
            event_table[i].in_use = true;
            id = i;
            break;
        }
    }

    if (id != -1) {
        if (!event_table[id].initialized) {
            init_waitqueue_head(&event_table[id].queue);
            spin_lock_init(&event_table[id].lock);
            init_waitqueue_head(&event_table[id].destroy_wq);
            atomic_set(&event_table[id].waiters, 0);
            event_table[id].initialized = true;
        }

        /* 重置逻辑状态为初始 generation(安全:eventclose 已等 waiters==0 再 set in_use=false) */
        spin_lock(&event_table[id].lock);
        event_table[id].gen = 0;
        event_table[id].closing = false;
        spin_unlock(&event_table[id].lock);
    }
    mutex_unlock(&event_table_lock);

    return id != -1 ? id : -ENOSPC;
}

3.3.2 sys_eventwait 的实现

eventwait 是最复杂的设计。

  1. 它首先在全局锁 event_table_lock 的保护下检查事件是否存在,并原子地递增 waiters 引用计数。
  2. 递增计数后,它立即释放全局锁,以允许其他进程(如 eventsig)并发执行。
  3. 它使用内部自旋锁 ev->lock 来安全地读取当前 gen 的快照 mygen
  4. 它调用 wait_event_interruptible 进入睡眠,等待 gen 发生变化(被 eventsig 触发)或 closing 变为 true(被 eventclose 触发)。
  5. 唤醒后,它原子地递减 waiters 计数。如果是最后一个退出者且 closingtrue,它负责唤醒正在 destroy_wq 上等待的 eventclose 进程。
SYSCALL_DEFINE1(eventwait, int, event_id) {
    struct os_event *ev;
    unsigned long flags;
    unsigned long mygen;
    int ret;

    if (event_id < 0 || event_id >= MAX_EVENTS)
        return -EINVAL;

    ev = &event_table[event_id];

    /* 在全局锁下检查是否存在 */
    mutex_lock(&event_table_lock);
    if (!ev->in_use) {
        mutex_unlock(&event_table_lock);
        return -EINVAL;
    }
    /* 进入等待前计数 */
    atomic_inc(&ev->waiters);
    mutex_unlock(&event_table_lock);


    /* 读取 snapshot generation(受 spinlock 保护,避免同时修改) */
    spin_lock_irqsave(&ev->lock, flags);
    mygen = ev->gen;
    spin_unlock_irqrestore(&ev->lock, flags);

    /* 等待直到 generation 发生变化(eventsig)或 closing 被设置 */
    ret = wait_event_interruptible(ev->queue, ({
        bool cond;
        spin_lock_irqsave(&ev->lock, flags);
        cond = (ev->gen != mygen) || ev->closing;
        spin_unlock_irqrestore(&ev->lock, flags);
        cond;
    }));

    /* 退出时递减 waiters;如果这是最后一个且正在 closing,则通知 destroy_wq */
    if (atomic_dec_and_test(&ev->waiters)) {
        spin_lock_irqsave(&ev->lock, flags);
        if (ev->closing)
            wake_up(&ev->destroy_wq);
        spin_unlock_irqrestore(&ev->lock, flags);
    }

    return ret;
}

3.3.3 sys_eventsig 的实现

eventsig 的实现轻量且高效。它不需要获取全局锁,只在内部 spinlock 的保护下将 gen 计数器加一,然后调用 wake_up_all 唤醒所有在 queue 上等待的进程。

SYSCALL_DEFINE1(eventsig, int, event_id) {
    struct os_event *ev;
    unsigned long flags;

    if (event_id < 0 || event_id >= MAX_EVENTS)
        return -EINVAL;

    ev = &event_table[event_id];

    mutex_lock(&event_table_lock);
    if (!ev->in_use) {
        mutex_unlock(&event_table_lock);
        return -EINVAL;
    }
    mutex_unlock(&event_table_lock);

    /* 增加 generation(受锁保护),然后 wake_up_all */
    spin_lock_irqsave(&ev->lock, flags);
    ev->gen++;   /* 每次 signal 都自增 generation */
    spin_unlock_irqrestore(&ev->lock, flags);

    wake_up_all(&ev->queue);
    return 0;
}

3.3.4 sys_eventclose 的实现

eventclose 是确保“释放后重用”安全的关键。

  1. 它首先设置 closing = true 并唤醒所有等待者。
  2. 然后,它不会立即释放槽位,而是调用 wait_eventdestroy_wq 上睡眠。
  3. 它会一直在此睡眠,直到最后一个 eventwait 进程退出并唤醒它。
  4. 被唤醒后,eventclose 才获取全局锁 event_table_lock,并安全地将 in_use = false,此时槽位才被真正释放。
SYSCALL_DEFINE1(eventclose, int, event_id) {
    struct os_event *ev;
    unsigned long flags;

    if (event_id < 0 || event_id >= MAX_EVENTS)
        return -EINVAL;

    ev = &event_table[event_id];

    /* 检查并标记为正在关闭(不立即清 in_use) */
    mutex_lock(&event_table_lock);
    if (!ev->in_use) {
        mutex_unlock(&event_table_lock);
        return -EINVAL;
    }
    mutex_unlock(&event_table_lock);

    /* 标记 closing 并唤醒所有等待者 */
    spin_lock_irqsave(&ev->lock, flags);
    ev->closing = true;
    spin_unlock_irqrestore(&ev->lock, flags);

    wake_up_all(&ev->queue);

    /* 等待所有等待者离开(waiters==0),eventwait 在最后会 wake_up destroy_wq */
    wait_event(ev->destroy_wq, atomic_read(&ev->waiters) == 0);

    /* 现在没有任何线程依赖此槽位,可以安全释放 in_use */
    mutex_lock(&event_table_lock);
    ev->in_use = false;
    ev->closing = false; /* 复位 closing(optional: 保留 initialized) */
    mutex_unlock(&event_table_lock);

    return 0;
}

3.4 用户态测试程序

为了验证上述内核实现的正确性和健壮性,设计了以下五个测试程序。

3.4.1 测试一:多进程等待单事件 (sync_test1.c)

此测试用于验证基本功能:多个子进程(5个)同时等待同一个事件,父进程在 sleep(3) 后调用 eventsig,验证所有子进程是否能被同时唤醒。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>

#define SYS_eventopen 550
#define sys_eventclose 551
#define sys_eventwait 552
#define sys_eventsig 553

#define NUM_CHILDREN 5

int main() {
    long event_id;
    int i;
    pid_t pid;

    printf("(2723)[Parent] 正在创建新事件...\n");
    event_id = syscall(SYS_eventopen);
    if (event_id < 0) {
        perror("syscall(eventopen) 失败");
        return 1;
    }
    printf("(2723)[Parent] 事件创建成功, ID: %ld\n", event_id);

    for (i = 0; i < NUM_CHILDREN; i++) {
        pid = fork();
        if (pid < 0) {
            perror("fork 失败");
            return 1;
        }
        if (pid == 0) {
            printf("(2723)[Child %d] 正在等待事件 %ld...\n", getpid(), event_id);
            long ret = syscall(sys_eventwait, event_id);
            if (ret == 0) {
                printf("(2723)[Child %d] 成功从事件 %ld 唤醒!\n", getpid(), event_id);
            } else {
                perror("(2723)[Child] eventwait 失败\n");
            }
            exit(0);
        }
    }

    printf("(2723)[parent] 等待3秒,确保所有子进程都已进入睡眠...\n");
    sleep(3);

    printf("(2723)[Parent] 正在发送信号(eventsig)到事件 %ld...\n", event_id);
    syscall(sys_eventsig, event_id);

    for (i = 0; i < NUM_CHILDREN; i++) {
        wait(NULL);
    }
    printf("(2723)[Parent] 所有子进程已退出。\n");

    syscall(sys_eventclose, event_id);
    printf("(2723)[Parent] 事件 %ld 已关闭。\n", event_id);

    return 0;
}

3.4.2 测试二:多进程等待多事件 (sync_test2.c)

此测试用于验证事件的隔离性。创建三个子进程,分别等待三个不同的事件 (A, B, C)。父进程依次唤醒 B、C、A,验证每次 eventsig 只唤醒了对应事件上的进程。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>

#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553

void wait_for_event(int event_id, const char* event_name) {
    printf("(2723)[Child %d] 正在等待事件 %s (ID: %d)...\n", getpid(), event_name, event_id);
    long ret = syscall(SYS_eventwait, event_id);
    if (ret == 0) {
        printf("(2723)[Child %d] 成功从事件 %s (ID: %d) 唤醒!\n", getpid(), event_name, event_id);
    } else {
        perror("(2723)[Child] eventwait 失败");
    }
    exit(0);
}

int main() {
    long event_A, event_B, event_C;
    event_A = syscall(SYS_eventopen);
    event_B = syscall(SYS_eventopen);
    event_C = syscall(SYS_eventopen);
    printf("(2723)[Parent] 创建了三个事件: A=%ld, B=%ld, C=%ld", event_A, event_B, event_C);
    if (fork() == 0) {
        wait_for_event(event_A, "A");
    }
    if (fork() == 0) {
        wait_for_event(event_B, "B");
    }
    if (fork() == 0) {
        wait_for_event(event_C, "C");
    }
    printf("(2723)[Parent] 等待3秒,确保所有子进程进入睡眠...\n");
    sleep(3);
    printf("(2723)[Parent] 正在发送信号(eventsig)到事件B(ID: %ld)...\n", event_B);
    syscall(SYS_eventsig, event_B);
    sleep(1);
    printf("(2723)[Parent] ---------------------------------------\n");
    printf("(2723)[Parent] 正在发送信号(eventsig)到事件C(ID: %ld)...\n", event_C);
    syscall(SYS_eventsig, event_C);
    sleep(1);
    printf("(2723)[Parent] ---------------------------------------\n");
    printf("(2723)[Parent] 正在发送信号(eventsig)到事件A(ID: %ld)...\n", event_A);
    syscall(SYS_eventsig, event_A);

    wait(NULL);
    wait(NULL);
    wait(NULL);
    printf("(2723)[Parent] 所有子进程已退出。\n");

    syscall(SYS_eventclose, event_A);
    syscall(SYS_eventclose, event_B);
    syscall(SYS_eventclose, event_C);
    printf("(2723)[Parent] 所有事件已关闭。\n");

    return 0;
}

3.4.3 测试三:eventclose 唤醒测试 (sync_test3.c)

此测试用于验证 eventclose 的唤醒功能。父进程不调用 eventsig,而是直接调用 eventclose,验证所有正在等待的子进程是否被正确唤醒(它们会检查到 closing == true)。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>

#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553

#define NUM_CHILDREN 3

int main() {
    long event_id;
    int i;
    pid_t pid;

    printf("(2723)[Parent] 正在创建新事件...\n");
    event_id = syscall(SYS_eventopen);
    if (event_id < 0) {
        perror("syscall(eventopen) failed.\n");
        return 1;
    }
    printf("(2723)[Parent] 事件创建成功,ID: %ld\n", event_id);

    for (i = 0; i < NUM_CHILDREN; i++) {
        if (fork() == 0) {
            printf("(2723)[Child %d] 正在等待事件 %ld...\n", getpid(), event_id);
            long ret = syscall(SYS_eventwait, event_id);
            if (ret == 0) {
                printf("(2723)[Child %d] 成功从事件 %ld 唤醒 (被close唤醒?)\n", getpid(), event_id);
            } else {
                printf("(2723)[Child %d] eventwait 返回 %ld, 可能是被 close 中断\n", getpid(), ret);
            }
            exit(0);
        }
    }

    printf("(2723)[Parent] 等待3秒,确保所有子进程都已进入睡眠...\n");
    sleep(3);

    printf("(2723)[Parent] 正在关闭(eventclose)事件 %ld 来唤醒子进程...\n", event_id);
    syscall(SYS_eventclose, event_id);

    for (i = 0; i < NUM_CHILDREN; i++) {
        wait(NULL);
    }
    printf("(2723)[Parent] 所有子进程已退出。\n");
    printf("(2723)[Parent] 测试完成。\n");

    return 0;
}

3.4.4 测试四:边缘触发测试 (sync_test4.c)

此测试用于验证 gen 计数器实现的“边缘触发”特性。子进程在一个循环中调用 eventwait 5次;父进程也必须在循环中调用 eventsig 5次,每次 eventsig 递增 gen,才能让子进程完成一次循环。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>

#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553

#define LOOP_COUNT 5

int main()
{
    long event_id;
    int i;
    pid_t pid;

    printf("(2723)[Parent] 正在创建新事件...\n");
    event_id = syscall(SYS_eventopen);
    printf("(2723)[Parent] 事件创建成功,ID: %ld\n", event_id);

    pid = fork();
    if (pid == 0) {
        // --- 子进程逻辑 ---
        for (i = 0; i < LOOP_COUNT; i++) {
            printf("(2723)[Child %d] 正在等待事件 %ld (第 %d/%d 次)\n",
                   getpid(), event_id, i + 1, LOOP_COUNT);

            long ret = syscall(SYS_eventwait, event_id); // 等待

            if (ret != 0) {
                perror("(2723)[Child] eventwait 失败");
                exit(1);
            }
            printf("(2723)[Child %d] 成功从事件 %ld 唤醒 (第 %d/%d 次)\n",
                   getpid(), event_id, i + 1, LOOP_COUNT);
            // 唤醒后不退出,而是循环回去再次等待
        }
        printf("(2723)[Child %d] 循环完成,正在退出。\n", getpid());
        exit(0);
        // --- 子进程结束 ---
    }

    // --- 父进程逻辑 ---
    printf("(2723)[Parent] 等待 2 秒让子进程进入睡眠...\n");
    sleep(2);

    for (i = 0; i < LOOP_COUNT; i++) {
        printf("(2723)[Parent] 正在发送信号 (eventsig) 到事件 %ld (第 %d/%d 次)\n",
               event_id, i + 1, LOOP_COUNT);

        syscall(SYS_eventsig, event_id); // 每次循环都发一个信号

        sleep(1); // 给子进程一点时间去循环并再次睡眠
    }

    wait(NULL); // 等待子进程退出
    printf("(2723)[Parent] 子进程已退出。\n");

    syscall(SYS_eventclose, event_id);
    printf("(2723)[Parent] 事件 %ld 已关闭。\n", event_id);

    return 0;
}

3.4.5 测试五:释放-重用压力测试 (sync_test5.c)

此测试用于验证 atomic_t waiters 引用计数和 destroy_wq 机制的健壮性。

  1. 首先填满全部 64 个事件槽位。
  2. 启动一个“慢等待者”线程 (slow_waiter) 在 ID: 0 上睡眠。
  3. 启动一个“关闭者”线程 (closer) 调用 eventclose(0)
  4. closer 会阻塞在 destroy_wq 上,等待 waiters 归零。
  5. 主线程 (main) 此时疯狂循环调用 eventopen
  6. 预期 main 线程会一直失败 (返回 -ENOSPC),直到 slow_waiter 退出、closer 返回、ID: 0in_use 最终被设为 false 后,eventopen 才能成功。
  7. 此测试使用 pthread 库,编译时需链接 -lpthread
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#include <errno.h>
#include <pthread.h>

#define SYS_eventopen 550
#define SYS_eventclose 551
#define SYS_eventwait 552
#define SYS_eventsig 553

// 你的内核中的 MAX_EVENTS
#define KERNEL_MAX_EVENTS 64 

long event_id_to_test = 0; // 我们将专门测试 ID 0

// 线程 1:缓慢的等待者
void* slow_waiter(void* arg) {
    printf("[Waiter] 线程启动,准备等待事件 %ld\n", event_id_to_test);
    syscall(SYS_eventwait, event_id_to_test);

    printf("[Waiter] 刚刚被唤醒 (被 close 唤醒)。\n");
    printf("[Waiter] 模拟一些清理工作... (睡眠 2 秒)\n");
    sleep(2); // 模拟在退出 eventwait 后仍持有锁或资源

    printf("[Waiter] 清理工作完成,线程退出。\n");
    return NULL;
}

// 线程 2:关闭者
void* closer(void* arg) {
    // 等待 waiter 先进入睡眠
    sleep(1); 

    printf("(2723)[Closer] 线程启动,准备关闭事件 %ld\n", event_id_to_test);
    long ret = syscall(SYS_eventclose, event_id_to_test);

    printf("(2723)[Closer] eventclose 调用返回 (ret=%ld)。这应该在 Waiter 退出后才发生。\n", ret);
    return NULL;
}


int main()
{
    pthread_t waiter_tid, closer_tid;
    long event_ids[KERNEL_MAX_EVENTS];
    int i;

    // --- 步骤 1: 填满所有事件槽位 ---
    printf("(2723)[Main] 正在填满所有 %d 个事件槽位...\n", KERNEL_MAX_EVENTS);
    for (i = 0; i < KERNEL_MAX_EVENTS; i++) {
        event_ids[i] = syscall(SYS_eventopen);
        if (event_ids[i] < 0) {
            perror("(2723)[Main] eventopen 失败,无法填满 table");
            return 1;
        }
    }
    printf("(2723)[Main] 所有 %d 个槽位已占满 (0 到 %d)。\n", KERNEL_MAX_EVENTS, KERNEL_MAX_EVENTS - 1);

    // 我们选择第一个事件 ID (即 0) 来进行测试
    event_id_to_test = event_ids[0];

    // --- 步骤 2 & 3: 启动 Waiter 和 Closer ---
    pthread_create(&waiter_tid, NULL, slow_waiter, NULL);
    pthread_create(&closer_tid, NULL, closer, NULL);

    // 等待 Closer 线程启动 (Closer 自己会 sleep 1s, 所以 0.1s 足够)
    usleep(100000); // 0.1 秒

    // --- 步骤 4: 主线程 (Churner) 开始疯狂尝试重用 ---
    printf("(2723)[Main] Closer 已启动。Main 线程开始循环尝试 eventopen...\n");

    long ret = -1;
    int attempts = 0;
    while(ret < 0) {
        attempts++;

        // 尝试打开新事件。我们希望它失败,直到 closer 线程完成。
        ret = syscall(SYS_eventopen);

        if (ret < 0) {
            // -ENOSPC (或 -ENOMEM) 是我们期望的
            // 这证明 eventclose 仍在阻塞,event_table[0].in_use 仍为 true
            if (attempts % 500000 == 0) { // 调高打印频率
                printf("(2723)[Main] 第 %d 次 eventopen 失败 (errno: %d)。这是正常的。\n", attempts, errno);
            }
        } else {
            // 成功了!
            printf("\n(2723)[Main] *** 成功在 %d 次尝试后打开了新事件 ID %ld ***\n", attempts, ret);
        }
    }

    printf("(2723)[Main] 这应该在 Closer 和 Waiter 都退出后才发生。\n");

    // --- 清理 ---
    pthread_join(waiter_tid, NULL);
    pthread_join(closer_tid, NULL);

    // 关闭我们新打开的事件
    syscall(SYS_eventclose, ret);

    // 关闭其他所有占位的事件 (ID 1 到 63)
    for (i = 1; i < KERNEL_MAX_EVENTS; i++) {
        syscall(SYS_eventclose, event_ids[i]);
    }

    printf("(2723)[Main] 测试完成。\n");
    return 0;
}

4. 实验结果

以下是五个测试程序的实际运行输出结果。

4.1 测试一:多进程等待单事件

测试 sync_test1.c,父进程 eventsig 成功唤醒了所有 5 个正在等待的子进程。

(2723)[Parent] 正在创建新事件...
(2723)[Parent] 事件创建成功, ID: 0
(2723)[Child 3070] 正在等待事件 0...
(2723)[Child 3071] 正在等待事件 0...
(2723)[parent] 等待3秒,确保所有子进程都已进入睡眠...
(2723)[Child 3073] 正在等待事件 0...
(2723)[Child 3072] 正在等待事件 0...
(2723)[Child 3074] 正在等待事件 0...
(2723)[Parent] 正在发送信号(eventsig)到事件 0...
(2723)[Child 3074] 成功从事件 0 唤醒!
(2723)[Child 3071] 成功从事件 0 唤醒!
(2723)[Child 3072] 成功从事件 0 唤醒!
(2723)[Child 3073] 成功从事件 0 唤醒!
(2723)[Child 3070] 成功从事件 0 唤醒!
(2723)[Parent] 所有子进程已退出。
(2723)[Parent] 事件 0 已关闭。

4.2 测试二:多进程等待多事件

测试 sync_test2.c,父进程对 B、C、A 的 eventsig 调用,精确地、依次唤醒了各自事件上的等待者,证明了事件的隔离性。

(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Child 3143] 正在等待事件 A (ID: 0)...
(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Child 3144] 正在等待事件 B (ID: 1)...
(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Parent] 等待3秒,确保所有子进程进入睡眠...
(2723)[Parent] 创建了三个事件: A=0, B=1, C=2(2723)[Child 3145] 正在等待事件 C (ID: 2)...
(2723)[Parent] 正在发送信号(eventsig)到事件B(ID: 1)...
(2723)[Child 3144] 成功从事件 B (ID: 1) 唤醒!
(2723)[Parent] ---------------------------------------
(2723)[Parent] 正在发送信号(eventsig)到事件C(ID: 2)...
(2723)[Child 3145] 成功从事件 C (ID: 2) 唤醒!
(2723)[Parent] ---------------------------------------
(2723)[Parent] 正在发送信号(eventsig)到事件A(ID: 0)...
(2723)[Child 3143] 成功从事件 A (ID: 0) 唤醒!
(2723)[Parent] 所有子进程已退出。
(2723)[Parent] 所有事件已关闭。

4.3 测试三:eventclose 唤醒测试

测试 sync_test3.c,父进程直接调用 eventclose,所有正在等待的子进程被成功唤醒,证明 eventclose 满足了“通知所有被阻塞进程”的要求。

(2723)[Parent] 正在创建新事件...
(2723)[Parent] 事件创建成功,ID: 0
(2723)[Child 3236] 正在等待事件 0...
(2723)[Parent] 等待3秒,确保所有子进程都已进入睡眠...
(2723)[Child 3237] 正在等待事件 0...
(2723)[Child 3238] 正在等待事件 0...
(2723)[Parent] 正在关闭(eventclose)事件 0 来唤醒子进程...
(2723)[Child 3236] 成功从事件 0 唤醒 (被close唤醒?)
(2723)[Child 3237] 成功从事件 0 唤醒 (被close唤醒?)
(2723)[Child 3238] 成功从事件 0 唤醒 (被close唤醒?)
(2723)[Parent] 所有子进程已退出。
(2723)[Parent] 测试完成。

4.4 测试四:边缘触发测试

测试 sync_test4.c,父进程的 5 次 eventsig 调用,对应了子进程的 5 次 eventwait 唤醒和循环,证明了 gen 计数器实现的边缘触发逻辑正确。

(2723)[Parent] 正在创建新事件...
(2723)[Parent] 事件创建成功,ID: 0
(2723)[Parent] 等待 2 秒让子进程进入睡眠...
(2723)[Child 3298] 正在等待事件 0 (第 1/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 1/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 1/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 2/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 2/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 2/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 3/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 3/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 3/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 4/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 4/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 4/5 次)
(2723)[Child 3298] 正在等待事件 0 (第 5/5 次)
(2723)[Parent] 正在发送信号 (eventsig) 到事件 0 (第 5/5 次)
(2723)[Child 3298] 成功从事件 0 唤醒 (第 5/5 次)
(2723)[Child 3298] 循环完成,正在退出。
(2723)[Parent] 子进程已退出。
(2723)[Parent] 事件 0 已关闭。

4.5 测试五:释放-重用压力测试

测试 sync_test5.c,主线程在 2 秒多的时间内 eventopen 失败了超过 160 万次,直到 slow_waiter 线程退出、closer 线程返回后,eventopen 才成功拿到 ID: 0。这证明了 atomic_t waitersdestroy_wq 机制成功阻塞了 eventclose,避免了“释放后重用”的竞态条件。

(2723)[Main] 正在填满所有 64 个事件槽位...
(2723)[Main] 所有 64 个槽位已占满 (0 到 63)。
[Waiter] 线程启动,准备等待事件 0
(2723)[Main] Closer 已启动。Main 线程开始循环尝试 eventopen...
(2723)[Main] 第 500000 次 eventopen 失败 (errno: 28)。这是正常的。
(2723)[Main] 第 1000000 次 eventopen 失败 (errno: 28)。这是正常的。
(2723)[Main] 第 1500000 次 eventopen 失败 (errno: 28)。这是正常的。
(2723)[Closer] 线程启动,准备关闭事件 0
[Waiter] 刚刚被唤醒 (被 close 唤醒)。
[Waiter] 模拟一些清理工作... (睡眠 2 秒)
(2723)[Closer] eventclose 调用返回 (ret=0)。这应该在 Waiter 退出后才发生。

(2723)[Main] *** 成功在 1609364 次尝试后打开了新事件 ID 0 ***
(2723)[Main] 这应该在 Closer 和 Waiter 都退出后才发生。
[Waiter] 清理工作完成,线程退出。
(2723)[Main] 测试完成。

5. 实验总结与反思

本次实验在实现进程同步原语的过程中,遇到了多个在课上讨论过的并发问题,这些调试经历是本次实验最大的收获。

最初的探索是关于内核编译的。为了加速开发迭代,本次实验尝试了 make localmodconfigmake menuconfig 来裁剪内核。但过于激进的裁剪(如禁用了 VMware PVSCSI 驱动)导致了 (initramfs) 启动失败。通过 GRUB 菜单切换回 ...-generic 内核才得以恢复,这让我深刻理解到内核对底层硬件驱动的依赖性。最终,通过 ccache 编译缓存和针对性的 menuconfig(仅禁用非必要外设,保留核心驱动)相结合,实现了开发效率的平衡。

在实现系统调用逻辑时,第一个挑战是“丢失的唤醒” (Lost Wakeup)。最初的 eventwait 设计是基于 schedule() 的简单睡眠,这导致 eventsigwake_up_alleventwait 进程加入等待队列之前就已执行,进程因此永久睡眠。为了修复这个竞态条件,引入了 bool signaled 标志和 wait_event_interruptible 宏,实现了“条件变量”模式。wait_event_interruptible 宏原子地检查条件和进入睡眠,彻底解决了这个Bug。

然而,signaled 标志又引入了新的竞态条件,即 eventcloseeventopen 之间对 signaled 标志的重置冲突。为了解决这个问题,将 signaled 升级为 unsigned long gen 计数器,实现了边缘触发。

最大的挑战来自于 eventcloseeventclose 必须唤醒所有等待者,并安全释放槽位。如果 eventclose 立即释放 in_use = false,另一个 eventopen 进程就可能重用这个槽位,并重置它的同步原语(如 gen 计数器)。而此时,被 eventclose 唤醒的旧进程可能才刚被调度,它们在检查 genclosing 标志时,会读到被新进程重置的脏数据,导致再次睡眠或崩溃。这是一个典型的“释放后重用”竞态条件。最终的解决方案是引入了 atomic_t waiters 引用计数器和 wait_queue_head_t destroy_wq 销毁队列。eventwait 在进入时 atomic_inc,退出时 atomic_deceventclose 则必须阻塞在 destroy_wq 上,直到 waiters 归零,才被最后一个退出的 eventwait 进程唤醒,并最终安全地释放槽位。压力测试(sync_test5.c)中 eventopen 失败了 160 万次,也证明了这一机制的健壮性。