Asyncio.gather: примеры (PYTHON)
asyncio.gather(*aws: awaitable objects, return_exceptions: bool=False): list of resultsФункция asyncio.gather предоставляет возможность параллельного выполнения нескольких асинхронных задач. Ее основное назначение заключается в запуске нескольких корутин или объектов, допускающих ожидание, с последующим сбором результатов.
Аргументы и возвращаемое значение
Сигнатура функции: asyncio.gather(*aws, return_exceptions=False).
- *aws (позиционные аргументы): Один или несколько объектов, допускающих ожидание (корутины, задачи, объекты Future).
- return_exceptions (флаг, по умолчанию
False):- Если установлено в
False, первое возникшее исключение немедленно распространяется на задачу, ожидающуюgather, а остальные задачи продолжают выполняться (но их результаты будут проигнорированы). - Если установлено в
True, исключения обрабатываются как успешные результаты и включаются в возвращаемый список.
- Если установлено в
Функция возвращает список результатов в порядке, соответствующем порядку переданных объектов. Если флаг return_exceptions равен True, в списке могут присутствовать объекты исключений.
Примеры использования
Пример с обычным выполнением трех асинхронных функций:
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f'{name} выполнена'
async def main():
results = await asyncio.gather(
task('Задача 1', 2),
task('Задача 2', 1),
task('Задача 3', 3)
)
print(results)
asyncio.run(main())['Задача 1 выполнена', 'Задача 2 выполнена', 'Задача 3 выполнена']
Пример с флагом return_exceptions=True:
import asyncio
async def successful_task():
await asyncio.sleep(1)
return 'Успех'
async def failing_task():
raise ValueError('Ошибка в задаче')
async def main():
results = await asyncio.gather(
successful_task(),
failing_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f'Задача {i}: исключение - {result}')
else:
print(f'Задача {i}: {result}')
asyncio.run(main())Задача 0: Успех Задача 1: исключение - Ошибка в задаче
Похожие функции в Python
asyncio.wait: Позволяет ожидать завершения задач с возможностью установки условий (например, FIRST_COMPLETED). Возвращает два множества: завершенные и незавершенные задачи. Удобна для сценариев, где требуется реагировать на завершение задач по мере их выполнения.
asyncio.as_completed: Возвращает итератор по корутинам, который выдает задачи по мере их завершения. Подходит для обработки результатов в порядке их готовности, а не в порядке запуска.
asyncio.create_task с последующим сбором результатов: Создает задачи и ожидает их завершения индивидуально, обеспечивая более тонкий контроль.
Функция gather предпочтительнее, когда необходимо запустить фиксированный набор задач и дождаться всех результатов одновременно.
Аналоги в других языках
JavaScript (Promise.all): Аналогично собирает результаты нескольких промисов.
const promises = [
Promise.resolve('Результат 1'),
Promise.reject(new Error('Ошибка')),
Promise.resolve('Результат 3')
];
Promise.all(promises)
.then(results => console.log(results))
.catch(error => console.error(error));Error: Ошибка
Golang (WaitGroup из пакета sync): Используется для ожидания завершения группы горутин.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
fmt.Printf("Воркер %d завершен\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("Все воркеры завершены")
}Воркер 3 завершен Воркер 1 завершен Воркер 2 завершен Все воркеры завершены
Основное отличие асинхронной модели Python от других языков заключается в использовании корутин и цикла событий, что обеспечивает детерминированный порядок завершения при использовании gather.
Распространенные ошибки
Передача обычных функций вместо асинхронных:
import asyncio
async def main():
# Ошибка: функция не является корутиной
await asyncio.gather(time.sleep(1))
asyncio.run(main())TypeError: object NoneType can't be used in 'await' expression
Игнорирование исключений при return_exceptions=False:
import asyncio
async def failing():
raise RuntimeError('Сбой')
async def main():
try:
await asyncio.gather(failing())
except RuntimeError as e:
print(f'Поймано исключение: {e}')
asyncio.run(main())Поймано исключение: Сбой
Неверный порядок результатов при передаче задач в виде словаря:
import asyncio
async def task(num):
return num
async def main():
tasks = {i: task(i) for i in range(3)}
# Ошибка: передача значений словаря, порядок не гарантирован в старых версиях Python
results = await asyncio.gather(*tasks.values())
print(results)
asyncio.run(main())[0, 1, 2]
Изменения в последних версиях
В Python 3.11 функция не претерпела значительных изменений. Однако в Python 3.10 была улучшена обработка отмены задач: при отмене gather отменяются все незавершенные задачи. В Python 3.7 появилась поддержка конкурентного выполнения задач с помощью asyncio.run, что упростило запуск асинхронного кода.
Расширенные примеры
Использование gather с таймаутом для всей группы задач:
import asyncio
async def long_task():
await asyncio.sleep(5)
return 'Долгая задача завершена'
async def main():
try:
results = await asyncio.wait_for(
asyncio.gather(long_task(), long_task()),
timeout=3
)
print(results)
except asyncio.TimeoutError:
print('Таймаут группы задач')
asyncio.run(main())Таймаут группы задач
Комбинирование gather с другими асинхронными примитивами:
import asyncio
async def process_data(data, semaphore):
async with semaphore:
await asyncio.sleep(1)
return data * 2
async def main():
semaphore = asyncio.Semaphore(2) # Ограничение параллелизма
tasks = [process_data(i, semaphore) for i in range(5)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())[0, 2, 4, 6, 8]
Использование для параллельных HTTP-запросов:
import asyncio
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()[:100]
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]
results = await asyncio.gather(*(fetch(url) for url in urls))
for url, content in zip(urls, results):
print(f'{url}: {content[:50]}...')
asyncio.run(main())