Параллельная обработка данных с помощью PHP
Многопоточность позволяет выполнять несколько операций одновременно, увеличивая производительность приложений. В PHP исторически не было встроенной поддержки потоков, но с появлением расширения parallel появилась возможность создавать настоящие потоки. В этой статье рассматриваются различные подходы к параллельному выполнению кода в PHP.
Расширение parallel
Как организовать параллельное выполнение задач в PHP без блокировки основного потока?
Расширение parallel (PECL) предоставляет полноценные потоки для PHP CLI. Оно основано на модели параллельных задач с передачей сериализованных замыканий между потоками. Для использования требуется PHP 7.4+ и установка через pecl install parallel.
<?
use parallel\Runtime;
$runtime = new Runtime();
$future = $runtime->run(function() {
return 42;
});
echo $future->value(); // 42
?>
В примере создается один поток (Runtime), выполняется замыкание, результат возвращается через объект Future. Метод value() блокирует выполнение до получения результата.
Типичные проблемы и решения
- Ошибка установки: требуется pecl install parallel и версия PHP 7.4+.
- Сериализация замыканий: замыкания, захватывающие переменные, должны быть сериализуемы. Передаются только простые типы или объекты, реализующие Serializable.
- Работа только в CLI: расширение не работает в веб-окружении. Используется для фоновых процессов или консольных скриптов.
Как использовать классические потоки в PHP?
Расширение pthreads (ныне заброшено) предоставляло объектно-ориентированный интерфейс для работы с потоками. Оно требовало специальной сборки PHP с ZTS (Zend Thread Safety) и было доступно только в CLI. Пример:
class WorkerThread extends Thread {
public $result;
public function run() {
$this->result = 1;
}
}
$worker = new WorkerThread();
$worker->start();
$worker->join();
echo $worker->result;
Проблемы и решения
- Расширение не обновляется с PHP 7.4, поддержка прекращена.
- Требует сборки PHP с ZTS, что редкость.
- Сложность синхронизации через mutex.
Как создать дочерние процессы для параллельной обработки?
Функция pcntl_fork() создает копию текущего процесса. Родитель и потомок выполняются параллельно, но не имеют общей памяти (все копируется).
$pid = pcntl_fork();
if ($pid == -1) {
die('Ошибка fork');
} elseif ($pid) {
// Родитель
pcntl_wait($status);
} else {
// Потомок
echo "Дочерний процесс\n";
exit(0);
}
Проблемы и решения
- Нет разделяемой памяти: каждый процесс работает со своей копией данных.
- Сложность синхронизации: необходимы семафоры или разделяемая память (shmop).
- Ошибки при завершении дочерних процессов: требуется обработка сигналов.
Как выполнять множество HTTP запросов параллельно?
Функция curl_multi_exec() позволяет обрабатывать несколько cURL-запросов в одном цикле без блокировки.
$mh = curl_multi_init();
$ch1 = curl_init('http://example.com');
$ch2 = curl_init('http://example.org');
curl_multi_add_handle($mh, $ch1);
curl_multi_add_handle($mh, $ch2);
$running = null;
do {
curl_multi_exec($mh, $running);
} while ($running > 0);
curl_multi_remove_handle($mh, $ch1);
curl_multi_remove_handle($mh, $ch2);
curl_multi_close($mh);
Проблемы и решения
- Подходит только для HTTP запросов, не для CPU-задач.
- Управление множеством дескрипторов требует аккуратности.
- Ошибки обработки таймаутов: необходимо проверять через curl_multi_info_read().
Как организовать асинхронное выполнение с event loop?
Библиотека ReactPHP строит асинхронное приложение на основе цикла событий. Код не блокируется, но выполняется в одном процессе.
$loop = React\EventLoop\Loop::get();
$loop->addTimer(1.0, function () {
echo "Таймер сработал\n";
});
$loop->run();
Проблемы и решения
- Требуется перепроектирование приложения под асинхронный стиль.
- Не подходит для CPU-интенсивных расчетов: все операции выполняются в одном потоке.
- Сложность отладки асинхронного кода.
Как использовать корутины для параллелизма?
Расширение Swoole предоставляет корутины (cooperative multitasking) для PHP. Код выглядит синхронным, но внутри переключается между задачами.
Co\run(function() {
go(function() {
echo "Корутина 1\n";
});
go(function() {
echo "Корутина 2\n";
});
});
Проблемы и решения
- Требуется установка расширения swoole.
- Нестандартный синтаксис: вызовы go(), channel и т.д.
- Некоторые блокирующие функции (sleep, file_get_contents) не совместимы с корутинами.
Выбор подхода зависит от задачи: для обработки большого объема данных без блокировок подходит parallel; для асинхронных сетевых запросов ReactPHP; для HTTP-запросов curl_multi; для процессов pcntl; для высокопроизводительных серверов Swoole. Каждый вариант имеет свои ограничения и требует учета особенностей среды.
Расширенные примеры с расширением parallel
Пул потоков для пакетной обработки
Создание пула из 4 потоков для выполнения 10 задач:
<?
use parallel\Runtime;
$tasks = range(1, 10);
$runtimes = [];
$futures = [];
// Создание пула воркеров
for ($i = 0; $i < 4; $i++) {
$runtimes[$i] = new Runtime();
}
// Распределение задач
foreach ($tasks as $task) {
$runtime = $runtimes[array_rand($runtimes)]; // случайное назначение
$futures[] = $runtime->run(function($id) {
sleep(1); // имитация работы
return $id * 2;
}, [$task]);
}
// Сбор результатов
$results = [];
foreach ($futures as $future) {
$results[] = $future->value();
}
print_r($results);
?>
Array
(
[0] => 2
[1] => 4
[2] => 6
[3] => 8
[4] => 10
[5] => 12
[6] => 14
[7] => 16
[8] => 18
[9] => 20
)
Пояснение: каждый поток переиспользуется для нескольких задач. Функция run() принимает замыкание и аргументы. Результаты сохраняются и собираются после завершения. Время выполнения сокращается примерно в 4 раза по сравнению с последовательным выполнением.
Обработка большого массива данных
Разбиение массива на части и параллельная обработка:
<?
use parallel\Runtime;
$data = range(1, 100);
$chunks = array_chunk($data, 25); // 4 части
$runtimes = [];
$futures = [];
foreach ($chunks as $key => $chunk) {
$runtimes[$key] = new Runtime();
$futures[$key] = $runtimes[$key]->run(function($chunk) {
$sum = 0;
foreach ($chunk as $value) {
$sum += $value; // может быть любая ресурсоемкая операция
}
return $sum;
}, [$chunk]);
}
$total = 0;
foreach ($futures as $future) {
$total += $future->value();
}
echo "Сумма: $total\n"; // должно быть 5050
?>
Сумма: 5050
Важно: данные передаются как копия, поэтому большие массивы могут потребовать много памяти. Решение - передавать ссылки или разбивать на более мелкие части.
Обработка ошибок в потоках
Исключения внутри run() пробрасываются в основной поток при вызове value():
<?
use parallel\Runtime;
$runtime = new Runtime();
$future = $runtime->run(function() {
throw new \Exception("Ошибка в потоке");
});
try {
$future->value();
} catch (\Exception $e) {
echo "Поймано исключение: " . $e->getMessage() . "\n";
}
?>
Поймано исключение: Ошибка в потоке
Для обработки ошибок без прерывания всего пула можно внутри run() перехватывать исключения и возвращать код ошибки.
Мультиплексирование с parallel\Events
Класс parallel\Events позволяет ожидать завершение первого из нескольких Future (как select):
<?
use parallel\Runtime;
use parallel\Events;
$r1 = new Runtime();
$r2 = new Runtime();
$f1 = $r1->run(function() { sleep(3); return "A"; });
$f2 = $r2->run(function() { sleep(1); return "B"; });
$events = new Events();
$events->addFuture("task1", $f1);
$events->addFuture("task2", $f2);
$event = $events->poll(); // вернет объект Event
if ($event->type === Events\Event\Type::Success) {
echo $event->name . " -> " . $event->value . "\n";
}
?>
task2 -> B
Это полезно для получения быстрого результата из нескольких параллельных операций.
Передача объектов с автозагрузкой
При использовании классов внутри потоков необходимо обеспечить автозагрузку. Файлы классов должны быть доступны в каждом потоке:
<?
use parallel\Runtime;
// Файл MyClass.php
class MyClass {
public function greet() { return "Hello"; }
}
$runtime = new Runtime();
$future = $runtime->run(function() {
require_once 'MyClass.php'; // подгрузка внутри потока
$obj = new MyClass();
return $obj->greet();
});
echo $future->value();
?>
Hello
Для автозагрузки можно использовать spl_autoload_register, но лучше передавать только простые данные или использовать Phar-архивы.