当前位置 博文首页 > 忞翛:python协程

    忞翛:python协程

    作者:忞翛 时间:2021-02-03 00:23

    python协程

    协程本质上就是一个线程,不过它是协作式的非抢占式的程序,面向的是IO操作。python有GIL的限制,不能充分利用多线程的实现高并发。进程和线程都是通过cpu的调度实现不同任务的有序执行,协程则要在写代码的时候确定执行顺序。由于协程在一个线程中,所以协程不能阻塞。

    优缺点:

    • 无需线程上下文切换的开销
    • 在一个线程中,不需要加锁
    • 无法利用多核资源:协程的本质是单线程,需要和进程配合才能运行在多CPU上
    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    python协程的发展时间较长:

    • python2.5 为生成器引用.send()、.throw()、.close()方法
    • python3.3 为引入yield from,可以接收返回值,可以使用yield from定义协程
    • Python3.4 加入了asyncio模块
    • Python3.5 增加async、await关键字,在语法层面的提供支持
    • python3.7 使用async def + await的方式定义协程
    • 此后asyncio模块更加完善和稳定,对底层的API进行的封装和扩展
    • python将于 3.10版本中移除 以yield from的方式定义协程 (目前版本是3.9.1)

    由于asyncio每个版本都会新增功能,对一些旧的底层的API进行封装,极大地方便的使用者,但正因为此,网上有很多教程使用的接口官方已经不建议直接使用,应该改而使用更加高级的API,所以在这里记录一下如何使用这些API。

    简单例子

    要点

    1. 使用async def的形式定义
    2. 在协程中可以使用await关键字,注意其后跟的是"可等待对象"(协程, 任务 和 Future)
    3. 协程不能直接执行,需要在asyncio.run()中执行,也可以跟在await后面
    4. asyncawait这两个关键字只能在协程中使用
      import asyncio
      
      
      async def foo(name):
      
      	await asyncio.sleep(1)      # 这是一个不会阻塞的sleep,是一个协程
      	print(f"name = {name}")
      
      
      async def main():
      	# 协程本身就是一个可等待对象
      	await foo("lczmx")  # 执行协程
      	print("done")
      
      if __name__ == '__main__':
      	# 使用asyncio.run运行
      	asyncio.run(main())
      

    asyncio.run(main, *, debug=False)方法就是对run_until_complete进行了封装:
    loop = events.new_event_loop()
    return loop.run_until_complete(main)

    关于可等待对象
    可等待对象(awaitable)是能在 await 表达式中使用的对象。可以是 协程 或是具有__await__() 方法的对象。

    那么协程是如何成为可等待对象的呢?

    1. collections.abc.Awaitable类,这是为可等待对象提供的类,可被用于 await 表达式中
      class Awaitable(metaclass=ABCMeta):
      	__slots__ = ()
      
      	@abstractmethod
      	def __await__(self):	# __await__方法必须返回一个 iterator
      		yield
      
      	@classmethod
      	def __subclasshook__(cls, C):
      		if cls is Awaitable:
      			return _check_methods(C, "__await__")
      		return NotImplemented
    2. async def复合语句创建的函数,它返回的是一个Coroutine对象,而Coroutine继承Awaitable

    并发

    使用协程进行并发操作
    方法一
    使用asyncio.create_task(coro)方法,返回一个Task对象,Task类继承Future,在python3.7以下版本中使用asyncio.ensure_future(coro_or_future)

    import asyncio
    
    
    async def foo(char:str, count: int):
        for i in range(count):
            print(f"{char}-{i}")
            await asyncio.sleep(.5)
    
    
    async def main():
        task1 = asyncio.create_task(foo("A", 2))
        task2 = asyncio.create_task(foo("B", 3))
        task3 = asyncio.create_task(foo("C", 2))
    
        await task1
        await task2
        await task3
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    执行结果

    A-0
    B-0
    C-0
    A-1
    B-1
    C-1
    B-2

    方法二
    使用asyncio.gather()方法,其内部调用的是asyncio.ensure_future()方法

    import asyncio
    
    
    async def foo(char:str, count: int):
        for i in range(count):
            print(f"{char}-{i}")
            await asyncio.sleep(.5)
    
    
    async def main():
    
        await asyncio.gather(foo("A", 2), foo("B", 3), foo("C", 2))
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    回调、返回值等操作

    要完成这些功能需要Task对象,即asyncio.create_task()的返回值。由于Task继承Future,实现了除Future.set_result()Future.set_exception()外的全部API,而asyncio.Future模仿的是 concurrent.futures.Future,所以Task很多方法和 在使用线/进程池时用到的方法类似(有细微差别)。

    Task的方法,见官方文档
    使用回调函数和取得返回值的例子:

    import asyncio
    
    
    def callback(future):
        # 唯一参数是一个Task对象
        # print(type(future))     # <class '_asyncio.Task'>
    
        print(future)
        # <Task finished name='Task-2' coro=<foo() done, defined at E: ... xxx.py:11> result=123>
    
        print(future.result())      # 123   # 接收返回值
        print(future.get_name())    # foo
    
    
    async def foo():
        print("running")
        return 123
    
    
    async def main():
        task = asyncio.create_task(foo(), name="foo")   # name形参3.8及以上版本可用
        task.add_done_callback(callback)                # 添加回调函数
        await task
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    与线程结合

    我们知道,协程本身就只有一个线程,假如这协程阻塞了,那么整个程序也就阻塞了。为此我们在执行一些必然会产生阻塞的代码时,可以把代码放入到其它线程/进程中,这样可以继续执行协程的其它代码了。

    方法一
    coroutine asyncio.to_thread(func, /, *args, **kwargs)
    这是python3.9的新方法,3.9以下版本看方法二
    在不同的线程中异步地运行函数 func。向此函数提供的任何*args**kwargs 会被直接传给 func。其返回值是一个协程,所以假如有回调等操作,使用asyncio.create_task(coro)方法,再调用Task对象的方法。

    import asyncio
    import time
    
    
    def block_func(name: str):
        time.sleep(2)       # 模拟阻塞时间
        print(f"name = {name}")
    
    
    async def foo():
        # 一个协程
        print("async foo")
        await asyncio.sleep(1)
    
    
    async def main():
        await asyncio.gather(
            asyncio.to_thread(block_func, name="lczmx"),
            foo()
        )
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    方法二
    awaitable loop.run_in_executor(executor, func, *args)

    下一篇:没有了