1.简介

  • Asyncio是Python3.4加入的异步编程模块,用来取代yield,实现协程。asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束

  • 官方文档:https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#

  • 基本概念

event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数

coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用

task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态

future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别

async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口

2.使用示例

  • 简单示例:
import asyncio

async def count():
print("One")
await asyncio.sleep(1)
print("Two")

async def main():
await asyncio.gather(count(), count(), count())

asyncio.run(main())

注意,直接执行main()函数并不会运行代码,需要使用asyncio.run()将main函数注册到事件循环并等待运行结束

  • 并发执行协程函数
import asyncio


async def test(p):
await asyncio.sleep(1)
print(f"{p} complete")
return p+1


async def main():
"""
create_task在python3.7加入,python3.7之前可用asyncio.ensure_future()函数;
asyncio.create_task只能在协程函数中调用,因为这个函数中需要用到事件循环(event_loop);
如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果;
"""
# tasks = [asyncio.create_task(test(i)) for i in range(10)]
# result = await asyncio.gather(*tasks)

"""
gather接收的参数可以是task或者协程对象,而wait不可以直接接收协程对象
gather返回值的顺序是按照原始顺序排列
"""
result = await asyncio.gather(*[test(i) for i in range(10)])
print(result)

"""
如果使用的是 asyncio.wait,那么返回值是(已完成任务,等待中任务)的集合
"""
# tasks = [asyncio.create_task(test(i)) for i in range(10)]
# done, pending = await asyncio.wait(tasks)
# for task in done:
# print(task.result())


def main2():
"""
通过event_loop执行协程函数
:return:
"""
loop = asyncio.get_event_loop()
tasks = asyncio.gather(*[test(i) for i in range(10)])
result = loop.run_until_complete(tasks)
print(result)


def main3():
"""
- asyncio.gather和asyncio.wait区别

gather只会返回协程结果,而wait会返回封装的task, wait支持自定义返回的时机,通过接受参数return_when,可选值有ALL_COMPLETED/FIRST_COMPLETED/FIRST_EXCEPTION。因此,如果需要取消task、添加回调函数、自定义返回时机,那么就可以使用wait

- asyncio.create_task、loop.create_task、asyncio.ensure_future区别

asyncio.create_task源码是调用的loop.create_task

loop.create_task接收的参数只能是协程,而ensure_future则可以接收三种参数:
1.接收协程,调用loop.create_task,返回task对象
2.接收Future对象,直接返回
3.接收一个awaitable对象,会await这个对象的__await__方法,再执行一次ensure_future,最后返回Task或者Future
"""
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
done, pending = loop.run_until_complete(asyncio.wait(tasks))

for task in done:
print(task.result())

# for task in tasks:
# print(task.result())


if __name__ == '__main__':
asyncio.run(main())
  • 利用run_in_executor 开启多线程执行阻塞任务

由于很多第三方库还未直接支持await的写法,因此碰到一些I/O阻塞的任务(比如request, selenium等),还是需要通过多线程并发执行
官方文档:https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

loop.run_in_executor(executor, func, *args) 接收三个参数:
executor: 必须是一个concurrent.futures.Executor,如果未指定则使用默认executor
func 要执行的函数
*args 要传递给函数func的参数
使用示例如下:

import asyncio
from time import sleep, strftime
from concurrent import futures

executor = futures.ThreadPoolExecutor(max_workers=5)


async def blocked_sleep(name, t):
print(strftime('[%H:%M:%S]'),end=' ')
print('sleep {} is running {}s'.format(name, t))
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, sleep, 1)
print(strftime('[%H:%M:%S]'),end=' ')
print('sleep {} is end'.format(name))
return t


async def main():
future = (blocked_sleep(i, i) for i in range(1, 6))
fs = asyncio.gather(*future)
return await fs

asyncio.run(main())

  • 利用asyncio实现异步爬虫的示例

https://github.com/Ray-SR/fund_query

3.参考文章

《Python黑魔法 — 异步IO( asyncio) 协程》
https://www.jianshu.com/p/b5e347b3a17c

《asyncio中使用阻塞函数》
https://juejin.cn/post/6844903576825774093

《深入理解asyncio》
https://zhuanlan.zhihu.com/p/73568282



Python

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!