协程本质上就是一个线程,不过它是协作式的非抢占式的程序,面向的是IO操作。python有GIL的限制,不能充分利用多线程的实现高并发。进程和线程都是通过cpu的调度实现不同任务的有序执行,协程则要在写代码的时候确定执行顺序。由于协程在一个线程中,所以协程不能阻塞。
优缺点:
python协程的发展时间较长:
由于asyncio每个版本都会新增功能,对一些旧的底层的API进行封装,极大地方便的使用者,但正因为此,网上有很多教程使用的接口官方已经不建议直接使用,应该改而使用更加高级的API,所以在这里记录一下如何使用这些API。
要点:
async def
的形式定义await
关键字,注意其后跟的是"可等待对象"(协程, 任务 和 Future)asyncio.run()
中执行,也可以跟在await
后面async
和await
这两个关键字只能在协程中使用
import asyncio
async def foo(name):
await asyncio.sleep(1) # 这是一个不会阻塞的sleep,是一个协程
print(f"name = {name}")
async def main():
# 协程本身就是一个可等待对象
await foo("lczmx") # 执行协程
print("done")
if __name__ == '__main__':
# 使用asyncio.run运行
asyncio.run(main())
asyncio.run(main, *, debug=False)
方法就是对run_until_complete
进行了封装:
loop = events.new_event_loop()
return loop.run_until_complete(main)
关于可等待对象
可等待对象(awaitable)是能在 await 表达式中使用的对象。可以是 协程 或是具有__await__()
方法的对象。
那么协程是如何成为可等待对象的呢?
collections.abc.Awaitable
类,这是为可等待对象提供的类,可被用于 await 表达式中
class Awaitable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __await__(self): # __await__方法必须返回一个 iterator
yield
@classmethod
def __subclasshook__(cls, C):
if cls is Awaitable:
return _check_methods(C, "__await__")
return NotImplemented
async def
复合语句创建的函数,它返回的是一个Coroutine对象
,而Coroutine
继承Awaitable
。使用协程进行并发操作
方法一
使用asyncio.create_task(coro)
方法,返回一个Task对象,Task类继承Future,在python3.7以下版本中使用asyncio.ensure_future(coro_or_future)
。
import asyncio
async def foo(char:str, count: int):
for i in range(count):
print(f"{char}-{i}")
await asyncio.sleep(.5)
async def main():
task1 = asyncio.create_task(foo("A", 2))
task2 = asyncio.create_task(foo("B", 3))
task3 = asyncio.create_task(foo("C", 2))
await task1
await task2
await task3
if __name__ == '__main__':
asyncio.run(main())
执行结果
A-0
B-0
C-0
A-1
B-1
C-1
B-2
方法二
使用asyncio.gather()
方法,其内部调用的是asyncio.ensure_future()
方法
import asyncio
async def foo(char:str, count: int):
for i in range(count):
print(f"{char}-{i}")
await asyncio.sleep(.5)
async def main():
await asyncio.gather(foo("A", 2), foo("B", 3), foo("C", 2))
if __name__ == '__main__':
asyncio.run(main())
要完成这些功能需要Task对象,即asyncio.create_task()
的返回值。由于Task继承Future,实现了除Future.set_result()
和 Future.set_exception()
外的全部API,而asyncio.Future模仿的是 concurrent.futures.Future
类,所以Task很多方法和 在使用线/进程池时用到的方法类似(有细微差别)。
Task的方法,见官方文档
使用回调函数和取得返回值的例子:
import asyncio
def callback(future):
# 唯一参数是一个Task对象
# print(type(future)) # <class '_asyncio.Task'>
print(future)
# <Task finished name='Task-2' coro=<foo() done, defined at E: ... xxx.py:11> result=123>
print(future.result()) # 123 # 接收返回值
print(future.get_name()) # foo
async def foo():
print("running")
return 123
async def main():
task = asyncio.create_task(foo(), name="foo") # name形参3.8及以上版本可用
task.add_done_callback(callback) # 添加回调函数
await task
if __name__ == '__main__':
asyncio.run(main())
我们知道,协程本身就只有一个线程,假如这协程阻塞了,那么整个程序也就阻塞了。为此我们在执行一些必然会产生阻塞的代码时,可以把代码放入到其它线程/进程中,这样可以继续执行协程的其它代码了。
方法一
coroutine asyncio.to_thread(func, /, *args, **kwargs)
这是python3.9的新方法,3.9以下版本看方法二
在不同的线程中异步地运行函数 func。向此函数提供的任何*args
和 **kwargs
会被直接传给 func。其返回值是一个协程,所以假如有回调等操作,使用asyncio.create_task(coro)
方法,再调用Task对象
的方法。
import asyncio
import time
def block_func(name: str):
time.sleep(2) # 模拟阻塞时间
print(f"name = {name}")
async def foo():
# 一个协程
print("async foo")
await asyncio.sleep(1)
async def main():
await asyncio.gather(
asyncio.to_thread(block_func, name="lczmx"),
foo()
)
if __name__ == '__main__':
asyncio.run(main())
方法二
awaitable loop.run_in_executor(executor, func, *args)