Параллельная обработка данных с помощью PHP

Раздел: Разработка на PHP -> Многопоточность

Многопоточность позволяет выполнять несколько операций одновременно, увеличивая производительность приложений. В PHP исторически не было встроенной поддержки потоков, но с появлением расширения parallel появилась возможность создавать настоящие потоки. В этой статье рассматриваются различные подходы к параллельному выполнению кода в PHP.

Расширение parallel

Как организовать параллельное выполнение задач в PHP без блокировки основного потока?

Расширение parallel (PECL) предоставляет полноценные потоки для PHP CLI. Оно основано на модели параллельных задач с передачей сериализованных замыканий между потоками. Для использования требуется PHP 7.4+ и установка через pecl install parallel.


<?
use parallel\Runtime;

$runtime = new Runtime();
$future = $runtime->run(function() {
    return 42;
});
echo $future->value(); // 42
?>
  

В примере создается один поток (Runtime), выполняется замыкание, результат возвращается через объект Future. Метод value() блокирует выполнение до получения результата.

Типичные проблемы и решения

  • Ошибка установки: требуется pecl install parallel и версия PHP 7.4+.
  • Сериализация замыканий: замыкания, захватывающие переменные, должны быть сериализуемы. Передаются только простые типы или объекты, реализующие Serializable.
  • Работа только в CLI: расширение не работает в веб-окружении. Используется для фоновых процессов или консольных скриптов.

Как использовать классические потоки в PHP?

Расширение pthreads (ныне заброшено) предоставляло объектно-ориентированный интерфейс для работы с потоками. Оно требовало специальной сборки PHP с ZTS (Zend Thread Safety) и было доступно только в CLI. Пример:


class WorkerThread extends Thread {
    public $result;
    public function run() {
        $this->result = 1;
    }
}
$worker = new WorkerThread();
$worker->start();
$worker->join();
echo $worker->result;
  

Проблемы и решения

  • Расширение не обновляется с PHP 7.4, поддержка прекращена.
  • Требует сборки PHP с ZTS, что редкость.
  • Сложность синхронизации через mutex.

Как создать дочерние процессы для параллельной обработки?

Функция pcntl_fork() создает копию текущего процесса. Родитель и потомок выполняются параллельно, но не имеют общей памяти (все копируется).


$pid = pcntl_fork();
if ($pid == -1) {
    die('Ошибка fork');
} elseif ($pid) {
    // Родитель
    pcntl_wait($status);
} else {
    // Потомок
    echo "Дочерний процесс\n";
    exit(0);
}
  

Проблемы и решения

  • Нет разделяемой памяти: каждый процесс работает со своей копией данных.
  • Сложность синхронизации: необходимы семафоры или разделяемая память (shmop).
  • Ошибки при завершении дочерних процессов: требуется обработка сигналов.

Как выполнять множество HTTP запросов параллельно?

Функция curl_multi_exec() позволяет обрабатывать несколько cURL-запросов в одном цикле без блокировки.


$mh = curl_multi_init();
$ch1 = curl_init('http://example.com');
$ch2 = curl_init('http://example.org');
curl_multi_add_handle($mh, $ch1);
curl_multi_add_handle($mh, $ch2);
$running = null;
do {
    curl_multi_exec($mh, $running);
} while ($running > 0);
curl_multi_remove_handle($mh, $ch1);
curl_multi_remove_handle($mh, $ch2);
curl_multi_close($mh);
  

Проблемы и решения

  • Подходит только для HTTP запросов, не для CPU-задач.
  • Управление множеством дескрипторов требует аккуратности.
  • Ошибки обработки таймаутов: необходимо проверять через curl_multi_info_read().

Как организовать асинхронное выполнение с event loop?

Библиотека ReactPHP строит асинхронное приложение на основе цикла событий. Код не блокируется, но выполняется в одном процессе.


$loop = React\EventLoop\Loop::get();
$loop->addTimer(1.0, function () {
    echo "Таймер сработал\n";
});
$loop->run();
  

Проблемы и решения

  • Требуется перепроектирование приложения под асинхронный стиль.
  • Не подходит для CPU-интенсивных расчетов: все операции выполняются в одном потоке.
  • Сложность отладки асинхронного кода.

Как использовать корутины для параллелизма?

Расширение Swoole предоставляет корутины (cooperative multitasking) для PHP. Код выглядит синхронным, но внутри переключается между задачами.


Co\run(function() {
    go(function() {
        echo "Корутина 1\n";
    });
    go(function() {
        echo "Корутина 2\n";
    });
});
  

Проблемы и решения

  • Требуется установка расширения swoole.
  • Нестандартный синтаксис: вызовы go(), channel и т.д.
  • Некоторые блокирующие функции (sleep, file_get_contents) не совместимы с корутинами.

Выбор подхода зависит от задачи: для обработки большого объема данных без блокировок подходит parallel; для асинхронных сетевых запросов ReactPHP; для HTTP-запросов curl_multi; для процессов pcntl; для высокопроизводительных серверов Swoole. Каждый вариант имеет свои ограничения и требует учета особенностей среды.

Расширенные примеры с расширением parallel

Пул потоков для пакетной обработки

Создание пула из 4 потоков для выполнения 10 задач:

Пример

<?
use parallel\Runtime;

$tasks = range(1, 10);
$runtimes = [];
$futures = [];

// Создание пула воркеров
for ($i = 0; $i < 4; $i++) {
    $runtimes[$i] = new Runtime();
}

// Распределение задач
foreach ($tasks as $task) {
    $runtime = $runtimes[array_rand($runtimes)]; // случайное назначение
    $futures[] = $runtime->run(function($id) {
        sleep(1); // имитация работы
        return $id * 2;
    }, [$task]);
}

// Сбор результатов
$results = [];
foreach ($futures as $future) {
    $results[] = $future->value();
}

print_r($results);
?>
Array
(
    [0] => 2
    [1] => 4
    [2] => 6
    [3] => 8
    [4] => 10
    [5] => 12
    [6] => 14
    [7] => 16
    [8] => 18
    [9] => 20
)

Пояснение: каждый поток переиспользуется для нескольких задач. Функция run() принимает замыкание и аргументы. Результаты сохраняются и собираются после завершения. Время выполнения сокращается примерно в 4 раза по сравнению с последовательным выполнением.

Обработка большого массива данных

Разбиение массива на части и параллельная обработка:

Пример

<?
use parallel\Runtime;

$data = range(1, 100);
$chunks = array_chunk($data, 25); // 4 части

$runtimes = [];
$futures = [];

foreach ($chunks as $key => $chunk) {
    $runtimes[$key] = new Runtime();
    $futures[$key] = $runtimes[$key]->run(function($chunk) {
        $sum = 0;
        foreach ($chunk as $value) {
            $sum += $value; // может быть любая ресурсоемкая операция
        }
        return $sum;
    }, [$chunk]);
}

$total = 0;
foreach ($futures as $future) {
    $total += $future->value();
}

echo "Сумма: $total\n"; // должно быть 5050
?>
Сумма: 5050

Важно: данные передаются как копия, поэтому большие массивы могут потребовать много памяти. Решение - передавать ссылки или разбивать на более мелкие части.

Обработка ошибок в потоках

Исключения внутри run() пробрасываются в основной поток при вызове value():

Пример

<?
use parallel\Runtime;

$runtime = new Runtime();
$future = $runtime->run(function() {
    throw new \Exception("Ошибка в потоке");
});

try {
    $future->value();
} catch (\Exception $e) {
    echo "Поймано исключение: " . $e->getMessage() . "\n";
}
?>
Поймано исключение: Ошибка в потоке

Для обработки ошибок без прерывания всего пула можно внутри run() перехватывать исключения и возвращать код ошибки.

Мультиплексирование с parallel\Events

Класс parallel\Events позволяет ожидать завершение первого из нескольких Future (как select):

Пример

<?
use parallel\Runtime;
use parallel\Events;

$r1 = new Runtime();
$r2 = new Runtime();

$f1 = $r1->run(function() { sleep(3); return "A"; });
$f2 = $r2->run(function() { sleep(1); return "B"; });

$events = new Events();
$events->addFuture("task1", $f1);
$events->addFuture("task2", $f2);

$event = $events->poll(); // вернет объект Event
if ($event->type === Events\Event\Type::Success) {
    echo $event->name . " -> " . $event->value . "\n";
}
?>
task2 -> B

Это полезно для получения быстрого результата из нескольких параллельных операций.

Передача объектов с автозагрузкой

При использовании классов внутри потоков необходимо обеспечить автозагрузку. Файлы классов должны быть доступны в каждом потоке:

Пример

<?
use parallel\Runtime;

// Файл MyClass.php
class MyClass {
    public function greet() { return "Hello"; }
}

$runtime = new Runtime();
$future = $runtime->run(function() {
    require_once 'MyClass.php'; // подгрузка внутри потока
    $obj = new MyClass();
    return $obj->greet();
});
echo $future->value();
?>
Hello

Для автозагрузки можно использовать spl_autoload_register, но лучше передавать только простые данные или использовать Phar-архивы.

Многопоточность в PHP - comments

En
Php threads (php)