深圳幻海软件技术有限公司 欢迎您!

如何在Python 的线程中运行协程

2023-02-27

 在一篇文章理解Python异步编程的基本原理这篇文章中,我们讲到,如果在异步代码里面又包含了一段非常耗时的同步代码,异步代码就会被卡住。那么有没有办法让同步代码与异步代码看起来也是同时运行的呢?方法就是使用事件循环的.run_in_executor()方法。我们来看一下Python官方文

 在一篇文章理解Python异步编程的基本原理这篇文章中,我们讲到,如果在异步代码里面又包含了一段非常耗时的同步代码,异步代码就会被卡住。

那么有没有办法让同步代码与异步代码看起来也是同时运行的呢?方法就是使用事件循环的.run_in_executor()方法。

我们来看一下 Python 官方文档[1]中的说法:

 

那么怎么使用呢?还是以非常耗时的递归方式计算斐波那契数列的这个函数为例:

 

def sync_calc_fib(n): 
    if n in [1, 2]: 
        return1 
    return sync_calc_fib(n - 1) + sync_calc_fib(n - 2) 
 
 
async def calc_fib(n): 
    result = sync_calc_fib(n) 
    print(f'第 {n} 项计算完成,结果是:{result}'
    return result 
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

我们现在需要用 aiohttp 访问一个延迟5秒的网页,同时计算斐波那契数列第36项。

首先我们看看单独计算第36项需要5秒钟:

 

我们再来看看如果直接把这计算斐波那契数列和请求网站的两个异步任务放在一起“并行”,实际时间是两个任务的时间叠加:

具体原因我在上一篇文章里面已经做了说明。

 

现在,我想让两个任务“同时运行”,于是就可以这样修改代码:

 

import aiohttp 
import asyncio 
import time 
from concurrent.futures import ThreadPoolExecutor 
 
 
async def request(sleep_time): 
    async with aiohttp.ClientSession() as client: 
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}'
        resp_json = await resp.json() 
        print(resp_json) 
 
 
def sync_calc_fib(n): 
    if n in [1, 2]: 
        return 1 
    return sync_calc_fib(n - 1) + sync_calc_fib(n - 2) 
 
 
def calc_fib(n): 
    result = sync_calc_fib(n) 
    print(f'第 {n} 项计算完成,结果是:{result}'
    return result 
 
 
async def main(): 
    start = time.perf_counter() 
    loop = asyncio.get_event_loop() 
    with ThreadPoolExecutor(max_workers=4) as executor: 
        tasks_list = [ 
            loop.run_in_executor(executor, calc_fib, 36), 
            asyncio.create_task(request(5)) 
        ] 
        await asyncio.gather(*tasks_list) 
        end = time.perf_counter() 
        print(f'总计耗时:{end - start}'
 
 
asyncio.run(main()) 
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.

运行效果如下图所示:

 

在5秒钟的时间,就把计算斐波那契数列和请求5秒延迟的网站都做完了。

实现这样的转变,关键的代码就是:loop.run_in_executor(executor, calc_fib, 36)

其中的 loop就是主线程的事件循环(event loop),它是用来调度同一个线程里面的多个协程。

executor是我们使用ThreadPoolExecutor(max_workers=4)创建的一个有4个线程的线程池,calc_fib是一个耗时的同步函数,36是传入calc_fib的参数。loop.run_in_executor(executor, calc_fib, 36)的意思是说:

  1. 把calc_fib函数放到线程池里面去运行
  2. 给线程池增加一个回调函数,这个回调函数会在运行结束后的下一次事件循环把结果保存下来。

请注意上图中红色箭头对应的calc_fib这是一个同步函数,请与上一篇文章中的异步函数区分开。run_in_executor的第二个参数需要是一个同步函数的函数名。

在上面的例子中,我们创建的是有4个线程的线程池。所以这个线程池最多允许4个阻塞式的同步函数“并行”。