并行任务的几种实现方式

批量执行同样的任务是一种很常见的编程模式,比如处理大量图片、爬取网站的所有网页、向数据库批量保存信息等。 下面总结一些实现没有数据依赖任务的批量执行和限流的编程手段,并给出一些对比和选择考量。

Setup

本文主要关注代码质量(可读性、可维护性),并不尝试提供一个完整的性能评估。 真实的系统性能因业务场景而异,很难用一个简单例子得出结论。 本文的例子以包含少量计算和大量 IO 的场景为例,单个任务的流程为 计算-IO-计算-IO-计算 。 以 Python 为例,每次计算耗时 2-3 ms ,每次 IO 用 sleep 模拟,耗时 100ms 。 假设机器有八个处理器核心,需要处理 600 条数据,且有外部 IO 限制导致并发最高为 300 。

下面是每个语言中任务的具体实现。

Python

def gcd(x, y):
  while y:
    x, y = y, x % y
  return x


def compute():
  assert all([gcd(9 * i + 4, 2 * i + 1) == 1 for i in range(10000)])


def task():
  compute()
  time.sleep(0.1)
  compute()
  time.sleep(0.1)
  compute()

NodeJS

const sleep = (timeMs) => new Promise((res) => setTimeout(res, timeMs));

const gcd = (x, y) => {
  while (y) {
    t = x % y;
    x = y;
    y = t;
  }
  return x;
};
const compute = () => {
  assert(
    [...Array(10000).keys()]
      .map((i) => gcd(9 * i + 4, 2 * i + 1) == 1)
      .every((x) => x)
  );
};

const task = async () => {
  compute();
  await sleep(100);
  compute();
  await sleep(100);
  compute();
};

Rust

fn gcd(_x: i64, _y: i64) -> i64 {
    let (mut x, mut y) = (_x, _y);
    while y != 0 {
        (x, y) = (y, x % y)
    }
    return x;
}

fn compute() {
    assert!((0..10000)
        .map(|i| gcd(9 * i + 4, 2 * i + 1) == 1)
        .all(|x| x))
}

async fn task() {
    compute();
    task::sleep(Duration::from_micros(100)).await;
    compute();
    task::sleep(Duration::from_micros(100)).await;
    compute();
}

Python 多线程

对多线程和多进程场景, Python 提供了对应的线程池/进程池工具,可以直接调用 map 函数。 我们可以方便的通过线程池大小限制最大并发数。

from task import task
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
  executor.map(task, range(600))
python src/python-thread.py  6.44s user 0.13s system 99% cpu 6.603 total

Python 多进程

多进程场景, worker 数太大会造成过多的 overhead,一个适中的结果:

from task import task
import concurrent.futures


with concurrent.futures.ProcessPoolExecutor(max_workers=128) as executor:
  executor.map(task, range(600))
python src/python-process.py  12.12s user 0.31s system 862% cpu 1.441 total

Python Async

对 Python 的异步函数,可以通过信号量来限制其并发:

from task import async_task
import asyncio

sem = asyncio.Semaphore(300)
async def run_task():
  async with sem:
    await async_task()

async def main():
  await asyncio.gather(*[run_task() for _ in range(600)])

asyncio.run(main())
python src/python-async-sem.py  6.33s user 0.01s system 99% cpu 6.341 total

也可以通过循环来限制,写法跟下面的 Ray 相同。

Python Ray

Ray 的多线程和协程 Actor 和之前的计算模型重复了,不再尝试。 Ray 的精髓在于通过比较简洁(类似协程)的语法,可以实现对多进程甚至分布式的计算任务调度。 下面的写法是 Ray 官方推荐 的限制进程并发的编程模式。

import ray
import time
from task import task

ray.init()

@ray.remote
def run():
  task()

start = time.time()
futures = []
concurrency = 300
for _ in range(600):
  futures.append(run.remote())
  if len(futures) > concurrency:
    resolved_futures, futures = ray.wait(futures, num_returns=len(futures) - concurrency)
print(time.time() - start)
4.14541482925415
python src/python-ray.py  3.78s user 1.16s system 45% cpu 10.965 total

可以看出 Ray 的启动 overhead 还是很高的。 Ray 比较适合长时间的计算任务,在这个场景下看不出优势。

NodeJS Async

JS 是单线程模型,只有异步函数一种并发方式可选。 也可以考虑 service worker ,但是会麻烦很多。 JS 语言本身没有比较方便的限制并发的写法(如 python 的内置信号量,或者 wait 原语),不过有工具库做了这件事情:

import pLimit from "p-limit";

const limit = pLimit(300);

const main = async () => {
  await Promise.all([...Array(600).keys()].map(() => limit(() => task())));
};

main();
node src/js-async.js  0.90s user 0.03s system 101% cpu 0.919 total

出乎意料的快。

Rust Async

个人感觉可以充分利用多核的协程是 Rust 的一大优势。 下面是写法:

#[tokio::main]
async fn main() -> () {
    let mut tasks = Vec::with_capacity(300);
    for _ in 0..600 {
        tasks.push(tokio::spawn(task()));
    }
    for task in tasks {
        task.await.unwrap();
    }
    return ();
}
./target/debug/test  13.66s user 0.05s system 1582% cpu 0.866 total

可以看到有明显多核使用,在计算加重的时候加速可能会更突出。