Многопроцессорная обработка в Python: варианты и рекомендации
Основные подходы к созданию нескольких процессов в Python
Многозадачность на уровне процессов позволяет обойти ограничения GIL и эффективно использовать многоядерные процессоры. Рассмотрим наиболее популярные способы запуска и управления процессами, их цели, типичные ошибки и примеры кода.
Как запустить пул процессов для параллельного выполнения задач?
Библиотека multiprocessing.Pool предоставляет самый удобный способ распараллеливания независимых задач. Процессы создаются один раз и переиспользуются.
import multiprocessing as mp
def square(x):
return x * x
if __name__ == '__main__':
with mp.Pool(processes=4) as pool:
results = pool.map(square, range(10))
print(results)несколько процессов python (создание нескольких процессов в python)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Цель: переложить выполнение множества однотипных вызовов на несколько ядер без ручного управления процессами. Подходит для задач, не требующих общего состояния.
Типичная проблема: аргументы и результат должны быть сериализуемы через pickle. Объекты вроде file handler, connection не сериализуются. Решение – передавать только простые типы или использовать наследуемые очереди.
Как создать отдельный процесс с очередью для обмена данными?
Класс multiprocessing.Process в комбинации с Queue даёт полный контроль над запуском и остановкой процессов.
import multiprocessing as mp
def worker(q):
while True:
item = q.get()
if item is None:
break
print(f'Обработка: {item}')
if __name__ == '__main__':
queue = mp.Queue()
proc = mp.Process(target=worker, args=(queue,))
proc.start()
for i in range(5):
queue.put(i)
queue.put(None) # сигнал завершения
proc.join()
Обработка: 0 Обработка: 1 Обработка: 2 Обработка: 3 Обработка: 4
Цель: реализация producer-consumer, когда один процесс генерирует данные, а другой обрабатывает. Удобно для передачи больших объёмов или стриминга.
Типичная ошибка: забыть обработать пустой элемент (None) – процесс зависнет в ожидании. Также нельзя передавать None как полезные данные, иначе путаница. Лучше использовать отдельный сторожевой объект.
Как использовать os.fork() для создания процессов (только Unix)?
Функция os.fork() клонирует текущий процесс. Используется редко из-за ограничений платформы и сложности безопасной работы.
import os
if __name__ == '__main__':
pid = os.fork()
if pid == 0:
# дочерний процесс
print(f'Дочерний процесс PID {os.getpid()}')
os._exit(0)
else:
print(f'Родительский процесс, дочерний PID {pid}')
os.wait() # ожидание завершения дочернего
Родительский процесс, дочерний PID 12345 Дочерний процесс PID 12345
Цель: максимально лёгкий способ получить копию процесса, часто используется в демонах и серверах. Требует ручного управления зомби.
Проблема: после fork дочерний процесс наследует открытые файлы, блокировки, состояние потоков – легко получить deadlock. Обязательно вызывать os._exit() вместо exit(), чтобы не вызвать запуск обработчиков финализации.
Как запустить внешнюю программу в отдельном процессе?
Модуль subprocess позволяет запускать любые команды ОС и общаться с ними через потоки ввода/вывода.
import subprocess
proc = subprocess.Popen(['python3', '-c', 'print("Привет из дочернего")'], stdout=subprocess.PIPE)
output, _ = proc.communicate()
print(f'Вывод: {output.decode()}')
Вывод: Привет из дочернего
Цель: интеграция с внешними утилитами, запуск скриптов или исполняемых файлов. Не требует, чтобы дочерний процесс был Python-программой.
Ошибка: если не прочитать stdout/stderr, буфер может заполниться и процесс зависнет. Используйте communicate() или subprocess.run(), который делает это автоматически.
Как упростить пул процессов через concurrent.futures?
ProcessPoolExecutor из модуля concurrent.futures предоставляет высокоуровневый интерфейс с поддержкой Future.
from concurrent.futures import ProcessPoolExecutor
def cube(x):
return x ** 3
with ProcessPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(cube, i) for i in range(5)]
for future in futures:
print(future.result())
0 1 8 27 64
Цель: альтернатива multiprocessing.Pool с возможностью асинхронного запуска и отмены задач. Подходит, когда нужно собрать результаты по мере готовности.
Нюанс: при использовании внутри интерактивного интерпретатора (Jupyter) возможны проблемы с замораживанием – лучше использовать mp.Pool с явным if __name__ == '__main__'.
Как разделить память между процессами?
Начиная с Python 3.8, модуль multiprocessing.shared_memory позволяет выделять сегменты памяти, доступные всем процессам.
from multiprocessing import shared_memory, Process
def writer(shm_name):
shm = shared_memory.SharedMemory(name=shm_name)
buf = shm.buf
buf[0] = 42
shm.close()
if __name__ == '__main__':
shm = shared_memory.SharedMemory(create=True, size=10)
p = Process(target=writer, args=(shm.name,))
p.start()
p.join()
print(f'Прочитано: {shm.buf[0]}')
shm.close()
shm.unlink()
Прочитано: 42
Цель: высокая скорость обмена данными без сериализации. Идеально для массивов чисел, аудио/видео потоков.
Опасность: необходимо синхронизировать доступ через Lock или Semaphore, иначе race condition. Не забывайте вызывать unlink() для освобождения ресурсов.
Расширенные примеры работы с процессами
Применение Lock для синхронизации конкурентного доступа к общему ресурсу
Использование блокировок предотвращает одновременное изменение данных. Пример с многопроцессным инкрементом счётчика.
import multiprocessing as mp
def increment(counter, lock):
for _ in range(1000):
with lock:
counter.value += 1
if __name__ == '__main__':
counter = mp.Value('i', 0)
lock = mp.Lock()
processes = [mp.Process(target=increment, args=(counter, lock)) for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f'Итоговое значение: {counter.value}')
Итоговое значение: 4000
Пояснение: mp.Value создаёт разделяемую переменную. Lock гарантирует атомарность операции прибаления. Без блокировки значение было бы меньше 4000 из-за гонок.
Пул процессов с разными функциями и ошибкой сериализации
Лямбда-функции не сериализуются pickle, что приводит к AttributeError. Пример исправления через глобальную функцию.
def bad_lambda():
return 1
if __name__ == '__main__':
import multiprocessing as mp
with mp.Pool(2) as pool:
try:
result = pool.apply_async(lambda: 42).get()
except AttributeError as e:
print(f'Ошибка: {e}')
# Исправление
with mp.Pool(2) as pool:
result = pool.apply_async(bad_lambda).get()
print(f'Результат: {result}')
Ошибка: Can't pickleat 0x...> Результат: 1
Пояснение: лямбды не могут быть переданы в другой процесс, так как они не имеют глобального имени. Всегда определяйте функции на верхнем уровне модуля.
Запуск демонического процесса, завершающегося вместе с родителем
Установка флага daemon=True заставляет процесс завершаться при завершении родительского. Полезно для фоновых служб.
import multiprocessing as mp
import time
def background_task():
while True:
print('Фоновый процесс жив')
time.sleep(1)
if __name__ == '__main__':
p = mp.Process(target=background_task, daemon=True)
p.start()
time.sleep(2)
print('Родитель завершается. Демон будет убит.')
Фоновый процесс жив Фоновый процесс жив Родитель завершается. Демон будет убит.
Предостережение: демонические процессы не могут создавать свои подпроцессы и не должны использовать очереди или трубы, так как при аварийном завершении ресурсы не освобождаются корректно.
Обмен большими объёмами данных через Queue с Batch-обработкой
Использование очереди для передачи чанков данных и сигналов завершения.
import multiprocessing as mp
import numpy as np
def processor(in_q, out_q):
while True:
chunk = in_q.get()
if chunk is None:
break
result = np.mean(chunk)
out_q.put(result)
if __name__ == '__main__':
data = [np.random.rand(1000) for _ in range(5)]
in_q = mp.Queue()
out_q = mp.Queue()
p = mp.Process(target=processor, args=(in_q, out_q))
p.start()
for chunk in data:
in_q.put(chunk)
in_q.put(None)
p.join()
results = []
while not out_q.empty():
results.append(out_q.get())
print(f'Средние: {results}')
Средние: [0.493..., 0.512..., ...]
Пояснение: numpy массивы не сериализуются напрямую, но они поддерживают буферный протокол. Queue автоматически упаковывает их в bytes. Для очень больших массивов лучше использовать shared_memory.
Использование Semaphore для ограничения одновременного доступа к ресурсу
Семафор разрешает фиксированное число процессов одновременно выполнять определённый код.
import multiprocessing as mp
import time
def limited_access(sem, proc_id):
with sem:
print(f'Процесс {proc_id} начал работу')
time.sleep(1)
print(f'Процесс {proc_id} завершил')
if __name__ == '__main__':
sem = mp.Semaphore(2)
procs = [mp.Process(target=limited_access, args=(sem, i)) for i in range(4)]
for p in procs:
p.start()
for p in procs:
p.join()
Процесс 0 начал работу Процесс 1 начал работу Процесс 0 завершил Процесс 1 завершил Процесс 2 начал работу Процесс 3 начал работу Процесс 2 завершил Процесс 3 завершил
Пояснение: семафор счётчиком 2 позволяет только двум процессам одновременно войти в критическую секцию. Остальные ждут освобождения.
Комбинация asyncio и multiprocessing для асинхронного ввода-вывода и параллельных вычислений
С помощью loop.run_in_executor с ProcessPoolExecutor можно выполнять CPU-интенсивные задачи, не блокируя цикл событий.
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
def heavy_calc(n):
return sum(i**2 for i in range(n))
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
tasks = [loop.run_in_executor(pool, heavy_calc, 10_000_000) for _ in range(3)]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == '__main__':
asyncio.run(main())
[333283335000000, 333283335000000, 333283335000000]
Пояснение: asyncio управляет I/O, а процессы обрабатывают вычисления. Не забывайте, что количество процессов не должно превышать число ядер для оптимальной производительности.