Python 异步编程:async、await 与同步函数¶
前言¶
在现代 Python 开发中,异步编程已经成为处理 I/O 密集型任务的标准方式。理解 async、await 和同步函数的区别,对于编写高效、可扩展的应用程序至关重要。
与多线程和多进程的关系¶
多线程是在一个进程里创建多个线程,共享资源,线程切换开销小,适合 I/O密集型任务,像网络请求、文件读写。它编程简单,能提高程序响应性,但因全局解释器锁,在 CPU 密集型任务中无法发挥多核优势,还存在线程安全问题。
多进程中每个进程有独立内存和资源,适合 CPU 密集型任务,能充分利用多核 CPU,稳定性高。不过,进程创建和销毁开销大,进程间通信和数据共享复杂。
异步编程基于事件循环和协程,在单线程内实现异步。它并发性能高,代码简洁,适合大量 I/O 密集型任务。但不适合 CPU 密集型任务,编程模型复杂,调试维护难。
简单来说,在开发时,I/O 密集型任务少用多线程,任务多用异步;CPU 密集型任务就选多进程;混合任务则按需组合。
同步函数¶
同步函数是传统的函数调用方式,代码按顺序执行,每一步都必须等待前一步完成。
import time
def sync_function():
"""同步函数示例"""
print("开始执行")
time.sleep(2) # 阻塞 2 秒
print("执行完成")
return "结果"
# 调用同步函数
result = sync_function()
print(result)
简单直观,易于理解;代码执行顺序清晰。
阻塞执行,无法并发;处理大量I/O操作时效率低下。
异步函数¶
异步函数使用 async def 定义,可以在等待 I/O 操作时让出控制权,允许其他任务并发执行。
import asyncio
async def async_function():
"""异步函数示例"""
print("开始执行")
await asyncio.sleep(2) # 异步等待 2 秒
print("执行完成")
return "结果"
# 调用异步函数
async def main():
result = await async_function()
print(result)
# 运行异步函数
asyncio.run(main())
非阻塞执行,可以并发处理多个任务。适合 I/O 密集型操作(网络请求、文件读写等)。提高资源利用率。
代码复杂度较高。要理解事件循环机制。
对比示例:处理 100 个网络请求
import time
import asyncio
import aiohttp
# ========== 同步方式 ==========
def sync_fetch(url):
"""同步获取网页内容"""
time.sleep(1) # 模拟网络延迟
return f"Response from {url}"
def sync_process_100_requests():
"""同步处理 100 个请求"""
start = time.time()
results = []
for i in range(100):
result = sync_fetch(f"http://example.com/{i}")
results.append(result)
end = time.time()
print(f"同步方式耗时: {end - start:.2f} 秒")
return results
# ========== 异步方式 ==========
async def async_fetch(session, url):
"""异步获取网页内容"""
await asyncio.sleep(1) # 模拟网络延迟
return f"Response from {url}"
async def async_process_100_requests():
"""异步处理 100 个请求"""
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [async_fetch(session, f"http://example.com/{i}") for i in range(100)]
results = await asyncio.gather(*tasks)
end = time.time()
print(f"异步方式耗时: {end - start:.2f} 秒")
return results
# 运行对比
if __name__ == "__main__":
# 同步方式:约 100 秒
sync_process_100_requests()
# 异步方式:约 1 秒(并发执行)
asyncio.run(async_process_100_requests())
同步方式:~100 秒(顺序执行)
异步方式:~1 秒(并发执行)
async关键字¶
async 关键字用于定义一个协程函数(coroutine function),它返回一个协程对象(coroutine object)。
async def my_async_function():
return "Hello"
# 调用异步函数
coro = my_async_function()
print(type(coro)) # <class 'coroutine'>
# ❌ 错误:直接调用不会执行
result = my_async_function() # 这只是创建了协程对象,没有执行
# ✅ 正确:需要使用 await 或 asyncio.run()
result = await my_async_function() # 在异步函数内执行
# 或
result = asyncio.run(my_async_function()) # 在同步上下文中(异步函数外)
async def func():
return 42
# 方式 1:使用 await(在异步函数中)
async def main():
result = await func()
print(result) # 42
# 方式 2:使用 asyncio.run()
result = asyncio.run(func())
print(result) # 42
await关键字¶
await 关键字用于暂停当前协程的执行,等待异步操作完成,同时让出控制权给事件循环,允许其他协程运行。
async def task1():
print("Task 1 开始")
await asyncio.sleep(2) # 暂停,让出控制权
print("Task 1 完成")
async def task2():
print("Task 2 开始")
await asyncio.sleep(1) # 暂停,让出控制权
print("Task 2 完成")
async def main():
# 并发执行两个任务
await asyncio.gather(task1(), task2())
asyncio.run(main())
Task 1 开始
Task 2 开始
Task 2 完成 (1 秒后)
Task 1 完成 (2 秒后)
# 总耗时:约 2 秒(不是 3 秒),因为线程遇到io阻塞会利用控制时间干其他事。
await vs 同步阻塞¶
time.sleep():阻塞整个线程,其他代码无法执行
await asyncio.sleep():暂停当前协程,其他协程可以执行
await 可以等待什么?
# 1. 其他协程对象
async def func1():
return "result"
async def func2():
result = await func1() # ✅ 可以
return result
# 2. 可等待对象(Awaitable)
# - 协程(Coroutine)
# - 任务(Task)
# - Future 对象
# 3. 异步上下文管理器
async def example():
async with aiofiles.open('file.txt') as f:
content = await f.read() # ✅ 可以
# 4. 异步迭代器
async def example():
async for line in async_file_reader():
print(line) # ✅ 可以
asyncio.create_task¶
asyncio.create_task() 是 Python 3.7+ 引入的函数,用于将协程包装成 Task 对象并立即调度执行。它是并发执行多个异步任务的关键工具。
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "完成"
# create_task 将协程包装成 Task
task = asyncio.create_task(my_coroutine())
# Task 立即开始执行(不等待完成)
- 立即调度执行:创建 Task 后,协程立即开始执行,不需要等待
- 返回 Task 对象:可以跟踪任务状态、取消任务、获取结果
- 并发执行:多个 Task 可以并发运行
输出:
import asyncio import time async def task(name: str, duration: int): print(f"[{time.time():.2f}] 任务 {name} 开始") await asyncio.sleep(duration) print(f"[{time.time():.2f}] 任务 {name} 完成") return f"任务 {name} 的结果" async def main(): start = time.time() # 创建多个 Task,它们会并发执行 task1 = asyncio.create_task(task("A", 2)) task2 = asyncio.create_task(task("B", 1)) task3 = asyncio.create_task(task("C", 3)) # 等待所有任务完成 results = await asyncio.gather(task1, task2, task3) end = time.time() print(f"\n所有任务完成,耗时: {end - start:.2f} 秒") print(f"结果: {results}") asyncio.run(main())[1774764975.97] 任务 A 开始 [1774764975.97] 任务 B 开始 [1774764975.97] 任务 C 开始 [1774764976.98] 任务 B 完成 [1774764977.97] 任务 A 完成 [1774764978.98] 任务 C 完成 所有任务完成,耗时: 3.01 秒 结果: ['任务 A 的结果', '任务 B 的结果', '任务 C 的结果']
await vs create_task¶
import asyncio
async def slow_task(name: str):
print(f"{name} 开始")
await asyncio.sleep(2)
print(f"{name} 完成")
return f"{name} 的结果"
# ========== 方式 1:顺序执行(使用 await)==========
async def sequential():
print("=== 顺序执行 ===")
result1 = await slow_task("任务1") # 等待完成
result2 = await slow_task("任务2") # 等待完成
result3 = await slow_task("任务3") # 等待完成
# 总耗时:约 6 秒
return [result1, result2, result3]
# ========== 方式 2:并发执行(使用 create_task)==========
async def concurrent():
print("=== 并发执行 ===")
task1 = asyncio.create_task(slow_task("任务1")) # 立即开始
task2 = asyncio.create_task(slow_task("任务2")) # 立即开始
task3 = asyncio.create_task(slow_task("任务3")) # 立即开始
# 等待所有任务完成
result1 = await task1
result2 = await task2
result3 = await task3
# 总耗时:约 2 秒(并发执行)
return [result1, result2, result3]
# ========== 方式 3:使用 gather(推荐)==========
async def using_gather():
print("=== 使用 gather ===")
tasks = [
asyncio.create_task(slow_task("任务1")),
asyncio.create_task(slow_task("任务2")),
asyncio.create_task(slow_task("任务3"))
]
results = await asyncio.gather(*tasks)
# 总耗时:约 2 秒
return results
create_task vs asyncio.gather¶
两者都可以实现并发,但使用场景不同:
import asyncio
async def fetch_data(url: str):
await asyncio.sleep(1) # 模拟网络请求
return f"数据来自 {url}"
# ========== 使用 create_task:需要单独控制每个任务 ==========
async def with_create_task():
task1 = asyncio.create_task(fetch_data("url1"))
task2 = asyncio.create_task(fetch_data("url2"))
# 可以单独等待某个任务
result1 = await task1
print(f"第一个结果: {result1}")
# 可以取消任务
task2.cancel()
try:
result2 = await task2
except asyncio.CancelledError:
print("任务2被取消")
# 可以检查任务状态
print(f"任务1完成: {task1.done()}")
print(f"任务2完成: {task2.done()}")
# ========== 使用 gather:统一处理所有任务 ==========
async def with_gather():
# 更简洁,统一处理结果
results = await asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3")
)
# results 是一个列表,包含所有结果
return results
# ========== gather 的错误处理 ==========
async def with_gather_error_handling():
async def fetch_with_error(url: str):
if url == "error":
raise ValueError("模拟错误")
await asyncio.sleep(1)
return f"数据来自 {url}"
# return_exceptions=True:不抛出异常,返回异常对象
results = await asyncio.gather(
fetch_with_error("url1"),
fetch_with_error("error"),
fetch_with_error("url3"),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(f"任务 {i} 成功: {result}")
* 使用 create_task:需要单独控制任务(取消、检查状态、单独等待)
* 使用 gather:统一处理多个任务,代码更简洁(推荐)
create_task 的实际应用场景¶
场景 1:后台任务(Fire -and-Forget)
import asyncio
async def send_notification(user_id: int, message: str):
"""发送通知(不需要等待完成)"""
await asyncio.sleep(1) # 模拟发送
print(f"通知已发送给用户 {user_id}: {message}")
async def process_order(order_id: int):
"""处理订单"""
print(f"开始处理订单 {order_id}")
await asyncio.sleep(2) # 模拟处理
print(f"订单 {order_id} 处理完成")
# 发送通知(后台任务,不阻塞主流程)
asyncio.create_task(send_notification(order_id, "订单处理完成"))
# 注意:不 await,立即返回
return f"订单 {order_id} 已处理"
async def main():
order = await process_order(123)
print(f"主流程完成: {order}")
# 等待一下,让后台任务完成
await asyncio.sleep(2)
asyncio.run(main())
# 输出:
# 开始处理订单 123
# 订单 123 处理完成
# 主流程完成: 订单 123 已处理
# 通知已发送给用户 123: 订单处理完成
import asyncio
async def fetch_with_timeout(url: str, timeout: float):
"""带超时的请求"""
task = asyncio.create_task(fetch_data(url))
try:
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
task.cancel() # 取消任务
return f"请求 {url} 超时"
async def fetch_data(url: str):
await asyncio.sleep(5) # 模拟慢请求
return f"数据来自 {url}"
async def main():
result = await fetch_with_timeout("http://example.com", timeout=2.0)
print(result) # 请求 http://example.com 超时
asyncio.run(main())
import asyncio
async def long_running_task(name: str, duration: int):
try:
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"任务 {name} 的结果"
except asyncio.CancelledError:
print(f"任务 {name} 被取消")
raise
async def main():
# 创建任务
task1 = asyncio.create_task(long_running_task("A", 5))
task2 = asyncio.create_task(long_running_task("B", 3))
# 等待 2 秒
await asyncio.sleep(2)
# 检查任务状态
print(f"任务A完成: {task1.done()}")
print(f"任务B完成: {task2.done()}")
# 取消任务A
task1.cancel()
# 等待任务完成(或取消)
try:
result1 = await task1
except asyncio.CancelledError:
print("任务A已被取消")
result2 = await task2
print(f"任务B结果: {result2}")
asyncio.run(main())
* 立即调度:createtask() 创建 Task 后立即开始执行 * 并发执行:多个 Task 可以并发运行 * 任务控制:可以取消、检查状态、获取结果 * 与 await 的区别:await 会等待完成,createtask() 立即返回 * 与 gather 的关系:gather() 内部使用 create_task(),但更简洁 * 适用场景:后台任务、超时控制、任务管理
异步实际应用场景¶
场景 1:Web API 请求处理
from fastapi import FastAPI
import asyncio
app = FastAPI()
# ❌ 同步方式:阻塞服务器
@app.get("/sync")
def sync_endpoint():
# 模拟数据库查询
time.sleep(1) # 阻塞!其他请求无法处理
return {"status": "ok"}
# ✅ 异步方式:非阻塞
@app.get("/async")
async def async_endpoint():
# 模拟异步数据库查询
await asyncio.sleep(1) # 不阻塞,其他请求可以处理
return {"status": "ok"}
同步方式:服务器每秒只能处理 1 个请求
异步方式:服务器可以并发处理多个请求
场景 2:批量文件处理
import aiofiles
import asyncio
from pathlib import Path
# ❌ 同步方式
def sync_process_files(file_paths):
results = []
for path in file_paths:
with open(path, 'r') as f:
content = f.read() # 阻塞
results.append(process(content))
return results
# ✅ 异步方式
async def async_process_files(file_paths):
async def process_file(path):
async with aiofiles.open(path, 'r') as f:
content = await f.read() # 非阻塞
return process(content)
tasks = [process_file(path) for path in file_paths]
results = await asyncio.gather(*tasks) # 并发处理
return results
import asyncpg
import asyncio
# ✅ 异步数据库操作
async def fetch_users():
conn = await asyncpg.connect('postgresql://...')
try:
users = await conn.fetch('SELECT * FROM users')
return users
finally:
await conn.close()
# 并发查询多个表
async def fetch_all_data():
users, posts, comments = await asyncio.gather(
fetch_users(),
fetch_posts(),
fetch_comments()
)
return users, posts, comments
import asyncio
async def background_task(task_id: str):
"""后台任务:不需要等待完成"""
print(f"任务 {task_id} 开始")
await asyncio.sleep(5) # 模拟耗时操作
print(f"任务 {task_id} 完成")
async def critical_operation():
"""关键操作:需要等待完成"""
print("关键操作开始")
await asyncio.sleep(1)
print("关键操作完成")
return "结果"
async def main():
# 关键操作:必须等待
result = await critical_operation()
print(f"得到结果: {result}")
# 后台任务:使用 create_task,不需要等待,立即返回
task1 = asyncio.create_task(background_task("task-1"))
task2 = asyncio.create_task(background_task("task-2"))
print("主流程继续执行...")
# 后台任务在后台运行,不影响主流程
# 如果需要,可以稍后等待任务完成
await asyncio.sleep(6) # 等待后台任务完成
print("所有任务完成")
asyncio.run(main())
常见误区¶
误区 1: 认为 await 就是同步阻塞,认为await someasyncfunction() 会阻塞整个程序
# await 是异步等待,会暂停当前协程,但允许其他协程运行
async def task1():
await asyncio.sleep(2) # 暂停 task1
async def task2():
await asyncio.sleep(1) # 暂停 task2
# 两个任务并发执行,总耗时约 2 秒(不是 3 秒)
await asyncio.gather(task1(), task2())
错误理解:
async def main():
# 认为这样会自动并发
await func1()
await func2()
await func3()
# ❌ 顺序执行(不是并发)
async def main():
await func1() # 等待完成
await func2() # 等待完成
await func3() # 等待完成
# 总耗时 = func1 + func2 + func3
# ✅ 并发执行方式 1:使用 create_task
async def main():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
task3 = asyncio.create_task(func3())
await asyncio.gather(task1, task2, task3)
# 总耗时 = max(func1, func2, func3)
# ✅ 并发执行方式 2:使用 gather(推荐)
async def main():
await asyncio.gather(func1(), func2(), func3())
# 总耗时 = max(func1, func2, func3)
# 认为所有函数都应该改成 async
async def add(a, b):
return a + b # CPU 密集型操作,不需要 async
# CPU 密集型:使用同步函数
def add(a, b):
return a + b
# I/O 密集型:使用异步函数
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
# ❌ 错误:创建任务后不等待,主函数可能提前结束
asyncio.create_task(long_running_task())
return "完成" # 任务可能还没完成就返回了
async def main():
# ✅ 正确方式 1:等待任务完成
task = asyncio.create_task(long_running_task())
result = await task
return result
# ✅ 正确方式 2:使用 gather
await asyncio.gather(long_running_task())
return "完成"
# ✅ 正确方式 3:后台任务(明确不需要等待)
task = asyncio.create_task(background_task())
# 确保事件循环运行足够长时间
await asyncio.sleep(10) # 等待后台任务完成
return "完成"
企业级应用案例¶
异步操作redis¶
在使用python代码操作redis时,链接/操作/断开都是网络IO
pip install aioredis
示例:
import asyncio
import aioredis
async def execute(address,password):
print("开始执行",address)
#网络IO操作:先去连接 47.93.4.197:6379,遇到I0则自动切换任务,去连接47.93.4.198:6379
redis= await aioredis.create_redis_pool(address, password=password)
#网络IO操作:遇到IO会自动切换任务
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作:遇到IO会自动切换任务
result=await redis.hgetall('car',encoding='utf-8')
print(result)
redis.close()
#网络IO操作:遇到IO会自动切换任务
await redis.wait_closed()
print("结束",address)
task_1ist=[
execute('redis://47.93.4.197:6379',"root!2345"),
execute('redis://47.93.4.193:6379', "root!2345")]
asyncio.run(asyncio.wait(task_list))
异步操作MySQL¶
pip install aiomysql
import asyncio
import aioredis
async def execute(host,password):
print("开始执行",host)
#网络IO操作:先去连接 47.93.4.197:6379,遇到I0则自动切换任务,去连接47.93.4.198:6379
conn = await aiomysql.connect(host=host, port=3306,user='root',password=password,db='mysql')
#网络IO操作:遇到IO会自动切换任务
cur = await conn.cursor() # 创建游标对象
# 网络IO操作:遇到IO会自动切换任务
await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:遇到IO会自动切换任务
result=await cur.fetchall() # 获取所有查询结果
print(result)
#网络IO操作:遇到IO会自动切换任务
await cur.close() # 关闭游标和连接
conn.close()
print("结束",host)
task_1ist=[
execute('redis://47.93.4.197:6379',"root!2345"),
execute('redis://47.93.4.193:6379', "root!2345")]
asyncio.run(asyncio.wait(task_list))
FastAPI框架异步¶
FastAPI 是一个基于 ASGI(Asynchronous Server Gateway Interface)的现代化 Web 框架,专注于高性能和异步编程。ASGI 是 WSGI 的升级版本,支持异步处理,能够更好地应对高并发场景,例如 WebSocket 和长连接。
先认识一下uvloop,它是asyncio的事件循环的替代方案,是第三方框架,而默认asyncio是python自带的。事件循环效率>默认asyncio的事件循环。FastAPI也是因为内部使用了uvloop所以快,效率高。
pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
#编写asyncio的代码,与之前写的代码一致
#内部的事件循环会自动变为uvloop
asyncio.run(...)
pip install fastapi
pip install uvicorn (asgi内部基于uvloop)
# 脚本名称luffy.py
#!/usr/bin/env python
-*-coding:utf-8-*-
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app=FastAPI()
#创建一个redis连接池
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123",minsize=1,maxsize=10)
@app.get("/")
def index():
"""普通操作接口"""
return {"message": "Hello world"}
@app.get("/red")
async def red():
"""异步操作接口"""
print("请求来了")
await asyncio.sleep(3)
#连接池获取一个连接
conn=await REDIS_POOL.acquire()
redis=Redis(conn)
#设置值
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
#读取值
result=await redis.hgetall('car', encoding='utf-8')
print(result)
#连接归还连接池
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
uvicorn.run("luffy:app",host="127.0.0.1",port=5000,log_level="info")
总结¶
何时使用异步?
✅ 适合异步的场景:
- 网络请求(HTTP API、WebSocket)
- 文件 I/O(读写文件)
- 数据库操作
- 等待外部服务响应
- 处理大量并发连接
❌ 不适合异步的场景:
- CPU 密集型计算(图像处理、数值计算)
- 简单的同步操作(不需要并发)
- 已经优化的同步代码(不要为了异步而异步)
核心概念回顾:
1、同步函数:顺序执行,阻塞等待
简单直观
适合 CPU 密集型任务
不适合大量 I/O 操作
2、异步函数(async):定义协程函数
返回协程对象
必须用 await 或事件循环执行
适合 I/O 密集型任务
3、await 关键字:异步等待
暂停当前协程
让出控制权给事件循环
不阻塞整个线程
允许其他协程并发执行
4、asyncio.create_task():创建并调度任务
将协程包装成 Task 对象
立即开始执行(不等待完成)
实现并发执行多个任务
可以取消、检查状态、获取结果
关键区别总结
特性 |同步函数 |异步函数 + await| create_task
---|---|---|---
执行方式 |顺序执行| 顺序执行(单个协程)| 并发执行(多个任务)
阻塞性 |阻塞线程 |暂停协程,不阻塞线程 |不阻塞,立即返回
适用场景| CPU 密集型| I/O 密集型(单个)| I/O 密集型(多个并发)
并发能力 |无 |无(单个) |高(多个)
代码复杂度| 低 |中 |中高
返回类型| 直接返回值| 协程对象| Task 对象
记忆要点
* async def = 定义异步函数,返回协程对象
* await = 异步等待,暂停协程但不阻塞线程
* 异步函数必须用 await,否则不会执行
* await 不等于同步阻塞,它允许并发执行
* I/O 操作用异步,CPU 计算用同步
* createtask() = 创建任务并立即调度,实现并发执行
* 多个 await 顺序执行,多个 createtask 并发执行
* gather() 内部使用 create_task(),但更简洁
参考链接:
https://blog.csdn.net/liulanba/article/details/156242877