Многопроцессорное программирование: создание и управление процессами
Основные способы создания процессов
Наиболее эффективным решением для создания процессов в Python является использование класса multiprocessing.Process. Этот подход обеспечивает полный контроль над жизненным циклом процесса и позволяет запускать произвольные функции.
Пример создания процесса, выполняющего функцию worker:
from multiprocessing import Process
import os
def worker(name):
print(f'Процесс {name} запущен, PID: {os.getpid()}')
if __name__ == '__main__':
p = Process(target=worker, args=('Worker-1',))
p.start()
p.join()
print('Главный процесс завершен')
создание процессов python (создание процессов в python)
Результат выполнения (порядок может отличаться):
Процесс Worker-1 запущен, PID: 12345 Главный процесс завершен
Пояснение: создается объект Process с указанием функции target и аргументов args. Метод start() запускает процесс, join() ожидает его завершения. Защита if __name__ == '__main__' необходима для избежания рекурсивного запуска процессов на Windows.
Типичные ошибки:
- Отсутствие
if __name__ == '__main__'приводит к ошибке «RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phrase». - Забытый
join()может привести к тому, что главный процесс завершится раньше дочернего, особенно при управлении ресурсами. - Глобальные переменные не разделяются между процессами; для обмена данными используются
Queue,PipeилиManager.
Как запустить множество однотипных задач в пуле процессов?
Для параллельного выполнения большого числа независимых задач удобно использовать multiprocessing.Pool. Он автоматически распределяет задачи по фиксированному числу процессов.
from multiprocessing import Pool
import os
def square(n):
return n * n
if __name__ == '__main__':
with Pool(processes=4) as pool:
result = pool.map(square, range(10))
print(result)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Объяснение: Pool(4) создает 4 процесса. pool.map применяет функцию ко всем элементам и возвращает результаты в порядке вызова. with гарантирует закрытие пула.
Ошибка: при использовании pool.map с большими данными может возникнуть переполнение памяти, если результаты накапливаются в списке. Рекомендуется использовать imap или imap_unordered для потоковой обработки.
Как использовать современный API для пула процессов?
Модуль concurrent.futures предоставляет высокоуровневый интерфейс ProcessPoolExecutor, аналогичный пула потоков.
from concurrent.futures import ProcessPoolExecutor
import os
def cube(x):
return x**3
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(cube, range(10)))
print(results)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
Здесь map возвращает итератор, преобразуемый в список. ProcessPoolExecutor автоматически управляет рабочими процессами.
Проблема: сериализация аргументов и возвращаемых значений происходит через pickle. Не все объекты могут быть сериализованы. Решение: использовать только базовые типы или явно передавать необходимые данные через Manager.
Как создать процесс с помощью системного вызова fork?
На Unix-системах доступен низкоуровневый вызов os.fork(). Он создает точную копию текущего процесса (родительский и дочерний) с разными PID.
import os
if __name__ == '__main__':
pid = os.fork()
if pid == 0:
print(f'Дочерний процесс: PID {os.getpid()}, родительский PID {os.getppid()}')
else:
print(f'Родительский процесс: PID {os.getpid()}, дочерний PID {pid}')
(пример вывода на Unix) Родительский процесс: PID 1000, дочерний PID 1001 Дочерний процесс: PID 1001, родительский PID 1000
Важно: fork() не работает на Windows. После fork оба процесса продолжают выполнение с одной точки. Для различения используется возвращаемое значение (0 в дочернем, PID дочернего в родительском).
Ошибки: использование fork в многопоточных приложениях может привести к взаимоблокировкам, так как копируются все потоки. Также сложно синхронизировать доступ к ресурсам. Рекомендуется использовать multiprocessing для более безопасной работы.
Как запустить внешнюю программу как отдельный процесс?
Для выполнения внешних программ (например, утилит командной строки) применяется модуль subprocess. Он создает новый процесс с указанной командой.
import subprocess
result = subprocess.run(['echo', 'Hello from subprocess'], capture_output=True, text=True)
print(result.stdout)
Hello from subprocess
Функция run ожидает завершения процесса. capture_output=True позволяет получить stdout и stderr. Popen предоставляет более гибкое управление.
Проблема: при передаче сложных аргументов необходимо экранировать или использовать список, а не строку. Ошибка: Windows может требовать shell=True для некоторых команд, что несет риски безопасности.
Как создать кастомный процесс с состоянием?
Класс Process можно наследовать и переопределить метод run() для реализации сложной логики с внутренним состоянием.
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f'Процесс {self.name} выполняется')
if __name__ == '__main__':
p = MyProcess('Test')
p.start()
p.join()
Этот подход удобен, когда процесс должен хранить собственные данные или иметь сложный жизненный цикл.
Типичная ошибка: переопределение init без вызова super().__init__() приводит к тому, что процесс не может быть запущен. Также атрибуты экземпляра не передаются в дочерний процесс автоматически; они копируются при старте.
Как выбрать метод запуска процесса (spawn, fork, forkserver)?
Модуль multiprocessing поддерживает три метода: fork (Unix, копия), spawn (Windows по умолчанию, запуск нового интерпретатора), forkserver (Unix, серверный процесс). Выбор влияет на производительность и безопасность.
import multiprocessing as mp
def worker():
print('Работает')
if __name__ == '__main__':
mp.set_start_method('spawn') # или 'fork', 'forkserver'
p = mp.Process(target=worker)
p.start()
p.join()
Метод spawn безопаснее, но медленнее, так как загружает модуль заново. fork быстрее, но может вызывать проблемы с блокировками. forkserver является компромиссом.
Ошибка: set_start_method может быть вызван только один раз в программе. Попытка повторного вызова вызывает RuntimeError. Рекомендуется устанавливать метод в самом начале программы.
Пример 1: Передача данных через Queue
from multiprocessing import Process, Queue
def producer(queue):
for i in range(5):
queue.put(f'Сообщение {i}')
queue.put(None)
def consumer(queue):
while True:
msg = queue.get()
if msg is None:
break
print(f'Получено: {msg}')
if __name__ == '__main__':
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
print('Готово')
Получено: Сообщение 0 Получено: Сообщение 1 Получено: Сообщение 2 Получено: Сообщение 3 Получено: Сообщение 4 Готово
Queue обеспечивает безопасную передачу объектов между процессами. None используется как сигнал завершения.
Пример 2: Pool с imap_unordered
from multiprocessing import Pool
import time
def slow_square(n):
time.sleep(0.5)
return n * n
if __name__ == '__main__':
with Pool(2) as pool:
results = pool.imap_unordered(slow_square, range(5))
for r in results:
print(r)
0 1 4 9 16
imap_unordered возвращает результаты по мере готовности, порядок не гарантирован. Полезно для длительных задач.
Пример 3: ProcessPoolExecutor с submit и as_completed
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def power(x, y):
time.sleep(0.2)
return x ** y
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(power, i, i) for i in range(5)]
for future in as_completed(futures):
print(future.result())
1 4 27 256 3125
as_completed позволяет обрабатывать результаты по мере завершения, что удобно при разнородных задачах.
Пример 4: os.fork с разделением файла
import os
def child_work():
with open('test.txt', 'a') as f:
f.write('Дочерний\n')
if __name__ == '__main__':
pid = os.fork()
if pid == 0:
child_work()
os._exit(0)
else:
import time
time.sleep(0.1)
with open('test.txt', 'a') as f:
f.write('Родительский\n')
os.wait()
Результат: файл test.txt будет содержать две строки (порядок может различаться). После fork оба процесса имеют собственные файловые дескрипторы, но буферизация может привести к неожиданностям. Рекомендуется использовать multiprocessing.
Пример 5: subprocess.Popen с взаимодействием через stdin/stdout
import subprocess
proc = subprocess.Popen(['python3', '-c', 'import sys; print(sys.stdin.read())'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
stdout, _ = proc.communicate('Привет от родителя')
print(f'Получено: {stdout.strip()}')
Получено: Привет от родителя
communicate передает данные в stdin и читает stdout, ожидая завершения.
Пример 6: Наследование Process с атрибутами
from multiprocessing import Process
import os
class WorkerProcess(Process):
def __init__(self, name, data):
super().__init__()
self.name = name
self.data = data
def run(self):
print(f'{self.name} обрабатывает {self.data} в PID {os.getpid()}')
if __name__ == '__main__':
processes = [WorkerProcess(f'Worker-{i}', i) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
Worker-0 обрабатывает 0 в PID 12346 Worker-1 обрабатывает 1 в PID 12347 Worker-2 обрабатывает 2 в PID 12348
Атрибуты экземпляра копируются в дочерний процесс при запуске.