Asyncio.gather: примеры (PYTHON)

Параллельное выполнение задач с asyncio.gather
Раздел: Асинхронное программирование, Управление задачами
asyncio.gather(*aws: awaitable objects, return_exceptions: bool=False): list of results

Функция asyncio.gather предоставляет возможность параллельного выполнения нескольких асинхронных задач. Ее основное назначение заключается в запуске нескольких корутин или объектов, допускающих ожидание, с последующим сбором результатов.

Аргументы и возвращаемое значение

Сигнатура функции: asyncio.gather(*aws, return_exceptions=False).

  • *aws (позиционные аргументы): Один или несколько объектов, допускающих ожидание (корутины, задачи, объекты Future).
  • return_exceptions (флаг, по умолчанию False):
    • Если установлено в False, первое возникшее исключение немедленно распространяется на задачу, ожидающую gather, а остальные задачи продолжают выполняться (но их результаты будут проигнорированы).
    • Если установлено в True, исключения обрабатываются как успешные результаты и включаются в возвращаемый список.

Функция возвращает список результатов в порядке, соответствующем порядку переданных объектов. Если флаг return_exceptions равен True, в списке могут присутствовать объекты исключений.

Примеры использования

Пример с обычным выполнением трех асинхронных функций:

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    return f'{name} выполнена'

async def main():
    results = await asyncio.gather(
        task('Задача 1', 2),
        task('Задача 2', 1),
        task('Задача 3', 3)
    )
    print(results)

asyncio.run(main())
['Задача 1 выполнена', 'Задача 2 выполнена', 'Задача 3 выполнена']

Пример с флагом return_exceptions=True:

import asyncio

async def successful_task():
    await asyncio.sleep(1)
    return 'Успех'

async def failing_task():
    raise ValueError('Ошибка в задаче')

async def main():
    results = await asyncio.gather(
        successful_task(),
        failing_task(),
        return_exceptions=True
    )
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f'Задача {i}: исключение - {result}')
        else:
            print(f'Задача {i}: {result}')

asyncio.run(main())
Задача 0: Успех
Задача 1: исключение - Ошибка в задаче

Похожие функции в Python

asyncio.wait: Позволяет ожидать завершения задач с возможностью установки условий (например, FIRST_COMPLETED). Возвращает два множества: завершенные и незавершенные задачи. Удобна для сценариев, где требуется реагировать на завершение задач по мере их выполнения.

asyncio.as_completed: Возвращает итератор по корутинам, который выдает задачи по мере их завершения. Подходит для обработки результатов в порядке их готовности, а не в порядке запуска.

asyncio.create_task с последующим сбором результатов: Создает задачи и ожидает их завершения индивидуально, обеспечивая более тонкий контроль.

Функция gather предпочтительнее, когда необходимо запустить фиксированный набор задач и дождаться всех результатов одновременно.

Аналоги в других языках

JavaScript (Promise.all): Аналогично собирает результаты нескольких промисов.

const promises = [
  Promise.resolve('Результат 1'),
  Promise.reject(new Error('Ошибка')),
  Promise.resolve('Результат 3')
];

Promise.all(promises)
  .then(results => console.log(results))
  .catch(error => console.error(error));
Error: Ошибка

Golang (WaitGroup из пакета sync): Используется для ожидания завершения группы горутин.

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    time.Sleep(time.Second)
    fmt.Printf("Воркер %d завершен\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    wg.Wait()
    fmt.Println("Все воркеры завершены")
}
Воркер 3 завершен
Воркер 1 завершен
Воркер 2 завершен
Все воркеры завершены

Основное отличие асинхронной модели Python от других языков заключается в использовании корутин и цикла событий, что обеспечивает детерминированный порядок завершения при использовании gather.

Распространенные ошибки

Передача обычных функций вместо асинхронных:

import asyncio

async def main():
    # Ошибка: функция не является корутиной
    await asyncio.gather(time.sleep(1))

asyncio.run(main())
TypeError: object NoneType can't be used in 'await' expression

Игнорирование исключений при return_exceptions=False:

import asyncio

async def failing():
    raise RuntimeError('Сбой')

async def main():
    try:
        await asyncio.gather(failing())
    except RuntimeError as e:
        print(f'Поймано исключение: {e}')

asyncio.run(main())
Поймано исключение: Сбой

Неверный порядок результатов при передаче задач в виде словаря:

import asyncio

async def task(num):
    return num

async def main():
    tasks = {i: task(i) for i in range(3)}
    # Ошибка: передача значений словаря, порядок не гарантирован в старых версиях Python
    results = await asyncio.gather(*tasks.values())
    print(results)

asyncio.run(main())
[0, 1, 2]

Изменения в последних версиях

В Python 3.11 функция не претерпела значительных изменений. Однако в Python 3.10 была улучшена обработка отмены задач: при отмене gather отменяются все незавершенные задачи. В Python 3.7 появилась поддержка конкурентного выполнения задач с помощью asyncio.run, что упростило запуск асинхронного кода.

Расширенные примеры

Использование gather с таймаутом для всей группы задач:

Пример python
import asyncio

async def long_task():
    await asyncio.sleep(5)
    return 'Долгая задача завершена'

async def main():
    try:
        results = await asyncio.wait_for(
            asyncio.gather(long_task(), long_task()),
            timeout=3
        )
        print(results)
    except asyncio.TimeoutError:
        print('Таймаут группы задач')

asyncio.run(main())
Таймаут группы задач

Комбинирование gather с другими асинхронными примитивами:

Пример python
import asyncio

async def process_data(data, semaphore):
    async with semaphore:
        await asyncio.sleep(1)
        return data * 2

async def main():
    semaphore = asyncio.Semaphore(2)  # Ограничение параллелизма
    tasks = [process_data(i, semaphore) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())
[0, 2, 4, 6, 8]

Использование для параллельных HTTP-запросов:

Пример python
import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()[:100]

async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/1'
    ]
    results = await asyncio.gather(*(fetch(url) for url in urls))
    for url, content in zip(urls, results):
        print(f'{url}: {content[:50]}...')

asyncio.run(main())

питон asyncio.gather function comments

En
Asyncio.gather Run awaitable objects in the sequence concurrently