二、asyncio详细分析


返回

2.1 事件循环

  • 相当于一个while循环,在周期性的运行并执行一些任务,在特定条件下终止循环。

    """ 伪代码 """
    
    任务列表 = [ 任务1, 任务2, 任务3,... ]
    
    while True:
        可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行''已完成'的任务返回
        
        for 就绪任务 in 已准备就绪的任务列表:
            执行已就绪的任务
            
        for 已完成的任务 in 已完成的任务列表:
            在任务列表中移除 已完成的任务
    
    	如果 任务列表 中的任务都已完成,则终止循环
      
    
  • 获取事件循环

    import asyncio
    
    # 生成一个事件循环
    loop = asyncio.get_event_loop()
    
    # 将任务放到任务列表
    loop.run_until_complete(任务)
    
    

2.2 协程函数

  • 协程函数

    import asyncio
    
    
    # 定义一个协程函数
    async def func():
        print("协程内部代码")
    
    
    # 调用协程函数,返回一个协程对象
    result = func()
    
    # 执行协程函数:方式一
    loop = asyncio.get_event_loop()  # 创建一个事件循环
    loop.run_until_complete(result)  # 将协程任务提交到事件循环的任务列表中,协程执行完成之后终止
    
    # 执行协程函数:方式二:内部实现相同,语法更简洁
    asyncio.run(result)  # asyncio.run()方法在Python3.7中加入
    
    

2.3 await关键字

  • await + 可等待的对象(协程对象、Future、Task对象)

  • await是一个只能在协程函数中使用的关键字

  • 遇到IO操作时挂起 当前协程(任务),等IO操作完成之后再继续往下执行

  • 当前协程(任务)挂起过程中 事件循环可以去执行其他的协程(任务)

  • 示例1

    import asyncio
    
    
    async def func():
        print("执行协程函数内部代码")
        # 遇到IO操作时,挂起当前协程任务,等IO操作完成之后再继续往下执行;当前协程挂起时,事件循环可以去执行其他协程任务
        response = await asyncio.sleep(2)
        print("IO请求结束,结果为:", response)
    
    
    result = func()
    
    asyncio.run(result)
    
    
  • 示例2

    import asyncio
    
    
    async def others():
        print("start")
        await asyncio.sleep(2)
        print('end')
        return '返回值'
    
    
    async def func():
        print("执行协程函数内部代码")
        response = await others()
        print("IO请求结束,结果为:", response)
    
    
    asyncio.run(func())
    
    
  • 示例3

    import asyncio
    
    
    async def others():
        print("start")
        await asyncio.sleep(2)
        print('end')
        return '返回值'
    
    
    async def func():
        print("执行协程函数内部代码")
        response1 = await others()  # 2s
        print("IO请求结束,结果为:", response1)
        response2 = await others()  # +2s
        print("IO请求结束,结果为:", response2)
    
    
    asyncio.run(func())
    
    

2.4 task对象

  • 作用:将协程任务加入事件循环中,等待被调度执行

  • 在 Python 3.7 之前,可以用低层级的 loop.create_task()asyncio.ensure_future() 函数创建Task对象

  • 在 Python 3.7 开始,可以通过asyncio.create_task(协程对象)的方式创建Task对象

  • 示例1

    import asyncio
    
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值"
    
    
    async def main():
        print("main开始")
    
        # 创建了一个协程对象,创建了一个task对象,将协程对象封装到task对象中,并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)
        task1 = asyncio.create_task(func())
        task2 = asyncio.create_task(func())
    
        print("main结束")
    
        # 当执行某协程任务遇到IO操作时,会自动切换执行到其他协程任务
        # 此处的await是等待相对应的协程全都执行完毕,并获取结果
        ret1 = await task1
        ret2 = await task2
        print(ret1, ret2)
    
    
    asyncio.run(main())  # 运行流程:1 1 2 2
    
    
  • 示例2:task对象列表,常用

  • asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,所以在和wait配合使用时,task_list的值为[func(),func()] 也是可以的

    import asyncio
    
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值"
    
    
    async def main():
        print("main开始")
    
        # 创建协程对象,将协程对象封装到task对象中,并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)
        # task_list:把task对象封装到一个列表中
        # name="n1":对task对象进行命名
        task_list = [
            asyncio.create_task(func(), name="n1"),
            asyncio.create_task(func(), name="n2")
        ]
    
        print("main结束")
    
        # 当执行某协程任务遇到IO操作时,会自动化切换执行其他任务
        # 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
        # 如果设置了timeout值,则意味着此处最多等待多少秒,完成的协程返回值写入到done中,未完成则写到pending中
        done, pending = await asyncio.wait(task_list, timeout=None)
        print(done, pending)
    
    
    asyncio.run(main())
    
    
  • 示例3:task列表写在外面,不推荐

    import asyncio
    
    
    async def func():
        print("执行协程函数内部代码")
        response = await asyncio.sleep(2)
        print("IO请求结束,结果为:", response)
    
    
    coroutine_list = [func(), func()]
    
    # 错误:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
    # 此处不能直接 asyncio.create_task,task会立即将任务加入到事件循环的任务列表,但此时事件循环还未创建,所以会报错
    
    
    # 使用asyncio.wait将列表封装为一个协程,并调用asyncio.run实现执行两个协程
    # asyncio.wait内部会对列表中的每个协程执行ensure_future,封装为task对象
    done, pending = asyncio.run(asyncio.wait(coroutine_list))
    
    

2.5 asyncio的future对象

  • 相对更底层的对象,通常不常用

  • Task类继承Future类,task对象内部await结果的处理基于future对象

  • 示例1:任务没有结果

    import asyncio
    
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
    
        # 创建一个任务(Future对象),这个任务什么都不干。
        fut = loop.create_future()
    
        # 等待任务最终结果(Future对象),没有结果则会一直等下去。
        await fut
        print(fut)  # 没有结果一直等待
    
    
    asyncio.run(main())
    
    
  • 示例2:任务有结果

    import asyncio
    
    
    async def set_after(fut):
        await asyncio.sleep(2)
        fut.set_result("666")
    
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
    
        # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
        fut = loop.create_future()
    
        # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
        # 即手动设置future任务的最终结果,那么fut就可以结束了。
        await loop.create_task(set_after(fut))
    
        # 等待 Future对象获取 最终结果,否则一直等下去
        data = await fut
        print(data)  # 2s后有结果,不会一直卡住
    
    
    asyncio.run(main())
    
    

2.6 concurrent的future对象

  • 在Python的concurrent.futures模块中也有一个Future对象,这个对象是基于线程池和进程池实现异步操作时使用的对象

  • 示例1

    import time
    from concurrent.futures import Future
    from concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures.process import ProcessPoolExecutor
    
    
    def func(value):
        time.sleep(2)
        print(value)
        return 123
    
    
    # 创建线程池
    pool = ThreadPoolExecutor(max_workers=5)
    
    # 创建进程池
    # pool = ProcessPoolExecutor(max_workers=5)
    
    
    for i in range(10):
        fut = pool.submit(func, i)
        print(fut)
    
    
  • 示例2

    import time
    import asyncio
    import concurrent.futures
    
    
    def func1():
        # 某个耗时操作
        time.sleep(2)
        return "xxx"
    
    
    async def main():
        loop = asyncio.get_running_loop()
    
        # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
        # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
        # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
        # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
        fut = loop.run_in_executor(None, func1)
        result = await fut
        print('default thread pool', result)
    
        # 2. Run in a custom thread pool:
        with concurrent.futures.ThreadPoolExecutor() as pool:
            result = await loop.run_in_executor(
                pool, func1)
            print('custom thread pool', result)
    
        # 3. Run in a custom process pool:
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(
                pool, func1)
            print('custom process pool', result)
    
    
    asyncio.run(main())
    
    
  • 示例3:不支持协程的requests模块,搭配线程池来实现

    import asyncio
    import requests
    
    
    async def download_image(url):
        # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
        print("开始下载:", url)
    
        loop = asyncio.get_event_loop()
        # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
        future = loop.run_in_executor(None, requests.get, url)
    
        response = await future
        print('下载完成')
        # 图片保存到本地文件
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(response.content)
    
    
    if __name__ == '__main__':
        url_list = [
            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
        ]
    
        tasks = [download_image(url) for url in url_list]
    
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    
    

2.7 异步迭代器

  • 实现了 __aiter__()__anext__() 方法的对象。

  • 示例

    import asyncio
    
    
    class Reader(object):
        """ 自定义异步迭代器(同时也是异步可迭代对象) """
    
        def __init__(self):
            self.count = 0
    
        async def readline(self):
            await asyncio.sleep(1)
            self.count += 1
            if self.count == 100:
                return None
            return self.count
    
        def __aiter__(self):
            return self
    
        async def __anext__(self):
            val = await self.readline()
            if val == None:
                raise StopAsyncIteration
            return val
    
    
    async def func():
        # 创建异步可迭代对象
        async_iter = Reader()
        # async for 必须要放在async def函数内,否则语法错误。
        async for item in async_iter:
            print(item)
    
    
    asyncio.run(func())
    
    

2.8 异步上下文管理

  • 此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。

  • 示例

    import asyncio
    
    
    class AsyncContextManager:
        def __init__(self):
            self.conn = None
    
        async def do_something(self):
            print('异步操作数据库')
            return 666
    
        async def __aenter__(self):
            print('异步链接数据库')
            self.conn = await asyncio.sleep(1)
            return self
    
        async def __aexit__(self, exc_type, exc, tb):
            await asyncio.sleep(1)
            print('异步关闭数据库链接')
    
    
    async def func():
        async with AsyncContextManager() as f:
            result = await f.do_something()
            print(result)
    
    
    asyncio.run(func())
    
    

2.9 补充:uvloop模块

  • uvloop是 asyncio 中的事件循环的替代方案,替换后可以使得asyncio性能提高

  • asgi uvicorn内部就是使用的uvloop的事件循环

  • 安装pip3 install uvloop

  • 示例

    import asyncio
    import uvloop
    
    # uvloop替代asyncio中的事件循环
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    """ 
    编写asyncio的代码,与之前写的代码一致
    内部的事件循环自动化会变为uvloop
    """
    
    asyncio.run(...)
    
    
返回