Реализация сложных Python проектов с продвинутыми алгоритмами и архитектурой

Раздел: Практические задачи -> Сложные проекты

Основные подходы к построению сложных программ на Python

Как реализовать высокопроизводительную параллельную загрузку данных?

Наиболее эффективное решение для задач, связанных с множественными сетевыми запросами (веб-скрапинг, API), основано на асинхронном программировании через asyncio и библиотеку aiohttp. Асинхронный подход позволяет выполнять сотни запросов одновременно без затрат на потоки и процессы, утилизируя один поток и событийный цикл.


import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["https://example.com"] * 100
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    print(f"Получено {len(results)} страниц")

asyncio.run(main())

сложные программы python (сложные программы на python)

Код создаёт сессию, формирует список корутин и запускает их параллельно через asyncio.gather. Это минимизирует время ожидания I/O. Для обработки ошибок используйте try/except внутри корутины или установите таймаут через asyncio.timeout.

Типичные проблемы асинхронного подхода:

  • Сложность отладки: стек вызовов часто «разорван» из-за переключения контекста. Используйте asyncio.run_coroutine_threadsafe осторожно.
  • Необходимость корректного управления сессией: всегда используйте async with для закрытия соединений.
  • Блокирующие вызовы внутри корутин (например, time.sleep) останавливают весь цикл событий. Заменяйте их на asyncio.sleep.
  • Большое количество одновременных запросов может привести к срабатыванию защиты сервера. Добавьте задержку через asyncio.sleep или используйте семафоры.

Когда многопоточность оправдана для сложных программ?

Если проект использует синхронные библиотеки (например, requests, BeautifulSoup), асинхронная замена невозможна или требует значительных изменений, применяется многопоточность через threading. Однако из-за GIL (Global Interpreter Lock) настоящий параллелизм достигается только для I/O задач.


import threading
import requests

urls = ["https://example.com"] * 20
results = []
lock = threading.Lock()

def download(url):
    resp = requests.get(url)
    with lock:
        results.append(resp.text)

threads = [threading.Thread(target=download, args=(url,)) for url in urls]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Скачано {len(results)} страниц")

Здесь блокировка Lock защищает общий список results от одновременной записи. Для уменьшения конкуренции следует минимизировать критические секции.

Ошибки многопоточности:

  • Гонка данных: неправильная синхронизация приводит к перемешиванию или потере данных. Всегда используйте Lock, Semaphore или Queue.
  • Deadlock: блокировка в обратном порядке при вложенных захватах. Применяйте try/finally или контекстный менеджер with lock.
  • GIL не позволяет ускорить CPU-интенсивные операции (например, обработка изображений). Для таких задач переходите к multiprocessing.

Как обойти GIL для CPU-интенсивных задач?

Многопроцессорный подход multiprocessing запускает отдельные интерпретаторы Python, каждый со своей памятью. Это позволяет добиться настоящего параллелизма на многоядерных системах.


from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(square, range(1000000))
    print(f"Обработано {len(results)} чисел")

Функция pool.map распределяет задачи по рабочим процессам. Передача данных между процессами через сериализацию (pickle) может быть дорогой, поэтому для больших объёмов данных используйте разделяемую память (multiprocessing.Array, shared_memory).

Сложности multiprocessing:

  • Затраты на создание процессов выше, чем на потоки. Не рекомендуется для лёгких задач.
  • Необходимость оборачивать основной код в if __name__ == "__main__" для избежания рекурсивного запуска.
  • Трудности отладки: исключения в дочерних процессах не видны напрямую. Используйте Queue для сбора ошибок.

Простота и предсказуемость: когда хватает синхронного подхода?

Для маленьких проектов или однократных задач (например, обработка одного файла) избыточность асинхронности и многопоточности неоправданна. Последовательный код проще писать, отлаживать и поддерживать.


import requests

urls = ["https://example.com"] * 5
for url in urls:
    response = requests.get(url)
    print(f"Длина страницы: {len(response.text)}")

Этот вариант не требует импорта дополнительных модулей, управления блокировками или циклами событий. Однако время выполнения линейно растёт с количеством запросов.

Как организовать фоновые задачи в веб-приложении с Celery?

Для распределённой обработки задач вне основного потока веб-сервера (например, отправка писем, генерация отчётов) подходит Celery с брокером Redis или RabbitMQ. Это позволяет вынести тяжёлые вычисления в отдельные воркеры.


# tasks.py
from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0")

@app.task
def process_data(data_id):
    # имитация обработки
    return f"Обработан {data_id}"

Вызов задачи через process_data.delay(data_id) возвращается немедленно, а выполнение происходит асинхронно. Для мониторинга и повторной обработки ошибок используйте Flower или retry-механизмы Celery.

Проблемы Celery:

  • Дополнительная инфраструктура: брокер (Redis), бэкенд результатов.
  • Сложность настройки сериализации для сложных объектов.
  • Возможны потерянные задачи при сбое воркера без подтверждения (acks late).

Расширенные примеры кода для сложных Python программ

Асинхронный парсинг с обработкой ошибок и таймером

Следующий пример демонстрирует загрузку 50 страниц с замером времени и логированием ошибок. Используется получевие и семафор для ограничения параллельных запросов.

Пример

import asyncio
import aiohttp
import time

async def fetch(session, url, semaphore):
    async with semaphore:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
                return await resp.text()
        except Exception as e:
            return f"Ошибка {url}: {e}"

async def main():
    urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]
    sem = asyncio.Semaphore(5)  # не более 5 одновременных запросов
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, sem) for url in urls]
        start = time.time()
        results = await asyncio.gather(*tasks)
        elapsed = time.time() - start
    print(f"Время: {elapsed:.2f} сек")
    errors = [r for r in results if r.startswith("Ошибка")]
    print(f"Ошибок: {len(errors)}")

if __name__ == "__main__":
    asyncio.run(main())
Время: 10.12 сек
Ошибок: 0

Благодаря семафору не создаётся одновременная нагрузка в 50 запросов, что снижает риск блокировки.

Многопоточное скачивание с прогрессом через tqdm

Пример использует concurrent.futures.ThreadPoolExecutor и библиотеку tqdm для отображения прогресса.

Пример

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

urls = ["https://httpbin.org/get"] * 30

def download(url):
    resp = requests.get(url, timeout=5)
    return (url, len(resp.text))

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {executor.submit(download, url): url for url in urls}
    for future in tqdm(as_completed(futures), total=len(urls), desc="Загрузка"):
        url, length = future.result()
        # можно сохранять результат
Загрузка: 100%|██████████| 30/30 [00:03<00:00,  8.99it/s]

Асинхронное обновление прогресс-бара происходит благодаря tqdm, которое перерисовывает строку при каждом завершении задачи.

Многопроцессорная обработка изображений с Pool

Реальный случай: преобразование 1000 изображений (уменьшение размера) с задействованием всех ядер CPU.

Пример

import os
import sys
from PIL import Image
from multiprocessing import Pool

def resize_image(filename):
    img = Image.open(filename)
    img = img.resize((100, 100))
    out_name = f"thumb_{filename}"
    img.save(out_name)
    return out_name

if __name__ == "__main__":
    files = [f"img_{i}.jpg" for i in range(1000)]
    with Pool(processes=os.cpu_count()) as pool:
        results = pool.map(resize_image, files)
    print(f"Создано {len(results)} миниатюр")
Создано 1000 миниатюр

Для передачи файлов между процессами не требуется сериализация, так как каждый процесс открывает свой файл независимо. Это даёт почти линейное ускорение на многоядерных системах.

Кастомный декоратор для измерения времени выполнения

Использование декораторов - один из признаков сложной программы. Пример автоматически замеряет время работы любой функции и выводит метрики.

Пример

import functools
import time

def log_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} выполнена за {elapsed:.4f} сек")
        return result
    return wrapper

@log_time
def heavy_calculation(n):
    return sum(i**2 for i in range(n))

print(heavy_calculation(10_000_000))
heavy_calculation выполнена за 0.4712 сек
333333283333335000000

Декоратор использован без изменения исходной функции. При добавлении stacklevel в wraps сохраняется сигнатура.

Контекстный менеджер для временной смены рабочей директории

Контекстные менеджеры помогают управлять ресурсами. Этот пример безопасно переключает директорию и возвращает обратно, даже при возникновении исключения.

Пример

import os
from contextlib import contextmanager

@contextmanager
def changedir(path):
    old_dir = os.getcwd()
    try:
        os.chdir(path)
        yield
    finally:
        os.chdir(old_dir)

with changedir("/tmp"):
    print("Текущая директория:", os.getcwd())
print("После выхода:", os.getcwd())
Текущая директория: /tmp
После выхода: /home/user

Реализация через contextmanager упрощает создание своих менеджеров, избегая класса с __enter__ и __exit__.

Сложные программы на Python - comments

En
сложные программы python (python)