Многопроцессорная обработка в Python: варианты и рекомендации

Раздел: Системное программирование -> Многозадачность и процессы

Основные подходы к созданию нескольких процессов в Python

Многозадачность на уровне процессов позволяет обойти ограничения GIL и эффективно использовать многоядерные процессоры. Рассмотрим наиболее популярные способы запуска и управления процессами, их цели, типичные ошибки и примеры кода.

Как запустить пул процессов для параллельного выполнения задач?

Библиотека multiprocessing.Pool предоставляет самый удобный способ распараллеливания независимых задач. Процессы создаются один раз и переиспользуются.

import multiprocessing as mp

def square(x):
    return x * x

if __name__ == '__main__':
    with mp.Pool(processes=4) as pool:
        results = pool.map(square, range(10))
    print(results)

несколько процессов python (создание нескольких процессов в python)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Цель: переложить выполнение множества однотипных вызовов на несколько ядер без ручного управления процессами. Подходит для задач, не требующих общего состояния.

Типичная проблема: аргументы и результат должны быть сериализуемы через pickle. Объекты вроде file handler, connection не сериализуются. Решение – передавать только простые типы или использовать наследуемые очереди.

Как создать отдельный процесс с очередью для обмена данными?

Класс multiprocessing.Process в комбинации с Queue даёт полный контроль над запуском и остановкой процессов.

import multiprocessing as mp

def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f'Обработка: {item}')

if __name__ == '__main__':
    queue = mp.Queue()
    proc = mp.Process(target=worker, args=(queue,))
    proc.start()
    for i in range(5):
        queue.put(i)
    queue.put(None)  # сигнал завершения
    proc.join()
Обработка: 0
Обработка: 1
Обработка: 2
Обработка: 3
Обработка: 4

Цель: реализация producer-consumer, когда один процесс генерирует данные, а другой обрабатывает. Удобно для передачи больших объёмов или стриминга.

Типичная ошибка: забыть обработать пустой элемент (None) – процесс зависнет в ожидании. Также нельзя передавать None как полезные данные, иначе путаница. Лучше использовать отдельный сторожевой объект.

Как использовать os.fork() для создания процессов (только Unix)?

Функция os.fork() клонирует текущий процесс. Используется редко из-за ограничений платформы и сложности безопасной работы.

import os

if __name__ == '__main__':
    pid = os.fork()
    if pid == 0:
        # дочерний процесс
        print(f'Дочерний процесс PID {os.getpid()}')
        os._exit(0)
    else:
        print(f'Родительский процесс, дочерний PID {pid}')
        os.wait()  # ожидание завершения дочернего
Родительский процесс, дочерний PID 12345
Дочерний процесс PID 12345

Цель: максимально лёгкий способ получить копию процесса, часто используется в демонах и серверах. Требует ручного управления зомби.

Проблема: после fork дочерний процесс наследует открытые файлы, блокировки, состояние потоков – легко получить deadlock. Обязательно вызывать os._exit() вместо exit(), чтобы не вызвать запуск обработчиков финализации.

Как запустить внешнюю программу в отдельном процессе?

Модуль subprocess позволяет запускать любые команды ОС и общаться с ними через потоки ввода/вывода.

import subprocess

proc = subprocess.Popen(['python3', '-c', 'print("Привет из дочернего")'], stdout=subprocess.PIPE)
output, _ = proc.communicate()
print(f'Вывод: {output.decode()}')
Вывод: Привет из дочернего

Цель: интеграция с внешними утилитами, запуск скриптов или исполняемых файлов. Не требует, чтобы дочерний процесс был Python-программой.

Ошибка: если не прочитать stdout/stderr, буфер может заполниться и процесс зависнет. Используйте communicate() или subprocess.run(), который делает это автоматически.

Как упростить пул процессов через concurrent.futures?

ProcessPoolExecutor из модуля concurrent.futures предоставляет высокоуровневый интерфейс с поддержкой Future.

from concurrent.futures import ProcessPoolExecutor

def cube(x):
    return x ** 3

with ProcessPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(cube, i) for i in range(5)]
    for future in futures:
        print(future.result())
0
1
8
27
64

Цель: альтернатива multiprocessing.Pool с возможностью асинхронного запуска и отмены задач. Подходит, когда нужно собрать результаты по мере готовности.

Нюанс: при использовании внутри интерактивного интерпретатора (Jupyter) возможны проблемы с замораживанием – лучше использовать mp.Pool с явным if __name__ == '__main__'.

Как разделить память между процессами?

Начиная с Python 3.8, модуль multiprocessing.shared_memory позволяет выделять сегменты памяти, доступные всем процессам.

from multiprocessing import shared_memory, Process

def writer(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    buf = shm.buf
    buf[0] = 42
    shm.close()

if __name__ == '__main__':
    shm = shared_memory.SharedMemory(create=True, size=10)
    p = Process(target=writer, args=(shm.name,))
    p.start()
    p.join()
    print(f'Прочитано: {shm.buf[0]}')
    shm.close()
    shm.unlink()
Прочитано: 42

Цель: высокая скорость обмена данными без сериализации. Идеально для массивов чисел, аудио/видео потоков.

Опасность: необходимо синхронизировать доступ через Lock или Semaphore, иначе race condition. Не забывайте вызывать unlink() для освобождения ресурсов.

Расширенные примеры работы с процессами

Применение Lock для синхронизации конкурентного доступа к общему ресурсу

Использование блокировок предотвращает одновременное изменение данных. Пример с многопроцессным инкрементом счётчика.

Пример
import multiprocessing as mp

def increment(counter, lock):
    for _ in range(1000):
        with lock:
            counter.value += 1

if __name__ == '__main__':
    counter = mp.Value('i', 0)
    lock = mp.Lock()
    processes = [mp.Process(target=increment, args=(counter, lock)) for _ in range(4)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f'Итоговое значение: {counter.value}')
Итоговое значение: 4000

Пояснение: mp.Value создаёт разделяемую переменную. Lock гарантирует атомарность операции прибаления. Без блокировки значение было бы меньше 4000 из-за гонок.

Пул процессов с разными функциями и ошибкой сериализации

Лямбда-функции не сериализуются pickle, что приводит к AttributeError. Пример исправления через глобальную функцию.

Пример
def bad_lambda():
    return 1

if __name__ == '__main__':
    import multiprocessing as mp
    with mp.Pool(2) as pool:
        try:
            result = pool.apply_async(lambda: 42).get()
        except AttributeError as e:
            print(f'Ошибка: {e}')
    # Исправление
    with mp.Pool(2) as pool:
        result = pool.apply_async(bad_lambda).get()
        print(f'Результат: {result}')
Ошибка: Can't pickle  at 0x...>
Результат: 1

Пояснение: лямбды не могут быть переданы в другой процесс, так как они не имеют глобального имени. Всегда определяйте функции на верхнем уровне модуля.

Запуск демонического процесса, завершающегося вместе с родителем

Установка флага daemon=True заставляет процесс завершаться при завершении родительского. Полезно для фоновых служб.

Пример
import multiprocessing as mp
import time

def background_task():
    while True:
        print('Фоновый процесс жив')
        time.sleep(1)

if __name__ == '__main__':
    p = mp.Process(target=background_task, daemon=True)
    p.start()
    time.sleep(2)
    print('Родитель завершается. Демон будет убит.')
Фоновый процесс жив
Фоновый процесс жив
Родитель завершается. Демон будет убит.

Предостережение: демонические процессы не могут создавать свои подпроцессы и не должны использовать очереди или трубы, так как при аварийном завершении ресурсы не освобождаются корректно.

Обмен большими объёмами данных через Queue с Batch-обработкой

Использование очереди для передачи чанков данных и сигналов завершения.

Пример
import multiprocessing as mp
import numpy as np

def processor(in_q, out_q):
    while True:
        chunk = in_q.get()
        if chunk is None:
            break
        result = np.mean(chunk)
        out_q.put(result)

if __name__ == '__main__':
    data = [np.random.rand(1000) for _ in range(5)]
    in_q = mp.Queue()
    out_q = mp.Queue()
    p = mp.Process(target=processor, args=(in_q, out_q))
    p.start()
    for chunk in data:
        in_q.put(chunk)
    in_q.put(None)
    p.join()
    results = []
    while not out_q.empty():
        results.append(out_q.get())
    print(f'Средние: {results}')
Средние: [0.493..., 0.512..., ...]

Пояснение: numpy массивы не сериализуются напрямую, но они поддерживают буферный протокол. Queue автоматически упаковывает их в bytes. Для очень больших массивов лучше использовать shared_memory.

Использование Semaphore для ограничения одновременного доступа к ресурсу

Семафор разрешает фиксированное число процессов одновременно выполнять определённый код.

Пример
import multiprocessing as mp
import time

def limited_access(sem, proc_id):
    with sem:
        print(f'Процесс {proc_id} начал работу')
        time.sleep(1)
        print(f'Процесс {proc_id} завершил')

if __name__ == '__main__':
    sem = mp.Semaphore(2)
    procs = [mp.Process(target=limited_access, args=(sem, i)) for i in range(4)]
    for p in procs:
        p.start()
    for p in procs:
        p.join()
Процесс 0 начал работу
Процесс 1 начал работу
Процесс 0 завершил
Процесс 1 завершил
Процесс 2 начал работу
Процесс 3 начал работу
Процесс 2 завершил
Процесс 3 завершил

Пояснение: семафор счётчиком 2 позволяет только двум процессам одновременно войти в критическую секцию. Остальные ждут освобождения.

Комбинация asyncio и multiprocessing для асинхронного ввода-вывода и параллельных вычислений

С помощью loop.run_in_executor с ProcessPoolExecutor можно выполнять CPU-интенсивные задачи, не блокируя цикл событий.

Пример
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time

def heavy_calc(n):
    return sum(i**2 for i in range(n))

async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        tasks = [loop.run_in_executor(pool, heavy_calc, 10_000_000) for _ in range(3)]
        results = await asyncio.gather(*tasks)
    print(results)

if __name__ == '__main__':
    asyncio.run(main())
[333283335000000, 333283335000000, 333283335000000]

Пояснение: asyncio управляет I/O, а процессы обрабатывают вычисления. Не забывайте, что количество процессов не должно превышать число ядер для оптимальной производительности.

Создание нескольких процессов в Python - comments

En
несколько процессов python (python)