Пример кода для запуска одной асинхронной асинхронной задачи выглядит следующим образом:
from __future__ import annotations
import asyncio
import datetime
import uuid
from typing import Dict
async def my_async_func() -> Dict[str, str]:
await asyncio.sleep(1) # Используется по умолчанию asyncio.sleep как трудоемкая задача
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
if __name__ == '__main__':
# использовать asyncio.run Запустить одну сопрограмму
# Прибытие в это время my_async_func() это Coroutine объект
print(asyncio.run(my_async_func()))
После запуска вы можете получить следующие результаты печати:
{
'uuid': 'ee99da4393714a73baaddc4e3bb31f89',
'time': '2024-11-23T12:21:48.380767',
'coroutine_id': 5021212608
}
Приведенный выше пример кода является общим asyncio.run
Библиотечная функция для запуска Асинхронного задачифункция,Универсальныйawait
ключевое слово получить асинхронныйфункция Результат после запуска。
from __future__ import annotations
import asyncio
import datetime
import uuid
from typing import Dict
async def my_async_func() -> Dict[str, str]:
await asyncio.sleep(1) # Используется по умолчанию asyncio.sleep как трудоемкая задача
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
# существовать my_async_main середина Запустить одну сопрограмму
ret = await my_async_func()
print(ret)
print('end')
if __name__ == '__main__':
# использовать asyncio.run Запуск функции записи сопрограммы my_async_main()
# существовать my_async_основная функциясередина Запустить одну сопрограмму:await my_async_func()
print(asyncio.run(my_async_main()))
Приведенный выше интерфейс кода можно резюмировать следующим образом:
from __future__ import annotations
import asyncio
import datetime
import json
import uuid
from typing import Dict
async def my_async_func() -> Dict[str, str]:
await asyncio.sleep(2) # Используется по умолчанию asyncio.sleep как трудоемкая задача
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
# использовать asyncio.gather Планирование запуска нескольких сопрограмм
ret = await asyncio.gather(my_async_func(), my_async_func(), my_async_func())
print(json.dumps(ret, ensure_ascii = False, indent = 4))
print('end')
if __name__ == '__main__':
# использовать asyncio.run Запуск функции записи сопрограммы my_async_main()
print(asyncio.run(my_async_main()))
Эффект от бега:
start
[
{
"uuid": "4630638d03594f209387a49e7e5926eb",
"time": "2024-11-23T13:45:43.743456",
"coroutine_id": 4860356992
},
{
"uuid": "04ba280c906a421d994f2871344fe224",
"time": "2024-11-23T13:45:43.743578",
"coroutine_id": 4860352960
},
{
"uuid": "a73788bfc4d2410091fa97dfa997b90d",
"time": "2024-11-23T13:45:43.743625",
"coroutine_id": 4860355072
}
]
end
None
Стоит отметить, что в asyncio.gather также можно передать один или несколько объектов Task:
async def my_async_main():
print('start')
# использовать create_task Воля Coroutine объект Преобразовать в Task объект
# использовать asyncio.gather Запланируйте запуск нескольких задач
ret = await asyncio.gather(
asyncio.create_task(my_async_func()),
asyncio.create_task(my_async_func()),
asyncio.create_task(my_async_func())
)
print(json.dumps(ret, ensure_ascii = False, indent = 4))
print('end')
from __future__ import annotations
import asyncio
import datetime
import uuid
from asyncio import FIRST_COMPLETED
from typing import Dict
async def my_async_func(delta: int) -> Dict[str, str]:
await asyncio.sleep(delta) # Используется по умолчанию asyncio.sleep как трудоемкая задача
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
# использовать create_task Воля Coroutine объект Преобразовать в Task объект
# Волямножественный Task объект передается как список asyncio.wait
# использовать asyncio.wait Запланируйте запуск нескольких задач
# в соответствии снастраиватьиз return_when Различные параметры, возврат done и pending Количество собранных объектов также отличается.
task_list = [
asyncio.create_task(my_async_func(1)),
asyncio.create_task(my_async_func(2)),
asyncio.create_task(my_async_func(3))
]
done, pending = await asyncio.wait(task_list, return_when = FIRST_COMPLETED)
for d in done:
print(type(d))
print('DONE:', d)
for p in pending:
print(type(p))
print('PENDING:', p)
print(await p)
print('end')
if __name__ == '__main__':
# использовать asyncio.run Запуск функции записи сопрограммы my_async_main()
print(asyncio.run(my_async_main()))
Эффект от бега:
start
<class '_asyncio.Task'>
DONE: <Task finished name='Task-2' coro=<my_async_func() done, defined at /Users/cyx/Documents/BoardStudy/Python3/AsyncIO/demo_test/multi_3.py:14> result={'coroutine_id': 5095254400, 'time': '2024-11-23T13:47:59.710775', 'uuid': '8f91bc728b5e...ac7938c5ba36e'}>
<class '_asyncio.Task'>
PENDING: <Task pending name='Task-4' coro=<my_async_func() running at /Users/cyx/Documents/BoardStudy/Python3/AsyncIO/demo_test/multi_3.py:15> wait_for=<Future pending cb=[Task.task_wakeup()]>>
{'uuid': '94e74a0052964923b079f7469fd7976e', 'time': '2024-11-23T13:48:01.710907', 'coroutine_id': 5095252480}
<class '_asyncio.Task'>
PENDING: <Task finished name='Task-3' coro=<my_async_func() done, defined at /Users/cyx/Documents/BoardStudy/Python3/AsyncIO/demo_test/multi_3.py:14> result={'coroutine_id': 5095250368, 'time': '2024-11-23T13:48:00.710883', 'uuid': '68cbf4c14538...a521f0b9513cd'}>
{'uuid': '68cbf4c145384796bf2a521f0b9513cd', 'time': '2024-11-23T13:48:00.710883', 'coroutine_id': 5095250368}
end
None
Следует отметить, что вы не можете забыть передать объект Coroutine в asyncio.wait:
async def my_async_main():
print('start')
# еслииспользовать Coroutine список объектов, переданный asyncio.wait, будет сообщено об ошибке:
# TypeError: Passing coroutines is forbidden, use tasks explicitly.
task_list = [
my_async_func(1),
my_async_func(2),
my_async_func(3)
]
done, pending = await asyncio.wait(task_list, return_when = FIRST_COMPLETED)
for d in done:
print('DONE:', d)
for p in pending:
print('PENDING:', p)
print('end')
В противном случае будет сообщено об ошибке:
from __future__ import annotations
import asyncio
import datetime
import json
import uuid
from typing import Dict
async def my_async_func(delta: int) -> Dict[str, str]:
await asyncio.sleep(delta) # Используется по умолчанию asyncio.sleep как трудоемкая задача
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
try:
# использовать wait_for Запланируйте сингл Coroutine объект, тайм-аут настройки
ret = await asyncio.wait_for(my_async_func(1), timeout = 10)
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
if __name__ == '__main__':
# использовать asyncio.run Запуск функции записи сопрограммы my_async_main()
asyncio.run(my_async_main())
Эффект от операции:
start
{
"uuid": "5f587726d866427783936937972267dc",
"time": "2024-11-23T13:51:39.985166",
"coroutine_id": 5370404800
}
end
Объекты задач также можно передавать в asyncio.wait_for:
async def my_async_main():
print('start')
try:
# использовать wait_for Запланируйте сингл Task объект, тайм-аут настройки
ret = await asyncio.wait_for(asyncio.create_task(my_async_func(1)), timeout = 10)
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
from __future__ import annotations
import asyncio
import datetime
import json
import uuid
from typing import Dict
async def my_async_func(delta: int) -> Dict[str, str]:
await asyncio.sleep(delta) # Используется по умолчанию asyncio.sleep как трудоемкая задача
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
try:
# использовать asyncio.as_completed Планирование Coroutine список объектов
# asyncio.as_completed сгенерирует итератор,
# Итератор возвращает один каждый раз Coroutine объект
# каждый раз, когда нужно await Coroutine объект Получает результат своего выполнения
# asyncio.as_completed Позволяет выполнять несколько программ одновременно задача и существование каждая задача завершается и ее результаты обрабатываются немедленно
# Это отличается от ожидания завершения всех задач перед одновременной обработкой результатов (например, использовать asyncio.gather) по сравнению с
# Может быстрее реагировать на выполнение каждой задачи
coroutines = [my_async_func(1), my_async_func(2), my_async_func(3)]
coroutines_iter = asyncio.as_completed(coroutines, timeout = 4)
for i in coroutines_iter:
print(type(i))
ret = await i # Итератор возвращает Coroutine объект
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
if __name__ == '__main__':
# использовать asyncio.run Запуск функции записи сопрограммы my_async_main()
asyncio.run(my_async_main())
Эффект от бега:
start
<class 'coroutine'>
{
"uuid": "d0fdd0d1caba44148a0a531d58811baa",
"time": "2024-11-23T13:53:32.706889",
"coroutine_id": 6047213952
}
<class 'coroutine'>
{
"uuid": "30389fbaa6ae40d7932d5f344d6e0389",
"time": "2024-11-23T13:53:33.708025",
"coroutine_id": 6047209920
}
<class 'coroutine'>
{
"uuid": "856fc8020d874f2a98fadcd26a252762",
"time": "2024-11-23T13:53:34.707807",
"coroutine_id": 6047212032
}
end
Объекты задач также можно передавать в asyncio.as_completed:
async def my_async_main():
print('start')
try:
# использовать asyncio.as_completed Планирование Task список объектов
# asyncio.as_completed сгенерирует итератор,
# Итератор возвращает один каждый раз Task объект
# каждый раз, когда нужно await Task объект Получает результат своего выполнения
tasks = [
asyncio.create_task(my_async_func(1)),
asyncio.create_task(my_async_func(2)),
asyncio.create_task(my_async_func(3))]
coroutines_iter = asyncio.as_completed(tasks, timeout = 4)
for i in coroutines_iter:
print(type(i))
ret = await i # Итератор возвращает Task объект
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
from __future__ import annotations
import asyncio
async def my_coroutine():
print('my_coroutine begin')
await asyncio.sleep(2)
print('my_coroutine end')
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
task = loop.create_task(my_coroutine())
await task
print('my_async_func end')
if __name__ == '__main__':
asyncio.run(my_async_func())
Эффект от операции:
my_async_func begin
my_coroutine begin
my_coroutine end
my_async_func end
Объект Future существует как заполнитель для хранения результатов асинхронных операций.
Когда запускается асинхронная операция, ее результаты могут быть еще недоступны. Именно здесь вступает в действие объект Future, который будет заполнен результатами операции в будущем.
from __future__ import annotations
import asyncio
async def my_coroutine():
print('my_coroutine begin')
await asyncio.sleep(2)
print('my_coroutine end')
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
future = loop.create_future()
future.set_result('this is a future object')
ret = await future
print(ret)
print('my_async_func end')
if __name__ == '__main__':
asyncio.run(my_async_func())
Эффект операции:
my_async_func begin
this is a future object
my_async_func end
from __future__ import annotations
import asyncio
from asyncio import Future
def on_my_future_done(obj: Future):
print('my_future done begin')
print(obj.result())
print('my_future done end')
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
future = loop.create_future()
future.set_result('this is a future object')
future.add_done_callback(on_my_future_done) # синхронный интерфейс
ret = await future
print(ret)
print('my_async_func end ------')
if __name__ == '__main__':
asyncio.run(my_async_func())
Эффект от исполнения:
my_async_func begin
this is a future object
my_async_func end ------
my_future done begin
this is a future object
my_future done end
from __future__ import annotations
import asyncio
import time
from asyncio import Future
def on_my_future_done(obj: Future):
print('my_future done begin')
print(obj.result())
print('my_future done end')
async def another_async_func(obj: Future):
print('another_async_func begin')
obj.set_result('this is another future coroutine object')
print('another_async_func end')
return f'another async future coroutine finished at {time.time()}'
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
future = loop.create_future()
future.add_done_callback(on_my_future_done) # синхронный интерфейс
task = asyncio.create_task(another_async_func(future))
print(await task)
print(await future)
print('my_async_func end ------')
if __name__ == '__main__':
asyncio.run(my_async_func())
Эффект операции:
my_async_func begin
another_async_func begin
another_async_func end
my_future done begin
this is another future coroutine object
my_future done end
another async future coroutine finished at 1732341693.668807
this is another future coroutine object
my_async_func end ------
def sync_func(delta: int) -> str:
begin_time = time.time()
print('sync_func begin@', begin_time)
time.sleep(delta) # Имитация трудоемких процедур синхронизации
finished_time = time.time()
print('sync_func finished@', finished_time)
return f'sync_func begin@{begin_time} finished@{finished_time}'
async def async_func(delta: int) -> str:
begin_time = time.time()
print('async_func begin@', time.time())
await asyncio.sleep(delta) # Имитировать асинхронные трудоемкие программы
finished_time = time.time()
print('async_func finished@', finished_time)
return f'async_func begin@{begin_time} finished@{finished_time}'
async def main():
# Выполнить первым to_thread, Выполнить еще раз async_func, последовательное исполнение
print('async main begin...')
sync_ret = await asyncio.to_thread(sync_func, 3) # Блокировать главный Но отдай это cpu Доступно для других мероприятий
async_ret = await async_func(3)
print(sync_ret, async_ret)
print('async main end...')
if __name__ == '__main__':
asyncio.run(main())
Эффект операции:
async def main2():
# Параллельное выполнение sync_func и async_func
print(f'async main begin @{time.time()}')
tasks = [
asyncio.create_task(asyncio.to_thread(sync_func, 3)),
asyncio.create_task(asyncio.to_thread(sync_func, 3)),
asyncio.create_task(async_func(3)),
asyncio.create_task(async_func(3))
]
_ = await asyncio.wait(tasks, return_when = asyncio.ALL_COMPLETED)
print(f'async main end @{time.time()}')
if __name__ == '__main__':
asyncio.run(main2())
Эффект операции:
существовать main
В функции последовательно выполняются две задачи: синхронизированная функция sync_func
и асинхронная функция async_func
。здесьиз关键是использовать await asyncio.to_thread(sync_func, 3)
Приходитьсуществовать不同из线程середина运行同步функция,Это позволяет избежать блокировки цикла событий.。потому чтоиспользовать了 await
,async_func
толькосуществовать sync_func
Выполнение начинается после завершения. Это означает, что эти две функции постоянно исполнениеиз,даже если их поставятсуществовать了不同из线程или协程середина。
существовать main2
В функции одновременное выполнение используется для обработки четырех задач: двух sync_func
и два async_func
。здесь,Все задачи инкапсулированы в asyncio.Task
и запускаться одновременно. использовать asyncio.wait
дождаться завершения всех задач. Такой подход позволяет всем задачам запускаться примерно в одно и то же время и выполняться независимо, не дожидаясь завершения других задач. Это значительно улучшает производительность параллельной работы программы.
Эти два метода показывают, как выбирать подходящие стратегии планирования задач в соответствии с различными требованиями.
from __future__ import annotations
import asyncio
import random
import time
import uuid
from asyncio import ALL_COMPLETED, StreamReader, StreamWriter
async def handle_echo(reader: StreamReader, writer: StreamWriter):
addr = writer.get_extra_info('peername')
print(f"{addr} connected")
try:
while True:
data = await reader.read(1024) # Прочитайте данные, отправленные клиентом, читай каждый раз 100 байт
message = data.decode('utf-8') # Здесь предполагается, что клиент отправляет все utf-8 нить
print(f'Сервер получил запрос: {message}')
writer.write(f'{message}@{time.time()}'.encode('utf-8')) # Воля полученные данные возвращаются клиенту
await writer.drain() # Убедитесь, что данные отправлены
except Exception as e:
print(f"Error with {addr}: {e}")
finally:
print(f"{addr} disconnected")
writer.close() # тесная связь
async def tcp_server_main():
"""
client_connected_cb: Обратный вызов при установке клиентского соединения обычно получает два параметра: считыватель и writer。
host: нитилинить список,表示服务器监听из IP адрес. может быть IPv4 или IPv6 адрес.
port: Целое число, указывающее номер порта, который прослушивает сервер.
loop: Дополнительные параметры,Укажите цикл событий. Если не указано,Используется по умолчанию asyncio.get_event_loop()。
limit: Необязательный параметр, настройка ограничения размера буфера, по умолчанию 256 KB。
family: Необязательный параметр, семейство адресов настройки, например socket.AF_INET。
flags: Необязательный параметр, настройка разрешения адресов.
sock: Необязательные параметры, которые вы можете передать напрямую в одном socket объект вместо того, чтобы позволить asyncio.start_server Создайте его.
backlog: Необязательный параметр, настройки Максимальное количество соединений, которые может приостановить операционная система.
ssl: Необязательный параметр, если он указан SSL Контекст, обработка сервера Воля SSL Шифрованное соединение.
reuse_address: Дополнительные параметры,Сообщает операционной системе, следует ли разрешить повторное использованиеадрес.
reuse_port: Необязательный параметр, позволяющий нескольким программам подключаться к одному и тому же порту.
"""
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 18082) # имеется локальный порт для запуска сервера
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever() # Непрерывное обслуживание до отмены извне
async def tcp_client_main():
client_id = uuid.uuid4()
random_sleep = random.randint(1, 10)
print(f"Client {client_id} starting... random sleep: {random_sleep}s")
# Введена случайная задержка для каждого клиента, чтобы сервер мог полностью запуститься и быть готовым принимать соединения.
# Поэтапное время запуска: эта случайная задержка распределяет время, когда каждый клиент пытается подключиться к серверу, уменьшая вероятность одновременного подключения всех клиентов к серверу, тем самым снижая риск сбоя соединения.
# Дайте серверу время на подготовку. Эта задержка дает серверу дополнительное время для завершения процесса запуска и начала прослушивания порта, чтобы при попытке клиента подключиться сервер был готов принять новые соединения.
# Уменьшите конкуренцию за ресурсы:существовать多客户端同时启动из情况下,Серверу может потребоваться обработка нескольких одновременных запросов на подключение.,Это может привести к конфликту за ресурсыи Узкое место в производительности。Введя задержку,Запросы клиента на подключение разбросаны по разным моментам времени,从而减轻了服务器из负载压力
await asyncio.sleep(random_sleep)
reader, writer = await asyncio.open_connection(
'127.0.0.1', 18082) # Подключиться к серверу
print(f"Client {client_id} connected")
try:
while True:
writer.write(f'{client_id}, Период интервала: {random_sleep} с, Hello World!'.encode('utf-8')) # Отправить данные
await writer.drain() # Убедитесь, что данные отправлены
data = await reader.read(1024) # Прочитайте ответ сервера
print(f'Клиент получил ответ: {data.decode()}')
await asyncio.sleep(random_sleep)
except Exception as e:
print(f"Error: {e}")
finally:
print('server exit')
# Этот метод используется для закрытия StreamWriter базовый транспорт объекта (например. TCP соединять)
# вызов close() назад,Передача будет остановлена Отправить данные,并尽可能地完成剩余数据из发送
# Однако сам этот метод неблокирующий, т.е. возвращает сразу, не дожидаясь полного закрытия соединения.
writer.close() # тесная связь
# Этот метод является асинхронным, он ждет, пока базовый транспорт фактически не закроется.
# это значит существоватьэто await После выражения вы можете быть уверены, что соединение полностью закрыто и ресурсы освобождены.
# если仅仅вызов writer.close() не дожидаясь wait_closed()
# Тогда вполне возможно, что последующее выполнение может продолжиться без фактического полного закрытия соединения.
# Это может привести к утечке ресурсовили其他一些难以预料из问题
# writer.close() и await writer.wait_closed() 一起использовать是为了确保连接из正确и完全关闭
# 这种做法遵循了良好из异步编程实践,т.е. обеспечение надлежащего управления всеми ресурсамии释放
await writer.wait_closed()
async def my_tcp_demo(client_num: int = 1):
server_task = asyncio.create_task(tcp_server_main())
# await asyncio.sleep(1)
client_task_list = [asyncio.create_task(tcp_client_main()) for _ in range(client_num)]
done, pending = await asyncio.wait([server_task, *client_task_list],
return_when = ALL_COMPLETED)
for task in pending:
task.cancel()
for task in done:
try:
await task
except asyncio.CancelledError:
pass
if __name__ == '__main__':
asyncio.run(my_tcp_demo(3))
Эффект операции: