Rust 基于 Tokio 实现任务管理器

AI摘要
本文是一篇关于使用Rust的Tokio异步运行时构建任务管理器的技术教程,属于知识分享。文章详细介绍了从初始化项目、创建基础TaskManager结构、实现任务添加,到引入TaskHandle包装、实现优雅关机(Graceful Shutdown)机制(分别使用broadcast通道和CancellationToken),以及处理超时控制和竞态条件修复的全过程。内容聚焦于异步编程和并发管理的核心实践。

距离上一篇文章已经是好几个月前的事情了,这几个月好忙好忙。现在终于闲下来了,花了十几个小时写了这篇文章,这真的是体力活,很考验毅力。

这篇文章介绍了如何基于 Tokio 写一个任务管理器,内容涵盖了异步编程、并发编程的核心实践,干货满满当当的。其他废话不多说,开始吧。

初始化项目

在开始之前,你需要先初始化一个项目:

cargo init task_manager_example

Cargo.toml 如下, 只依赖了 tokio 运行时:

[package]
name = "task-manager-example"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.34.0", features = ["full"] }
tokio-util = { version = "0.7.10" }

然后你会得到一个 hello world 的示例程序,src/main.rs 内容如下:

fn main() { 
    println!("Hello, world!"); 
}

编写 Task Manager

千里之行,始于足下。首先,我们来编写 TaskManager 这个最基础的组件:

use std::collections::HashMap;
use tokio::task::JoinHandle;

struct TaskManager {
    pub tasks: HashMap<String, JoinHandle<()>>,
}

TaskManager 就一个字段 HashMap 类型的 tasks, 包含了多个 task

Rust 基于 Tokio 实现任务管理器

JoinHandle 是对 Future 的包装,对其生命周期进行管理,并且提供了跨任务通信机制。

实现 new 方法

接着,我们来实现 new(), 初始化 tasks 字段:

impl TaskManager {
    pub fn new() -> Self {
        Self {
            tasks: HashMap::new(),
        }
    }
}

简单起见,这里暂时没有使用锁,目前也不会有并发问题:TaskManager 目前也只会有一个可变引用,所有操作都会是顺序的。tasks 也不会跨任务共享。

实现 add_task()

上面我们已经初始化了 tasks 这个字段,但它只是一个 HashMap 类型的容器,我们需要通过 add_task() 方法,将任务加入进去,填充它:

impl TaskManager {
    // ...省略...
    pub fn add_task(&mut self, name: String, handle: JoinHandle<()>) {
        self.tasks.insert(name, handle);
    }
}

Passive Manager Pattern

最后,我们来测试这个最基础版本的 TaskManager, 在 main 函数中 add_task 后调用 run 方法:

#[tokio::main]
async fn main() {
    let mut manager = TaskManager::new();

    manager.add_task(
        "task1".to_string(),
        tokio::spawn(async {
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            println!("Task 1 executing...");
        }),
    );

    manager.add_task(
        "task2".to_string(),
        tokio::spawn(async {
            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
            println!("Task 2 executing...");
        }),
    );
    // 你可以添加更多的测试任务

    println!("Starting all tasks...");
    manager.run().await;
    println!("All tasks completed!");
}

猜一下,println! 的输出的顺序是什么?其实是不固定的。

值得一说的是,tokio::spawn(async {}) 执行后,会返回 JoinHandle , 然后 Future 也随即会交给 Tokio 的 Executor 去执行。所以 TaskManager 并没有一个 run 的方法,它采用的是 Passive Manager Pattern (被动管理器模式)。

在 Passive Manager Pattern 下, TaskManager 并不负责 Poll 任务,那是 Tokio Runtime 的事情,它仅仅是持有任务的句柄(JoinHandle),负责任务的生命周期。

在 Tokio 生态下,这是标准的写法。除非你自己去实现一个运行时,才需要采用 Active Manager Pattern(主动管理器模式),写一个 loop 或者 select! , 驱动任务状态的流转。这都是运行时做的事情。

所以,TaskManager也显得非常轻量。

实现 TaskHandle

在 tokio 生态中,一个任务就是 JoinHandle。但是在生产中, JoinHandle 只是最基础的结构,并不能满足我们更多的需求,比如状态管理、优雅关机等。

所以我们需要在 JoinHandle 的基础上,再包装一层,实现 TaskHandle , 核心作用是**实现 Task 完整的生命周期管理,以及实现任务状态的查询和监控。

实现基础结构

根据上面的结构图,我们来新增 TaskHandlestruct

struct TaskHandle {
    join_handle: JoinHandle<()>,
}

然后将 JoinHandle 全部替换为 TaskHandle:

struct TaskManager {
    pub tasks: HashMap<String, TaskHandle>,
}

pub fn add_task(&mut self, name: String, handle: TaskHandle) {}

pub async fn run(self) {
    for (name, handle) in self.tasks {
        match handle.join_handle.await {}  // 这里要改为 `handle.join_handle`
    }
}

manager.add_task(
    "task1".to_string(),
    TaskHandle {    // 在 join_handle 的外面包一层
        join_handle: tokio::spawn(async {}),
    },
);

至此,我们只是用 TaskHandle 简单地替换了 JoinHandle ,并没有实现更多的能力。但是为后面扩展奠定了基础。这在计算机中很常见,也很重要,有一句话是这么说的:

All problems in computer science can be solved by another level of indirection。

所有的问题,如果加一层还不能解决,那就再加上一层(除了性能问题哦~)。当然这也不是真理:

All non-trivial abstractions, to some degree, are leaky.(所有非微不足道的抽象,到最后都是漏洞百出)。

实现 Graceful Shutdown

首先我们来实现优雅停机(Graceful Shutdown), 指的是当接收到 Ctrl + C 信号的时候,不再接收新的任务、新的请求,能够从从容容、游刃有余地的释放资源,停止程序运行。

首先,需要为 TaskHandle 增加一个字段:

use tokio::sync::broadcast;

struct TaskManager {
    tasks: HashMap<String, TaskHandle>,
    shutdown_tx: broadcast::Sender<()>,
}

tokio::sync::broadcast 是一个 MPMC (Multi-Producer, Multi-Consumer:多生产者多消费者)的通道。可以实现消息一对多或者多对多的分发,比如配置更新、群聊以及我们现在要做的 Graceful Shutdown。

TaskManager 中增加了 shutdown_tx , 那么创建 new 方法也要随着更改,如下:

impl TaskManager {
    pub fn new() -> Self {
        // 创建一个容量为 1 的 channel,因为只需要发送一个关闭信号就
        let (shutdown_tx, _) = broadcast::channel::<()>(1);
        Self {
            tasks: HashMap::new(),
            shutdown_tx,
        }
    }
    // 
    // 调用 `.subscribe()` 方法来生成一个接收端,
    // 每一个 task 都需要有一个接收者,来订阅消息
    pub fn get_shutdown_receiver(&self) -> broadcast::Receiver<()> {
        self.shutdown_tx.subscribe()
    }
}

接着我们创建 TaskHandle 的地方,也需要做出变更:

// 获取接收者,然后通过闭包将其 move 到 JoinHandle 的闭包中
let mut shutdown_rx1 = manager.get_shutdown_receiver();

manager.add_task(
    "task1".to_string(),
    tokio::spawn(async move {
        loop {
            tokio::select! {
                // 每秒打印一次 working...
                _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
                    println!("Task 1 working...");
                }
                // 等待收到关闭的消息
                _ = shutdown_rx1.recv() => {
                    println!("Task 1 received shutdown signal, cleaning up...");
                    tokio::time::sleep(
                        tokio::time::Duration::from_millis(200)
                    ).await;
                    println!("Task 1 cleanup complete");
                    break;
                }
            }
        }
    })
);

这里着重解释一下 tokio::select! 这个宏,本质上是 await 多个 Future ,哪一个分支的 Future 准备好了,就执行哪一个,如果都没有准备好,就会出让控制权。

所以,当你看到 tokio::select! 最外层的 loop 死循环的时候,不要感到惊讶。确实,在同步的代码中,死循环会造成 CPU 满负载运行。但是在这里不会,因为 tokio::select! 每次执行后,当前任务会返回 Pending 的状态,交出 CPU 控制权(Yield),所在的线程就会去执行其他任务。只有在操作系统或者定时器再次通知 Tokio 后,才会重新唤醒这个任务,继续下一次迭代。

然后我们再来实现 TaskManagershutdown 方法:

pub async fn shutdown(self) {
    println!("Shutting down all tasks...");

    // 1. 向所有任务发送关闭信号
    println!("Sending global shutdown signal...");
    let _ = self.shutdown_tx.send(());

    // 2. 等待所有任务完成
    for (name, handle) in self.tasks {
        match handle.join_handle.await {
            Ok(_) => println!("Task '{}' shutdown successfully", name),
            Err(e) => eprintln!("Task '{}' shutdown failed: {:?}", name, e),
        }
    }

    println!("All tasks shutdown completed!");
}

最后,在 main 函数的最后,加上如下代码:

// 监听 Ctrl+C 信号
tokio::select! {
    _ = tokio::signal::ctrl_c() => {
        println!("\nReceived Ctrl+C signal!");
    }
    // 如果没有收到 `ctrl+c` 信号,也会在 5 秒之后触发 shutdown
    _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => {
        println!("\nTimeout reached, initiating shutdown...");
    }
}

// 触发优雅关机
manager.shutdown().await;
println!("Program exited gracefully!");

使用 CancellationToken

tokio_util::sync::CancellationToken 是一种协作式取消机制,用于优雅停止异步任务。它可以 clone, 所有 clone 的 token 都会共享一个取消状态(内部通过 AtomicBool 实现),非常轻量。

接下来,我们用它来替换之前使用 broadcast 的实现,其核心 API 如下表所示:

方法 说明 使用场景
new() 创建一个新的根令牌 初始化取消令牌
cancel() 取消令牌及其所有子令牌 触发取消操作
is_cancelled() 同步检查是否已取消 在循环中快速检查状态
cancelled() 返回一个 Future,在令牌被取消时完成 select! 中异步等待取消

我们先将之前代码中的 broadcast 全部删掉,然后使用 CancellationToken 再来实现一次:

struct TaskHandle {
    join_handle: JoinHandle<()>,
    cancellation_token: CancellationToken,
}

你发现我们和之前 broadcast 全局的通知不一样,现在每一个 TaskHandle 都持有一个 CancellationToken ,这样带来的直接好处就是可以单独取消每一个任务。

接着,将其余部分也修改一下:

// 增加 cancellation_token 参数
pub fn add_task(
    &mut self,
    name: String,
    join_handle: JoinHandle<()>,
    cancellation_token: CancellationToken
) {
    self.tasks.insert(name, TaskHandle {
        join_handle,
        cancellation_token,
    });
}

main 函数中,我们需要创建 CancellationToken ,然后传给 add_task 方法:

let token_1 = CancellationToken::new();
let token_1_clone = token_1.clone();
manager.add_task(
    "task1".to_string(),
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token_1_clone.cancelled() => {
                    println!("Task 1 cancelled");
                    break;
            }
            _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
                println!("Task 1 working...");
            }
         }
    }}),
    token_1
);

接着,我们需要在 TaskManager 中实现 Graceful shutdown 的方法:

pub async fn shutdown(self) {
    for handle in self.tasks.values() {
        handle.cancellation_token.cancel();
    }
    // 省略其它
}

最后,在 main 函数的最后,去调用 TaskManager.shutdown() 方法就可以了:

manager.shutdown().await;

上文有说到 CancellationToken 是协作式的,这怎么理解呢?你看在 shutdown 方法中,我们只是通过调用 cancellation_token.cancel 方法发出了一个信号,而是否结束、如何结束正在运行的目标程序是 JoinHandle 中持有的 Future 的逻辑来决定的(在上面,我们 print 了日志,并主动 break )。

Rust 基于 Tokio 实现任务管理器

协作式和强制式不是互斥的,在 Graceful shutdown 这个场景中应该是同时存在的。当 task 关停超时了,就应该要上强制手段了。这在下文中马上就会说明如何实现。

最后,我们再来看看 CancellationTokentokio::sync::boardcast 之间的区别:

特性 CancellationToken broadcast 说明
协作式 都需要任务主动检查
Graceful Shutdown 都支持优雅关闭
克隆成本 低(Arc) 低(订阅) 都很轻量
层级结构 ✅ 支持父子令牌 ❌ 不支持 Token 更适合层级管理
携带数据 ❌ 只传递信号 ✅ 可传递数据 Broadcast 可发送关闭原因
API 简洁性 ✅ 专为取消设计 ⚠️ 需要处理 RecvError Token 更直观
多次触发 ❌ 只能取消一次 ✅ 可发送多条消息 Broadcast 更灵活

双向通信机制实现超时控制

我们先来看之前实现的 shutdown 代码:

pub async fn shutdown(self) {
        // 通知所有的 task 结束运行
    for handle in self.tasks.values() {
        handle.cancellation_token.cancel();
    }
    // 使用 await 来等待 task 结束
    for (name, handle) in self.tasks {
        match handle.join_handle.await {
            Ok(_) => println!("Task '{}' shutdown successfully", name),
            Err(e) => eprintln!("Task '{}' shutdown failed: {:?}", name, e),
        }
    }
}

正常情况下,这个程序没什么问题,但异常情况下呢?task 没有正常的关闭,而是一直运行下去,就会导致程序永远也不会停止。所以,我们要增加超时控制,如果超时就上强制手段。

  1. 主程序调用 mange.shutdown() 方法,下达最后通牒;
  2. shutdown() 方法中,通过 cancellation_token.cancel() 发送信号;
  3. task 收到了 cancel 信号,并正常结束的话,通过 boardcast 广播通知主程序;
  4. 如果主程序在预定的超时时间内,没有收到 task 正常关闭的消息,则强制关闭 task;

一个错误的实现

接下来我们来实现上面说的这个双向通信机制,但这个实现是错误的,存在竞态条件,很经典也很有意思,故而先放一个错误的,然后分析并修复。

第一步:先更改 TaskHandle 结构,增加一个 shutdown_tx字段,在收到 cancel 信号并正常结束后发送给主程序通知:

struct TaskHandle {
    join_handle: JoinHandle<()>,
    cancellation_token: CancellationToken,
    shutdown_tx: broadcast::Sender<()>,
}

第二步:修改 add_task 方法,增加 shutdown_tx 参数,用来构建 TaskHandle :

pub fn add_task(
    &mut self,
    name: String,
    join_handle: JoinHandle<()>,
    cancellation_token: CancellationToken,
    shutdown_tx: broadcast::Sender<()>
) {
    self.tasks.insert(name, TaskHandle {
        join_handle,
        cancellation_token,
        shutdown_tx,
    });
}

第三步:修改 main 函数,调用 add_task 的时候传入 shutdown_tx

let (shutdown_tx_1, _) = broadcast::channel::<()>(1);
let shutdown_tx_1_clone = shutdown_tx_1.clone();
manager.add_task(
    "task1".to_string(),
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token_1_clone.cancelled() => {
                    println!("Task 1 cancelled");
                    // 增加这一行,收到信号后发送通知
                    let _ = shutdown_tx_1_clone.send(());
                    break;
                }
                // 省略这里的代码,不变
            }
        }),
    token_1,
    // 增加这一行,主程序用来订阅消息
    shutdown_tx_1
);

最后:修改 TaskManager::shutdown() 方法,订阅消息并对超时的任务强制退出:

pub async fn shutdown(self) {
        // 对所有 handle 发送通知
    for handle in self.tasks.values() {
        handle.cancellation_token.cancel();
    }
        // 收集所有 handle 的 shutdown_tx
    let task_handles: Vec<_> = self.tasks
        .iter()
        .map(|(name, handle)| (name.clone(), handle.shutdown_tx.clone()))
        .collect();
        // 遍历 shutdown_tx ,如果超时则强制关闭
    for (name, shutdown_tx) in task_handles {
        let mut shutdown_rx = shutdown_tx.subscribe();
        // 判断是否为正常关闭
        let result = tokio::select! {
                _ = shutdown_rx.recv() => {
                    println!("Task shutdown gracefully!");
                    true
                }
                _ = sleep(Duration::from_secs(5)) => {
                    println!("Task shutdown timed out after 5");
                    false
                }
            };
        // 如果超时,则强制结束 handle
        if !result {
            if let Some(handle) = self.tasks.get(&name) {
                handle.join_handle.abort();
            }
        }
    }
    println!("All tasks shutdown completed!");
}

看起来这样实现很完美,编译也能通过,运行也能正常运行。运行后输出如下:

// 省略前面 working... 的输出
Timeout reached, initiating shutdown...
Shutting down all tasks...
Task 2 cancelled
Task 3 cancelled
Task 1 cancelled
Task shutdown gracefully!
Task shutdown timed out after 5
Task shutdown timed out after 5
All tasks shutdown completed!
Program exited gracefully!

你看到了没: Task shutdown timed out after 5 这个输出是错误的,我们的任务不可能会超时 5 秒,这是“伪超时”。具体原因,我们下文详解。

竞态条件分析

我们来看一眼下面的时序图,看这图就很清楚了:

Rust 基于 Tokio 实现任务管理器

我们先来看一下竞态条件(Race Condition) 是什么:

在并发编程中,多个线程或者进程同时访问和操作共享资源,程序的执行结果依赖于线程执行的时序和顺序,导致程序行为不确定或者出现错误的情况。

对比我们上面这个实现,来看看是否符合 Race Condition 的定义:

特征 说明 我们的实现
时序依赖 结果取决于操作的执行顺序 subscribe() 必须在 send() 之前,但无法保证
不确定性 每次运行可能有不同结果 有时能收到信号,有时收不到
并发冲突 多个执行流访问共享状态 TaskManager 和 Task 都在操作 shutdown_tx
难以复现 取决于调度时机 在快速机器上更容易出错

所以这个问题的本质就是, cancel 的信号已经发送了,但是我们都还没有订阅这个消息。

修复竞态条件

问题清楚了,是因为订阅在发送消息之后,那么解决方法也有了,调整程序执行顺序,将订阅放在最前面。修复如下:

pub async fn shutdown(self) {
    // 先订阅
    let mut shutdown_monitors = Vec::new();
    for (name, handle) in &self.tasks {
        shutdown_monitors.push((name.clone(), handle.shutdown_tx.subscribe()));
    }
    // 再取消
    for handle in self.tasks.values() {
        handle.cancellation_token.cancel();
    }
    // 超时则强行关闭
    for (name, mut shutdown_rx) in shutdown_monitors {
        let result =
            tokio::select! {
                _ = shutdown_rx.recv() => {
                    println!("Task '{}' shutdown gracefully!", name);
                    true
                }
                _ = sleep(Duration::from_secs(5)) => {
                    println!("Task '{}' shutdown timed out after 5s", name);
                    false
                }
            };

        if !result && let Some(handle) = self.tasks.get(&name) {
                        handle.join_handle.abort();
        }
    }
}

但是这个程序还是可能会存在问题,不够健壮,join_handle.abort() 这个方法也只是发送出一个信号,是异步的,并不能保证超时的程序立即被中止。存在任务还没有完全中止,而主程序已经退出的可能。所以,我们要在方法的最后 await 所有任务确实已经结束:

for (name, handle) in self.tasks {
    match handle.join_handle.await {
        Ok(_) => println!("Task '{}' shutdown successfully", name),
        Err(e) => eprintln!("Task '{}' shutdown failed: {:?}", name, e),
    }
}

总结

碍于篇幅,我就不继续写下去了,实现一个真正生产级别的 TaskManager 还有很多的问题要考虑,比如支持配置、退避重试、状态机、日志监控等等。有空再继续吧。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 2

图挂了,,

3周前 评论
苏近之 (楼主) 3周前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!