Многопроцессорное программирование: создание и управление процессами

Раздел: Параллельное программирование -> Многопроцессность

Основные способы создания процессов

Наиболее эффективным решением для создания процессов в Python является использование класса multiprocessing.Process. Этот подход обеспечивает полный контроль над жизненным циклом процесса и позволяет запускать произвольные функции.

Пример создания процесса, выполняющего функцию worker:


from multiprocessing import Process
import os

def worker(name):
    print(f'Процесс {name} запущен, PID: {os.getpid()}')

if __name__ == '__main__':
    p = Process(target=worker, args=('Worker-1',))
    p.start()
    p.join()
    print('Главный процесс завершен')

создание процессов python (создание процессов в python)

Результат выполнения (порядок может отличаться):

Процесс Worker-1 запущен, PID: 12345
Главный процесс завершен

Пояснение: создается объект Process с указанием функции target и аргументов args. Метод start() запускает процесс, join() ожидает его завершения. Защита if __name__ == '__main__' необходима для избежания рекурсивного запуска процессов на Windows.

Типичные ошибки:

  • Отсутствие if __name__ == '__main__' приводит к ошибке «RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phrase».
  • Забытый join() может привести к тому, что главный процесс завершится раньше дочернего, особенно при управлении ресурсами.
  • Глобальные переменные не разделяются между процессами; для обмена данными используются Queue, Pipe или Manager.

Как запустить множество однотипных задач в пуле процессов?

Для параллельного выполнения большого числа независимых задач удобно использовать multiprocessing.Pool. Он автоматически распределяет задачи по фиксированному числу процессов.


from multiprocessing import Pool
import os

def square(n):
    return n * n

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        result = pool.map(square, range(10))
    print(result)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Объяснение: Pool(4) создает 4 процесса. pool.map применяет функцию ко всем элементам и возвращает результаты в порядке вызова. with гарантирует закрытие пула.

Ошибка: при использовании pool.map с большими данными может возникнуть переполнение памяти, если результаты накапливаются в списке. Рекомендуется использовать imap или imap_unordered для потоковой обработки.

Как использовать современный API для пула процессов?

Модуль concurrent.futures предоставляет высокоуровневый интерфейс ProcessPoolExecutor, аналогичный пула потоков.


from concurrent.futures import ProcessPoolExecutor
import os

def cube(x):
    return x**3

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(cube, range(10)))
    print(results)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

Здесь map возвращает итератор, преобразуемый в список. ProcessPoolExecutor автоматически управляет рабочими процессами.

Проблема: сериализация аргументов и возвращаемых значений происходит через pickle. Не все объекты могут быть сериализованы. Решение: использовать только базовые типы или явно передавать необходимые данные через Manager.

Как создать процесс с помощью системного вызова fork?

На Unix-системах доступен низкоуровневый вызов os.fork(). Он создает точную копию текущего процесса (родительский и дочерний) с разными PID.


import os

if __name__ == '__main__':
    pid = os.fork()
    if pid == 0:
        print(f'Дочерний процесс: PID {os.getpid()}, родительский PID {os.getppid()}')
    else:
        print(f'Родительский процесс: PID {os.getpid()}, дочерний PID {pid}')
(пример вывода на Unix)
Родительский процесс: PID 1000, дочерний PID 1001
Дочерний процесс: PID 1001, родительский PID 1000

Важно: fork() не работает на Windows. После fork оба процесса продолжают выполнение с одной точки. Для различения используется возвращаемое значение (0 в дочернем, PID дочернего в родительском).

Ошибки: использование fork в многопоточных приложениях может привести к взаимоблокировкам, так как копируются все потоки. Также сложно синхронизировать доступ к ресурсам. Рекомендуется использовать multiprocessing для более безопасной работы.

Как запустить внешнюю программу как отдельный процесс?

Для выполнения внешних программ (например, утилит командной строки) применяется модуль subprocess. Он создает новый процесс с указанной командой.


import subprocess
result = subprocess.run(['echo', 'Hello from subprocess'], capture_output=True, text=True)
print(result.stdout)
Hello from subprocess

Функция run ожидает завершения процесса. capture_output=True позволяет получить stdout и stderr. Popen предоставляет более гибкое управление.

Проблема: при передаче сложных аргументов необходимо экранировать или использовать список, а не строку. Ошибка: Windows может требовать shell=True для некоторых команд, что несет риски безопасности.

Как создать кастомный процесс с состоянием?

Класс Process можно наследовать и переопределить метод run() для реализации сложной логики с внутренним состоянием.


from multiprocessing import Process

class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f'Процесс {self.name} выполняется')

if __name__ == '__main__':
    p = MyProcess('Test')
    p.start()
    p.join()

Этот подход удобен, когда процесс должен хранить собственные данные или иметь сложный жизненный цикл.

Типичная ошибка: переопределение init без вызова super().__init__() приводит к тому, что процесс не может быть запущен. Также атрибуты экземпляра не передаются в дочерний процесс автоматически; они копируются при старте.

Как выбрать метод запуска процесса (spawn, fork, forkserver)?

Модуль multiprocessing поддерживает три метода: fork (Unix, копия), spawn (Windows по умолчанию, запуск нового интерпретатора), forkserver (Unix, серверный процесс). Выбор влияет на производительность и безопасность.


import multiprocessing as mp

def worker():
    print('Работает')

if __name__ == '__main__':
    mp.set_start_method('spawn')  # или 'fork', 'forkserver'
    p = mp.Process(target=worker)
    p.start()
    p.join()

Метод spawn безопаснее, но медленнее, так как загружает модуль заново. fork быстрее, но может вызывать проблемы с блокировками. forkserver является компромиссом.

Ошибка: set_start_method может быть вызван только один раз в программе. Попытка повторного вызова вызывает RuntimeError. Рекомендуется устанавливать метод в самом начале программы.

Пример 1: Передача данных через Queue

Пример

from multiprocessing import Process, Queue

def producer(queue):
    for i in range(5):
        queue.put(f'Сообщение {i}')
    queue.put(None)

def consumer(queue):
    while True:
        msg = queue.get()
        if msg is None:
            break
        print(f'Получено: {msg}')

if __name__ == '__main__':
    queue = Queue()
    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('Готово')
Получено: Сообщение 0
Получено: Сообщение 1
Получено: Сообщение 2
Получено: Сообщение 3
Получено: Сообщение 4
Готово

Queue обеспечивает безопасную передачу объектов между процессами. None используется как сигнал завершения.

Пример 2: Pool с imap_unordered

Пример

from multiprocessing import Pool
import time

def slow_square(n):
    time.sleep(0.5)
    return n * n

if __name__ == '__main__':
    with Pool(2) as pool:
        results = pool.imap_unordered(slow_square, range(5))
        for r in results:
            print(r)
0
1
4
9
16

imap_unordered возвращает результаты по мере готовности, порядок не гарантирован. Полезно для длительных задач.

Пример 3: ProcessPoolExecutor с submit и as_completed

Пример

from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def power(x, y):
    time.sleep(0.2)
    return x ** y

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(power, i, i) for i in range(5)]
        for future in as_completed(futures):
            print(future.result())
1
4
27
256
3125

as_completed позволяет обрабатывать результаты по мере завершения, что удобно при разнородных задачах.

Пример 4: os.fork с разделением файла

Пример

import os

def child_work():
    with open('test.txt', 'a') as f:
        f.write('Дочерний\n')

if __name__ == '__main__':
    pid = os.fork()
    if pid == 0:
        child_work()
        os._exit(0)
    else:
        import time
        time.sleep(0.1)
        with open('test.txt', 'a') as f:
            f.write('Родительский\n')
        os.wait()

Результат: файл test.txt будет содержать две строки (порядок может различаться). После fork оба процесса имеют собственные файловые дескрипторы, но буферизация может привести к неожиданностям. Рекомендуется использовать multiprocessing.

Пример 5: subprocess.Popen с взаимодействием через stdin/stdout

Пример

import subprocess

proc = subprocess.Popen(['python3', '-c', 'import sys; print(sys.stdin.read())'], 
                        stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
stdout, _ = proc.communicate('Привет от родителя')
print(f'Получено: {stdout.strip()}')
Получено: Привет от родителя

communicate передает данные в stdin и читает stdout, ожидая завершения.

Пример 6: Наследование Process с атрибутами

Пример

from multiprocessing import Process
import os

class WorkerProcess(Process):
    def __init__(self, name, data):
        super().__init__()
        self.name = name
        self.data = data

    def run(self):
        print(f'{self.name} обрабатывает {self.data} в PID {os.getpid()}')

if __name__ == '__main__':
    processes = [WorkerProcess(f'Worker-{i}', i) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
Worker-0 обрабатывает 0 в PID 12346
Worker-1 обрабатывает 1 в PID 12347
Worker-2 обрабатывает 2 в PID 12348

Атрибуты экземпляра копируются в дочерний процесс при запуске.

Создание процессов в Python - comments

En
создание процессов python (python)