并行任务的几种实现方式
批量执行同样的任务是一种很常见的编程模式,比如处理大量图片、爬取网站的所有网页、向数据库批量保存信息等。 下面总结一些实现没有数据依赖任务的批量执行和限流的编程手段,并给出一些对比和选择考量。
Setup
本文主要关注代码质量(可读性、可维护性),并不尝试提供一个完整的性能评估。 真实的系统性能因业务场景而异,很难用一个简单例子得出结论。 本文的例子以包含少量计算和大量 IO 的场景为例,单个任务的流程为 计算-IO-计算-IO-计算 。 以 Python 为例,每次计算耗时 2-3 ms ,每次 IO 用 sleep 模拟,耗时 100ms 。 假设机器有八个处理器核心,需要处理 600 条数据,且有外部 IO 限制导致并发最高为 300 。
下面是每个语言中任务的具体实现。
Python
def gcd(x, y):
while y:
x, y = y, x % y
return x
def compute():
assert all([gcd(9 * i + 4, 2 * i + 1) == 1 for i in range(10000)])
def task():
compute()
time.sleep(0.1)
compute()
time.sleep(0.1)
compute()
NodeJS
const sleep = (timeMs) => new Promise((res) => setTimeout(res, timeMs));
const gcd = (x, y) => {
while (y) {
t = x % y;
x = y;
y = t;
}
return x;
};
const compute = () => {
assert(
[...Array(10000).keys()]
.map((i) => gcd(9 * i + 4, 2 * i + 1) == 1)
.every((x) => x)
);
};
const task = async () => {
compute();
await sleep(100);
compute();
await sleep(100);
compute();
};
Rust
fn gcd(_x: i64, _y: i64) -> i64 {
let (mut x, mut y) = (_x, _y);
while y != 0 {
(x, y) = (y, x % y)
}
return x;
}
fn compute() {
assert!((0..10000)
.map(|i| gcd(9 * i + 4, 2 * i + 1) == 1)
.all(|x| x))
}
async fn task() {
compute();
task::sleep(Duration::from_micros(100)).await;
compute();
task::sleep(Duration::from_micros(100)).await;
compute();
}
Python 多线程
对多线程和多进程场景, Python 提供了对应的线程池/进程池工具,可以直接调用 map 函数。 我们可以方便的通过线程池大小限制最大并发数。
from task import task
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
executor.map(task, range(600))
python src/python-thread.py 6.44s user 0.13s system 99% cpu 6.603 total
Python 多进程
多进程场景, worker 数太大会造成过多的 overhead,一个适中的结果:
from task import task
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor(max_workers=128) as executor:
executor.map(task, range(600))
python src/python-process.py 12.12s user 0.31s system 862% cpu 1.441 total
Python Async
对 Python 的异步函数,可以通过信号量来限制其并发:
from task import async_task
import asyncio
sem = asyncio.Semaphore(300)
async def run_task():
async with sem:
await async_task()
async def main():
await asyncio.gather(*[run_task() for _ in range(600)])
asyncio.run(main())
python src/python-async-sem.py 6.33s user 0.01s system 99% cpu 6.341 total
也可以通过循环来限制,写法跟下面的 Ray 相同。
Python Ray
Ray 的多线程和协程 Actor 和之前的计算模型重复了,不再尝试。 Ray 的精髓在于通过比较简洁(类似协程)的语法,可以实现对多进程甚至分布式的计算任务调度。 下面的写法是 Ray 官方推荐 的限制进程并发的编程模式。
import ray
import time
from task import task
ray.init()
@ray.remote
def run():
task()
start = time.time()
futures = []
concurrency = 300
for _ in range(600):
futures.append(run.remote())
if len(futures) > concurrency:
resolved_futures, futures = ray.wait(futures, num_returns=len(futures) - concurrency)
print(time.time() - start)
4.14541482925415
python src/python-ray.py 3.78s user 1.16s system 45% cpu 10.965 total
可以看出 Ray 的启动 overhead 还是很高的。 Ray 比较适合长时间的计算任务,在这个场景下看不出优势。
NodeJS Async
JS 是单线程模型,只有异步函数一种并发方式可选。 也可以考虑 service worker ,但是会麻烦很多。 JS 语言本身没有比较方便的限制并发的写法(如 python 的内置信号量,或者 wait 原语),不过有工具库做了这件事情:
import pLimit from "p-limit";
const limit = pLimit(300);
const main = async () => {
await Promise.all([...Array(600).keys()].map(() => limit(() => task())));
};
main();
node src/js-async.js 0.90s user 0.03s system 101% cpu 0.919 total
出乎意料的快。
Rust Async
个人感觉可以充分利用多核的协程是 Rust 的一大优势。 下面是写法:
#[tokio::main]
async fn main() -> () {
let mut tasks = Vec::with_capacity(300);
for _ in 0..600 {
tasks.push(tokio::spawn(task()));
}
for task in tasks {
task.await.unwrap();
}
return ();
}
./target/debug/test 13.66s user 0.05s system 1582% cpu 0.866 total
可以看到有明显多核使用,在计算加重的时候加速可能会更突出。