如果你厌倦了多线程,不妨试试python的异步编程,再引入async, await关键字之后语法变得更加简洁和直观,又经过几年的生态发展,现在是一个很不错的并发模型。
在python异步编程中,可能出现很多其他的对象,比如Future, Task, 后者继承自前者,但是为了统一,无论是Future还是Task,本文中统一称呼为协程。
因为GIL的存在,所以Python的多线程在CPU密集的任务下显得无力,但是对于IO密集的任务,多线程还是足以发挥多线程的优势的,而异步也是为了应对IO密集的任务,所以两者是一个可以相互替代的方案,因为设计的不同,理论上异步要比多线程快,因为异步的花销更少, 因为不需要额外系统申请额外的内存,而线程的创建跟系统有关,需要分配一定量的内存,一般是几兆,比如linux默认是8MB。
虽然异步很好,比如可以使用更少的内存,比如更好的控制并发(也许你并不这么认为:))。但是由于async/await 语法的存在导致与之前的语法有些割裂,所以需要适配,需要付出额外的努力,再者就是生态远远没有同步编程强大,比如很多库还不支持异步,所以你需要一些额外的适配。
# web.py import asyncio from random import random import uvicorn from fastapi import FastAPI app = FastAPI() @app.get("/") async def index(): await asyncio.sleep(1) return {"msg": "ok"} @app.get("/random") async def index(): await asyncio.sleep(1) return {"msg": random()} if __name__ == "__main__": # uvicorn.run(app) # 如果需要热加载(reload), 需要传入一个字符串而不是application对象 uvicorn.run("web:app", reload=True)import asyncio import uvicorn from fastapi import FastAPI app = FastAPI() @app.get("/") async def index(): await asyncio.sleep(1) return {"msg": "ok"} if __name__ == "__main__": uvicorn.run(app)本文所有依赖如下:
- Python > 3.7+
- fastapi
- aiohttp
- uvicorn
pip install requirements.txt并发,并发,并发
# test1.py import asyncio from datetime import datetime import aiohttp async def main(workers: int, url: str): async with aiohttp.ClientSession() as sess: for _ in range(workers): async with sess.get(url) as resp: print("响应内容", await resp.json()) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)输出如下:
$ python test1.py 响应内容 {'msg': 'ok'} 响应内容 {'msg': 'ok'} 响应内容 {'msg': 'ok'} 耗时: 0:00:03.011565发现花费了3秒,不符合预期呀。。。。这是因为虽然用了协程,但是每个协程是串行的运行,也就是说后一个等前一个完成之后才开始,那么这样的异步代码并没有并发,所以我们需要让这些协程并行起来
# test2.py import asyncio from datetime import datetime import aiohttp async def run(sess: aiohttp.ClientSession, url: str): async with sess.get(url) as resp: print("响应内容", await resp.json()) async def main(workers: int, url: str): async with aiohttp.ClientSession() as sess: for _ in range(workers): asyncio.ensure_future(run(sess, url)) await asyncio.sleep(1.1) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)为了让代码变动的不是太多,所以这里用了一个笨办法来等待所有任务完成, 之所以在main函数中等待是为了不让ClientSession关闭, 如果你移除了main函数中的等待代码会发现报告异常RuntimeError: Session is closed,而代码里的解决方案非常的不优雅,需要手动的等待,为了解决这个问题,我们再次改进代码。
# test3.py import asyncio from datetime import datetime import aiohttp async def run(sess: aiohttp.ClientSession, url: str): async with sess.get(url) as resp: print("响应内容", await resp.json()) async def main(workers: int, url: str): async with aiohttp.ClientSession() as sess: futures = [] for _ in range(workers): futures.append(asyncio.ensure_future(run(sess, url))) done, pending = await asyncio.wait(futures) print(done, pending) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)这里解决的方式是通过asyncio.wait方法等待一个协程列表,默认是等待所有协程结束后返回,会返回一个完成(done)列表,以及一个待办(pending)列表。
# test4.py import asyncio from datetime import datetime import aiohttp async def run(sess: aiohttp.ClientSession, url: str, id: int): async with sess.get(url) as resp: print("响应内容", await resp.json()) return id async def main(workers: int, url: str): async with aiohttp.ClientSession() as sess: futures = [] for i in range(workers): futures.append(asyncio.ensure_future(run(sess, url, i))) # 注意: 这里要讲列表解开 rets = await asyncio.gather(*futures) print(rets) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)结果输出如下:
$ python test4.py 响应内容 {'msg': 'ok'} 响应内容 {'msg': 'ok'} 响应内容 {'msg': 'ok'} [0, 1, 2] 耗时: 0:00:01.011840小结
虽然前面能够随心所欲的创建协程,但是就像多线程一样,我们也需要处理协程之间的同步问题,为了保持语法及使用情况的一致,多线程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用异步的关键字,比如async with/ await等
锁 lock
# test5.py import asyncio from datetime import datetime import aiohttp lock = asyncio.Lock() async def run(sess: aiohttp.ClientSession, url: str, id: int): async with lock: async with sess.get(url) as resp: print("响应内容", await resp.json()) return id async def main(workers: int, url: str): async with aiohttp.ClientSession() as sess: futures = [] for i in range(workers): futures.append(asyncio.ensure_future(run(sess, url, i))) # 注意: 这里要讲列表解开 rets = await asyncio.gather(*futures) print(rets) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)输出如下:
$ python test5.py 响应内容 {'msg': 'ok'} 响应内容 {'msg': 'ok'} 响应内容 {'msg': 'ok'} [0, 1, 2] 耗时: 0:00:03.007251通过观察很容易发现,并发的速度因为锁而慢下来了,因为每次只有一个协程能获得锁,所以并发变成了串行。
事件 event
# test6.py import asyncio from datetime import datetime import aiohttp big_event = asyncio.Event() small_event = asyncio.Event() async def big_waiter(): await small_event.wait() print(f"{datetime.now()} big waiter 收到任务事件") async def small_waiter(): await big_event.wait() print(f"{datetime.now()} small waiter 收到任务事件") async def run(sess: aiohttp.ClientSession, url: str, id: int): async with sess.get(url) as resp: ret = await resp.json() print("响应内容", ret) data = ret["msg"] if data > 0.5: big_event.set() else: small_event.set() return data async def main(workers: int, url: str): asyncio.ensure_future(big_waiter()) asyncio.ensure_future(big_waiter()) asyncio.ensure_future(small_waiter()) asyncio.ensure_future(small_waiter()) async with aiohttp.ClientSession() as sess: futures = [] for i in range(workers): futures.append(asyncio.ensure_future(run(sess, url, i))) await asyncio.wait(futures) if not big_event.is_set(): big_event.set() if not small_event.is_set(): small_event.set() # 等到其他pending可马上运行完成的任务运行结束 await asyncio.sleep(0) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)输出如下:
响应内容 {'msg': 0.9879470259657458} 2022-07-11 10:16:51.577579 small waiter 收到任务事件 2022-07-11 10:16:51.577579 small waiter 收到任务事件 响应内容 {'msg': 0.33312954919903903} 2022-07-11 10:16:51.578574 big waiter 收到任务事件 2022-07-11 10:16:51.578574 big waiter 收到任务事件 响应内容 {'msg': 0.41934453838367824} 耗时: 0:00:00.996697可以看到事件(Event)等待者都是在得到响应内容之后输出,并且事件(Event)可以是多个协程同时等待。
条件 Condition
# test7.py import asyncio from datetime import datetime import aiohttp cond = asyncio.Condition() async def waiter(id): async with cond: await cond.wait() print(f"{datetime.now()} waiter[{id}]等待完成") async def run(sess: aiohttp.ClientSession, url: str, id: int): async with sess.get(url) as resp: ret = await resp.json() print("响应内容", ret) data = ret["msg"] async with cond: # cond.notify() # cond.notify_all() cond.notify(2) return data async def main(workers: int, url: str): for i in range(workers): asyncio.ensure_future(waiter(i)) async with aiohttp.ClientSession() as sess: futures = [] for i in range(workers): futures.append(asyncio.ensure_future(run(sess, url, i))) await asyncio.wait(futures) # 等到其他pending可马上运行完成的任务运行结束 await asyncio.sleep(0) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)输出如下:
$ python test7.py 响应内容 {'msg': 0.587516452693613} 2022-07-11 10:26:13.482781 waiter[0]等待完成 2022-07-11 10:26:13.483778 waiter[1]等待完成 响应内容 {'msg': 0.3391774763719556} 响应内容 {'msg': 0.2653464378663153} 2022-07-11 10:26:13.484771 waiter[2]等待完成 耗时: 0:00:01.013655可以看到,前面两个等待的协程是在同一时刻完成,而不是全部等待完成。
信号量 Semaphore
# test8.py import asyncio from datetime import datetime import aiohttp semp = asyncio.Semaphore(2) async def run(sess: aiohttp.ClientSession, url: str, id: int): async with semp: async with sess.get(url) as resp: ret = await resp.json() print(f"{datetime.now()} worker[{id}] 响应内容", ret) data = ret["msg"] return data async def main(workers: int, url: str): async with aiohttp.ClientSession() as sess: futures = [] for i in range(workers): futures.append(asyncio.ensure_future(run(sess, url, i))) await asyncio.wait(futures) # 等到其他pending可马上运行完成的任务运行结束 await asyncio.sleep(0) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3, "")) end = datetime.now() print("耗时:", end - start)输出如下:
$ python test8.py 2022-07-11 10:30:40.634801 worker[0] 响应内容 {'msg': 0.21337652123021056} 2022-07-11 10:30:40.634801 worker[1] 响应内容 {'msg': 0.7591980200967501} 2022-07-11 10:30:41.636346 worker[2] 响应内容 {'msg': 0.8282581038608438} 耗时: 0:00:02.011661可以发现,虽然同时创建了三个协程,但是同一时刻只有两个协程工作,而另外一个协程需要等待一个协程让出信号量才能运行。
无论是协程还是线程,任务之间的状态同步还是很重要的,所以有了应对各种同步机制的同步原语,因为要保证一个资源同一个时刻只能一个任务访问,所以引入了锁,又因为需要一个任务等待另一个任务,或者多个任务等待某个任务,因此引入了事件(Event),但是为了更精细的控制通知的程度,所以又引入了条件(Condition), 通过条件可以控制一次通知多少的任务。
# test9.py import time from datetime import datetime import asyncio from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor semp = asyncio.Semaphore(2) def wait_io(id: int): # 为了简单起见,直接使用sleep模拟io time.sleep(1) return f"threading({id}): done at {datetime.now()}" def more_cpu(id: int): sum(i * i for i in range(10 ** 7)) return f"process({id}): done at {datetime.now()}" async def main(workers: int): loop = asyncio.get_event_loop() futures = [] thread_pool = ThreadPoolExecutor(workers+1) process_pool = ProcessPoolExecutor(workers) ret = loop.run_in_executor(thread_pool, wait_io, 0, ) for i in range(workers): futures.append(loop.run_in_executor(thread_pool, wait_io, i)) for i in range(workers): futures.append(loop.run_in_executor(process_pool, more_cpu, i)) print("\n".join(await asyncio.gather(*futures))) if __name__ == "__main__": loop = asyncio.get_event_loop() start = datetime.now() loop.run_until_complete(main(3)) end = datetime.now() print("耗时:", end - start)输出如下:
threading(0): done at 2022-07-11 15:38:36.073547 threading(1): done at 2022-07-11 15:38:36.074540 threading(2): done at 2022-07-11 15:38:36.074540 process(0): done at 2022-07-11 15:38:36.142233 process(1): done at 2022-07-11 15:38:36.177190 process(2): done at 2022-07-11 15:38:36.162244 耗时: 0:00:01.107643可以看到总耗时1秒,说明所有的线程跟进程是同时运行的。
- fastapi 超级棒的web框架,使用过就不再想使用其他的了
- httpie
- aiohttp
- aioredis redis异步库
- motor mongodb异步库
- sqlmodel 超级棒的ORM