大家好啊,我是说的异步。今天来给大家运行时的东西啊。
术语表
Task: 异步任务。可能是 goroutine/coroutine/future/promise 等等。
Runtime: 调度 Task 的异步运行时环境。
OS 线程:由操作系统调用的线程,通常由 pthread 等库创建。
让 Task 运行起来
这一步相当简单。由于 Task 本身就是可执行的代码,使用上下文切换(有栈协程)/函数调用(无栈协程)等方式将控制权转移给 Task 就可以完成工作了。
1 | * |
Task 要能产生新的 Task
相当常用的需求,比如我们正在写一个 HTTP 服务,我们肯定希望给每个连接新建一个 Task。
我们新增一个 task 队列,代表需要运行的所有 task。
1 | * |
spawn 时,向队列中推入一个 task 即可。yield 时,从队头获取并将控制权转移给下一个 task,将当前 task 推入队列尾部。
简陋的运行时
现在,我们已经有一个迫真运行时了。让我们来看看它能做到哪些东西吧。
新建 Task
调度 Task
手动让出控制权
但是它还是有一个相当致命的缺点:
- 执行阻塞操作(比如 socket 上的读操作)时,整个运行时会被阻塞在阻塞操作里。
如果运行时读写都会阻塞,相当于没有并发能力,也就等同于编程比同步程序复杂但是啥好处也没有的异步飞舞。接下来我们将尝试改进这一点。
避免阻塞实现并发
在运行时中避免阻塞操作的方法主要有以下几种:
调用可能阻塞的接口前检测接口是否会阻塞,如果接口会阻塞则直接 yield 转让控制权。典型的例子是在对 socket accept 之前用 select 检测其是否可读。
将接口设置为 non-blocking 模式,调用后检查返回值。如果发现读取失败则 yield 后重试。典型的例子是在设置为 NONBLOCK 的 socket 上读写后检查 errno。
摆烂,把阻塞操作整个扔到另一个线程里执行。典型的例子是著名的 tokio 运行时使用独立的线程来完成磁盘文件 IO 操作。
接下来我们举个例子。
一个包装标准库的例子
我们有时会用到这个系统调用:
*
1 | ssize_t read(int fd, void buf[.count], size_t count); |
包装一下:
*
*
*
*
*
*
*
*
*
*
1 | ssize_t async_read(int fd, void buf, size_t count) while (errno == EAGAIN || errno == EWOULDBLOCK); return result;} |
1 |
好耶,现在它会在没有数据可读时自动切换至另一个 Task 运行,避免了阻塞的问题。
简陋,但是能并发的运行时
假设我们用类似的技巧把所有可能阻塞的函数给包装了一遍,来看看我们的运行时现在能做些什么吧:
新建 Task
调度 Task
手动让出控制权
在操作会阻塞时自动让出控制权
看起来相当完美,但是……
如果所有的 Task 全部会阻塞,那么运行时会反复遍历所有 Task,变成 CPU 吞噬者。
假设任务队列长为 N,最坏情况下当某一个 Task,假设取名为 T,由阻塞变为就绪后,运行时需要依次运行 N - 1 个 Task 后才能轮到 T 执行。但是理想情况下当 T 就绪时,它应该尽快执行。虽然在只有几个 Task 时这看起来有点无关紧要,但是想象一下如果我们有 114514 个 Task 在队列里……
事件循环
如果我们能够有下面这种事件循环:
Task 将会阻塞时,将自己休眠并且把自己因阻塞而无法获取的资源提交到某个事件循环中
事件循环会监控 Task 请求的资源,并且在资源就绪时将对应的 Task 唤醒
很多 Task 共用一个事件循环来减少开销
那么所有问题都解决了!
相当可用的运行时设计
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
1 | digraph }" ] pending_tasks [ rankdir = LR, shape = record, label = "}" ] } event_loop -> pending_tasks event_loop -> resources [ label = "监控" ] ready_tasks:t1 -> current_task [ label = "yield" ]; pending_tasks:t1 -> ready_tasks:add [ label = "唤醒" ]} |
1 |
总结一下
一个能用的异步系统,需要哪些东西?
编程语言中有类似 Task 的概念:由编译器,或者强力的代码生成器支持。
执行器:运行当前活跃的 Task,由运行时支持。
非阻塞的标准库:在 Task 的资源请求可能阻塞时,使用相关设施将其挂起。由运行时支持。
事件循环:挂起和唤醒被阻塞的 Task,由运行时支持。
还能做哪些工作呢?
加入多核喵,使用线程池喵,并行谢谢喵
使用大家最喜欢的 work stealing 机制
实现避免笨蛋程序员阻塞异步系统的 hand off 机制
总之是在多核下新增的一堆复杂小工作
多核下的任务队列
以 Go 语言经典的 GPM 架构为例:
G: Goroutine
P: Processor,逻辑处理器
M: OS 线程
1
digraph subgraph subgraph G4 -> G1 -> P1 G5 -> G2 -> P2 G3 -> P3 P1 -> M1 P2 -> M2 P3 -> M3 M4}
1
本地队列
为什么不采用单一全局任务队列(即老式的 GM 架构),而要采用每个处理器一个局部队列 + 全局队列的模式:
增加局部性
减少多线程操作单一全局队列带来的锁争用开销
为什么不限制 M 的数量而是限制 P 的数量:
被长时间运行的 Task 阻塞时,P 可以分配新的 M 继续运行
M 的数量会随着阻塞情况动态增减
任务偷取机制
当本地队列为空时,不是销毁当前线程,而是试着从别的线程的本地队列/全局队列中偷取任务。
获取 Task 的流程如下 :
从本地队列中获取 Task
从全局队列中获取 Task,数量为全局队列当前任务数量的一半
从事件循环中获取就绪的 Task
从其他线程的本地队列中获取 Task,数量为其他线程的本地队列当前任务数量的一半
由于 Rust 的异步运行时基本抄的 Go,所以流程也大同小异
hand off 机制
当线程被长时间运行 Task 阻塞时所发生的事情
当检测到长时间运行的 Task 时,运行时会从线程池里弄一个新线程来运行本地队列中剩余的所有 Task。长时间运行的 Task 结束后,当前线程会被重新加入线程池。
效果类似调用了 tokio 中的 tokio::block_on 函数,只不过一个是手动的一个是自动的。
对这个机制,Go 语言是支持的。Rust 里 smol 和 async-std 都支持,但是 tokio 不支持,不知道为什么。
async-std 实现这个机制后写的博客:https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/